Skip to content
281 changes: 243 additions & 38 deletions packages/opencode/src/cli/cmd/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import { Filesystem } from "../../util/filesystem"
import { createOpencodeClient, type Message, type OpencodeClient, type ToolPart } from "@opencode-ai/sdk/v2"
import { Server } from "../../server/server"
import { Provider } from "../../provider/provider"
// altimate_change start — verifier-gated router (run cheap, verify, escalate)
import { Router } from "../../router/router"
import { Verifier } from "../../router/verifier"
import { Verdict } from "../../router/verdict"
import { Policy } from "../../router/policy"
import { EquivalenceVerifier } from "../../router/equivalence-verifier"
import { ReferenceResolver } from "../../router/reference"
import * as Dispatcher from "../../altimate/native/dispatcher"
// altimate_change end
import { Agent } from "../../agent/agent"
import { PermissionNext } from "../../permission/next"
import { Tool } from "../../tool/tool"
Expand Down Expand Up @@ -816,46 +825,53 @@ You are speaking to a non-technical business executive. Follow these rules stric
process.exit(1)
})

if (args.command) {
await sdk.session.command({
sessionID,
agent,
model: args.model,
command: args.command,
arguments: message,
variant: args.variant,
})
} else {
const model = args.model ? Provider.parseModel(args.model) : undefined
await sdk.session.prompt({
sessionID,
agent,
model,
variant: args.variant,
parts: [...files, { type: "text", text: message }],
...(audienceSystem ? { system: audienceSystem } : {}),
})
}
// altimate_change start — per-run finally cleanup. The verifier-gated router catches a
// thrown tier (router.ts) and escalates to the next tier within the SAME process; without
// this finally, a tier whose prompt throws would leak its SIGINT/SIGTERM/beforeExit handlers
// and leave its tracer active, accumulating across tiers. Cleanup now always runs.
try {
if (args.command) {
await sdk.session.command({
sessionID,
agent,
model: args.model,
command: args.command,
arguments: message,
variant: args.variant,
})
} else {
const model = args.model ? Provider.parseModel(args.model) : undefined
await sdk.session.prompt({
sessionID,
agent,
model,
variant: args.variant,
parts: [...files, { type: "text", text: message }],
...(audienceSystem ? { system: audienceSystem } : {}),
})
}

// Wait for the event loop to drain (breaks when session reaches idle)
await loopPromise

// Remove crash handlers — trace will be finalized cleanly
process.removeListener("SIGINT", onSigint)
process.removeListener("SIGTERM", onSigterm)
process.removeListener("beforeExit", onBeforeExit)

// Finalize trace and save to disk
if (tracer) {
Tracer.setActive(null)
const tracePath = await tracer.endTrace(error)
if (tracePath) {
emit("trace_saved", { path: tracePath })
if (args.format !== "json" && process.stdout.isTTY) {
UI.println(UI.Style.TEXT_DIM + `Trace saved: ${tracePath}` + UI.Style.TEXT_NORMAL)
// Wait for the event loop to drain (breaks when session reaches idle)
await loopPromise
} finally {
// Remove crash handlers — trace will be finalized cleanly
process.removeListener("SIGINT", onSigint)
process.removeListener("SIGTERM", onSigterm)
process.removeListener("beforeExit", onBeforeExit)

// Finalize trace and save to disk (with `error` if the run failed)
if (tracer) {
Tracer.setActive(null)
const tracePath = await tracer.endTrace(error)
if (tracePath) {
emit("trace_saved", { path: tracePath })
if (args.format !== "json" && process.stdout.isTTY) {
UI.println(UI.Style.TEXT_DIM + `Trace saved: ${tracePath}` + UI.Style.TEXT_NORMAL)
}
}
}
}
// altimate_change end

// Write accumulated text output to file if --output was specified
if (args.output) {
Expand All @@ -864,8 +880,190 @@ You are speaking to a non-technical business executive. Follow these rules stric
await Bun.write(outputPath, content)
process.stderr.write(`\n✓ Output saved to: ${outputPath}\n`)
}

// altimate_change start — expose the session id so the router can reuse one session
// across tiers (escalation continues the same session instead of starting fresh).
return sessionID
// altimate_change end
}

// altimate_change start — verifier-gated router orchestration
// Deterministic-verify the dbt workspace in cwd (`dbt build`, judged by Verifier).
// Only gates real dbt projects; with nothing to prove it returns ok (no escalation).
async function verifyWorkspace(): Promise<Verifier.Verdict> {
const root = process.cwd()
if (!(await Filesystem.exists(path.join(root, "dbt_project.yml"))))
return {
ok: true,
unverifiable: true,
strength: Verifier.Strength.UNVERIFIABLE,
decision: Verifier.Decision.OK,
reason: "no dbt project to verify",
checks: [],
}

// Reference-free gate: `dbt build` in `dir`, judged by Verifier. Used directly (default)
// and as the fallback for the equivalence verifier (greenfield / undecidable).
const buildVerify = async (dir: string): Promise<Verifier.Verdict> => {
try {
const proc = Bun.spawn(["dbt", "build"], { cwd: dir, stdout: "pipe", stderr: "pipe" })
// Hard timeout so a hung dbt (lock, prompt, runaway query) can't stall the run.
let timedOut = false
const timer = setTimeout(() => {
timedOut = true
proc.kill()
}, 300_000)
const out = (await new Response(proc.stdout).text()) + (await new Response(proc.stderr).text())
const code = await proc.exited
clearTimeout(timer)
if (timedOut)
return {
ok: false,
strength: Verifier.Strength.BUILD,
decision: Verifier.Decision.FAILED,
reason: "dbt build timed out after 300s",
checks: [{ name: "dbt build", ok: false, detail: "timed out after 300s" }],
}
return Verifier.fromDbt(out, code)
} catch (e) {
// dbt binary missing / spawn failure → can't verify; mark unverifiable (fail-open, but honest).
return {
ok: true,
unverifiable: true,
strength: Verifier.Strength.UNVERIFIABLE,
decision: Verifier.Decision.OK,
reason: `verify skipped: ${String(e)}`,
checks: [],
}
}
}

// EXPERIMENTAL (flag-gated, default off): equivalence-backed verification in the
// reference-available regime — proven-equivalent vs the model's base version. Always
// falls back to `buildVerify` on greenfield / undecidable / any error, so it can never
// be less safe than the build gate. Value is gated on altimate-core dialect + schema
// coverage (altimate-core-internal #128 / #130); ships dormant until those land.
if (process.env["ALTIMATE_ROUTER_EQUIVALENCE"] === "1") {
try {
const exec: ReferenceResolver.Exec = async (cmd, args, cwd) => {
const p = Bun.spawn([cmd, ...args], { cwd, stdout: "pipe", stderr: "pipe" })
const stdout = await new Response(p.stdout).text()
return { stdout, code: await p.exited }
}
const readCompiled = async (dir: string): Promise<Map<string, string>> => {
const { readdir } = await import("node:fs/promises")
const map = new Map<string, string>()
const baseDir = path.join(dir, "target", "compiled")
if (!(await Filesystem.exists(baseDir))) return map
const walk = async (d: string) => {
for (const e of await readdir(d, { withFileTypes: true })) {
const fp = path.join(d, e.name)
if (e.isDirectory()) await walk(fp)
else if (e.name.endsWith(".sql")) map.set(e.name.replace(/\.sql$/, ""), await Bun.file(fp).text())
}
}
await walk(baseDir)
return map
}
const checkoutBase = async (workdir: string, ref: string) => {
const dir = path.join("/tmp", `altimate-base-${Date.now()}`)
await exec("git", ["worktree", "add", "--detach", dir, ref], workdir)
return {
dir,
cleanup: async () => {
await exec("git", ["worktree", "remove", "--force", dir], workdir)
},
}
}
const deps = ReferenceResolver.gitDbtDeps(exec, {
readCompiled,
// Best-effort: empty schema ⇒ the engine abstains on table refs ⇒ build fallback.
// A warehouse schema resolver lands with the dialect coverage work.
buildSchema: async () => undefined,
checkoutBase,
})
const check: EquivalenceVerifier.CheckEquivalence = async (head, base, schema) => {
const r = await Dispatcher.call("altimate_core.equivalence", {
sql1: head,
sql2: base,
schema_context: schema as Record<string, unknown> | undefined,
})
const d = ((r as { data?: Record<string, unknown> }).data ?? {}) as {
equivalent?: boolean
validation_errors?: string[]
differences?: { severity?: string; description?: string }[]
confidence?: number
}
return {
equivalent: !!d.equivalent,
validation_errors: d.validation_errors ?? [],
differences: d.differences ?? [],
confidence: d.confidence,
}
}
return await EquivalenceVerifier.create(check, ReferenceResolver.create(deps), {
verify: buildVerify,
}).verify(root)
} catch {
return buildVerify(root) // the experimental path must never break the run
}
}

return buildVerify(root)
}

// Run the tier ladder: cheap → verify → escalate with failing-check context, stop at first pass.
// Each tier re-invokes the existing single-run path with that model (and the escalation note
// prepended) in the SAME workspace, so a later tier fixes the prior attempt rather than restarting.
async function runRouted(sdk: OpencodeClient) {
// Only route when the workspace is verifiable. Without a deterministic gate, routing
// would accept the cheapest tier with no way to verify or escalate — silently
// downgrading quality. In a non-dbt project, run once with the user's model instead.
if (!(await Filesystem.exists(path.join(process.cwd(), "dbt_project.yml")))) {
await execute(sdk)
return
}
const baseMessage = message
const originalModel = args.model
const originalSession = args.session
// Reuse ONE session across tiers: tier-1 creates it; escalation tiers continue the
// same session so the stronger model sees the prior attempt + the failing-check note,
// rather than starting cold. Captured from execute()'s returned session id.
let sharedSessionID: string | undefined
const policy = Policy.resolve()
const tiers = await policy.tiers({ prompt: baseMessage })
let result
try {
result = await Router.route({
tiers,
runAgent: async (model, note) => {
args.model = model
message = note ? `${note}\n\n${baseMessage}` : baseMessage
if (sharedSessionID) args.session = sharedSessionID // continue tier-1's session
const sid = await execute(sdk)
if (sid && !sharedSessionID) sharedSessionID = sid // capture tier-1's session
},
verify: verifyWorkspace,
})
} finally {
// Always restore the mutated request state, even if a tier throws — otherwise
// `message`/`args.model`/`args.session` leak the last tier's state to any
// downstream logging/telemetry/retry.
message = baseMessage
args.model = originalModel
args.session = originalSession
}
const envelope = Verdict.build(result, { now: new Date().toISOString() })
if (args.format === "json") {
process.stdout.write(JSON.stringify({ type: "verdict", timestamp: Date.now(), ...envelope }) + EOL)
} else {
const tag = envelope.solved ? `✓ verified by ${envelope.solvedBy}` : "✗ unverified after all tiers"
UI.println(UI.Style.TEXT_INFO_BOLD + `~ router: ${tag} (policy: ${policy.source})`)
}
await Policy.reportOutcome(envelope)
}
// altimate_change end

if (args.attach) {
const headers = (() => {
const password = args.password ?? process.env.OPENCODE_SERVER_PASSWORD
Expand All @@ -875,7 +1073,11 @@ You are speaking to a non-technical business executive. Follow these rules stric
return { Authorization: auth }
})()
const sdk = createOpencodeClient({ baseUrl: args.attach, directory, headers })
return await execute(sdk)
// altimate_change start — route when enabled, else single run
if (Router.enabled()) await runRouted(sdk)
else await execute(sdk) // discard execute()'s returned session id (handler returns void)
return
// altimate_change end
}

await bootstrap(process.cwd(), async () => {
Expand All @@ -884,7 +1086,10 @@ You are speaking to a non-technical business executive. Follow these rules stric
return Server.Default().fetch(request)
}) as typeof globalThis.fetch
const sdk = createOpencodeClient({ baseUrl: "http://altimate-code.internal", fetch: fetchFn })
await execute(sdk)
// altimate_change start — route when enabled, else single run
if (Router.enabled()) await runRouted(sdk)
else await execute(sdk)
// altimate_change end
})
},
})
Loading
Loading