From 15f6079c6b2d7b97de0ffe84061ae6dc519e586f Mon Sep 17 00:00:00 2001 From: HenkDz Date: Thu, 26 Mar 2026 10:59:58 +0100 Subject: [PATCH] Fix Pi adapter execution and improve transcript parsing - Changed from RPC mode to JSON print mode (--mode json -p) - Added prompt as CLI argument instead of stdin RPC command - Rewrote transcript parser to properly handle Pi's JSONL output - Added toolUseId to tool_call entries for proper matching with tool_result - Filter out RPC protocol messages from transcript - Extract thinking blocks and usage statistics --- .../adapters/pi-local/src/server/execute.ts | 18 +- .../adapters/pi-local/src/ui/parse-stdout.ts | 206 ++++++++++++++---- 2 files changed, 175 insertions(+), 49 deletions(-) diff --git a/packages/adapters/pi-local/src/server/execute.ts b/packages/adapters/pi-local/src/server/execute.ts index 96588f18..a78bc1d4 100644 --- a/packages/adapters/pi-local/src/server/execute.ts +++ b/packages/adapters/pi-local/src/server/execute.ts @@ -326,8 +326,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise { const args: string[] = []; - // Use RPC mode for proper lifecycle management (waits for agent completion) - args.push("--mode", "rpc"); + // Use JSON mode for structured output with print mode (non-interactive) + args.push("--mode", "json"); + args.push("-p"); // Non-interactive mode: process prompt and exit // Use --append-system-prompt to extend Pi's default system prompt args.push("--append-system-prompt", renderedSystemPromptExtension); @@ -343,19 +344,13 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0) args.push(...extraArgs); + + // Add the user prompt as the last argument + args.push(userPrompt); return args; }; - const buildRpcStdin = (): string => { - // Send the prompt as an RPC command - const promptCommand = { - type: "prompt", - message: userPrompt, - }; - return JSON.stringify(promptCommand) + "\n"; - }; - const runAttempt = async (sessionFile: string) => { const args = buildArgs(sessionFile); if (onMeta) { @@ -402,7 +397,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise): string { - if (typeof content === "string") return content; - if (!Array.isArray(content)) return ""; - return content - .filter((c) => c.type === "text" && c.text) - .map((c) => c.text!) - .join(""); +function extractTextContent(content: string | Array<{ type: string; text?: string; thinking?: string }>): { text: string; thinking: string } { + if (typeof content === "string") return { text: content, thinking: "" }; + if (!Array.isArray(content)) return { text: "", thinking: "" }; + + let text = ""; + let thinking = ""; + + for (const c of content) { + if (c.type === "text" && c.text) { + text += c.text; + } + if (c.type === "thinking" && c.thinking) { + thinking += c.thinking; + } + } + + return { text, thinking }; +} + +// Track pending tool calls for proper toolUseId matching +let pendingToolCalls = new Map(); + +export function resetParserState(): void { + pendingToolCalls.clear(); } export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] { const parsed = asRecord(safeJsonParse(line)); if (!parsed) { - return [{ kind: "stdout", ts, text: line }]; + // Non-JSON line, treat as raw stdout + const trimmed = line.trim(); + if (!trimmed) return []; + return [{ kind: "stdout", ts, text: trimmed }]; } const type = asString(parsed.type); @@ -41,16 +61,64 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] { // Agent lifecycle if (type === "agent_start") { - return [{ kind: "system", ts, text: "Pi agent started" }]; + return [{ kind: "system", ts, text: "🚀 Pi agent started" }]; } if (type === "agent_end") { - return [{ kind: "system", ts, text: "Pi agent finished" }]; + const entries: TranscriptEntry[] = []; + + // Extract final message from messages array if available + const messages = parsed.messages as Array> | undefined; + if (messages && messages.length > 0) { + const lastMessage = messages[messages.length - 1]; + if (lastMessage?.role === "assistant") { + const content = lastMessage.content as string | Array<{ type: string; text?: string; thinking?: string }>; + const { text, thinking } = extractTextContent(content); + + if (thinking) { + entries.push({ kind: "thinking", ts, text: thinking }); + } + if (text) { + entries.push({ kind: "assistant", ts, text }); + } + + // Extract usage + const usage = asRecord(lastMessage.usage); + if (usage) { + const inputTokens = (usage.inputTokens ?? usage.input ?? 0) as number; + const outputTokens = (usage.outputTokens ?? usage.output ?? 0) as number; + const cachedTokens = (usage.cacheRead ?? usage.cachedInputTokens ?? 0) as number; + const costRecord = asRecord(usage.cost); + const costUsd = (costRecord?.total ?? usage.costUsd ?? 0) as number; + + if (inputTokens > 0 || outputTokens > 0) { + entries.push({ + kind: "result", + ts, + text: "Run completed", + inputTokens, + outputTokens, + cachedTokens, + costUsd, + subtype: "end", + isError: false, + errors: [], + }); + } + } + } + } + + if (entries.length === 0) { + entries.push({ kind: "system", ts, text: "✅ Pi agent finished" }); + } + + return entries; } // Turn lifecycle if (type === "turn_start") { - return [{ kind: "system", ts, text: "Turn started" }]; + return []; // Skip noisy lifecycle events } if (type === "turn_end") { @@ -60,16 +128,21 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] { const entries: TranscriptEntry[] = []; if (message) { - const content = message.content as string | Array<{ type: string; text?: string }>; - const text = extractTextContent(content); + const content = message.content as string | Array<{ type: string; text?: string; thinking?: string }>; + const { text, thinking } = extractTextContent(content); + + if (thinking) { + entries.push({ kind: "thinking", ts, text: thinking }); + } if (text) { entries.push({ kind: "assistant", ts, text }); } } - // Process tool results + // Process tool results - match with pending tool calls if (toolResults) { for (const tr of toolResults) { + const toolCallId = asString(tr.toolCallId, `tool-${Date.now()}`); const content = tr.content; const isError = tr.isError === true; @@ -78,23 +151,31 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] { if (typeof content === "string") { contentStr = content; } else if (Array.isArray(content)) { - contentStr = extractTextContent(content as Array<{ type: string; text?: string }>); + const extracted = extractTextContent(content as Array<{ type: string; text?: string }>); + contentStr = extracted.text || JSON.stringify(content); } else { contentStr = JSON.stringify(content); } + // Get tool name from pending calls if available + const pendingCall = pendingToolCalls.get(toolCallId); + const toolName = asString(tr.toolName, pendingCall?.toolName || "tool"); + entries.push({ kind: "tool_result", ts, - toolUseId: asString(tr.toolCallId, "unknown"), - toolName: asString(tr.toolName), + toolUseId: toolCallId, + toolName, content: contentStr, isError, }); + + // Clean up pending call + pendingToolCalls.delete(toolCallId); } } - return entries.length > 0 ? entries : [{ kind: "system", ts, text: "Turn ended" }]; + return entries; } // Message streaming @@ -106,33 +187,81 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] { const assistantEvent = asRecord(parsed.assistantMessageEvent); if (assistantEvent) { const msgType = asString(assistantEvent.type); + + // Handle thinking deltas + if (msgType === "thinking_delta") { + const delta = asString(assistantEvent.delta); + if (delta) { + return [{ kind: "thinking", ts, text: delta, delta: true }]; + } + } + + // Handle text deltas if (msgType === "text_delta") { const delta = asString(assistantEvent.delta); if (delta) { return [{ kind: "assistant", ts, text: delta, delta: true }]; } } + + // Handle thinking end - emit full thinking block + if (msgType === "thinking_end") { + const content = asString(assistantEvent.content); + if (content) { + return [{ kind: "thinking", ts, text: content }]; + } + } + + // Handle text end - emit full text block + if (msgType === "text_end") { + const content = asString(assistantEvent.content); + if (content) { + return [{ kind: "assistant", ts, text: content }]; + } + } } return []; } if (type === "message_end") { + const message = asRecord(parsed.message); + if (message) { + const content = message.content as string | Array<{ type: string; text?: string; thinking?: string }>; + const { text, thinking } = extractTextContent(content); + + const entries: TranscriptEntry[] = []; + + // Emit final thinking block if present + if (thinking) { + entries.push({ kind: "thinking", ts, text: thinking }); + } + + // Emit final text block if present + if (text) { + entries.push({ kind: "assistant", ts, text }); + } + + return entries; + } return []; } // Tool execution if (type === "tool_execution_start") { - const toolName = asString(parsed.toolName); + const toolCallId = asString(parsed.toolCallId, `tool-${Date.now()}`); + const toolName = asString(parsed.toolName, "tool"); const args = parsed.args; - if (toolName) { - return [{ - kind: "tool_call", - ts, - name: toolName, - input: args, - }]; - } - return [{ kind: "system", ts, text: `Tool started` }]; + + // Track this tool call for later matching + pendingToolCalls.set(toolCallId, { toolName, args }); + + return [{ + kind: "tool_call", + ts, + name: toolName, + input: args, + toolUseId: toolCallId, + }]; } if (type === "tool_execution_update") { @@ -140,40 +269,43 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] { } if (type === "tool_execution_end") { - const toolCallId = asString(parsed.toolCallId); - const toolName = asString(parsed.toolName); + const toolCallId = asString(parsed.toolCallId, `tool-${Date.now()}`); + const toolName = asString(parsed.toolName, "tool"); const result = parsed.result; const isError = parsed.isError === true; // Extract text from Pi's content array format - // Can be: {"content": [{"type": "text", "text": "..."}]} or [{"type": "text", "text": "..."}] let contentStr: string; if (typeof result === "string") { contentStr = result; } else if (Array.isArray(result)) { - // Direct array format: result is [{"type": "text", "text": "..."}] - contentStr = extractTextContent(result as Array<{ type: string; text?: string }>); + const extracted = extractTextContent(result as Array<{ type: string; text?: string }>); + contentStr = extracted.text || JSON.stringify(result); } else if (result && typeof result === "object") { const resultObj = result as Record; if (Array.isArray(resultObj.content)) { - // Wrapped format: result is {"content": [{"type": "text", "text": "..."}]} - contentStr = extractTextContent(resultObj.content as Array<{ type: string; text?: string }>); + const extracted = extractTextContent(resultObj.content as Array<{ type: string; text?: string }>); + contentStr = extracted.text || JSON.stringify(result); } else { contentStr = JSON.stringify(result); } } else { - contentStr = JSON.stringify(result); + contentStr = String(result); } + // Clean up pending call + pendingToolCalls.delete(toolCallId); + return [{ kind: "tool_result", ts, - toolUseId: toolCallId || "unknown", + toolUseId: toolCallId, toolName, content: contentStr, isError, }]; } + // Fallback for unknown event types return [{ kind: "stdout", ts, text: line }]; }