Fix Pi adapter execution and improve transcript parsing
- Changed from RPC mode to JSON print mode (--mode json -p) - Added prompt as CLI argument instead of stdin RPC command - Rewrote transcript parser to properly handle Pi's JSONL output - Added toolUseId to tool_call entries for proper matching with tool_result - Filter out RPC protocol messages from transcript - Extract thinking blocks and usage statistics
This commit is contained in:
parent
af1b08fdf4
commit
15f6079c6b
2 changed files with 175 additions and 49 deletions
|
|
@ -326,8 +326,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||
const buildArgs = (sessionFile: string): string[] => {
|
||||
const args: string[] = [];
|
||||
|
||||
// Use RPC mode for proper lifecycle management (waits for agent completion)
|
||||
args.push("--mode", "rpc");
|
||||
// Use JSON mode for structured output with print mode (non-interactive)
|
||||
args.push("--mode", "json");
|
||||
args.push("-p"); // Non-interactive mode: process prompt and exit
|
||||
|
||||
// Use --append-system-prompt to extend Pi's default system prompt
|
||||
args.push("--append-system-prompt", renderedSystemPromptExtension);
|
||||
|
|
@ -343,19 +344,13 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||
args.push("--skill", PI_AGENT_SKILLS_DIR);
|
||||
|
||||
if (extraArgs.length > 0) args.push(...extraArgs);
|
||||
|
||||
// Add the user prompt as the last argument
|
||||
args.push(userPrompt);
|
||||
|
||||
return args;
|
||||
};
|
||||
|
||||
const buildRpcStdin = (): string => {
|
||||
// Send the prompt as an RPC command
|
||||
const promptCommand = {
|
||||
type: "prompt",
|
||||
message: userPrompt,
|
||||
};
|
||||
return JSON.stringify(promptCommand) + "\n";
|
||||
};
|
||||
|
||||
const runAttempt = async (sessionFile: string) => {
|
||||
const args = buildArgs(sessionFile);
|
||||
if (onMeta) {
|
||||
|
|
@ -402,7 +397,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||
graceSec,
|
||||
onSpawn,
|
||||
onLog: bufferedOnLog,
|
||||
stdin: buildRpcStdin(),
|
||||
});
|
||||
|
||||
// Flush any remaining buffer content
|
||||
|
|
|
|||
|
|
@ -17,19 +17,39 @@ function asString(value: unknown, fallback = ""): string {
|
|||
return typeof value === "string" ? value : fallback;
|
||||
}
|
||||
|
||||
function extractTextContent(content: string | Array<{ type: string; text?: string }>): string {
|
||||
if (typeof content === "string") return content;
|
||||
if (!Array.isArray(content)) return "";
|
||||
return content
|
||||
.filter((c) => c.type === "text" && c.text)
|
||||
.map((c) => c.text!)
|
||||
.join("");
|
||||
function extractTextContent(content: string | Array<{ type: string; text?: string; thinking?: string }>): { text: string; thinking: string } {
|
||||
if (typeof content === "string") return { text: content, thinking: "" };
|
||||
if (!Array.isArray(content)) return { text: "", thinking: "" };
|
||||
|
||||
let text = "";
|
||||
let thinking = "";
|
||||
|
||||
for (const c of content) {
|
||||
if (c.type === "text" && c.text) {
|
||||
text += c.text;
|
||||
}
|
||||
if (c.type === "thinking" && c.thinking) {
|
||||
thinking += c.thinking;
|
||||
}
|
||||
}
|
||||
|
||||
return { text, thinking };
|
||||
}
|
||||
|
||||
// Track pending tool calls for proper toolUseId matching
|
||||
let pendingToolCalls = new Map<string, { toolName: string; args: unknown }>();
|
||||
|
||||
export function resetParserState(): void {
|
||||
pendingToolCalls.clear();
|
||||
}
|
||||
|
||||
export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] {
|
||||
const parsed = asRecord(safeJsonParse(line));
|
||||
if (!parsed) {
|
||||
return [{ kind: "stdout", ts, text: line }];
|
||||
// Non-JSON line, treat as raw stdout
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) return [];
|
||||
return [{ kind: "stdout", ts, text: trimmed }];
|
||||
}
|
||||
|
||||
const type = asString(parsed.type);
|
||||
|
|
@ -41,16 +61,64 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] {
|
|||
|
||||
// Agent lifecycle
|
||||
if (type === "agent_start") {
|
||||
return [{ kind: "system", ts, text: "Pi agent started" }];
|
||||
return [{ kind: "system", ts, text: "🚀 Pi agent started" }];
|
||||
}
|
||||
|
||||
if (type === "agent_end") {
|
||||
return [{ kind: "system", ts, text: "Pi agent finished" }];
|
||||
const entries: TranscriptEntry[] = [];
|
||||
|
||||
// Extract final message from messages array if available
|
||||
const messages = parsed.messages as Array<Record<string, unknown>> | undefined;
|
||||
if (messages && messages.length > 0) {
|
||||
const lastMessage = messages[messages.length - 1];
|
||||
if (lastMessage?.role === "assistant") {
|
||||
const content = lastMessage.content as string | Array<{ type: string; text?: string; thinking?: string }>;
|
||||
const { text, thinking } = extractTextContent(content);
|
||||
|
||||
if (thinking) {
|
||||
entries.push({ kind: "thinking", ts, text: thinking });
|
||||
}
|
||||
if (text) {
|
||||
entries.push({ kind: "assistant", ts, text });
|
||||
}
|
||||
|
||||
// Extract usage
|
||||
const usage = asRecord(lastMessage.usage);
|
||||
if (usage) {
|
||||
const inputTokens = (usage.inputTokens ?? usage.input ?? 0) as number;
|
||||
const outputTokens = (usage.outputTokens ?? usage.output ?? 0) as number;
|
||||
const cachedTokens = (usage.cacheRead ?? usage.cachedInputTokens ?? 0) as number;
|
||||
const costRecord = asRecord(usage.cost);
|
||||
const costUsd = (costRecord?.total ?? usage.costUsd ?? 0) as number;
|
||||
|
||||
if (inputTokens > 0 || outputTokens > 0) {
|
||||
entries.push({
|
||||
kind: "result",
|
||||
ts,
|
||||
text: "Run completed",
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
cachedTokens,
|
||||
costUsd,
|
||||
subtype: "end",
|
||||
isError: false,
|
||||
errors: [],
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (entries.length === 0) {
|
||||
entries.push({ kind: "system", ts, text: "✅ Pi agent finished" });
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
// Turn lifecycle
|
||||
if (type === "turn_start") {
|
||||
return [{ kind: "system", ts, text: "Turn started" }];
|
||||
return []; // Skip noisy lifecycle events
|
||||
}
|
||||
|
||||
if (type === "turn_end") {
|
||||
|
|
@ -60,16 +128,21 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] {
|
|||
const entries: TranscriptEntry[] = [];
|
||||
|
||||
if (message) {
|
||||
const content = message.content as string | Array<{ type: string; text?: string }>;
|
||||
const text = extractTextContent(content);
|
||||
const content = message.content as string | Array<{ type: string; text?: string; thinking?: string }>;
|
||||
const { text, thinking } = extractTextContent(content);
|
||||
|
||||
if (thinking) {
|
||||
entries.push({ kind: "thinking", ts, text: thinking });
|
||||
}
|
||||
if (text) {
|
||||
entries.push({ kind: "assistant", ts, text });
|
||||
}
|
||||
}
|
||||
|
||||
// Process tool results
|
||||
// Process tool results - match with pending tool calls
|
||||
if (toolResults) {
|
||||
for (const tr of toolResults) {
|
||||
const toolCallId = asString(tr.toolCallId, `tool-${Date.now()}`);
|
||||
const content = tr.content;
|
||||
const isError = tr.isError === true;
|
||||
|
||||
|
|
@ -78,23 +151,31 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] {
|
|||
if (typeof content === "string") {
|
||||
contentStr = content;
|
||||
} else if (Array.isArray(content)) {
|
||||
contentStr = extractTextContent(content as Array<{ type: string; text?: string }>);
|
||||
const extracted = extractTextContent(content as Array<{ type: string; text?: string }>);
|
||||
contentStr = extracted.text || JSON.stringify(content);
|
||||
} else {
|
||||
contentStr = JSON.stringify(content);
|
||||
}
|
||||
|
||||
// Get tool name from pending calls if available
|
||||
const pendingCall = pendingToolCalls.get(toolCallId);
|
||||
const toolName = asString(tr.toolName, pendingCall?.toolName || "tool");
|
||||
|
||||
entries.push({
|
||||
kind: "tool_result",
|
||||
ts,
|
||||
toolUseId: asString(tr.toolCallId, "unknown"),
|
||||
toolName: asString(tr.toolName),
|
||||
toolUseId: toolCallId,
|
||||
toolName,
|
||||
content: contentStr,
|
||||
isError,
|
||||
});
|
||||
|
||||
// Clean up pending call
|
||||
pendingToolCalls.delete(toolCallId);
|
||||
}
|
||||
}
|
||||
|
||||
return entries.length > 0 ? entries : [{ kind: "system", ts, text: "Turn ended" }];
|
||||
return entries;
|
||||
}
|
||||
|
||||
// Message streaming
|
||||
|
|
@ -106,33 +187,81 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] {
|
|||
const assistantEvent = asRecord(parsed.assistantMessageEvent);
|
||||
if (assistantEvent) {
|
||||
const msgType = asString(assistantEvent.type);
|
||||
|
||||
// Handle thinking deltas
|
||||
if (msgType === "thinking_delta") {
|
||||
const delta = asString(assistantEvent.delta);
|
||||
if (delta) {
|
||||
return [{ kind: "thinking", ts, text: delta, delta: true }];
|
||||
}
|
||||
}
|
||||
|
||||
// Handle text deltas
|
||||
if (msgType === "text_delta") {
|
||||
const delta = asString(assistantEvent.delta);
|
||||
if (delta) {
|
||||
return [{ kind: "assistant", ts, text: delta, delta: true }];
|
||||
}
|
||||
}
|
||||
|
||||
// Handle thinking end - emit full thinking block
|
||||
if (msgType === "thinking_end") {
|
||||
const content = asString(assistantEvent.content);
|
||||
if (content) {
|
||||
return [{ kind: "thinking", ts, text: content }];
|
||||
}
|
||||
}
|
||||
|
||||
// Handle text end - emit full text block
|
||||
if (msgType === "text_end") {
|
||||
const content = asString(assistantEvent.content);
|
||||
if (content) {
|
||||
return [{ kind: "assistant", ts, text: content }];
|
||||
}
|
||||
}
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
if (type === "message_end") {
|
||||
const message = asRecord(parsed.message);
|
||||
if (message) {
|
||||
const content = message.content as string | Array<{ type: string; text?: string; thinking?: string }>;
|
||||
const { text, thinking } = extractTextContent(content);
|
||||
|
||||
const entries: TranscriptEntry[] = [];
|
||||
|
||||
// Emit final thinking block if present
|
||||
if (thinking) {
|
||||
entries.push({ kind: "thinking", ts, text: thinking });
|
||||
}
|
||||
|
||||
// Emit final text block if present
|
||||
if (text) {
|
||||
entries.push({ kind: "assistant", ts, text });
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
// Tool execution
|
||||
if (type === "tool_execution_start") {
|
||||
const toolName = asString(parsed.toolName);
|
||||
const toolCallId = asString(parsed.toolCallId, `tool-${Date.now()}`);
|
||||
const toolName = asString(parsed.toolName, "tool");
|
||||
const args = parsed.args;
|
||||
if (toolName) {
|
||||
return [{
|
||||
kind: "tool_call",
|
||||
ts,
|
||||
name: toolName,
|
||||
input: args,
|
||||
}];
|
||||
}
|
||||
return [{ kind: "system", ts, text: `Tool started` }];
|
||||
|
||||
// Track this tool call for later matching
|
||||
pendingToolCalls.set(toolCallId, { toolName, args });
|
||||
|
||||
return [{
|
||||
kind: "tool_call",
|
||||
ts,
|
||||
name: toolName,
|
||||
input: args,
|
||||
toolUseId: toolCallId,
|
||||
}];
|
||||
}
|
||||
|
||||
if (type === "tool_execution_update") {
|
||||
|
|
@ -140,40 +269,43 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] {
|
|||
}
|
||||
|
||||
if (type === "tool_execution_end") {
|
||||
const toolCallId = asString(parsed.toolCallId);
|
||||
const toolName = asString(parsed.toolName);
|
||||
const toolCallId = asString(parsed.toolCallId, `tool-${Date.now()}`);
|
||||
const toolName = asString(parsed.toolName, "tool");
|
||||
const result = parsed.result;
|
||||
const isError = parsed.isError === true;
|
||||
|
||||
// Extract text from Pi's content array format
|
||||
// Can be: {"content": [{"type": "text", "text": "..."}]} or [{"type": "text", "text": "..."}]
|
||||
let contentStr: string;
|
||||
if (typeof result === "string") {
|
||||
contentStr = result;
|
||||
} else if (Array.isArray(result)) {
|
||||
// Direct array format: result is [{"type": "text", "text": "..."}]
|
||||
contentStr = extractTextContent(result as Array<{ type: string; text?: string }>);
|
||||
const extracted = extractTextContent(result as Array<{ type: string; text?: string }>);
|
||||
contentStr = extracted.text || JSON.stringify(result);
|
||||
} else if (result && typeof result === "object") {
|
||||
const resultObj = result as Record<string, unknown>;
|
||||
if (Array.isArray(resultObj.content)) {
|
||||
// Wrapped format: result is {"content": [{"type": "text", "text": "..."}]}
|
||||
contentStr = extractTextContent(resultObj.content as Array<{ type: string; text?: string }>);
|
||||
const extracted = extractTextContent(resultObj.content as Array<{ type: string; text?: string }>);
|
||||
contentStr = extracted.text || JSON.stringify(result);
|
||||
} else {
|
||||
contentStr = JSON.stringify(result);
|
||||
}
|
||||
} else {
|
||||
contentStr = JSON.stringify(result);
|
||||
contentStr = String(result);
|
||||
}
|
||||
|
||||
// Clean up pending call
|
||||
pendingToolCalls.delete(toolCallId);
|
||||
|
||||
return [{
|
||||
kind: "tool_result",
|
||||
ts,
|
||||
toolUseId: toolCallId || "unknown",
|
||||
toolUseId: toolCallId,
|
||||
toolName,
|
||||
content: contentStr,
|
||||
isError,
|
||||
}];
|
||||
}
|
||||
|
||||
// Fallback for unknown event types
|
||||
return [{ kind: "stdout", ts, text: line }];
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue