feat(31-01): implement puterProxyService, puterProxyRoutes, and unit tests
- puterProxyService with storeToken (create/rotate idempotent), resolveToken, chatStream - chatStream relays to Puter OpenAI-compat endpoint with SSE streaming - Cost recording with provider=puter, billingType=subscription_included, costCents=0 - Cost recording skipped when agentId is null/undefined (no FK violation) - puterProxyRoutes with POST /puter-proxy/token and POST /puter-proxy/chat - Board auth (assertBoard + assertCompanyAccess) on all routes - All 10 TDD tests passing
This commit is contained in:
parent
c41ec162d0
commit
13bc39b1d4
3 changed files with 597 additions and 0 deletions
389
server/src/__tests__/31-puter-proxy.test.ts
Normal file
389
server/src/__tests__/31-puter-proxy.test.ts
Normal file
|
|
@ -0,0 +1,389 @@
|
|||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import express from "express";
|
||||
import request from "supertest";
|
||||
import { puterProxyService } from "../services/puter-proxy.js";
|
||||
import { puterProxyRoutes } from "../routes/puter-proxy.js";
|
||||
|
||||
// ─── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
function makeTextEncoder() {
|
||||
return new TextEncoder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a minimal ReadableStream that emits pre-formatted SSE lines.
|
||||
*/
|
||||
function buildSseStream(chunks: string[]): ReadableStream<Uint8Array> {
|
||||
const enc = makeTextEncoder();
|
||||
let idx = 0;
|
||||
return new ReadableStream<Uint8Array>({
|
||||
pull(controller) {
|
||||
if (idx < chunks.length) {
|
||||
controller.enqueue(enc.encode(chunks[idx++]));
|
||||
} else {
|
||||
controller.close();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function sseChunk(data: object): string {
|
||||
return `data: ${JSON.stringify(data)}\n\n`;
|
||||
}
|
||||
|
||||
// ─── Shared mock db factory ───────────────────────────────────────────────────
|
||||
|
||||
function makeMockDb(overrides: {
|
||||
existingSecret?: object | null;
|
||||
resolvedToken?: string;
|
||||
createSecret?: ReturnType<typeof vi.fn>;
|
||||
rotateSecret?: ReturnType<typeof vi.fn>;
|
||||
createCostEvent?: ReturnType<typeof vi.fn>;
|
||||
} = {}) {
|
||||
const {
|
||||
existingSecret = null,
|
||||
resolvedToken = "test-bearer-token",
|
||||
createSecret = vi.fn().mockResolvedValue({ id: "secret-1", name: "puter_auth_token" }),
|
||||
rotateSecret = vi.fn().mockResolvedValue({ id: "secret-1", name: "puter_auth_token" }),
|
||||
createCostEvent = vi.fn().mockResolvedValue({ id: "event-1" }),
|
||||
} = overrides;
|
||||
|
||||
// We return a db object that puterProxyService will pass to secretService and costService.
|
||||
// Since we mock at the module level via the db interactions, we need the service
|
||||
// to call these. We'll mock the actual db calls.
|
||||
return {
|
||||
_mocks: { createSecret, rotateSecret, createCostEvent },
|
||||
// The secretService and costService are created inside puterProxyService using the db.
|
||||
// We inject mock data via the module mocking strategy below.
|
||||
existingSecret,
|
||||
resolvedToken,
|
||||
};
|
||||
}
|
||||
|
||||
// ─── Mock secretService and costService at module level ──────────────────────
|
||||
|
||||
const mockGetByName = vi.fn();
|
||||
const mockCreate = vi.fn();
|
||||
const mockRotate = vi.fn();
|
||||
const mockResolveSecretValue = vi.fn();
|
||||
const mockCreateEvent = vi.fn();
|
||||
|
||||
vi.mock("../services/secrets.js", () => ({
|
||||
secretService: vi.fn(() => ({
|
||||
getByName: mockGetByName,
|
||||
create: mockCreate,
|
||||
rotate: mockRotate,
|
||||
resolveSecretValue: mockResolveSecretValue,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("../services/costs.js", () => ({
|
||||
costService: vi.fn(() => ({
|
||||
createEvent: mockCreateEvent,
|
||||
})),
|
||||
}));
|
||||
|
||||
// ─── Test suite ───────────────────────────────────────────────────────────────
|
||||
|
||||
describe("puterProxyService", () => {
|
||||
const companyId = "company-123";
|
||||
const db = {} as any;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.unstubAllGlobals();
|
||||
});
|
||||
|
||||
// Test 1: storeToken creates a new secret when none exists
|
||||
it("storeToken creates a new secret via secretService when none exists", async () => {
|
||||
mockGetByName.mockResolvedValue(null);
|
||||
mockCreate.mockResolvedValue({ id: "secret-1", name: "puter_auth_token" });
|
||||
|
||||
const svc = puterProxyService(db);
|
||||
await svc.storeToken(companyId, "my-puter-token");
|
||||
|
||||
expect(mockGetByName).toHaveBeenCalledWith(companyId, "puter_auth_token");
|
||||
expect(mockCreate).toHaveBeenCalledWith(
|
||||
companyId,
|
||||
expect.objectContaining({
|
||||
name: "puter_auth_token",
|
||||
provider: "local_encrypted",
|
||||
value: "my-puter-token",
|
||||
}),
|
||||
);
|
||||
expect(mockRotate).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
// Test 2: storeToken rotates existing secret
|
||||
it("storeToken rotates an existing secret when puter_auth_token already exists", async () => {
|
||||
const existingSecret = { id: "existing-secret-id", name: "puter_auth_token" };
|
||||
mockGetByName.mockResolvedValue(existingSecret);
|
||||
mockRotate.mockResolvedValue({ id: "existing-secret-id", name: "puter_auth_token" });
|
||||
|
||||
const svc = puterProxyService(db);
|
||||
await svc.storeToken(companyId, "new-puter-token");
|
||||
|
||||
expect(mockGetByName).toHaveBeenCalledWith(companyId, "puter_auth_token");
|
||||
expect(mockRotate).toHaveBeenCalledWith(
|
||||
"existing-secret-id",
|
||||
expect.objectContaining({ value: "new-puter-token" }),
|
||||
);
|
||||
expect(mockCreate).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
// Test 3: resolveToken retrieves stored token value
|
||||
it("resolveToken retrieves the stored token value via secretService.resolveSecretValue", async () => {
|
||||
const existingSecret = { id: "secret-id", name: "puter_auth_token" };
|
||||
mockGetByName.mockResolvedValue(existingSecret);
|
||||
mockResolveSecretValue.mockResolvedValue("resolved-bearer-token");
|
||||
|
||||
const svc = puterProxyService(db);
|
||||
const token = await svc.resolveToken(companyId);
|
||||
|
||||
expect(mockGetByName).toHaveBeenCalledWith(companyId, "puter_auth_token");
|
||||
expect(mockResolveSecretValue).toHaveBeenCalledWith(companyId, "secret-id", "latest");
|
||||
expect(token).toBe("resolved-bearer-token");
|
||||
});
|
||||
|
||||
// Test 4: chatStream sends POST to Puter with correct headers and body
|
||||
it("chatStream sends POST to Puter OpenAI-compat endpoint with Authorization Bearer header, stream: true, stream_options", async () => {
|
||||
mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" });
|
||||
mockResolveSecretValue.mockResolvedValue("my-bearer-token");
|
||||
mockCreateEvent.mockResolvedValue({ id: "ev-1" });
|
||||
|
||||
const enc = new TextEncoder();
|
||||
const chunks = [
|
||||
sseChunk({ choices: [{ delta: { content: "Hello" } }] }),
|
||||
sseChunk({ choices: [{ delta: { content: " world" } }] }),
|
||||
sseChunk({ choices: [{ delta: {} }], usage: { prompt_tokens: 10, completion_tokens: 5 } }),
|
||||
"data: [DONE]\n\n",
|
||||
];
|
||||
|
||||
const mockStream = buildSseStream(chunks);
|
||||
const mockResponse = {
|
||||
ok: true,
|
||||
body: mockStream,
|
||||
};
|
||||
const fetchSpy = vi.fn().mockResolvedValue(mockResponse);
|
||||
vi.stubGlobal("fetch", fetchSpy);
|
||||
|
||||
const svc = puterProxyService(db);
|
||||
const messages = [{ role: "user", content: "Hi" }];
|
||||
const result: string[] = [];
|
||||
for await (const chunk of svc.chatStream(companyId, "agent-1", messages, "claude-3-5-haiku-20241022", undefined)) {
|
||||
result.push(chunk);
|
||||
}
|
||||
|
||||
expect(fetchSpy).toHaveBeenCalledOnce();
|
||||
const [url, opts] = fetchSpy.mock.calls[0];
|
||||
expect(url).toContain("api.puter.com/puterai/openai/v1/chat/completions");
|
||||
expect(opts.method).toBe("POST");
|
||||
expect(opts.headers["Authorization"]).toBe("Bearer my-bearer-token");
|
||||
const body = JSON.parse(opts.body);
|
||||
expect(body.stream).toBe(true);
|
||||
expect(body.stream_options).toEqual({ include_usage: true });
|
||||
});
|
||||
|
||||
// Test 5: chatStream yields content strings from SSE data chunks
|
||||
it("chatStream yields content strings from SSE data chunks", async () => {
|
||||
mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" });
|
||||
mockResolveSecretValue.mockResolvedValue("my-bearer-token");
|
||||
mockCreateEvent.mockResolvedValue({ id: "ev-1" });
|
||||
|
||||
const chunks = [
|
||||
sseChunk({ choices: [{ delta: { content: "Hello" } }] }),
|
||||
sseChunk({ choices: [{ delta: { content: " world" } }] }),
|
||||
sseChunk({ choices: [{ delta: {} }], usage: { prompt_tokens: 5, completion_tokens: 3 } }),
|
||||
"data: [DONE]\n\n",
|
||||
];
|
||||
|
||||
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
body: buildSseStream(chunks),
|
||||
}));
|
||||
|
||||
const svc = puterProxyService(db);
|
||||
const result: string[] = [];
|
||||
for await (const chunk of svc.chatStream(companyId, "agent-1", [{ role: "user", content: "hi" }], undefined, undefined)) {
|
||||
if (chunk) result.push(chunk);
|
||||
}
|
||||
|
||||
expect(result).toContain("Hello");
|
||||
expect(result).toContain(" world");
|
||||
});
|
||||
|
||||
// Test 6: chatStream records cost event when agentId is provided
|
||||
it("chatStream records a cost event with provider=puter, billingType=subscription_included, costCents=0 when agentId is provided", async () => {
|
||||
mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" });
|
||||
mockResolveSecretValue.mockResolvedValue("my-bearer-token");
|
||||
mockCreateEvent.mockResolvedValue({ id: "ev-1" });
|
||||
|
||||
const chunks = [
|
||||
sseChunk({ choices: [{ delta: { content: "Hi" } }], usage: { prompt_tokens: 10, completion_tokens: 5 } }),
|
||||
"data: [DONE]\n\n",
|
||||
];
|
||||
|
||||
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
body: buildSseStream(chunks),
|
||||
}));
|
||||
|
||||
const svc = puterProxyService(db);
|
||||
for await (const _ of svc.chatStream(companyId, "agent-xyz", [{ role: "user", content: "hi" }], "claude-3-5-haiku-20241022", undefined)) {
|
||||
// consume stream
|
||||
}
|
||||
|
||||
// Allow microtask queue to flush (non-blocking cost recording)
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(mockCreateEvent).toHaveBeenCalledOnce();
|
||||
expect(mockCreateEvent).toHaveBeenCalledWith(
|
||||
companyId,
|
||||
expect.objectContaining({
|
||||
agentId: "agent-xyz",
|
||||
provider: "puter",
|
||||
billingType: "subscription_included",
|
||||
costCents: 0,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
// Test 7: chatStream skips cost recording when agentId is null/undefined
|
||||
it("chatStream skips cost recording when agentId is null/undefined", async () => {
|
||||
mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" });
|
||||
mockResolveSecretValue.mockResolvedValue("my-bearer-token");
|
||||
|
||||
const chunks = [
|
||||
sseChunk({ choices: [{ delta: { content: "No cost" } }] }),
|
||||
"data: [DONE]\n\n",
|
||||
];
|
||||
|
||||
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
body: buildSseStream(chunks),
|
||||
}));
|
||||
|
||||
const svc = puterProxyService(db);
|
||||
for await (const _ of svc.chatStream(companyId, null, [{ role: "user", content: "hi" }], undefined, undefined)) {
|
||||
// consume
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
expect(mockCreateEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Route tests ─────────────────────────────────────────────────────────────
|
||||
|
||||
function buildTestApp(db: any) {
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
// Inject a board actor for all requests
|
||||
app.use((req: any, _res: any, next: any) => {
|
||||
req.actor = {
|
||||
type: "board",
|
||||
source: "local_implicit",
|
||||
companyIds: ["company-123"],
|
||||
isInstanceAdmin: true,
|
||||
};
|
||||
next();
|
||||
});
|
||||
app.use(puterProxyRoutes(db));
|
||||
return app;
|
||||
}
|
||||
|
||||
describe("puterProxyRoutes", () => {
|
||||
const db = {} as any;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.unstubAllGlobals();
|
||||
});
|
||||
|
||||
// Test 8: POST /api/puter-proxy/token stores token and returns 200
|
||||
it("POST /puter-proxy/token stores token and returns 200", async () => {
|
||||
mockGetByName.mockResolvedValue(null);
|
||||
mockCreate.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" });
|
||||
|
||||
const app = buildTestApp(db);
|
||||
const res = await request(app)
|
||||
.post("/puter-proxy/token")
|
||||
.send({ companyId: "company-123", token: "puter-abc123" });
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.body).toEqual({ ok: true });
|
||||
expect(mockCreate).toHaveBeenCalledWith(
|
||||
"company-123",
|
||||
expect.objectContaining({ name: "puter_auth_token", value: "puter-abc123" }),
|
||||
);
|
||||
});
|
||||
|
||||
// Test 9: POST /api/puter-proxy/chat sets SSE headers and streams response chunks
|
||||
it("POST /puter-proxy/chat sets SSE headers and streams response chunks", async () => {
|
||||
mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" });
|
||||
mockResolveSecretValue.mockResolvedValue("bearer-xyz");
|
||||
mockCreateEvent.mockResolvedValue({ id: "ev-1" });
|
||||
|
||||
const chunks = [
|
||||
sseChunk({ choices: [{ delta: { content: "Stream token" } }] }),
|
||||
"data: [DONE]\n\n",
|
||||
];
|
||||
|
||||
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
body: buildSseStream(chunks),
|
||||
}));
|
||||
|
||||
const app = buildTestApp(db);
|
||||
const res = await request(app)
|
||||
.post("/puter-proxy/chat")
|
||||
.send({
|
||||
companyId: "company-123",
|
||||
agentId: "agent-1",
|
||||
messages: [{ role: "user", content: "Hello" }],
|
||||
});
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.headers["content-type"]).toMatch(/text\/event-stream/);
|
||||
expect(res.text).toContain("Stream token");
|
||||
});
|
||||
|
||||
// Test 10: POST /api/puter-proxy/chat works without agentId in request body
|
||||
it("POST /puter-proxy/chat works without agentId in request body (agentId is optional)", async () => {
|
||||
mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" });
|
||||
mockResolveSecretValue.mockResolvedValue("bearer-xyz");
|
||||
|
||||
const chunks = [
|
||||
sseChunk({ choices: [{ delta: { content: "No agent" } }] }),
|
||||
"data: [DONE]\n\n",
|
||||
];
|
||||
|
||||
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
body: buildSseStream(chunks),
|
||||
}));
|
||||
|
||||
const app = buildTestApp(db);
|
||||
const res = await request(app)
|
||||
.post("/puter-proxy/chat")
|
||||
.send({
|
||||
companyId: "company-123",
|
||||
// No agentId
|
||||
messages: [{ role: "user", content: "Hello without agent" }],
|
||||
});
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.headers["content-type"]).toMatch(/text\/event-stream/);
|
||||
expect(res.text).toContain("No agent");
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
// No cost event since no agentId
|
||||
expect(mockCreateEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
81
server/src/routes/puter-proxy.ts
Normal file
81
server/src/routes/puter-proxy.ts
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
import { Router } from "express";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import { assertBoard, assertCompanyAccess } from "./authz.js";
|
||||
import { puterProxyService } from "../services/puter-proxy.js";
|
||||
import { badRequest } from "../errors.js";
|
||||
|
||||
export function puterProxyRoutes(db: Db): Router {
|
||||
const router = Router();
|
||||
|
||||
// POST /puter-proxy/token — store or rotate Puter auth token
|
||||
router.post("/puter-proxy/token", async (req, res) => {
|
||||
assertBoard(req);
|
||||
const { companyId, token } = req.body;
|
||||
|
||||
if (!companyId || typeof companyId !== "string") {
|
||||
throw badRequest("companyId is required");
|
||||
}
|
||||
if (!token || typeof token !== "string") {
|
||||
throw badRequest("token is required");
|
||||
}
|
||||
|
||||
assertCompanyAccess(req, companyId);
|
||||
|
||||
await puterProxyService(db).storeToken(companyId, token);
|
||||
res.json({ ok: true });
|
||||
});
|
||||
|
||||
// POST /puter-proxy/chat — stream AI chat via Puter proxy
|
||||
router.post("/puter-proxy/chat", async (req, res) => {
|
||||
assertBoard(req);
|
||||
const { companyId, agentId, messages, model } = req.body;
|
||||
|
||||
if (!companyId || typeof companyId !== "string") {
|
||||
res.status(400).json({ error: "companyId is required" });
|
||||
return;
|
||||
}
|
||||
if (!Array.isArray(messages) || messages.length === 0) {
|
||||
res.status(400).json({ error: "messages must be a non-empty array" });
|
||||
return;
|
||||
}
|
||||
|
||||
assertCompanyAccess(req, companyId);
|
||||
|
||||
// Set SSE headers and flush immediately (same pattern as chat.ts)
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Accel-Buffering", "no");
|
||||
res.flushHeaders();
|
||||
res.write(":ok\n\n");
|
||||
|
||||
const abort = new AbortController();
|
||||
req.on("close", () => abort.abort());
|
||||
|
||||
try {
|
||||
const svc = puterProxyService(db);
|
||||
// agentId is optional — pass through as-is (string or undefined)
|
||||
for await (const chunk of svc.chatStream(
|
||||
companyId,
|
||||
agentId as string | undefined,
|
||||
messages,
|
||||
model as string | undefined,
|
||||
abort.signal,
|
||||
)) {
|
||||
if (!res.writable) break;
|
||||
res.write(`data: ${JSON.stringify({ token: chunk })}\n\n`);
|
||||
}
|
||||
if (res.writable && !abort.signal.aborted) {
|
||||
res.write(`data: ${JSON.stringify({ done: true })}\n\n`);
|
||||
}
|
||||
} catch (err) {
|
||||
if (res.writable) {
|
||||
res.write(`data: ${JSON.stringify({ error: "Puter stream error" })}\n\n`);
|
||||
}
|
||||
} finally {
|
||||
res.end();
|
||||
}
|
||||
});
|
||||
|
||||
return router;
|
||||
}
|
||||
127
server/src/services/puter-proxy.ts
Normal file
127
server/src/services/puter-proxy.ts
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
import type { Db } from "@paperclipai/db";
|
||||
import { secretService } from "./secrets.js";
|
||||
import { costService } from "./costs.js";
|
||||
import { unprocessable } from "../errors.js";
|
||||
|
||||
const PUTER_BASE_URL = "https://api.puter.com/puterai/openai/v1";
|
||||
const PUTER_DEFAULT_MODEL = "claude-3-5-haiku-20241022";
|
||||
const PUTER_TOKEN_SECRET_NAME = "puter_auth_token";
|
||||
|
||||
export function puterProxyService(db: Db) {
|
||||
const secrets = secretService(db);
|
||||
const costs = costService(db);
|
||||
|
||||
async function storeToken(companyId: string, token: string) {
|
||||
const existing = await secrets.getByName(companyId, PUTER_TOKEN_SECRET_NAME);
|
||||
if (existing) {
|
||||
return secrets.rotate(existing.id, { value: token });
|
||||
}
|
||||
return secrets.create(companyId, {
|
||||
name: PUTER_TOKEN_SECRET_NAME,
|
||||
provider: "local_encrypted",
|
||||
value: token,
|
||||
description: "Puter.com auth token for AI proxy",
|
||||
});
|
||||
}
|
||||
|
||||
async function resolveToken(companyId: string): Promise<string> {
|
||||
const secret = await secrets.getByName(companyId, PUTER_TOKEN_SECRET_NAME);
|
||||
if (!secret) {
|
||||
throw unprocessable("Puter auth token not configured");
|
||||
}
|
||||
return secrets.resolveSecretValue(companyId, secret.id, "latest");
|
||||
}
|
||||
|
||||
async function* chatStream(
|
||||
companyId: string,
|
||||
agentId: string | null | undefined,
|
||||
messages: unknown[],
|
||||
model: string | undefined,
|
||||
signal: AbortSignal | undefined,
|
||||
): AsyncGenerator<string> {
|
||||
const token = await resolveToken(companyId);
|
||||
const resolvedModel = model ?? PUTER_DEFAULT_MODEL;
|
||||
|
||||
const response = await fetch(`${PUTER_BASE_URL}/chat/completions`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
model: resolvedModel,
|
||||
messages,
|
||||
stream: true,
|
||||
stream_options: { include_usage: true },
|
||||
}),
|
||||
signal,
|
||||
});
|
||||
|
||||
if (!response.ok || !response.body) {
|
||||
const text = await response.text();
|
||||
throw new Error(`Puter API error ${response.status}: ${text}`);
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let inputTokens = 0;
|
||||
let outputTokens = 0;
|
||||
let buffer = "";
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split("\n");
|
||||
// Keep last incomplete line in buffer
|
||||
buffer = lines.pop() ?? "";
|
||||
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed.startsWith("data: ")) continue;
|
||||
const data = trimmed.slice("data: ".length);
|
||||
if (data === "[DONE]") continue;
|
||||
|
||||
let chunk: any;
|
||||
try {
|
||||
chunk = JSON.parse(data);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (chunk.usage) {
|
||||
inputTokens = chunk.usage.prompt_tokens ?? inputTokens;
|
||||
outputTokens = chunk.usage.completion_tokens ?? outputTokens;
|
||||
}
|
||||
|
||||
const content = chunk.choices?.[0]?.delta?.content;
|
||||
if (content) {
|
||||
yield content;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
|
||||
if (agentId) {
|
||||
costs
|
||||
.createEvent(companyId, {
|
||||
agentId,
|
||||
provider: "puter",
|
||||
biller: "puter",
|
||||
billingType: "subscription_included",
|
||||
model: resolvedModel,
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
costCents: 0,
|
||||
occurredAt: new Date(),
|
||||
})
|
||||
.catch(() => {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { storeToken, resolveToken, chatStream };
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue