fix: add periodic flush and graceful shutdown for server-side telemetry
The TelemetryClient only flushed at 50 events, so the server silently lost all queued telemetry on restart. Add startPeriodicFlush/stop methods to TelemetryClient, wire up 60s periodic flush in server initTelemetry, and flush on SIGTERM/SIGINT before exit. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
parent
34044cdfce
commit
f16de6026d
4 changed files with 104 additions and 10 deletions
|
|
@ -18,6 +18,7 @@ export class TelemetryClient {
|
||||||
private readonly version: string;
|
private readonly version: string;
|
||||||
private readonly sessionId: string;
|
private readonly sessionId: string;
|
||||||
private state: TelemetryState | null = null;
|
private state: TelemetryState | null = null;
|
||||||
|
private flushInterval: ReturnType<typeof setInterval> | null = null;
|
||||||
|
|
||||||
constructor(config: TelemetryConfig, stateFactory: () => TelemetryState, version: string) {
|
constructor(config: TelemetryConfig, stateFactory: () => TelemetryState, version: string) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
|
@ -68,6 +69,24 @@ export class TelemetryClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
startPeriodicFlush(intervalMs: number = 60_000): void {
|
||||||
|
if (this.flushInterval) return;
|
||||||
|
this.flushInterval = setInterval(() => {
|
||||||
|
void this.flush();
|
||||||
|
}, intervalMs);
|
||||||
|
// Allow the process to exit even if the interval is still active
|
||||||
|
if (typeof this.flushInterval === "object" && "unref" in this.flushInterval) {
|
||||||
|
this.flushInterval.unref();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stop(): void {
|
||||||
|
if (this.flushInterval) {
|
||||||
|
clearInterval(this.flushInterval);
|
||||||
|
this.flushInterval = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
hashPrivateRef(value: string): string {
|
hashPrivateRef(value: string): string {
|
||||||
const state = this.getState();
|
const state = this.getState();
|
||||||
return createHash("sha256")
|
return createHash("sha256")
|
||||||
|
|
|
||||||
66
server/src/__tests__/telemetry-client-flush.test.ts
Normal file
66
server/src/__tests__/telemetry-client-flush.test.ts
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
|
||||||
|
import { TelemetryClient } from "../../../packages/shared/src/telemetry/client.js";
|
||||||
|
import type { TelemetryConfig, TelemetryState } from "../../../packages/shared/src/telemetry/types.js";
|
||||||
|
|
||||||
|
function makeClient(config?: Partial<TelemetryConfig>) {
|
||||||
|
const merged: TelemetryConfig = { enabled: true, endpoint: "http://localhost:9999/ingest", ...config };
|
||||||
|
const state: TelemetryState = {
|
||||||
|
installId: "test-install",
|
||||||
|
salt: "test-salt",
|
||||||
|
createdAt: "2026-01-01T00:00:00Z",
|
||||||
|
firstSeenVersion: "0.0.0",
|
||||||
|
};
|
||||||
|
return new TelemetryClient(merged, () => state, "0.0.0-test");
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("TelemetryClient periodic flush", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ ok: true }));
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
vi.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("flushes queued events on interval", async () => {
|
||||||
|
const client = makeClient();
|
||||||
|
client.startPeriodicFlush(1000);
|
||||||
|
|
||||||
|
client.track("install.started");
|
||||||
|
expect(fetch).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(1000);
|
||||||
|
expect(fetch).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Second tick with no new events — no additional call
|
||||||
|
await vi.advanceTimersByTimeAsync(1000);
|
||||||
|
expect(fetch).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// New event gets flushed on next tick
|
||||||
|
client.track("install.started");
|
||||||
|
await vi.advanceTimersByTimeAsync(1000);
|
||||||
|
expect(fetch).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
|
client.stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stop() prevents further flushes", async () => {
|
||||||
|
const client = makeClient();
|
||||||
|
client.startPeriodicFlush(1000);
|
||||||
|
|
||||||
|
client.track("install.started");
|
||||||
|
client.stop();
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(2000);
|
||||||
|
expect(fetch).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("startPeriodicFlush is idempotent", () => {
|
||||||
|
const client = makeClient();
|
||||||
|
client.startPeriodicFlush(1000);
|
||||||
|
client.startPeriodicFlush(1000); // should not throw or double-fire
|
||||||
|
client.stop();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -39,7 +39,7 @@ import { createStorageServiceFromConfig } from "./storage/index.js";
|
||||||
import { printStartupBanner } from "./startup-banner.js";
|
import { printStartupBanner } from "./startup-banner.js";
|
||||||
import { getBoardClaimWarningUrl, initializeBoardClaimChallenge } from "./board-claim.js";
|
import { getBoardClaimWarningUrl, initializeBoardClaimChallenge } from "./board-claim.js";
|
||||||
import { maybePersistWorktreeRuntimePorts } from "./worktree-config.js";
|
import { maybePersistWorktreeRuntimePorts } from "./worktree-config.js";
|
||||||
import { initTelemetry } from "./telemetry.js";
|
import { initTelemetry, getTelemetryClient } from "./telemetry.js";
|
||||||
|
|
||||||
type BetterAuthSessionUser = {
|
type BetterAuthSessionUser = {
|
||||||
id: string;
|
id: string;
|
||||||
|
|
@ -728,18 +728,26 @@ export async function startServer(): Promise<StartedServer> {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
if (embeddedPostgres && embeddedPostgresStartedByThisProcess) {
|
{
|
||||||
const shutdown = async (signal: "SIGINT" | "SIGTERM") => {
|
const shutdown = async (signal: "SIGINT" | "SIGTERM") => {
|
||||||
logger.info({ signal }, "Stopping embedded PostgreSQL");
|
const telemetryClient = getTelemetryClient();
|
||||||
try {
|
if (telemetryClient) {
|
||||||
await embeddedPostgres?.stop();
|
telemetryClient.stop();
|
||||||
} catch (err) {
|
await telemetryClient.flush();
|
||||||
logger.error({ err }, "Failed to stop embedded PostgreSQL cleanly");
|
|
||||||
} finally {
|
|
||||||
process.exit(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (embeddedPostgres && embeddedPostgresStartedByThisProcess) {
|
||||||
|
logger.info({ signal }, "Stopping embedded PostgreSQL");
|
||||||
|
try {
|
||||||
|
await embeddedPostgres?.stop();
|
||||||
|
} catch (err) {
|
||||||
|
logger.error({ err }, "Failed to stop embedded PostgreSQL cleanly");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
process.exit(0);
|
||||||
};
|
};
|
||||||
|
|
||||||
process.once("SIGINT", () => {
|
process.once("SIGINT", () => {
|
||||||
void shutdown("SIGINT");
|
void shutdown("SIGINT");
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ export function initTelemetry(fileConfig?: { enabled?: boolean }): TelemetryClie
|
||||||
() => loadOrCreateState(stateDir, serverVersion),
|
() => loadOrCreateState(stateDir, serverVersion),
|
||||||
serverVersion,
|
serverVersion,
|
||||||
);
|
);
|
||||||
|
client.startPeriodicFlush(60_000);
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue