diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5ea57460..0f4ac749 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -555,6 +555,9 @@ importers: ffmpeg-static: specifier: ^5.3.0 version: 5.3.0 + grammy: + specifier: ^1.42.0 + version: 1.42.0 hermes-paperclip-adapter: specifier: ^0.2.0 version: 0.2.1 @@ -2116,6 +2119,9 @@ packages: '@floating-ui/utils@0.2.10': resolution: {integrity: sha512-aGTxbpbg8/b5JfU1HXSrbH3wXZuLPJcNEcZQFMxLs3oSzgtVu6nFPkbbGGUvBcUjKV2YyB9Wxxabo+HEH9tcRQ==} + '@grammyjs/types@3.26.0': + resolution: {integrity: sha512-jlnyfxfev/2o68HlvAGRocAXgdPPX5QabG7jZlbqC2r9DZyWBfzTlg+nu3O3Fy4EhgLWu28hZ/8wr7DsNamP9A==} + '@iconify/types@2.0.0': resolution: {integrity: sha512-+wluvCrRhXrhyOmRDJ3q8mux9JkKy5SJ/v8ol2tu4FVjyYvtEzkc/3pK15ET6RKg4b4w4BmTk1+gsCUhf21Ykg==} @@ -4079,6 +4085,10 @@ packages: '@vitest/utils@3.2.4': resolution: {integrity: sha512-fB2V0JFrQSMsCo9HiSq3Ezpdv4iYaXRG1Sx8edX3MwxfyNn83mKiGzOcH+Fkxt4MHxr3y42fQi1oeAInqgX2QA==} + abort-controller@3.0.0: + resolution: {integrity: sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==} + engines: {node: '>=6.5'} + accepts@2.0.0: resolution: {integrity: sha512-5cvg6CtKwfgdmVqY1WIiXKc3Q1bkRqGLi+2W/6ao+6Y7gu/RCwRuAhGEzh5B4KlszSuTLgZYuqFqo5bImjNKng==} engines: {node: '>= 0.6'} @@ -4967,6 +4977,10 @@ packages: event-emitter@0.3.5: resolution: {integrity: sha512-D9rRn9y7kLPnJ+hMq7S/nhvoKwwvVJahBi2BPmx3bvbsEdK3W9ii8cBSGjP+72/LnM4n6fo3+dkCX5FeTQruXA==} + event-target-shim@5.0.1: + resolution: {integrity: sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==} + engines: {node: '>=6'} + expect-type@1.3.0: resolution: {integrity: sha512-knvyeauYhqjOYvQ66MznSMs83wmHrCycNEN6Ao+2AeYEfxUIkuiVxdEa1qlGEPK+We3n0THiDciYSsCcgW/DoA==} engines: {node: '>=12.0.0'} @@ -5091,6 +5105,10 @@ packages: graceful-fs@4.2.11: resolution: {integrity: sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==} + grammy@1.42.0: + resolution: {integrity: sha512-1AdCge+AkjSdp2FwfICSFnVbl8Mq3KVHJDy+DgTI9+D6keJ0zWALPRKas5jv/8psiCzL4N2cEOcGW7O45Kn39g==} + engines: {node: ^12.20.0 || >=14.13.1} + guid-typescript@1.0.9: resolution: {integrity: sha512-Y8T4vYhEfwJOTbouREvG+3XDsjr8E3kIr7uf+JZ0BYloFsttiHU0WfvANVsR7TxNUJa/WpCnw/Ino/p+DeBhBQ==} @@ -8289,6 +8307,8 @@ snapshots: '@floating-ui/utils@0.2.10': {} + '@grammyjs/types@3.26.0': {} + '@iconify/types@2.0.0': {} '@iconify/utils@3.1.0': @@ -10569,6 +10589,10 @@ snapshots: loupe: 3.2.1 tinyrainbow: 2.0.0 + abort-controller@3.0.0: + dependencies: + event-target-shim: 5.0.1 + accepts@2.0.0: dependencies: mime-types: 3.0.2 @@ -11457,6 +11481,8 @@ snapshots: d: 1.0.2 es5-ext: 0.10.64 + event-target-shim@5.0.1: {} + expect-type@1.3.0: {} express@5.2.1: @@ -11609,6 +11635,16 @@ snapshots: graceful-fs@4.2.11: {} + grammy@1.42.0: + dependencies: + '@grammyjs/types': 3.26.0 + abort-controller: 3.0.0 + debug: 4.4.3 + node-fetch: 2.7.0 + transitivePeerDependencies: + - encoding + - supports-color + guid-typescript@1.0.9: {} hachure-fill@0.5.2: {} diff --git a/server/package.json b/server/package.json index 14d9fe8b..b6277823 100644 --- a/server/package.json +++ b/server/package.json @@ -69,6 +69,7 @@ "embedded-postgres": "^18.1.0-beta.16", "express": "^5.1.0", "ffmpeg-static": "^5.3.0", + "grammy": "^1.42.0", "hermes-paperclip-adapter": "^0.2.0", "jsdom": "^28.1.0", "multer": "^2.0.2", diff --git a/server/src/routes/telegram.ts b/server/src/routes/telegram.ts new file mode 100644 index 00000000..9d46c8ac --- /dev/null +++ b/server/src/routes/telegram.ts @@ -0,0 +1,70 @@ +import { Router } from "express"; +import { Bot } from "grammy"; +import { assertBoard } from "./authz.js"; +import { nexusSettingsService } from "../services/nexus-settings.js"; +import type { TelegramService } from "../services/telegram.js"; +import type { Db } from "@paperclipai/db"; + +export function telegramRoutes(db: Db, svc: TelegramService): Router { + const router = Router(); + + // POST /telegram/token — validate and save bot token, restart bot + router.post("/telegram/token", async (req, res) => { + try { + assertBoard(req); + + const { token } = req.body as { token?: string }; + if (!token || typeof token !== "string" || token.trim() === "") { + res.status(400).json({ error: "token is required" }); + return; + } + + // Validate token by calling getMe + let botUsername: string; + try { + const tempBot = new Bot(token.trim()); + const me = await tempBot.api.getMe(); + botUsername = me.username ?? me.first_name; + } catch { + res.status(400).json({ error: "Invalid Telegram bot token" }); + return; + } + + // Save token to nexus settings + await nexusSettingsService().set({ telegramToken: token.trim() }); + + // Stop old bot and start new one with new token + await svc.stop(); + svc.start(token.trim()).catch((err) => { + // Log but don't fail the request — token is saved + void err; + }); + + res.json({ ok: true, botUsername }); + } catch (err: unknown) { + if (err && typeof err === "object" && "status" in err) { + const e = err as { status: number; message: string }; + res.status(e.status).json({ error: e.message }); + return; + } + res.status(500).json({ error: "Unexpected error" }); + } + }); + + // GET /telegram/status — return running state + router.get("/telegram/status", async (req, res) => { + try { + assertBoard(req); + res.json({ running: svc.isRunning() }); + } catch (err: unknown) { + if (err && typeof err === "object" && "status" in err) { + const e = err as { status: number; message: string }; + res.status(e.status).json({ error: e.message }); + return; + } + res.status(500).json({ error: "Unexpected error" }); + } + }); + + return router; +} diff --git a/server/src/services/telegram.ts b/server/src/services/telegram.ts new file mode 100644 index 00000000..a0208563 --- /dev/null +++ b/server/src/services/telegram.ts @@ -0,0 +1,187 @@ +import { Bot } from "grammy"; +import type { Db } from "@paperclipai/db"; +import { chatService } from "./chat.js"; +import { agentService } from "./agents.js"; +import { companyService } from "./companies.js"; +import { puterProxyService } from "./puter-proxy.js"; +import { nexusSettingsService } from "./nexus-settings.js"; +import { logger } from "../middleware/logger.js"; + +// In-memory session map: `${chatId}:${agentId}` -> conversationId +const sessionMap = new Map(); + +interface ResolvedAgent { + companyId: string; + agentId: string; + agentName: string; +} + +async function resolveDefaultAgent(db: Db): Promise { + try { + const companies = await companyService(db).list(); + if (!companies || companies.length === 0) return null; + + const company = companies[0]!; + const agentsList = await agentService(db).list(company.id); + if (!agentsList || agentsList.length === 0) return null; + + const agent = agentsList[0]!; + return { + companyId: company.id, + agentId: agent.id, + agentName: agent.name, + }; + } catch (err) { + logger.error({ err }, "telegram: failed to resolve default agent"); + return null; + } +} + +async function getOrCreateConversation( + chatId: string, + agentId: string, + companyId: string, + db: Db, +): Promise { + const key = `${chatId}:${agentId}`; + const existing = sessionMap.get(key); + if (existing) return existing; + + const chatSvc = chatService(db); + const conv = await chatSvc.createConversation(companyId, { + title: `Telegram chat ${chatId}`, + agentId, + }); + + sessionMap.set(key, conv.id); + return conv.id; +} + +function splitMessage(text: string, limit = 4000): string[] { + if (text.length <= limit) return [text]; + const parts: string[] = []; + let remaining = text; + while (remaining.length > 0) { + parts.push(remaining.slice(0, limit)); + remaining = remaining.slice(limit); + } + return parts; +} + +export function telegramService(db: Db) { + let bot: Bot | null = null; + + async function start(token: string): Promise { + if (bot) { + await stop(); + } + + // Assign bot early so handlers reference the module-level ref + bot = new Bot(token); + + bot.on("message:text", async (ctx) => { + const chatId = String(ctx.chat.id); + + try { + const resolved = await resolveDefaultAgent(db); + if (!resolved) { + await ctx.reply("No agents configured. Please set up an agent first."); + return; + } + + const { companyId, agentId, agentName } = resolved; + const chatSvc = chatService(db); + + const convId = await getOrCreateConversation(chatId, agentId, companyId, db); + + // Persist user message + await chatSvc.addMessage(convId, { + role: "user", + content: ctx.message.text, + }); + + // Build messages array (last 20 messages) + const { items } = await chatSvc.listMessages(convId, { limit: 20 }); + // listMessages returns newest first, so reverse for LLM + const messages = [...items].reverse().map((m) => ({ + role: m.role as "user" | "assistant" | "system", + content: m.content, + })); + + // Collect response from puter proxy + const puterProxy = puterProxyService(db); + let fullResponse = ""; + for await (const chunk of puterProxy.chatStream( + companyId, + agentId, + messages, + undefined, + undefined, + )) { + fullResponse += chunk; + } + + // Persist assistant message + await chatSvc.addMessage(convId, { + role: "assistant", + content: fullResponse, + agentId, + }); + + // Prefix with agent name (TGRAM-02) + const prefixed = `[${agentName}]: ${fullResponse}`; + + // Send reply — split if over 4096 chars + const parts = splitMessage(prefixed, 4000); + for (const part of parts) { + await ctx.reply(part, { parse_mode: "Markdown" }); + } + } catch (err) { + logger.error({ err, chatId }, "telegram: error handling message"); + try { + await ctx.reply("Sorry, something went wrong."); + } catch { + // ignore reply errors + } + } + }); + + bot.catch((err) => { + logger.error({ err }, "Telegram bot error"); + }); + + // Delete webhook first to ensure long polling works (Pitfall 4) + await bot.api.deleteWebhook(); + + // Do NOT await — bot.start() is a never-resolving promise + bot.start().catch((err) => { + logger.error({ err }, "Telegram bot polling error"); + bot = null; + }); + + logger.info("Telegram bot started (long polling)"); + } + + async function stop(): Promise { + if (bot) { + try { + await bot.stop(); + } catch (err) { + logger.error({ err }, "Error stopping Telegram bot"); + } + bot = null; + logger.info("Telegram bot stopped"); + } + } + + function isRunning(): boolean { + return bot !== null; + } + + return { start, stop, isRunning }; +} + +export type TelegramService = ReturnType; + +// Re-export nexusSettingsService for convenience in routes +export { nexusSettingsService };