import { useEffect, useMemo, useRef, useState, type MutableRefObject } from "react"; import { Link } from "react-router-dom"; import { useQuery, useQueryClient } from "@tanstack/react-query"; import type { LiveEvent } from "@paperclip/shared"; import { heartbeatsApi, type LiveRunForIssue } from "../api/heartbeats"; import { getUIAdapter } from "../adapters"; import type { TranscriptEntry } from "../adapters"; import { queryKeys } from "../lib/queryKeys"; import { cn, relativeTime } from "../lib/utils"; import { ExternalLink, Square } from "lucide-react"; import { Identity } from "./Identity"; interface LiveRunWidgetProps { issueId: string; companyId?: string | null; } type FeedTone = "info" | "warn" | "error" | "assistant" | "tool"; interface FeedItem { id: string; ts: string; runId: string; agentId: string; agentName: string; text: string; tone: FeedTone; } const MAX_FEED_ITEMS = 80; function readString(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value : null; } function summarizeEntry(entry: TranscriptEntry): { text: string; tone: FeedTone } | null { if (entry.kind === "assistant") { const text = entry.text.trim(); return text ? { text, tone: "assistant" } : null; } if (entry.kind === "thinking") { const text = entry.text.trim(); return text ? { text: `[thinking] ${text}`, tone: "info" } : null; } if (entry.kind === "tool_call") { return { text: `tool ${entry.name}`, tone: "tool" }; } if (entry.kind === "tool_result") { const base = entry.content.trim(); return { text: entry.isError ? `tool error: ${base}` : `tool result: ${base}`, tone: entry.isError ? "error" : "tool", }; } if (entry.kind === "stderr") { const text = entry.text.trim(); return text ? { text, tone: "error" } : null; } if (entry.kind === "system") { const text = entry.text.trim(); return text ? { text, tone: "warn" } : null; } if (entry.kind === "stdout") { const text = entry.text.trim(); return text ? { text, tone: "info" } : null; } return null; } function createFeedItem( run: LiveRunForIssue, ts: string, text: string, tone: FeedTone, nextId: number, ): FeedItem | null { const trimmed = text.trim(); if (!trimmed) return null; return { id: `${run.id}:${nextId}`, ts, runId: run.id, agentId: run.agentId, agentName: run.agentName, text: trimmed.slice(0, 220), tone, }; } function parseStdoutChunk( run: LiveRunForIssue, chunk: string, ts: string, pendingByRun: Map, nextIdRef: MutableRefObject, ): FeedItem[] { const pendingKey = `${run.id}:stdout`; const combined = `${pendingByRun.get(pendingKey) ?? ""}${chunk}`; const split = combined.split(/\r?\n/); pendingByRun.set(pendingKey, split.pop() ?? ""); const adapter = getUIAdapter(run.adapterType); const items: FeedItem[] = []; for (const line of split.slice(-8)) { const trimmed = line.trim(); if (!trimmed) continue; const parsed = adapter.parseStdoutLine(trimmed, ts); if (parsed.length === 0) { const fallback = createFeedItem(run, ts, trimmed, "info", nextIdRef.current++); if (fallback) items.push(fallback); continue; } for (const entry of parsed) { const summary = summarizeEntry(entry); if (!summary) continue; const item = createFeedItem(run, ts, summary.text, summary.tone, nextIdRef.current++); if (item) items.push(item); } } return items; } function parseStderrChunk( run: LiveRunForIssue, chunk: string, ts: string, pendingByRun: Map, nextIdRef: MutableRefObject, ): FeedItem[] { const pendingKey = `${run.id}:stderr`; const combined = `${pendingByRun.get(pendingKey) ?? ""}${chunk}`; const split = combined.split(/\r?\n/); pendingByRun.set(pendingKey, split.pop() ?? ""); const items: FeedItem[] = []; for (const line of split.slice(-8)) { const item = createFeedItem(run, ts, line, "error", nextIdRef.current++); if (item) items.push(item); } return items; } export function LiveRunWidget({ issueId, companyId }: LiveRunWidgetProps) { const queryClient = useQueryClient(); const [feed, setFeed] = useState([]); const [cancellingRunIds, setCancellingRunIds] = useState(new Set()); const seenKeysRef = useRef(new Set()); const pendingByRunRef = useRef(new Map()); const runMetaByIdRef = useRef(new Map()); const nextIdRef = useRef(1); const bodyRef = useRef(null); const handleCancelRun = async (runId: string) => { setCancellingRunIds((prev) => new Set(prev).add(runId)); try { await heartbeatsApi.cancel(runId); queryClient.invalidateQueries({ queryKey: queryKeys.issues.liveRuns(issueId) }); } finally { setCancellingRunIds((prev) => { const next = new Set(prev); next.delete(runId); return next; }); } }; const { data: liveRuns } = useQuery({ queryKey: queryKeys.issues.liveRuns(issueId), queryFn: () => heartbeatsApi.liveRunsForIssue(issueId), enabled: !!companyId, refetchInterval: 3000, }); const runs = liveRuns ?? []; const runById = useMemo(() => new Map(runs.map((run) => [run.id, run])), [runs]); const activeRunIds = useMemo(() => new Set(runs.map((run) => run.id)), [runs]); useEffect(() => { const body = bodyRef.current; if (!body) return; body.scrollTo({ top: body.scrollHeight, behavior: "smooth" }); }, [feed.length]); useEffect(() => { for (const run of runs) { runMetaByIdRef.current.set(run.id, { agentId: run.agentId, agentName: run.agentName }); } }, [runs]); useEffect(() => { const stillActive = new Set(); for (const runId of activeRunIds) { stillActive.add(`${runId}:stdout`); stillActive.add(`${runId}:stderr`); } for (const key of pendingByRunRef.current.keys()) { if (!stillActive.has(key)) { pendingByRunRef.current.delete(key); } } }, [activeRunIds]); useEffect(() => { if (!companyId || activeRunIds.size === 0) return; let closed = false; let reconnectTimer: number | null = null; let socket: WebSocket | null = null; const appendItems = (items: FeedItem[]) => { if (items.length === 0) return; setFeed((prev) => [...prev, ...items].slice(-MAX_FEED_ITEMS)); }; const scheduleReconnect = () => { if (closed) return; reconnectTimer = window.setTimeout(connect, 1500); }; const connect = () => { if (closed) return; const protocol = window.location.protocol === "https:" ? "wss" : "ws"; const url = `${protocol}://${window.location.host}/api/companies/${encodeURIComponent(companyId)}/events/ws`; socket = new WebSocket(url); socket.onmessage = (message) => { const raw = typeof message.data === "string" ? message.data : ""; if (!raw) return; let event: LiveEvent; try { event = JSON.parse(raw) as LiveEvent; } catch { return; } if (event.companyId !== companyId) return; const payload = event.payload ?? {}; const runId = readString(payload["runId"]); if (!runId || !activeRunIds.has(runId)) return; const run = runById.get(runId); if (!run) return; if (event.type === "heartbeat.run.event") { const seq = typeof payload["seq"] === "number" ? payload["seq"] : null; const eventType = readString(payload["eventType"]) ?? "event"; const messageText = readString(payload["message"]) ?? eventType; const dedupeKey = `${runId}:event:${seq ?? `${eventType}:${messageText}:${event.createdAt}`}`; if (seenKeysRef.current.has(dedupeKey)) return; seenKeysRef.current.add(dedupeKey); if (seenKeysRef.current.size > 2000) { seenKeysRef.current.clear(); } const tone = eventType === "error" ? "error" : eventType === "lifecycle" ? "warn" : "info"; const item = createFeedItem(run, event.createdAt, messageText, tone, nextIdRef.current++); if (item) appendItems([item]); return; } if (event.type === "heartbeat.run.status") { const status = readString(payload["status"]) ?? "updated"; const dedupeKey = `${runId}:status:${status}:${readString(payload["finishedAt"]) ?? ""}`; if (seenKeysRef.current.has(dedupeKey)) return; seenKeysRef.current.add(dedupeKey); if (seenKeysRef.current.size > 2000) { seenKeysRef.current.clear(); } const tone = status === "failed" || status === "timed_out" ? "error" : "warn"; const item = createFeedItem(run, event.createdAt, `run ${status}`, tone, nextIdRef.current++); if (item) appendItems([item]); return; } if (event.type === "heartbeat.run.log") { const chunk = readString(payload["chunk"]); if (!chunk) return; const stream = readString(payload["stream"]) === "stderr" ? "stderr" : "stdout"; if (stream === "stderr") { appendItems(parseStderrChunk(run, chunk, event.createdAt, pendingByRunRef.current, nextIdRef)); return; } appendItems(parseStdoutChunk(run, chunk, event.createdAt, pendingByRunRef.current, nextIdRef)); } }; socket.onerror = () => { socket?.close(); }; socket.onclose = () => { scheduleReconnect(); }; }; connect(); return () => { closed = true; if (reconnectTimer !== null) window.clearTimeout(reconnectTimer); if (socket) { socket.onmessage = null; socket.onerror = null; socket.onclose = null; socket.close(1000, "issue_live_widget_unmount"); } }; }, [activeRunIds, companyId, runById]); if (runs.length === 0 && feed.length === 0) return null; const recent = feed.slice(-25); const headerRun = runs[0] ?? (() => { const last = recent[recent.length - 1]; if (!last) return null; const meta = runMetaByIdRef.current.get(last.runId); if (!meta) return null; return { id: last.runId, agentId: meta.agentId, }; })(); return (
{runs.length > 0 && ( )} {runs.length > 0 ? `Live issue runs (${runs.length})` : "Recent run updates"}
{headerRun && (
{runs.length > 0 && ( )} Open run
)}
{recent.length === 0 && (
Waiting for run output...
)} {recent.map((item, index) => (
{relativeTime(item.ts)}
[{item.runId.slice(0, 8)}] {item.text}
))}
{runs.length > 1 && (
{runs.map((run) => (
{run.id.slice(0, 8)}
))}
)}
); }