feat(38-02): add voice message handling + TTS reply to Telegram bridge

- Refactor text relay into shared relayToAgent() used by both text/voice handlers
- Add bot.on('message:voice') handler: send 'Transcribing...' immediately, process async
- Download OGG from Telegram CDN via ctx.getFile() + fetch, transcribe via voicePipelineService
- Synthesize agent responses to OGG Opus via transcodeToOggOpus() and ctx.replyWithVoice()
- TTS failure degrades gracefully (text reply already sent, voice is bonus)
- telegram.ts stays at 322 lines (under 500-line TGRAM-06 constraint)
This commit is contained in:
Nexus Dev 2026-04-04 03:18:07 +00:00
parent 0cd6f2b8e1
commit 3b329cf251

View file

@ -1,10 +1,14 @@
import { Bot } from "grammy";
import { Bot, InputFile } from "grammy";
import type { Context } from "grammy";
import { spawn } from "node:child_process";
import ffmpegPath from "ffmpeg-static";
import type { Db } from "@paperclipai/db";
import { chatService } from "./chat.js";
import { agentService } from "./agents.js";
import { companyService } from "./companies.js";
import { puterProxyService } from "./puter-proxy.js";
import { nexusSettingsService } from "./nexus-settings.js";
import { voicePipelineService } from "./voice-pipeline.js";
import { logger } from "../middleware/logger.js";
// In-memory session map: `${chatId}:${agentId}` -> conversationId
@ -68,14 +72,175 @@ function splitMessage(text: string, limit = 4000): string[] {
return parts;
}
/**
* Transcode raw PCM s16le (from Piper TTS) to OGG Opus for Telegram voice notes.
* Piper en_US-lessac-medium outputs 22050Hz.
*/
function transcodeToOggOpus(rawPcmBuffer: Buffer): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
if (!ffmpegPath) {
reject(new Error("ffmpeg-static binary not found"));
return;
}
const ffmpegBin = ffmpegPath as unknown as string;
// Input: raw PCM s16le at 22050Hz mono (Piper output rate)
// Output: OGG Opus at 48000Hz (Telegram requirement)
const ffmpeg = spawn(ffmpegBin, [
"-f", "s16le", "-ar", "22050", "-ac", "1", "-i", "pipe:0",
"-c:a", "libopus", "-ar", "48000", "-f", "ogg", "pipe:1",
], { stdio: ["pipe", "pipe", "pipe"] });
const chunks: Buffer[] = [];
ffmpeg.stdout.on("data", (chunk: Buffer) => chunks.push(chunk));
ffmpeg.stderr.on("data", () => { /* discard */ });
ffmpeg.on("close", (code) => {
if (code === 0) {
resolve(Buffer.concat(chunks));
} else {
reject(new Error(`ffmpeg exited with code ${code}`));
}
});
ffmpeg.on("error", reject);
ffmpeg.stdin.write(rawPcmBuffer);
ffmpeg.stdin.end();
});
}
/**
* Core relay: persist user message, collect LLM stream, send reply parts.
* Both text and voice handlers share this function.
*
* @param voiceMode - If true, attempt a TTS voice reply after sending text.
*/
async function relayToAgent(
ctx: Context,
chatId: string,
userText: string,
db: Db,
voiceMode = false,
): Promise<void> {
const resolved = await resolveDefaultAgent(db);
if (!resolved) {
await ctx.reply("No agents configured. Please set up an agent first.");
return;
}
const { companyId, agentId, agentName } = resolved;
const chatSvc = chatService(db);
const convId = await getOrCreateConversation(chatId, agentId, companyId, db);
// Persist user message
await chatSvc.addMessage(convId, { role: "user", content: userText });
// Build messages array (last 20 messages, reverse for LLM)
const { items } = await chatSvc.listMessages(convId, { limit: 20 });
const messages = [...items].reverse().map((m) => ({
role: m.role as "user" | "assistant" | "system",
content: m.content,
}));
// Collect response from puter proxy
const puterProxy = puterProxyService(db);
let fullResponse = "";
for await (const chunk of puterProxy.chatStream(
companyId,
agentId,
messages,
undefined,
undefined,
)) {
fullResponse += chunk;
}
// Persist assistant message
await chatSvc.addMessage(convId, {
role: "assistant",
content: fullResponse,
agentId,
});
// Prefix with agent name (TGRAM-02) and send reply parts
const prefixed = `[${agentName}]: ${fullResponse}`;
const parts = splitMessage(prefixed, 4000);
for (const part of parts) {
await ctx.reply(part, { parse_mode: "Markdown" });
}
// If voice mode: attempt TTS voice reply as bonus (text already sent)
if (voiceMode) {
try {
const voiceSvc = voicePipelineService();
const voiceText = voiceSvc.formatForVoice(fullResponse);
if (voiceText) {
const pcmBuffer = await voiceSvc.synthesize(voiceText);
const oggBuffer = await transcodeToOggOpus(pcmBuffer);
await ctx.replyWithVoice(new InputFile(oggBuffer, "response.ogg"));
}
} catch (ttsErr) {
// TTS is optional — log warning but don't fail (text reply already sent)
logger.warn({ err: ttsErr }, "telegram: TTS voice reply failed (degrading to text-only)");
}
}
}
/**
* Process a voice message asynchronously after sending the immediate "Transcribing..." reply.
* OGG is downloaded from Telegram CDN, transcribed via voicePipelineService, then relayed.
*/
async function processVoiceMessage(
ctx: Context,
chatId: string,
token: string,
db: Db,
): Promise<void> {
// Download OGG from Telegram file CDN
const file = await ctx.getFile();
if (!file.file_path) {
await ctx.reply("Could not retrieve voice message file.");
return;
}
const downloadUrl = `https://api.telegram.org/file/bot${token}/${file.file_path}`;
const response = await fetch(downloadUrl);
if (!response.ok) {
await ctx.reply("Could not download voice message.");
return;
}
const arrayBuf = await response.arrayBuffer();
const oggBuffer = Buffer.from(arrayBuf);
// Transcribe — voicePipelineService handles OGG->WAV16k internally
const voiceSvc = voicePipelineService();
const { text } = await voiceSvc.transcribe(oggBuffer, "ogg");
if (!text || !text.trim()) {
await ctx.reply("Could not transcribe voice message.");
return;
}
// Confirm transcription to user (truncate for readability)
await ctx.reply(`Heard: ${text.slice(0, 200)}`);
// Relay transcribed text to agent (with voice reply enabled)
await relayToAgent(ctx, chatId, text.trim(), db, true);
}
export function telegramService(db: Db) {
let bot: Bot | null = null;
let botToken: string | null = null;
async function start(token: string): Promise<void> {
if (bot) {
await stop();
}
botToken = token;
// Assign bot early so handlers reference the module-level ref
bot = new Bot(token);
@ -83,61 +248,9 @@ export function telegramService(db: Db) {
const chatId = String(ctx.chat.id);
try {
const resolved = await resolveDefaultAgent(db);
if (!resolved) {
await ctx.reply("No agents configured. Please set up an agent first.");
return;
}
const { companyId, agentId, agentName } = resolved;
const chatSvc = chatService(db);
const convId = await getOrCreateConversation(chatId, agentId, companyId, db);
// Persist user message
await chatSvc.addMessage(convId, {
role: "user",
content: ctx.message.text,
});
// Build messages array (last 20 messages)
const { items } = await chatSvc.listMessages(convId, { limit: 20 });
// listMessages returns newest first, so reverse for LLM
const messages = [...items].reverse().map((m) => ({
role: m.role as "user" | "assistant" | "system",
content: m.content,
}));
// Collect response from puter proxy
const puterProxy = puterProxyService(db);
let fullResponse = "";
for await (const chunk of puterProxy.chatStream(
companyId,
agentId,
messages,
undefined,
undefined,
)) {
fullResponse += chunk;
}
// Persist assistant message
await chatSvc.addMessage(convId, {
role: "assistant",
content: fullResponse,
agentId,
});
// Prefix with agent name (TGRAM-02)
const prefixed = `[${agentName}]: ${fullResponse}`;
// Send reply — split if over 4096 chars
const parts = splitMessage(prefixed, 4000);
for (const part of parts) {
await ctx.reply(part, { parse_mode: "Markdown" });
}
await relayToAgent(ctx, chatId, ctx.message.text, db);
} catch (err) {
logger.error({ err, chatId }, "telegram: error handling message");
logger.error({ err, chatId }, "telegram: error handling text message");
try {
await ctx.reply("Sorry, something went wrong.");
} catch {
@ -146,6 +259,27 @@ export function telegramService(db: Db) {
}
});
bot.on("message:voice", async (ctx) => {
const chatId = String(ctx.chat.id);
// Reply immediately to prevent Telegram from resending the update (Pitfall 1)
try {
await ctx.reply("Transcribing...");
} catch {
// ignore — continue processing
}
// Process async — do NOT await in handler body
processVoiceMessage(ctx, chatId, botToken!, db).catch(async (err) => {
logger.error({ err, chatId }, "telegram: error processing voice message");
try {
await ctx.reply("Voice transcription failed.");
} catch {
// ignore reply errors
}
});
});
bot.catch((err) => {
logger.error({ err }, "Telegram bot error");
});
@ -170,6 +304,7 @@ export function telegramService(db: Db) {
logger.error({ err }, "Error stopping Telegram bot");
}
bot = null;
botToken = null;
logger.info("Telegram bot stopped");
}
}