From 6435ccb1c479b0cda5749d68f8b5d32aa5849b6a Mon Sep 17 00:00:00 2001 From: Nexus Dev Date: Sat, 4 Apr 2026 03:18:07 +0000 Subject: [PATCH] feat(38-02): add voice message handling + TTS reply to Telegram bridge - Refactor text relay into shared relayToAgent() used by both text/voice handlers - Add bot.on('message:voice') handler: send 'Transcribing...' immediately, process async - Download OGG from Telegram CDN via ctx.getFile() + fetch, transcribe via voicePipelineService - Synthesize agent responses to OGG Opus via transcodeToOggOpus() and ctx.replyWithVoice() - TTS failure degrades gracefully (text reply already sent, voice is bonus) - telegram.ts stays at 322 lines (under 500-line TGRAM-06 constraint) --- server/src/services/telegram.ts | 245 +++++++++++++++++++++++++------- 1 file changed, 190 insertions(+), 55 deletions(-) diff --git a/server/src/services/telegram.ts b/server/src/services/telegram.ts index a0208563..4e86766f 100644 --- a/server/src/services/telegram.ts +++ b/server/src/services/telegram.ts @@ -1,10 +1,14 @@ -import { Bot } from "grammy"; +import { Bot, InputFile } from "grammy"; +import type { Context } from "grammy"; +import { spawn } from "node:child_process"; +import ffmpegPath from "ffmpeg-static"; import type { Db } from "@paperclipai/db"; import { chatService } from "./chat.js"; import { agentService } from "./agents.js"; import { companyService } from "./companies.js"; import { puterProxyService } from "./puter-proxy.js"; import { nexusSettingsService } from "./nexus-settings.js"; +import { voicePipelineService } from "./voice-pipeline.js"; import { logger } from "../middleware/logger.js"; // In-memory session map: `${chatId}:${agentId}` -> conversationId @@ -68,14 +72,175 @@ function splitMessage(text: string, limit = 4000): string[] { return parts; } +/** + * Transcode raw PCM s16le (from Piper TTS) to OGG Opus for Telegram voice notes. + * Piper en_US-lessac-medium outputs 22050Hz. + */ +function transcodeToOggOpus(rawPcmBuffer: Buffer): Promise { + return new Promise((resolve, reject) => { + if (!ffmpegPath) { + reject(new Error("ffmpeg-static binary not found")); + return; + } + const ffmpegBin = ffmpegPath as unknown as string; + + // Input: raw PCM s16le at 22050Hz mono (Piper output rate) + // Output: OGG Opus at 48000Hz (Telegram requirement) + const ffmpeg = spawn(ffmpegBin, [ + "-f", "s16le", "-ar", "22050", "-ac", "1", "-i", "pipe:0", + "-c:a", "libopus", "-ar", "48000", "-f", "ogg", "pipe:1", + ], { stdio: ["pipe", "pipe", "pipe"] }); + + const chunks: Buffer[] = []; + + ffmpeg.stdout.on("data", (chunk: Buffer) => chunks.push(chunk)); + ffmpeg.stderr.on("data", () => { /* discard */ }); + + ffmpeg.on("close", (code) => { + if (code === 0) { + resolve(Buffer.concat(chunks)); + } else { + reject(new Error(`ffmpeg exited with code ${code}`)); + } + }); + + ffmpeg.on("error", reject); + + ffmpeg.stdin.write(rawPcmBuffer); + ffmpeg.stdin.end(); + }); +} + +/** + * Core relay: persist user message, collect LLM stream, send reply parts. + * Both text and voice handlers share this function. + * + * @param voiceMode - If true, attempt a TTS voice reply after sending text. + */ +async function relayToAgent( + ctx: Context, + chatId: string, + userText: string, + db: Db, + voiceMode = false, +): Promise { + const resolved = await resolveDefaultAgent(db); + if (!resolved) { + await ctx.reply("No agents configured. Please set up an agent first."); + return; + } + + const { companyId, agentId, agentName } = resolved; + const chatSvc = chatService(db); + + const convId = await getOrCreateConversation(chatId, agentId, companyId, db); + + // Persist user message + await chatSvc.addMessage(convId, { role: "user", content: userText }); + + // Build messages array (last 20 messages, reverse for LLM) + const { items } = await chatSvc.listMessages(convId, { limit: 20 }); + const messages = [...items].reverse().map((m) => ({ + role: m.role as "user" | "assistant" | "system", + content: m.content, + })); + + // Collect response from puter proxy + const puterProxy = puterProxyService(db); + let fullResponse = ""; + for await (const chunk of puterProxy.chatStream( + companyId, + agentId, + messages, + undefined, + undefined, + )) { + fullResponse += chunk; + } + + // Persist assistant message + await chatSvc.addMessage(convId, { + role: "assistant", + content: fullResponse, + agentId, + }); + + // Prefix with agent name (TGRAM-02) and send reply parts + const prefixed = `[${agentName}]: ${fullResponse}`; + const parts = splitMessage(prefixed, 4000); + for (const part of parts) { + await ctx.reply(part, { parse_mode: "Markdown" }); + } + + // If voice mode: attempt TTS voice reply as bonus (text already sent) + if (voiceMode) { + try { + const voiceSvc = voicePipelineService(); + const voiceText = voiceSvc.formatForVoice(fullResponse); + if (voiceText) { + const pcmBuffer = await voiceSvc.synthesize(voiceText); + const oggBuffer = await transcodeToOggOpus(pcmBuffer); + await ctx.replyWithVoice(new InputFile(oggBuffer, "response.ogg")); + } + } catch (ttsErr) { + // TTS is optional — log warning but don't fail (text reply already sent) + logger.warn({ err: ttsErr }, "telegram: TTS voice reply failed (degrading to text-only)"); + } + } +} + +/** + * Process a voice message asynchronously after sending the immediate "Transcribing..." reply. + * OGG is downloaded from Telegram CDN, transcribed via voicePipelineService, then relayed. + */ +async function processVoiceMessage( + ctx: Context, + chatId: string, + token: string, + db: Db, +): Promise { + // Download OGG from Telegram file CDN + const file = await ctx.getFile(); + if (!file.file_path) { + await ctx.reply("Could not retrieve voice message file."); + return; + } + + const downloadUrl = `https://api.telegram.org/file/bot${token}/${file.file_path}`; + const response = await fetch(downloadUrl); + if (!response.ok) { + await ctx.reply("Could not download voice message."); + return; + } + const arrayBuf = await response.arrayBuffer(); + const oggBuffer = Buffer.from(arrayBuf); + + // Transcribe — voicePipelineService handles OGG->WAV16k internally + const voiceSvc = voicePipelineService(); + const { text } = await voiceSvc.transcribe(oggBuffer, "ogg"); + + if (!text || !text.trim()) { + await ctx.reply("Could not transcribe voice message."); + return; + } + + // Confirm transcription to user (truncate for readability) + await ctx.reply(`Heard: ${text.slice(0, 200)}`); + + // Relay transcribed text to agent (with voice reply enabled) + await relayToAgent(ctx, chatId, text.trim(), db, true); +} + export function telegramService(db: Db) { let bot: Bot | null = null; + let botToken: string | null = null; async function start(token: string): Promise { if (bot) { await stop(); } + botToken = token; // Assign bot early so handlers reference the module-level ref bot = new Bot(token); @@ -83,61 +248,9 @@ export function telegramService(db: Db) { const chatId = String(ctx.chat.id); try { - const resolved = await resolveDefaultAgent(db); - if (!resolved) { - await ctx.reply("No agents configured. Please set up an agent first."); - return; - } - - const { companyId, agentId, agentName } = resolved; - const chatSvc = chatService(db); - - const convId = await getOrCreateConversation(chatId, agentId, companyId, db); - - // Persist user message - await chatSvc.addMessage(convId, { - role: "user", - content: ctx.message.text, - }); - - // Build messages array (last 20 messages) - const { items } = await chatSvc.listMessages(convId, { limit: 20 }); - // listMessages returns newest first, so reverse for LLM - const messages = [...items].reverse().map((m) => ({ - role: m.role as "user" | "assistant" | "system", - content: m.content, - })); - - // Collect response from puter proxy - const puterProxy = puterProxyService(db); - let fullResponse = ""; - for await (const chunk of puterProxy.chatStream( - companyId, - agentId, - messages, - undefined, - undefined, - )) { - fullResponse += chunk; - } - - // Persist assistant message - await chatSvc.addMessage(convId, { - role: "assistant", - content: fullResponse, - agentId, - }); - - // Prefix with agent name (TGRAM-02) - const prefixed = `[${agentName}]: ${fullResponse}`; - - // Send reply — split if over 4096 chars - const parts = splitMessage(prefixed, 4000); - for (const part of parts) { - await ctx.reply(part, { parse_mode: "Markdown" }); - } + await relayToAgent(ctx, chatId, ctx.message.text, db); } catch (err) { - logger.error({ err, chatId }, "telegram: error handling message"); + logger.error({ err, chatId }, "telegram: error handling text message"); try { await ctx.reply("Sorry, something went wrong."); } catch { @@ -146,6 +259,27 @@ export function telegramService(db: Db) { } }); + bot.on("message:voice", async (ctx) => { + const chatId = String(ctx.chat.id); + + // Reply immediately to prevent Telegram from resending the update (Pitfall 1) + try { + await ctx.reply("Transcribing..."); + } catch { + // ignore — continue processing + } + + // Process async — do NOT await in handler body + processVoiceMessage(ctx, chatId, botToken!, db).catch(async (err) => { + logger.error({ err, chatId }, "telegram: error processing voice message"); + try { + await ctx.reply("Voice transcription failed."); + } catch { + // ignore reply errors + } + }); + }); + bot.catch((err) => { logger.error({ err }, "Telegram bot error"); }); @@ -170,6 +304,7 @@ export function telegramService(db: Db) { logger.error({ err }, "Error stopping Telegram bot"); } bot = null; + botToken = null; logger.info("Telegram bot stopped"); } }