From 526acbe8aa060e03ca657bcfeba350ffd00952d7 Mon Sep 17 00:00:00 2001 From: Nexus Dev Date: Fri, 3 Apr 2026 00:35:11 +0000 Subject: [PATCH] feat(31-01): implement puterProxyService, puterProxyRoutes, and unit tests - puterProxyService with storeToken (create/rotate idempotent), resolveToken, chatStream - chatStream relays to Puter OpenAI-compat endpoint with SSE streaming - Cost recording with provider=puter, billingType=subscription_included, costCents=0 - Cost recording skipped when agentId is null/undefined (no FK violation) - puterProxyRoutes with POST /puter-proxy/token and POST /puter-proxy/chat - Board auth (assertBoard + assertCompanyAccess) on all routes - All 10 TDD tests passing --- server/src/__tests__/31-puter-proxy.test.ts | 389 ++++++++++++++++++++ server/src/routes/puter-proxy.ts | 81 ++++ server/src/services/puter-proxy.ts | 127 +++++++ 3 files changed, 597 insertions(+) create mode 100644 server/src/__tests__/31-puter-proxy.test.ts create mode 100644 server/src/routes/puter-proxy.ts create mode 100644 server/src/services/puter-proxy.ts diff --git a/server/src/__tests__/31-puter-proxy.test.ts b/server/src/__tests__/31-puter-proxy.test.ts new file mode 100644 index 00000000..18badc51 --- /dev/null +++ b/server/src/__tests__/31-puter-proxy.test.ts @@ -0,0 +1,389 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import express from "express"; +import request from "supertest"; +import { puterProxyService } from "../services/puter-proxy.js"; +import { puterProxyRoutes } from "../routes/puter-proxy.js"; + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function makeTextEncoder() { + return new TextEncoder(); +} + +/** + * Build a minimal ReadableStream that emits pre-formatted SSE lines. + */ +function buildSseStream(chunks: string[]): ReadableStream { + const enc = makeTextEncoder(); + let idx = 0; + return new ReadableStream({ + pull(controller) { + if (idx < chunks.length) { + controller.enqueue(enc.encode(chunks[idx++])); + } else { + controller.close(); + } + }, + }); +} + +function sseChunk(data: object): string { + return `data: ${JSON.stringify(data)}\n\n`; +} + +// ─── Shared mock db factory ─────────────────────────────────────────────────── + +function makeMockDb(overrides: { + existingSecret?: object | null; + resolvedToken?: string; + createSecret?: ReturnType; + rotateSecret?: ReturnType; + createCostEvent?: ReturnType; +} = {}) { + const { + existingSecret = null, + resolvedToken = "test-bearer-token", + createSecret = vi.fn().mockResolvedValue({ id: "secret-1", name: "puter_auth_token" }), + rotateSecret = vi.fn().mockResolvedValue({ id: "secret-1", name: "puter_auth_token" }), + createCostEvent = vi.fn().mockResolvedValue({ id: "event-1" }), + } = overrides; + + // We return a db object that puterProxyService will pass to secretService and costService. + // Since we mock at the module level via the db interactions, we need the service + // to call these. We'll mock the actual db calls. + return { + _mocks: { createSecret, rotateSecret, createCostEvent }, + // The secretService and costService are created inside puterProxyService using the db. + // We inject mock data via the module mocking strategy below. + existingSecret, + resolvedToken, + }; +} + +// ─── Mock secretService and costService at module level ────────────────────── + +const mockGetByName = vi.fn(); +const mockCreate = vi.fn(); +const mockRotate = vi.fn(); +const mockResolveSecretValue = vi.fn(); +const mockCreateEvent = vi.fn(); + +vi.mock("../services/secrets.js", () => ({ + secretService: vi.fn(() => ({ + getByName: mockGetByName, + create: mockCreate, + rotate: mockRotate, + resolveSecretValue: mockResolveSecretValue, + })), +})); + +vi.mock("../services/costs.js", () => ({ + costService: vi.fn(() => ({ + createEvent: mockCreateEvent, + })), +})); + +// ─── Test suite ─────────────────────────────────────────────────────────────── + +describe("puterProxyService", () => { + const companyId = "company-123"; + const db = {} as any; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + // Test 1: storeToken creates a new secret when none exists + it("storeToken creates a new secret via secretService when none exists", async () => { + mockGetByName.mockResolvedValue(null); + mockCreate.mockResolvedValue({ id: "secret-1", name: "puter_auth_token" }); + + const svc = puterProxyService(db); + await svc.storeToken(companyId, "my-puter-token"); + + expect(mockGetByName).toHaveBeenCalledWith(companyId, "puter_auth_token"); + expect(mockCreate).toHaveBeenCalledWith( + companyId, + expect.objectContaining({ + name: "puter_auth_token", + provider: "local_encrypted", + value: "my-puter-token", + }), + ); + expect(mockRotate).not.toHaveBeenCalled(); + }); + + // Test 2: storeToken rotates existing secret + it("storeToken rotates an existing secret when puter_auth_token already exists", async () => { + const existingSecret = { id: "existing-secret-id", name: "puter_auth_token" }; + mockGetByName.mockResolvedValue(existingSecret); + mockRotate.mockResolvedValue({ id: "existing-secret-id", name: "puter_auth_token" }); + + const svc = puterProxyService(db); + await svc.storeToken(companyId, "new-puter-token"); + + expect(mockGetByName).toHaveBeenCalledWith(companyId, "puter_auth_token"); + expect(mockRotate).toHaveBeenCalledWith( + "existing-secret-id", + expect.objectContaining({ value: "new-puter-token" }), + ); + expect(mockCreate).not.toHaveBeenCalled(); + }); + + // Test 3: resolveToken retrieves stored token value + it("resolveToken retrieves the stored token value via secretService.resolveSecretValue", async () => { + const existingSecret = { id: "secret-id", name: "puter_auth_token" }; + mockGetByName.mockResolvedValue(existingSecret); + mockResolveSecretValue.mockResolvedValue("resolved-bearer-token"); + + const svc = puterProxyService(db); + const token = await svc.resolveToken(companyId); + + expect(mockGetByName).toHaveBeenCalledWith(companyId, "puter_auth_token"); + expect(mockResolveSecretValue).toHaveBeenCalledWith(companyId, "secret-id", "latest"); + expect(token).toBe("resolved-bearer-token"); + }); + + // Test 4: chatStream sends POST to Puter with correct headers and body + it("chatStream sends POST to Puter OpenAI-compat endpoint with Authorization Bearer header, stream: true, stream_options", async () => { + mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" }); + mockResolveSecretValue.mockResolvedValue("my-bearer-token"); + mockCreateEvent.mockResolvedValue({ id: "ev-1" }); + + const enc = new TextEncoder(); + const chunks = [ + sseChunk({ choices: [{ delta: { content: "Hello" } }] }), + sseChunk({ choices: [{ delta: { content: " world" } }] }), + sseChunk({ choices: [{ delta: {} }], usage: { prompt_tokens: 10, completion_tokens: 5 } }), + "data: [DONE]\n\n", + ]; + + const mockStream = buildSseStream(chunks); + const mockResponse = { + ok: true, + body: mockStream, + }; + const fetchSpy = vi.fn().mockResolvedValue(mockResponse); + vi.stubGlobal("fetch", fetchSpy); + + const svc = puterProxyService(db); + const messages = [{ role: "user", content: "Hi" }]; + const result: string[] = []; + for await (const chunk of svc.chatStream(companyId, "agent-1", messages, "claude-3-5-haiku-20241022", undefined)) { + result.push(chunk); + } + + expect(fetchSpy).toHaveBeenCalledOnce(); + const [url, opts] = fetchSpy.mock.calls[0]; + expect(url).toContain("api.puter.com/puterai/openai/v1/chat/completions"); + expect(opts.method).toBe("POST"); + expect(opts.headers["Authorization"]).toBe("Bearer my-bearer-token"); + const body = JSON.parse(opts.body); + expect(body.stream).toBe(true); + expect(body.stream_options).toEqual({ include_usage: true }); + }); + + // Test 5: chatStream yields content strings from SSE data chunks + it("chatStream yields content strings from SSE data chunks", async () => { + mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" }); + mockResolveSecretValue.mockResolvedValue("my-bearer-token"); + mockCreateEvent.mockResolvedValue({ id: "ev-1" }); + + const chunks = [ + sseChunk({ choices: [{ delta: { content: "Hello" } }] }), + sseChunk({ choices: [{ delta: { content: " world" } }] }), + sseChunk({ choices: [{ delta: {} }], usage: { prompt_tokens: 5, completion_tokens: 3 } }), + "data: [DONE]\n\n", + ]; + + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ + ok: true, + body: buildSseStream(chunks), + })); + + const svc = puterProxyService(db); + const result: string[] = []; + for await (const chunk of svc.chatStream(companyId, "agent-1", [{ role: "user", content: "hi" }], undefined, undefined)) { + if (chunk) result.push(chunk); + } + + expect(result).toContain("Hello"); + expect(result).toContain(" world"); + }); + + // Test 6: chatStream records cost event when agentId is provided + it("chatStream records a cost event with provider=puter, billingType=subscription_included, costCents=0 when agentId is provided", async () => { + mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" }); + mockResolveSecretValue.mockResolvedValue("my-bearer-token"); + mockCreateEvent.mockResolvedValue({ id: "ev-1" }); + + const chunks = [ + sseChunk({ choices: [{ delta: { content: "Hi" } }], usage: { prompt_tokens: 10, completion_tokens: 5 } }), + "data: [DONE]\n\n", + ]; + + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ + ok: true, + body: buildSseStream(chunks), + })); + + const svc = puterProxyService(db); + for await (const _ of svc.chatStream(companyId, "agent-xyz", [{ role: "user", content: "hi" }], "claude-3-5-haiku-20241022", undefined)) { + // consume stream + } + + // Allow microtask queue to flush (non-blocking cost recording) + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(mockCreateEvent).toHaveBeenCalledOnce(); + expect(mockCreateEvent).toHaveBeenCalledWith( + companyId, + expect.objectContaining({ + agentId: "agent-xyz", + provider: "puter", + billingType: "subscription_included", + costCents: 0, + }), + ); + }); + + // Test 7: chatStream skips cost recording when agentId is null/undefined + it("chatStream skips cost recording when agentId is null/undefined", async () => { + mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" }); + mockResolveSecretValue.mockResolvedValue("my-bearer-token"); + + const chunks = [ + sseChunk({ choices: [{ delta: { content: "No cost" } }] }), + "data: [DONE]\n\n", + ]; + + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ + ok: true, + body: buildSseStream(chunks), + })); + + const svc = puterProxyService(db); + for await (const _ of svc.chatStream(companyId, null, [{ role: "user", content: "hi" }], undefined, undefined)) { + // consume + } + + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(mockCreateEvent).not.toHaveBeenCalled(); + }); +}); + +// ─── Route tests ───────────────────────────────────────────────────────────── + +function buildTestApp(db: any) { + const app = express(); + app.use(express.json()); + // Inject a board actor for all requests + app.use((req: any, _res: any, next: any) => { + req.actor = { + type: "board", + source: "local_implicit", + companyIds: ["company-123"], + isInstanceAdmin: true, + }; + next(); + }); + app.use(puterProxyRoutes(db)); + return app; +} + +describe("puterProxyRoutes", () => { + const db = {} as any; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + // Test 8: POST /api/puter-proxy/token stores token and returns 200 + it("POST /puter-proxy/token stores token and returns 200", async () => { + mockGetByName.mockResolvedValue(null); + mockCreate.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" }); + + const app = buildTestApp(db); + const res = await request(app) + .post("/puter-proxy/token") + .send({ companyId: "company-123", token: "puter-abc123" }); + + expect(res.status).toBe(200); + expect(res.body).toEqual({ ok: true }); + expect(mockCreate).toHaveBeenCalledWith( + "company-123", + expect.objectContaining({ name: "puter_auth_token", value: "puter-abc123" }), + ); + }); + + // Test 9: POST /api/puter-proxy/chat sets SSE headers and streams response chunks + it("POST /puter-proxy/chat sets SSE headers and streams response chunks", async () => { + mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" }); + mockResolveSecretValue.mockResolvedValue("bearer-xyz"); + mockCreateEvent.mockResolvedValue({ id: "ev-1" }); + + const chunks = [ + sseChunk({ choices: [{ delta: { content: "Stream token" } }] }), + "data: [DONE]\n\n", + ]; + + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ + ok: true, + body: buildSseStream(chunks), + })); + + const app = buildTestApp(db); + const res = await request(app) + .post("/puter-proxy/chat") + .send({ + companyId: "company-123", + agentId: "agent-1", + messages: [{ role: "user", content: "Hello" }], + }); + + expect(res.status).toBe(200); + expect(res.headers["content-type"]).toMatch(/text\/event-stream/); + expect(res.text).toContain("Stream token"); + }); + + // Test 10: POST /api/puter-proxy/chat works without agentId in request body + it("POST /puter-proxy/chat works without agentId in request body (agentId is optional)", async () => { + mockGetByName.mockResolvedValue({ id: "sec-1", name: "puter_auth_token" }); + mockResolveSecretValue.mockResolvedValue("bearer-xyz"); + + const chunks = [ + sseChunk({ choices: [{ delta: { content: "No agent" } }] }), + "data: [DONE]\n\n", + ]; + + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ + ok: true, + body: buildSseStream(chunks), + })); + + const app = buildTestApp(db); + const res = await request(app) + .post("/puter-proxy/chat") + .send({ + companyId: "company-123", + // No agentId + messages: [{ role: "user", content: "Hello without agent" }], + }); + + expect(res.status).toBe(200); + expect(res.headers["content-type"]).toMatch(/text\/event-stream/); + expect(res.text).toContain("No agent"); + + await new Promise((resolve) => setTimeout(resolve, 50)); + // No cost event since no agentId + expect(mockCreateEvent).not.toHaveBeenCalled(); + }); +}); diff --git a/server/src/routes/puter-proxy.ts b/server/src/routes/puter-proxy.ts new file mode 100644 index 00000000..f4de2944 --- /dev/null +++ b/server/src/routes/puter-proxy.ts @@ -0,0 +1,81 @@ +import { Router } from "express"; +import type { Db } from "@paperclipai/db"; +import { assertBoard, assertCompanyAccess } from "./authz.js"; +import { puterProxyService } from "../services/puter-proxy.js"; +import { badRequest } from "../errors.js"; + +export function puterProxyRoutes(db: Db): Router { + const router = Router(); + + // POST /puter-proxy/token — store or rotate Puter auth token + router.post("/puter-proxy/token", async (req, res) => { + assertBoard(req); + const { companyId, token } = req.body; + + if (!companyId || typeof companyId !== "string") { + throw badRequest("companyId is required"); + } + if (!token || typeof token !== "string") { + throw badRequest("token is required"); + } + + assertCompanyAccess(req, companyId); + + await puterProxyService(db).storeToken(companyId, token); + res.json({ ok: true }); + }); + + // POST /puter-proxy/chat — stream AI chat via Puter proxy + router.post("/puter-proxy/chat", async (req, res) => { + assertBoard(req); + const { companyId, agentId, messages, model } = req.body; + + if (!companyId || typeof companyId !== "string") { + res.status(400).json({ error: "companyId is required" }); + return; + } + if (!Array.isArray(messages) || messages.length === 0) { + res.status(400).json({ error: "messages must be a non-empty array" }); + return; + } + + assertCompanyAccess(req, companyId); + + // Set SSE headers and flush immediately (same pattern as chat.ts) + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + res.flushHeaders(); + res.write(":ok\n\n"); + + const abort = new AbortController(); + req.on("close", () => abort.abort()); + + try { + const svc = puterProxyService(db); + // agentId is optional — pass through as-is (string or undefined) + for await (const chunk of svc.chatStream( + companyId, + agentId as string | undefined, + messages, + model as string | undefined, + abort.signal, + )) { + if (!res.writable) break; + res.write(`data: ${JSON.stringify({ token: chunk })}\n\n`); + } + if (res.writable && !abort.signal.aborted) { + res.write(`data: ${JSON.stringify({ done: true })}\n\n`); + } + } catch (err) { + if (res.writable) { + res.write(`data: ${JSON.stringify({ error: "Puter stream error" })}\n\n`); + } + } finally { + res.end(); + } + }); + + return router; +} diff --git a/server/src/services/puter-proxy.ts b/server/src/services/puter-proxy.ts new file mode 100644 index 00000000..b75a605b --- /dev/null +++ b/server/src/services/puter-proxy.ts @@ -0,0 +1,127 @@ +import type { Db } from "@paperclipai/db"; +import { secretService } from "./secrets.js"; +import { costService } from "./costs.js"; +import { unprocessable } from "../errors.js"; + +const PUTER_BASE_URL = "https://api.puter.com/puterai/openai/v1"; +const PUTER_DEFAULT_MODEL = "claude-3-5-haiku-20241022"; +const PUTER_TOKEN_SECRET_NAME = "puter_auth_token"; + +export function puterProxyService(db: Db) { + const secrets = secretService(db); + const costs = costService(db); + + async function storeToken(companyId: string, token: string) { + const existing = await secrets.getByName(companyId, PUTER_TOKEN_SECRET_NAME); + if (existing) { + return secrets.rotate(existing.id, { value: token }); + } + return secrets.create(companyId, { + name: PUTER_TOKEN_SECRET_NAME, + provider: "local_encrypted", + value: token, + description: "Puter.com auth token for AI proxy", + }); + } + + async function resolveToken(companyId: string): Promise { + const secret = await secrets.getByName(companyId, PUTER_TOKEN_SECRET_NAME); + if (!secret) { + throw unprocessable("Puter auth token not configured"); + } + return secrets.resolveSecretValue(companyId, secret.id, "latest"); + } + + async function* chatStream( + companyId: string, + agentId: string | null | undefined, + messages: unknown[], + model: string | undefined, + signal: AbortSignal | undefined, + ): AsyncGenerator { + const token = await resolveToken(companyId); + const resolvedModel = model ?? PUTER_DEFAULT_MODEL; + + const response = await fetch(`${PUTER_BASE_URL}/chat/completions`, { + method: "POST", + headers: { + Authorization: `Bearer ${token}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + model: resolvedModel, + messages, + stream: true, + stream_options: { include_usage: true }, + }), + signal, + }); + + if (!response.ok || !response.body) { + const text = await response.text(); + throw new Error(`Puter API error ${response.status}: ${text}`); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let inputTokens = 0; + let outputTokens = 0; + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + // Keep last incomplete line in buffer + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed.startsWith("data: ")) continue; + const data = trimmed.slice("data: ".length); + if (data === "[DONE]") continue; + + let chunk: any; + try { + chunk = JSON.parse(data); + } catch { + continue; + } + + if (chunk.usage) { + inputTokens = chunk.usage.prompt_tokens ?? inputTokens; + outputTokens = chunk.usage.completion_tokens ?? outputTokens; + } + + const content = chunk.choices?.[0]?.delta?.content; + if (content) { + yield content; + } + } + } + } finally { + reader.releaseLock(); + + if (agentId) { + costs + .createEvent(companyId, { + agentId, + provider: "puter", + biller: "puter", + billingType: "subscription_included", + model: resolvedModel, + inputTokens, + outputTokens, + costCents: 0, + occurredAt: new Date(), + }) + .catch(() => {}); + } + } + } + + return { storeToken, resolveToken, chatStream }; +}