nexus/server/src/services/voice-pipeline.ts
Nexus Dev 22beb245f2 feat(39-01): sentence-buffered TTS streaming + multi-language synthesis
- Export splitSentences() with title-abbreviation protection (Dr., Mr. etc.)
- Add synthesizeSentenceStream() AsyncGenerator yielding per-sentence audio chunks
- Add synthesizeMultiLang() synthesizing same text in N voices via Promise.all
- Add POST /api/synthesize/stream SSE endpoint with base64 audio per sentence
- Add POST /api/synthesize/multi-lang returning array of voiceId+audio pairs
- Existing POST /api/synthesize unchanged (backward compatible)
2026-04-04 03:55:50 +00:00

276 lines
8.7 KiB
TypeScript

import ffmpegPath from "ffmpeg-static";
import { spawn, execFile as execFileCb } from "node:child_process";
import { tmpdir } from "node:os";
import path from "node:path";
import { writeFile, unlink } from "node:fs/promises";
/** Promisifies execFile, always resolving with { stdout, stderr } for consistent mocking. */
function execFileAsync(
cmd: string,
args: string[],
opts: { timeout?: number; maxBuffer?: number; input?: string }
): Promise<{ stdout: string; stderr: string }> {
return new Promise((resolve, reject) => {
execFileCb(cmd, args, opts as any, (err, stdout, stderr) => {
if (err) {
reject(err);
} else {
resolve({
stdout: Buffer.isBuffer(stdout) ? stdout.toString() : String(stdout ?? ""),
stderr: Buffer.isBuffer(stderr) ? stderr.toString() : String(stderr ?? ""),
});
}
});
});
}
/**
* Splits text into sentences, preserving title abbreviations like Dr., Mr., etc.
* Uses a lookbehind for sentence-ending punctuation followed by whitespace.
* Protects common title abbreviations (Dr., Mr., Mrs., etc.) from being split on.
* Acronyms like D.C. and U.S. that appear at sentence end will still trigger splits.
*/
export function splitSentences(text: string): string[] {
if (!text || !text.trim()) return [];
const PLACEHOLDER = "\x00";
// Protect title abbreviations by replacing the trailing ". " with ".\x00"
const processed = text.replace(
/\b(Mr|Mrs|Ms|Dr|Prof|Sr|Jr|Rev|Gen|Col|Sgt|Cpl|Pvt|Lt|Cmdr|Capt|Gov|Rep|Sen)\.\s+/g,
(_, abbr) => `${abbr}.${PLACEHOLDER}`
);
// Split on sentence-ending punctuation followed by whitespace
const parts = processed.split(/(?<=[.!?])\s+/).filter((s) => s.length > 0);
// Restore placeholders (replace with a space)
return parts
.map((s) => s.replace(new RegExp(PLACEHOLDER, "g"), " ").trim())
.filter((s) => s.length > 0);
}
export function voicePipelineService() {
if (!ffmpegPath) {
throw new Error("ffmpeg-static binary not found on this platform");
}
const ffmpegBin = ffmpegPath as unknown as string;
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
return Promise.race([
promise,
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error(`Timed out after ${ms}ms`)), ms)
),
]);
}
async function transcodeToWav16k(inputBuffer: Buffer, inputFormat: string): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
const ffmpeg = spawn(ffmpegBin, ["-f", inputFormat, "-i", "pipe:0", "-ar", "16000", "-ac", "1", "-f", "wav", "pipe:1"], {
stdio: ["pipe", "pipe", "pipe"],
});
const chunks: Buffer[] = [];
ffmpeg.stdout.on("data", (chunk: Buffer) => {
chunks.push(chunk);
});
ffmpeg.stderr.on("data", () => {
// Discard stderr to avoid blocking
});
ffmpeg.on("close", (code) => {
if (code === 0) {
resolve(Buffer.concat(chunks));
} else {
reject(new Error(`ffmpeg exited with code ${code}`));
}
});
ffmpeg.on("error", (err) => {
reject(err);
});
ffmpeg.stdin.write(inputBuffer);
ffmpeg.stdin.end();
});
}
async function transcribe(
buffer: Buffer,
format: "webm" | "ogg" | "wav"
): Promise<{ text: string; language?: string }> {
const wavBuffer = format !== "wav" ? await transcodeToWav16k(buffer, format) : buffer;
const tmpPath = path.join(tmpdir(), `nexus-audio-${Date.now()}.wav`);
try {
await writeFile(tmpPath, wavBuffer);
// Try whisper-cpp first
try {
const { stdout } = await execFileAsync(
"whisper-cpp",
["--model", "base.en", "--file", tmpPath, "--no-timestamps", "--output-txt", "--language", "auto"],
{ timeout: 30000 }
);
// Parse language from output if present (e.g. "auto-detected language: en")
let language: string | undefined;
const langMatch = stdout.match(/auto-detected language[:\s]+([a-z]{2})/i);
if (langMatch) {
language = langMatch[1];
}
return { text: stdout.trim(), language };
} catch (_whisperCppErr) {
// Fall through to openai-whisper
}
// Try openai-whisper Python CLI as fallback
try {
const { stdout } = await execFileAsync(
"whisper",
[tmpPath, "--model", "base.en", "--output_format", "txt", "--output_dir", tmpdir()],
{ timeout: 60000 }
);
return { text: stdout.trim() };
} catch (_whisperErr) {
// Both failed
}
throw new Error(
"Whisper not available. Install whisper-cpp or openai-whisper for voice input."
);
} finally {
unlink(tmpPath).catch(() => {});
}
}
async function synthesizeSentence(sentence: string, voiceId?: string): Promise<Buffer> {
return withTimeout(
new Promise<Buffer>((resolve, reject) => {
execFileCb(
"piper",
["--model", voiceId || "en_US-lessac-medium", "--output-raw"],
{
timeout: 8000,
maxBuffer: 10 * 1024 * 1024,
// @ts-ignore - input option is valid for execFile
input: sentence,
},
(err: Error | null, stdout: string | Buffer) => {
if (err) {
reject(err);
} else {
resolve(Buffer.isBuffer(stdout) ? stdout : Buffer.from(stdout as string));
}
}
);
}),
8000
);
}
async function synthesize(text: string, voiceId?: string): Promise<Buffer> {
const sentences = splitSentences(text);
const buffers: Buffer[] = [];
for (const sentence of sentences) {
try {
const audioData = await synthesizeSentence(sentence, voiceId);
buffers.push(audioData);
} catch (err) {
const nodeErr = err as NodeJS.ErrnoException;
if (nodeErr.code === "ENOENT") {
throw new Error("Piper TTS not available. Install piper for voice output.");
}
throw err;
}
}
return Buffer.concat(buffers);
}
async function* synthesizeSentenceStream(
text: string,
voiceId?: string
): AsyncGenerator<{ index: number; total: number; audio: Buffer }> {
const sentences = splitSentences(text);
const total = sentences.length;
for (let index = 0; index < sentences.length; index++) {
try {
const audio = await synthesizeSentence(sentences[index], voiceId);
yield { index, total, audio };
} catch (err) {
const nodeErr = err as NodeJS.ErrnoException;
if (nodeErr.code === "ENOENT") {
throw new Error("Piper TTS not available. Install piper for voice output.");
}
throw err;
}
}
}
async function synthesizeMultiLang(text: string, voiceIds: string[]): Promise<Map<string, Buffer>> {
const results = await Promise.all(
voiceIds.map(async (voiceId) => {
const audio = await synthesize(text, voiceId);
return [voiceId, audio] as [string, Buffer];
})
);
return new Map(results);
}
function formatForVoice(text: string): string {
if (!text) return "";
// Check for SPOKEN: marker
const spokenMatch = text.match(/SPOKEN:\s*([\s\S]*?)(?=\nDETAILED:|\n\n[A-Z]+:)/);
if (spokenMatch) {
return spokenMatch[1].trim();
}
// Strip markdown
let result = text;
// Remove triple backtick code fences (with optional language identifier followed by newline)
// Pattern: ```lang\n...content...\n``` → content
// Pattern: ```content``` (no newline) → content
result = result.replace(/```([a-z]*)\n?([\s\S]*?)```/g, (_match, lang, inner) => {
// If lang is present and followed by a newline, it's a language identifier; inner is the code
// If no newline (lang === content), preserve the lang as text
if (lang && !inner.trim()) {
// ``` followed by word then immediately ``` — the "word" is actually content
return lang;
}
return inner.trim();
});
// Remove inline backticks
result = result.replace(/`([^`]+)`/g, "$1");
// Remove heading markers (## Heading -> Heading)
result = result.replace(/^#{1,6}\s+/gm, "");
// Remove bold markers (**text** -> text)
result = result.replace(/\*\*([^*]+)\*\*/g, "$1");
// Remove italic markers (*text* -> text)
result = result.replace(/\*([^*]+)\*/g, "$1");
// Remove bullet point prefixes (- item or * item)
result = result.replace(/^[-*]\s+/gm, "");
// Collapse multiple blank lines into one
result = result.replace(/\n{3,}/g, "\n\n");
return result.trim();
}
return { transcribe, synthesize, synthesizeSentenceStream, synthesizeMultiLang, formatForVoice, transcodeToWav16k };
}