Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 145 additions & 6 deletions packages/opencode/src/altimate/observability/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ export interface TraceFile {
export interface TraceExporter {
readonly name: string
export(trace: TraceFile): Promise<string | undefined>
// altimate_change start — M3 fix: optional crash-guard hook.
// If implemented, Trace.flushSync() calls this with the crashing session's
// id BEFORE writing the canonical crashed file. Exporters that write to a
// shared canonical location (e.g. FileExporter on the local filesystem)
// should use it to suppress in-flight or subsequent exports that would
// race the synchronous crash write. Exporters that POST to remote
// endpoints (e.g. HttpExporter) don't need to implement it.
markCrashed?(sessionId: string): void
// altimate_change end
}

// ---------------------------------------------------------------------------
Expand All @@ -165,13 +174,41 @@ export class FileExporter implements TraceExporter {
readonly name = "file"
private dir: string
private maxFiles: number
// altimate_change start — M3 fix: per-session crash-guard.
// We track which sessions have been crashed-out via flushSync rather than
// a single boolean, because the same FileExporter instance is shared
// across many Trace instances in the TUI worker (worker.ts caches a
// single exporter array at module init, then every session does
// `Trace.withExporters([...tracingExporters], ...)` which spreads the
// array but shares the inner objects). A single boolean would cause one
// crashed session to permanently suppress export() for all subsequent
// sessions in the same worker. Keying by sessionId scopes the guard to
// the actual crashing trace.
private crashedSessions = new Set<string>()
/** Mark a session's exports as superseded by a synchronous crash write.
* Subsequent or in-flight export() calls for that session bail at the
* next checkpoint (entry, pre-writeFile, or pre-rename). Idempotent.
* The sessionId is normalized through the same sanitization that
* buildTraceFile / export use for the on-disk file name, so callers
* can pass the raw `this.sessionId` without worrying about whether
* it contains path-unsafe characters. */
markCrashed(sessionId: string) {
if (!sessionId) return
const safeId = sessionId.replace(/[/\\.:]/g, "_") || "unknown"
this.crashedSessions.add(safeId)
}
// altimate_change end

constructor(dir?: string, maxFiles?: number) {
this.dir = dir ?? DEFAULT_TRACES_DIR
this.maxFiles = maxFiles ?? DEFAULT_MAX_FILES
}

async export(trace: TraceFile): Promise<string | undefined> {
// altimate_change start — M3 fix: bail at entry if this session has been
// marked crashed by a concurrent flushSync.
if (this.crashedSessions.has(trace.sessionId)) return undefined
// altimate_change end
let tmpPath: string | undefined
try {
await fs.mkdir(this.dir, { recursive: true })
Expand All @@ -181,7 +218,20 @@ export class FileExporter implements TraceExporter {
// Atomic write: write to temp file, then rename — prevents partial reads
// when concurrent snapshots or exports target the same file
tmpPath = filePath + `.tmp.${Date.now()}.${Math.random().toString(36).slice(2, 8)}`
// altimate_change start — M3 fix: re-check before the long writeFile.
// For large traces the writeFile takes 100+ms — a wide window for
// flushSync to interleave.
if (this.crashedSessions.has(trace.sessionId)) return undefined
// altimate_change end
await fs.writeFile(tmpPath, JSON.stringify(trace, null, 2))
// altimate_change start — M3 fix: re-check before the rename. If
// flushSync ran during the writeFile, drop the tmp instead of
// overwriting flushSync's crashed canonical file.
if (this.crashedSessions.has(trace.sessionId)) {
await fs.unlink(tmpPath).catch(() => {})
return undefined
}
// altimate_change end
await fs.rename(tmpPath, filePath)

if (this.maxFiles > 0) {
Expand Down Expand Up @@ -337,6 +387,17 @@ export class Trace {
private metadata: TraceFile["metadata"] = {}
private snapshotDir: string | undefined
private snapshotPending = false
// altimate_change start — M2 fix: if a snapshot was requested while another
// was in flight (and thus dropped by the snapshotPending debounce), we
// schedule exactly one follow-up snapshot when the in-flight one finishes.
// Without this, the on-disk file lags memory until a fresh event arrives
// — and if the process exits in that gap, the burst's tail is lost.
private snapshotRequestedDuringPending = false
// Once endTrace begins its canonical write, no more snapshots may run —
// they would race endTrace's mutation of the root span (endTime, status)
// and could clobber endTrace's content with stale pre-end state.
private endTraceStarted = false
// altimate_change end
private snapshotPromise: Promise<void> | undefined
// Set true when flushSync runs. Prevents in-flight async snapshot()
// calls from racing with the synchronous crash write — without this,
Expand Down Expand Up @@ -750,9 +811,25 @@ export class Trace {
*/
private snapshot() {
if (!this.snapshotDir || !this.sessionId) return
if (this.snapshotPending) return // Debounce — only one in flight at a time
// altimate_change start — M2 fix: when debounce drops this call, record
// that another snapshot is needed once the in-flight one settles.
// Without this, the disk file lags memory whenever events arrive faster
// than snapshots can complete (typical for bursty LLM turns).
if (this.snapshotPending) {
this.snapshotRequestedDuringPending = true
return
}
// No new snapshots once endTrace has begun: the canonical write is now
// endTrace's job, and a concurrent snapshot would capture state from
// BEFORE endTrace's root-span mutation and could lose to-end-only fields.
if (this.endTraceStarted) return
// altimate_change end
if (this.crashed) return // flushSync wrote the canonical crashed file; do NOT race it
this.snapshotPending = true
// altimate_change start — M2 fix: reset before kicking off, so any
// snapshot() calls during this write set the dirty flag for our .finally.
this.snapshotRequestedDuringPending = false
// altimate_change end

const trace = this.buildTraceFile()
const safeId = (this.sessionId || "unknown").replace(/[/\\.:]/g, "_") || "unknown"
Expand All @@ -779,6 +856,17 @@ export class Trace {
.finally(() => {
this.snapshotPending = false
this.snapshotPromise = undefined
// altimate_change start — M2 fix: if any span was added (or any
// snapshot was requested) while we were writing, schedule exactly
// one follow-up to flush those changes. Bounded: at most one extra
// write per "burst → quiet" cycle, regardless of burst size.
// Skip the follow-up if endTrace has taken over the canonical write,
// or if flushSync has marked the trace crashed.
if (this.snapshotRequestedDuringPending && !this.crashed && !this.endTraceStarted) {
this.snapshotRequestedDuringPending = false
queueMicrotask(() => this.snapshot())
}
// altimate_change end
})
}

Expand All @@ -793,13 +881,26 @@ export class Trace {
}

/**
* Wait for any pending async snapshot to complete.
* Use from tests that need to read the trace file deterministically after
* a span completion (instead of `await sleep(50)` which races on slow CI runners).
* No-op if no snapshot is in flight.
* Wait for the on-disk snapshot to catch up with in-memory state.
*
* Drains both the current in-flight snapshot and any M2 follow-up snapshot
* scheduled by its `.finally`. Bounded by `MAX_DRAIN_ITER` to guard against
* pathological cases where events arrive faster than they can be drained.
*/
async flush(): Promise<void> {
if (this.snapshotPromise) await this.snapshotPromise.catch(() => {})
// altimate_change start — M2 fix companion: drain follow-up snapshots so
// callers (and tests) see disk == memory after flush() returns. Without
// this drain loop, the M2 fix's queueMicrotask follow-up runs AFTER
// flush() resolves and the disk still trails by one snapshot.
const MAX_DRAIN_ITER = 16
for (let i = 0; i < MAX_DRAIN_ITER; i++) {
if (this.snapshotPromise) await this.snapshotPromise.catch(() => {})
if (!this.snapshotRequestedDuringPending && !this.snapshotPromise) return
// Yield so any queued microtask (the M2 follow-up) gets to run and
// populate snapshotPromise for the next iteration to await.
await new Promise<void>((r) => queueMicrotask(r))
}
// altimate_change end
}

/**
Expand Down Expand Up @@ -871,6 +972,29 @@ export class Trace {
* Returns the result from the first exporter that succeeds (typically the file path).
*/
async endTrace(error?: string): Promise<string | undefined> {
// altimate_change start — M2 fix companion: claim the canonical write
// BEFORE awaiting the in-flight snapshot so any follow-up snapshot the
// in-flight one might schedule in its `.finally` is suppressed.
//
// Microtask-ordering invariant we rely on below: when an in-flight
// snapshot resolves, its `.finally` runs synchronously inside the
// promise chain — and only THEN does the `await this.snapshotPromise`
// (line below) hand control back to this function. By that point, if
// the .finally tried to schedule a follow-up snapshot via
// `queueMicrotask`, the follow-up is queued but hasn't run yet. We set
// `endTraceStarted=true` BEFORE the await, so whether the .finally
// already checked the flag (during the await) or the queued microtask
// runs after this point (calling `snapshot()` which re-checks at line
// ~803), both paths see the flag and bail.
this.endTraceStarted = true
// M3 fix: if flushSync already wrote the canonical crashed file, return
// the path to that file directly. Calling exporters would either bail
// (FileExporter, via the per-session crash guard) or POST stale
// "completed" data (HttpExporter). The crash signal is authoritative
// across exporters once flushSync has fired.
if (this.crashed) return this.getTracePath()
// altimate_change end

// Wait for any in-flight snapshot to complete before final write
if (this.snapshotPromise) await this.snapshotPromise.catch(() => {})

Expand Down Expand Up @@ -980,6 +1104,21 @@ export class Trace {
// Set crashed BEFORE writing so any in-flight async snapshot() will
// bail out at its rename step instead of clobbering our crashed file.
this.crashed = true
// altimate_change start — M3 fix: notify every exporter that the current
// session is crashed BEFORE writing the canonical file synchronously
// below. Exporters that share a canonical write target (e.g. FileExporter)
// can suppress in-flight or future writes for this session. Polymorphic
// dispatch via the optional `markCrashed?` method — no instanceof check,
// so custom exporters with similar semantics participate automatically.
// Without this, an in-flight `await endTrace()` whose export() is
// mid-writeFile (a 100+ms window on multi-MB traces) lands its rename
// after this synchronous write and clobbers the crashed content.
if (this.sessionId) {
for (const exp of this.exporters) {
exp.markCrashed?.(this.sessionId)
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
}
}
// altimate_change end
this.currentGenerationSpanId = undefined
const rootSpan = this.spans.find((s) => s.spanId === this.rootSpanId)
if (rootSpan) {
Expand Down
17 changes: 11 additions & 6 deletions packages/opencode/test/altimate/tracing-display-crash.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,18 +497,23 @@ describe("flushSync — multiple calls", () => {
expect(traceFile.summary.status).toBe("crashed")
})

test("flushSync then endTrace — endTrace overwrites crashed status", async () => {
test("flushSync then endTrace — flushSync's crashed status is preserved (M3 fix)", async () => {
const tracer = Recap.withExporters([new FileExporter(tmpDir)])
tracer.startTrace("s-flush-then-end", { prompt: "test" })
await new Promise((r) => setTimeout(r, 50))

tracer.flushSync("early crash")
const flushedPath = tracer.getTracePath()!

// But actually the process survived — endTrace completes normally
const filePath = await tracer.endTrace()
const traceFile: TraceFile = JSON.parse(await fs.readFile(filePath!, "utf-8"))
// endTrace should overwrite with "completed"
expect(traceFile.summary.status).toBe("completed")
// After flushSync, endTrace short-circuits and returns the canonical
// crashed file path. The crashed write is authoritative — endTrace
// does not call any exporter once Trace.crashed is set.
const endPath = await tracer.endTrace()
expect(endPath).toBe(flushedPath)

const traceFile: TraceFile = JSON.parse(await fs.readFile(flushedPath, "utf-8"))
expect(traceFile.summary.status).toBe("crashed")
expect(traceFile.summary.error).toContain("early crash")
})
})

Expand Down
Loading
Loading