diff --git a/hypaware-core/plugins-workspace/ai-gateway/src/message_projector.js b/hypaware-core/plugins-workspace/ai-gateway/src/message_projector.js index b421379..b9d175a 100644 --- a/hypaware-core/plugins-workspace/ai-gateway/src/message_projector.js +++ b/hypaware-core/plugins-workspace/ai-gateway/src/message_projector.js @@ -637,6 +637,12 @@ function expandMessageParts(ctx) { const finishReason = mapFinishReason(stringValue(ctx.message.stop_reason)) const messageCreatedAt = stringValue(ctx.message.message_created_at) ?? ctx.tsStart const messageAttributes = ctx.message.attributes + // @ref LLP 0035#one-carrier [implements] — response-level `usage` is + // per-response, so a multi-block carrier message must not replicate it onto + // every part. Strip `usage` from all but the last part; the last block (the + // terminal output item, which also carries `stop_reason`/status) is the sole + // carrier. Closes the residual edge LLP 0035 assumed wasn't produced. + const nonCarrierAttributes = stripUsage(messageAttributes) const baseClientAttributes = withClientAttributes( undefined, stringValue(ctx.projection.client_version), @@ -725,7 +731,7 @@ function expandMessageParts(ctx) { : undefined, is_error: readKey(block, 'is_error') === true ? true : undefined, status: buildStatus(block, isLast, ctx.role, finishReason), - attributes: mergeJsonObjects(baseClientAttributes, messageAttributes), + attributes: mergeJsonObjects(baseClientAttributes, isLast ? messageAttributes : nonCarrierAttributes), raw_frame: isPlainObject(ctx.message.raw_frame) ? ctx.message.raw_frame : undefined, } if ( @@ -968,6 +974,22 @@ function buildGatewayAttributes(exchange) { return attrs } +/** + * Drop response-level `usage` from a message's attributes, for the + * non-carrier parts of a multi-block message. Returns the input + * unchanged when there's no `usage`, and `undefined` when `usage` was + * the only key (so non-carrier parts fall back to client attributes). + * + * @ref LLP 0035#one-carrier — usage is per-response; only the last part carries it. + * @param {Record | undefined} attributes + * @returns {Record | undefined} + */ +function stripUsage(attributes) { + if (!isPlainObject(attributes) || attributes.usage === undefined) return attributes + const { usage, ...rest } = attributes + return Object.keys(rest).length > 0 ? rest : undefined +} + /** * Right-biased deep merge for one level of nested objects. Used both * to fold projection-level attributes into per-row attributes and to diff --git a/hypaware-core/plugins-workspace/claude/agents/hypaware-analyst.md b/hypaware-core/plugins-workspace/claude/agents/hypaware-analyst.md index b991516..ec52241 100644 --- a/hypaware-core/plugins-workspace/claude/agents/hypaware-analyst.md +++ b/hypaware-core/plugins-workspace/claude/agents/hypaware-analyst.md @@ -36,7 +36,8 @@ For exact columns in the installed version: `hyp query schema --format j - `is_error`, `is_sidechain`, `is_compact_summary` are direct boolean columns — prefer them over JSON probing or `content_text` substring matches. - Token usage is recorded at `attributes.$.usage.*` when present, but **for Claude-via-gateway recordings this is typically null** — fall back to `attributes.$.gateway.request_bytes` and `attributes.$.gateway.response_bytes` as size proxies. - Latency lives at `attributes.$.timing.latency_ms` (note: `latency_ms`, not `duration_ms`). -- Dedup usage/timing per message before summing: those fields can repeat across the parts of one message — `GROUP BY session_id, message_id` with `MAX(...)` first, then aggregate per session/user/etc. (group/key on `session_id`, not `conversation_id`, which is null for Claude). +- Token usage now rides exactly **one** row per response — the last assistant part — so a plain `SUM(CAST(JSON_VALUE(attributes, '$.usage.') AS BIGINT))` over assistant rows is correct with **no dedupe** (non-carrier parts are null and ignored). This is the one-carrier-per-response rule (LLP 0035 / LLP 0026), which superseded the older "usage repeats across parts" shape; a defensive `MAX(...) GROUP BY session_id, message_id` still returns the right number if you'd rather not assume the rule. `input_tokens` is net of cache for every provider, so `SUM(input_tokens)` (or `SUM(input_tokens + cache_read_tokens)` for gross prompt) means the same thing across rows and never double-counts cache. +- Timing can still repeat across the parts of one message — `MAX(latency_ms) GROUP BY session_id, message_id` before aggregating per session/user/etc. (group/key on `session_id`, not `conversation_id`, which is null for Claude). - Tool call / result pairs join on `tool_call_id`. The natural ordering key for `ai_gateway_messages` is `(session_id, message_index, part_index)`; add `conversation_id` only to separate threads within one Codex session. - Table names are resolved from the SQL AST; only built-ins and registered collection tables are valid. diff --git a/hypaware-core/plugins-workspace/claude/src/backfill.js b/hypaware-core/plugins-workspace/claude/src/backfill.js index 2030580..d59c5b9 100644 --- a/hypaware-core/plugins-workspace/claude/src/backfill.js +++ b/hypaware-core/plugins-workspace/claude/src/backfill.js @@ -298,12 +298,26 @@ async function projectedExchangeFromEntries(args) { let startedAtMs /** @type {string | undefined} */ let transcriptCwd - for (const entry of entries) { + // Usage is a response-level (per API message) figure that Claude Code + // duplicates onto every block line of an assistant turn. Record the last + // block line per API message id so usage is stamped on only that one block — + // matching the live projector, so each response contributes usage to exactly + // one row and live/backfill dedupe onto the same row. @ref LLP 0035#one-carrier + /** @type {Map} */ + const lastBlockIndexByMessageId = new Map() + entries.forEach((entry, index) => { + if (entry.messageId) lastBlockIndexByMessageId.set(entry.messageId, index) + }) + for (let index = 0; index < entries.length; index++) { + const entry = entries[index] // Capture before the message filter: cwd rides every transcript line, not // only the ones that project to a message, and it's the only repo signal a // pre-0032 session carries. if (!transcriptCwd && entry.cwd) transcriptCwd = entry.cwd - const message = projectedMessageFromEntry(entry, agentMeta) + // A line with no API message id is its own single-block message → keep its + // usage; otherwise only the last block of the message carries it. + const stampUsage = !entry.messageId || lastBlockIndexByMessageId.get(entry.messageId) === index + const message = projectedMessageFromEntry(entry, agentMeta, stampUsage) if (!message) continue messages.push(message) if (!clientVersion && entry.client_version) clientVersion = entry.client_version @@ -366,9 +380,11 @@ async function projectedExchangeFromEntries(args) { * * @param {TranscriptEntry} entry * @param {Map} agentMeta + * @param {boolean} stampUsage fold attributes.usage onto this block (true only + * for the last block of an API message, so usage lands once per response) * @returns {AiGatewayProjectedMessage | undefined} */ -function projectedMessageFromEntry(entry, agentMeta) { +function projectedMessageFromEntry(entry, agentMeta, stampUsage) { const role = entry.role if (!role) return undefined @@ -410,9 +426,10 @@ function projectedMessageFromEntry(entry, agentMeta) { } // Mirror live capture: fold the assistant turn's token usage into // attributes.usage (anthropic.js owns the cache_*_input_tokens → - // cache_{read,write}_tokens normalization). Merged, not assigned, so a - // subagent's `claude.spawned_by_tool_use_id` above survives. - const usageAttrs = anthropicMessageAttributes(entry) + // cache_{read,write}_tokens normalization), but only on the last block of the + // API message so usage lands once per response. Merged, not assigned, so a + // subagent's `claude.spawned_by_tool_use_id` above survives. @ref LLP 0035#one-carrier + const usageAttrs = stampUsage ? anthropicMessageAttributes(entry) : undefined if (usageAttrs) message.attributes = { ...(message.attributes ?? {}), ...usageAttrs } if (entry.attachment_type) message.attachment_type = entry.attachment_type if (entry.hook_event) message.hook_event = entry.hook_event diff --git a/hypaware-core/plugins-workspace/claude/src/projector.js b/hypaware-core/plugins-workspace/claude/src/projector.js index 1760625..7221092 100644 --- a/hypaware-core/plugins-workspace/claude/src/projector.js +++ b/hypaware-core/plugins-workspace/claude/src/projector.js @@ -378,12 +378,14 @@ function projectAssistantMessage(args) { match = findTranscriptMatch(transcriptIndex, { role: 'assistant', content: unitContent, agentId }) } applyTranscriptMatch(projected, match) - if (messageAttrs) projected.attributes = messageAttrs - if (stopReason && i === unitCount - 1) { - // The gateway core reads `stop_reason` off the projected message; - // on a split turn it rides the last block's message so - // `status.finish_reason` lands exactly once per API message. - projected.stop_reason = stopReason + if (i === unitCount - 1) { + // `usage` and `stop_reason` are response-level envelope fields, not + // per-block. On a split turn they ride ONLY the last block's message so + // each API message contributes its usage to exactly one row (a SUM over + // rows isn't multiplied by the per-block fanout) and `finish_reason` + // lands once. @ref LLP 0035#one-carrier @ref LLP 0026#consequences + if (messageAttrs) projected.attributes = messageAttrs + if (stopReason) projected.stop_reason = stopReason } out.push(projected) } diff --git a/hypaware-core/plugins-workspace/codex/src/backfill.js b/hypaware-core/plugins-workspace/codex/src/backfill.js index 12e28b3..13447a0 100644 --- a/hypaware-core/plugins-workspace/codex/src/backfill.js +++ b/hypaware-core/plugins-workspace/codex/src/backfill.js @@ -371,8 +371,10 @@ function parseLegacyDoc(text, filePath) { /** * Modern rollout: line-delimited `{ timestamp, type, payload }` records — * one `session_meta`, zero+ `turn_context`, and the conversation's - * `response_item`s. `event_msg` / `compacted` and any unknown line types are - * skipped. A blank or truncated line never aborts the parse. + * `response_item`s. A `token_count` `event_msg` carries the turn's token + * usage and is captured as a synthetic turn-boundary marker (no message); + * all other `event_msg` / `compacted` and unknown line types are skipped. A + * blank or truncated line never aborts the parse. * * @param {string} text * @param {string} filePath @@ -407,6 +409,15 @@ function parseJsonlRollout(text, filePath) { turnPayloads.push(payload) } else if (type === 'response_item' && payload) { items.push({ payload, timestampMs: timestampToMs(row.timestamp) }) + } else if (type === 'event_msg' && payload) { + // The one event_msg we keep: token_count. It is NOT a message — it is a + // turn-boundary marker carrying that turn's normalized usage. Its slot in + // the items stream is preserved (so the projector can attribute it to the + // preceding assistant message), but it never projects a row of its own. + const usageAttributes = codexUsageFromTokenCount(payload) + if (usageAttributes) { + items.push({ payload: { type: 'token_count' }, timestampMs: timestampToMs(row.timestamp), usageAttributes }) + } } } @@ -475,7 +486,19 @@ function projectedExchangeFromSession(args) { /** @type {AiGatewayProjectedMessage[]} */ const messages = [] let nativeIdCount = 0 + // Index in `messages` where the current turn began. A token_count marker + // closes the turn: its usage is stamped onto that turn's LAST assistant + // message (mirroring the live projector's stampUsageOnLastAssistant), then + // the next turn starts. Reasoning-only assistant messages are skipped as + // stamp targets so live and backfilled rows carry usage on the same logical + // message and dedupe to one row. @ref LLP 0035#per-turn + let turnStartIndex = 0 for (const item of items) { + if (item.usageAttributes) { + stampUsageOnTurn(messages, turnStartIndex, item.usageAttributes) + turnStartIndex = messages.length + continue + } const message = projectedMessageFromItem(item) if (!message) continue if (message.message_id) nativeIdCount += 1 @@ -681,6 +704,90 @@ function reasoningItemToProjected(payload) { return { role: 'assistant', content: [{ type: 'thinking', thinking: text }] } } +// --------------------------------------------------------------------- +// Usage extraction +// --------------------------------------------------------------------- + +/** + * Pull a turn's normalized token usage from a `token_count` event_msg + * payload. Reads the per-turn delta (`info.last_token_usage`), NOT the + * cumulative session running total (`info.total_token_usage`) — stamping the + * cumulative would multiply-count when usage is summed across a conversation. + * Returns `undefined` for any other event_msg or a usage-less payload. + * + * @param {Record} payload + * @returns {JsonObject | undefined} + */ +function codexUsageFromTokenCount(payload) { + if (stringValue(payload.type) !== 'token_count') return undefined + const info = payload.info + if (!isPlainObject(info)) return undefined + return codexUsageAttributes(info.last_token_usage) +} + +/** + * Normalize a Codex `last_token_usage` block into the gateway's + * `attributes.usage` shape, matching the live Codex projector and Claude. + * + * @param {unknown} rawUsage + * @returns {JsonObject | undefined} + */ +function codexUsageAttributes(rawUsage) { + if (!isPlainObject(rawUsage)) return undefined + /** @type {JsonObject} */ + const usage = {} + + // @ref LLP 0035#net-input — Codex `input_tokens` is gross (it includes + // `cached_input_tokens`); HypAware stores input_tokens NET of cache so it + // never double-counts against cache_read_tokens and matches the Claude / + // live-Codex convention. total_tokens stays raw, so net + cache_read + + // output == total. + const cachedInput = numberValue(rawUsage.cached_input_tokens) + const grossInput = numberValue(rawUsage.input_tokens) + if (grossInput !== undefined) { + usage.input_tokens = cachedInput !== undefined ? Math.max(0, grossInput - cachedInput) : grossInput + } + if (cachedInput !== undefined) usage.cache_read_tokens = cachedInput + + copyNumberAlias(rawUsage, usage, 'output_tokens', 'output_tokens') + copyNumberAlias(rawUsage, usage, 'reasoning_output_tokens', 'reasoning_tokens') + copyNumberAlias(rawUsage, usage, 'total_tokens', 'total_tokens') + + return Object.keys(usage).length === 0 ? undefined : { usage } +} + +/** + * Stamp a turn's usage onto the LAST assistant message at or after + * `startIndex` that carries text or a tool_use — the same target the live + * projector picks (its terminal output item), so live and backfilled rows fold + * usage onto the one logical message and dedupe cleanly, and the row carrying + * usage is the response's last assistant row for both Codex and Claude. + * Reasoning-only (thinking) messages are skipped; if the turn produced no + * eligible message (e.g. windowed out) the usage is dropped rather than + * mis-attributed. @ref LLP 0035#one-carrier + * + * @param {AiGatewayProjectedMessage[]} messages + * @param {number} startIndex + * @param {JsonObject} usageAttributes + */ +function stampUsageOnTurn(messages, startIndex, usageAttributes) { + for (let i = messages.length - 1; i >= startIndex; i--) { + const message = messages[i] + if (message.role !== 'assistant' || !hasTextOrToolUse(message)) continue + message.attributes = { ...(message.attributes ?? {}), ...usageAttributes } + return + } +} + +/** @param {AiGatewayProjectedMessage} message */ +function hasTextOrToolUse(message) { + if (!Array.isArray(message.content)) return false + return message.content.some((block) => { + const type = isPlainObject(block) ? block.type : undefined + return type === 'text' || type === 'tool_use' + }) +} + // --------------------------------------------------------------------- // Value helpers // --------------------------------------------------------------------- @@ -888,6 +995,27 @@ function boolValue(value) { return typeof value === 'boolean' ? value : undefined } +/** @param {unknown} value @returns {number | undefined} */ +function numberValue(value) { + if (typeof value === 'number' && Number.isFinite(value)) return value + if (typeof value === 'string') { + const n = Number(value) + if (Number.isFinite(n)) return n + } + return undefined +} + +/** + * @param {Record} source + * @param {JsonObject} target + * @param {string} sourceKey + * @param {string} targetKey + */ +function copyNumberAlias(source, target, sourceKey, targetKey) { + const value = numberValue(source[sourceKey]) + if (value !== undefined) target[targetKey] = value +} + /** @param {JsonObject} target @param {string} key @param {string | undefined} value */ function setIfString(target, key, value) { if (value !== undefined) target[key] = value diff --git a/hypaware-core/plugins-workspace/codex/src/exchange-projector.js b/hypaware-core/plugins-workspace/codex/src/exchange-projector.js index 4881515..39dced4 100644 --- a/hypaware-core/plugins-workspace/codex/src/exchange-projector.js +++ b/hypaware-core/plugins-workspace/codex/src/exchange-projector.js @@ -220,6 +220,8 @@ function openAiChatMessages(reqBody, responseBody) { if (assistant) { const finish = stringValue(choice.finish_reason) if (finish) assistant.raw_frame = { ...assistant.raw_frame, finish_reason: finish } + const usageAttributes = openAiUsageAttributes(readOpenAiUsage(responseBody)) + if (usageAttributes) assistant.attributes = mergeJsonObjects(assistant.attributes, usageAttributes) messages.push(assistant) } } @@ -279,6 +281,10 @@ function openAiResponsesMessages(reqBody, responseBody, streamEvents) { const messages = responsesInputMessages(reqBody.input) let assistant = responsesAssistantMessagesFromBody(responseBody) if (assistant.length === 0) assistant = responsesAssistantMessagesFromStream(streamEvents) + const usageAttributes = openAiUsageAttributes( + readOpenAiUsage(responseBody) ?? readOpenAiUsageFromResponsesStream(streamEvents) + ) + stampUsageOnLastAssistant(assistant, usageAttributes) for (const msg of assistant) messages.push(msg) return messages } @@ -454,6 +460,158 @@ function responsesAssistantMessagesFromStream(streamEvents) { return messages } +// --------------------------------------------------------------------- +// Usage extraction +// --------------------------------------------------------------------- + +/** + * @param {unknown} responseBody + * @returns {Record | undefined} + */ +function readOpenAiUsage(responseBody) { + if (!isPlainObject(responseBody)) return undefined + const usage = readKey(responseBody, 'usage') + return isPlainObject(usage) ? usage : undefined +} + +/** + * Pull usage from terminal Responses streaming events. The public + * Responses API carries usage on `response.completed.response.usage`; + * ChatGPT Codex has used the same event family while sometimes placing + * the response fields directly on the payload, so accept both shapes. + * + * @param {Array<{ event: string, data: string }>} streamEvents + * @returns {Record | undefined} + */ +function readOpenAiUsageFromResponsesStream(streamEvents) { + /** @type {Record | undefined} */ + let found + for (const row of streamEvents) { + const payload = parseEventData(row.data) + if (!isPlainObject(payload)) continue + const type = stringValue(payload.type) ?? stringValue(row.event) + if (type !== 'response.completed' && type !== 'response.incomplete' && type !== 'response.failed') continue + const response = isPlainObject(payload.response) ? payload.response : payload + const usage = readOpenAiUsage(response) + if (usage) found = usage + } + return found +} + +/** + * Normalize OpenAI Chat Completions and Responses usage into the + * `attributes.usage` shape already used by Claude rows. The provider's + * usage object is response-scoped, so callers stamp it onto exactly one + * response assistant message — the LAST one — rather than every fanned-out + * output item. @ref LLP 0035#one-carrier + * + * @param {Record | undefined} rawUsage + * @returns {JsonObject | undefined} + */ +function openAiUsageAttributes(rawUsage) { + if (!isPlainObject(rawUsage)) return undefined + /** @type {JsonObject} */ + const usage = {} + + // @ref LLP 0035#net-input — OpenAI/Codex report input_tokens INCLUSIVE of + // cached prompt reads; HypAware stores input_tokens NET of cache so it never + // double-counts against cache_read_tokens and matches the Claude convention + // (input + cache_read [+ cache_write] = total prompt). total_tokens stays the + // provider's raw value, so net_input + cache_read + output == total holds. + const inputDetails = firstPlainObject( + readKey(rawUsage, 'input_tokens_details'), + readKey(rawUsage, 'prompt_tokens_details') + ) + const cachedInput = inputDetails ? numberValue(inputDetails.cached_tokens) : undefined + const grossInput = numberValue(rawUsage.input_tokens) ?? numberValue(rawUsage.prompt_tokens) + if (grossInput !== undefined) { + usage.input_tokens = cachedInput !== undefined ? Math.max(0, grossInput - cachedInput) : grossInput + } + if (cachedInput !== undefined) usage.cache_read_tokens = cachedInput + + copyNumberAlias(rawUsage, usage, 'output_tokens', 'output_tokens') + copyNumberAlias(rawUsage, usage, 'completion_tokens', 'output_tokens') + copyNumberAlias(rawUsage, usage, 'total_tokens', 'total_tokens') + + if (inputDetails) { + copyNumberAlias(inputDetails, usage, 'audio_tokens', 'input_audio_tokens') + } + + const outputDetails = firstPlainObject( + readKey(rawUsage, 'output_tokens_details'), + readKey(rawUsage, 'completion_tokens_details') + ) + if (outputDetails) { + copyNumberAlias(outputDetails, usage, 'reasoning_tokens', 'reasoning_tokens') + copyNumberAlias(outputDetails, usage, 'audio_tokens', 'output_audio_tokens') + copyNumberAlias(outputDetails, usage, 'accepted_prediction_tokens', 'accepted_prediction_tokens') + copyNumberAlias(outputDetails, usage, 'rejected_prediction_tokens', 'rejected_prediction_tokens') + } + + return Object.keys(usage).length === 0 ? undefined : { usage } +} + +/** + * Stamp response-level usage onto the LAST assistant message of the response + * that carries text or a tool_use (the terminal output item — a tool_use on + * tool-calling turns, else the final text). One carrier per response keeps a + * SUM over rows honest, and "last text/tool_use assistant" is the SAME + * predicate the backfill path applies (`backfill.js#stampUsageOnTurn` → + * `hasTextOrToolUse`), so live and backfilled rows fold usage onto the same + * logical row and dedupe to one. Sharing the predicate means the rule no longer + * rests on the implicit invariant that a live Responses reply never ends in a + * reasoning-only assistant message — today it never does (reasoning isn't + * projected as an assistant message live), and if that ever changed the usage + * would be dropped rather than mis-attributed, exactly as backfill does. + * @ref LLP 0035#one-carrier + * + * @param {AiGatewayProjectedMessage[]} messages + * @param {JsonObject | undefined} usageAttributes + */ +function stampUsageOnLastAssistant(messages, usageAttributes) { + if (!usageAttributes) return + for (let i = messages.length - 1; i >= 0; i--) { + const message = messages[i] + if (message.role !== 'assistant' || !hasTextOrToolUse(message)) continue + message.attributes = mergeJsonObjects(message.attributes, usageAttributes) + return + } +} + +/** + * A message is an eligible usage carrier when it has a text or tool_use block. + * Mirrors `backfill.js#hasTextOrToolUse` so the live and backfill carrier + * predicates stay identical. @ref LLP 0035#one-carrier + * + * @param {AiGatewayProjectedMessage} message + */ +function hasTextOrToolUse(message) { + if (!Array.isArray(message.content)) return false + return message.content.some((block) => { + const type = isPlainObject(block) ? block.type : undefined + return type === 'text' || type === 'tool_use' + }) +} + +/** + * @param {unknown[]} values + * @returns {Record | undefined} + */ +function firstPlainObject(...values) { + return values.find(isPlainObject) +} + +/** + * @param {Record} source + * @param {JsonObject} target + * @param {string} sourceKey + * @param {string} targetKey + */ +function copyNumberAlias(source, target, sourceKey, targetKey) { + const value = numberValue(source[sourceKey]) + if (value !== undefined) target[targetKey] = value +} + // --------------------------------------------------------------------- // Codex header + workspace metadata // --------------------------------------------------------------------- @@ -893,6 +1051,26 @@ function parseMaybeJson(value) { try { return JSON.parse(value) } catch { return value } } +/** + * @param {JsonObject | undefined} a + * @param {JsonObject | undefined} b + * @returns {JsonObject | undefined} + */ +function mergeJsonObjects(a, b) { + if (!a) return b + if (!b) return a + /** @type {JsonObject} */ + const out = { ...a } + for (const [key, value] of Object.entries(b)) { + if (isPlainObject(value) && isPlainObject(out[key])) { + out[key] = { ...(/** @type {JsonObject} */ (out[key])), ...value } + } else { + out[key] = value + } + } + return out +} + /** * @param {unknown} value * @returns {value is Record} diff --git a/hypaware-core/plugins-workspace/codex/src/types.d.ts b/hypaware-core/plugins-workspace/codex/src/types.d.ts index 65aef7e..1d9838b 100644 --- a/hypaware-core/plugins-workspace/codex/src/types.d.ts +++ b/hypaware-core/plugins-workspace/codex/src/types.d.ts @@ -1,4 +1,4 @@ -import type { AiGatewayExchangeInput } from '../../../../collectivus-plugin-kernel-types.d.ts' +import type { AiGatewayExchangeInput, JsonObject } from '../../../../collectivus-plugin-kernel-types.d.ts' export interface CodexLogReader { /** Identifier used in telemetry and de-dup. */ @@ -58,6 +58,14 @@ export interface CodexRolloutItem { payload: Record /** Envelope timestamp in epoch millis, when present (modern JSONL only). */ timestampMs?: number + /** + * Per-turn token usage recovered from a `token_count` event_msg, already + * normalized into the gateway's `{ usage: {...} }` attributes shape. Present + * only on the synthetic marker item the parser inserts at a turn boundary; + * the projector consumes it to stamp usage onto that turn's assistant + * message rather than projecting a message of its own. + */ + usageAttributes?: JsonObject } export interface CodexAttachOptions { diff --git a/llp/0000-hypaware.explainer.md b/llp/0000-hypaware.explainer.md index 7197e23..067de6a 100644 --- a/llp/0000-hypaware.explainer.md +++ b/llp/0000-hypaware.explainer.md @@ -78,6 +78,7 @@ plugin that registers a dataset gets query and formatting for free. | Context-graph T1/T2 enrichment + completion | [0028](./0028-context-graph-enrichment.decision.md) | Decision | | MCP hosting intrinsic (verbs → tools) | [0034](./0034-mcp-host-intrinsic.decision.md) | Decision | | Remote attach (client consumer half) | [0033](./0033-remote-query-attach.spec.md) | Spec | +| Token-usage normalization (net input) | [0035](./0035-token-usage-normalization.decision.md) | Decision | ## Where to start diff --git a/llp/0026-claude-native-granularity.decision.md b/llp/0026-claude-native-granularity.decision.md index 4de4dc1..6c8fde0 100644 --- a/llp/0026-claude-native-granularity.decision.md +++ b/llp/0026-claude-native-granularity.decision.md @@ -165,9 +165,19 @@ user tool_result batches have no API message id, so backfill would have to - "Message" in this table means *DAG node*, not *API message*. Counting messages per turn yields more rows than before for live Claude data (matching what backfill already produced). -- The API message envelope (`usage`, `stop_reason`, `model`) is duplicated - across a group's rows, mirroring the transcript. **Consumers summing token - usage must dedupe by API message id, not by row.** +- The API message envelope is split between two scopes. `model` (and the raw + transcript line) is duplicated across a group's rows, mirroring the + transcript. The **response-level** fields — `usage` and `stop_reason` — are + NOT: they ride **only the last block's row** of a group, so each API message + contributes its `usage` to exactly one row and a `SUM` over rows is not + multiplied by the per-block fanout. `stop_reason`-derived + `status.finish_reason` lands on that same last-block row. + - This supersedes the earlier rule that `usage` was duplicated and consumers + "must dedupe by API message id." Usage is now stamped once; a plain + per-row `SUM(attributes.usage.*)` is correct, and a dedupe-by-API-message-id + query still works (the non-carrier blocks are simply null). See + [LLP 0035](./0035-token-usage-normalization.decision.md) for the + cross-provider one-carrier invariant and the canonical accounting query. - **`model` granularity differs by path, by design.** Live capture has one model per exchange and the gateway stamps it on every row (user, assistant, and tool_result alike). Backfill has no exchange-level model — a session can @@ -181,8 +191,6 @@ user tool_result batches have no API message id, so backfill would have to responding assistant's model onto its user turn would have to choose among several responses after a mid-turn switch — ambiguous — and is deliberately out of scope; the per-line `message.model` is the unambiguous native unit.) -- `stop_reason`-derived `status.finish_reason` lands on the last block's - message of a group. - `previous_message_id` stores only the **immediate predecessor** (a 0/1-element array), not the full ancestry. The split multiplies the rows carrying this column, and a full per-row chain is O(N) per message → O(N²) per thread; as a @@ -217,6 +225,8 @@ user tool_result batches have no API message id, so backfill would have to - [LLP 0012](./0012-sources.spec.md) — sources (live + backfill paths) - [LLP 0016](./0016-ai-gateway.decision.md) — gateway owns the schema; adapters own message shape +- [LLP 0035](./0035-token-usage-normalization.decision.md) — revises the + `usage` duplication consequence: usage now lands once, on the last block - `hypaware-core/plugins-workspace/claude/src/projector.js` — the splitter and the `aux_kind` tagging (decision point 6) - `hypaware-core/plugins-workspace/claude/src/anthropic.js` — `claudeAuxKind`, diff --git a/llp/0035-token-usage-normalization.decision.md b/llp/0035-token-usage-normalization.decision.md new file mode 100644 index 0000000..4811510 --- /dev/null +++ b/llp/0035-token-usage-normalization.decision.md @@ -0,0 +1,167 @@ +# LLP 0035: Normalize token-usage semantics in `attributes.usage` + +**Type:** Decision +**Status:** Draft +**Systems:** Gateway, Plugins +**Author:** Brendan / Claude +**Date:** 2026-06-23 +**Related:** LLP 0016 (ai-gateway), LLP 0026 (claude-native-granularity), LLP 0030 (session-id-partition-key) + +## Summary + +`attributes.usage` on `ai_gateway_messages` carries token counts from multiple +providers. Two cross-cutting rules keep the column analyzable without the +analyst (human or model) having to special-case the provider: + +1. **`input_tokens` is net of cache, everywhere.** It counts only *uncached* + prompt tokens. Cached prompt reads ride `cache_read_tokens`, and (Claude + only) cache writes ride `cache_write_tokens`. So for every provider + `input_tokens + cache_read_tokens [+ cache_write_tokens] = total prompt`, + and a naive `SUM(input_tokens)` or `SUM(input_tokens + cache_read_tokens)` + means the same thing across rows. +2. **Per-message usage is per-response (a delta), never cumulative.** A row's + usage describes the one model response that produced it; summing rows over a + conversation reconstructs the conversation total. + +`total_tokens`, when a provider supplies it, is stored **raw** (the provider's +own total, which is gross-input + output). Because input is stored net, the +identity `input_tokens + cache_read_tokens + output_tokens == total_tokens` +holds — a cheap reconciliation check. + +## Context + +`attributes.usage` was first shaped by the Claude adapter, whose transcript +`usage` block is already per-response and **net**: Anthropic reports +`input_tokens` (uncached), `cache_read_input_tokens`, and +`cache_creation_input_tokens` as three additive, non-overlapping fields +(`anthropic.js#anthropicMessageAttributes` maps the latter two to +`cache_read_tokens` / `cache_write_tokens`). + +OpenAI and ChatGPT Codex report usage differently: + +- `input_tokens` (Responses) / `prompt_tokens` (Chat) is **gross** — it + *includes* the cached reads. The cached subset is + `input_tokens_details.cached_tokens` (live) or `cached_input_tokens` (the + Codex rollout `token_count` event). +- The Codex rollout emits a `token_count` event after each turn carrying both + `total_token_usage` (cumulative session running total) and + `last_token_usage` (this turn). + +If those raw shapes were stored as-is, `usage.input_tokens` would mean +"uncached input" for Claude and "input incl. cache" for Codex — the same +column, two meanings. Any cross-provider `SUM`/comparison would silently +mismix net and gross, and `input_tokens + cache_read_tokens` would +double-count cache for Codex. That is a confidently-wrong-numbers trap for an +LLM querying the data, which is HypAware's primary consumer. + +## Decision + +- **Net input.** Both the live Codex exchange projector + (`exchange-projector.js#openAiUsageAttributes`) and the Codex backfill + (`backfill.js#codexUsageAttributes`) compute + `input_tokens = grossInput − cachedInput` (floored at 0) and put the cached + count on `cache_read_tokens`. The Claude adapter already produces net input + and is unchanged — Claude/net is the anchor convention. +- **Per-turn, not cumulative.** Codex backfill reads the + `token_count` event's `last_token_usage`, never `total_token_usage`. The + event is consumed as a turn-boundary marker (it never projects a row); its + usage is stamped per the one-carrier rule below. +- **One carrier per response, on the last assistant + row.** A billed response fans into several rows — Claude splits one API + message into one row per content block (LLP 0026); Codex fans a response into + separate messages and a turn into reasoning/text/tool rows. Response-level + `usage` is stamped onto exactly **one** of those rows: the **last** assistant + row of the response (the terminal output item — a `tool_use` on tool-calling + turns, else the final `text`). This holds for all four paths: + - Claude live (`projector.js#projectAssistantMessage`) and backfill + (`backfill.js`, last block per `messageId`) — usage rides the same + last-block row as `stop_reason`, instead of being duplicated onto every + block. + - Codex live (`exchange-projector.js#stampUsageOnLastAssistant`) and backfill + (`backfill.js#stampUsageOnTurn`, last eligible) — switched from first to + last so the carrier row matches Claude. Both apply the **same** eligibility + predicate (`hasTextOrToolUse`: the last assistant row carrying text or a + tool_use, skipping reasoning-only rows), so the two paths select the same + carrier *by rule* rather than by the coincidence that a live Responses reply + never ends in a reasoning-only assistant message. A turn with no eligible + assistant (e.g. windowed out) drops its usage rather than mis-attributing + it to an earlier row. + + Two payoffs: a plain `SUM(attributes.usage.*)` over rows is correct with no + dedupe, and a human scanning the table sees one identical shape for both + providers (a run of assistant rows, usage on the final one). Live and backfill + pick the same last row, so they dedupe onto it (the dedup hash excludes + `attributes`, so placement is safe). The within-message carrier rule is + enforced structurally too: when a single usage-bearing *message* has multiple + content blocks, `expandMessageParts` stamps `usage` on only its **last** part + (`message_projector.js#stripUsage`), so a multi-block carrier no longer + replicates (over-counts) its usage across every block. This edge was assumed + not to occur ("carrier messages are single-block"), but Claude backfill does + emit multi-block carrier messages — `reasoning + text`, `reasoning + tool_use`, + and parallel-tool-call turns (`reasoning + reasoning + tool_use + tool_use`) — + where the transcript records several blocks under one `messageId`. Those were + the only rows where a plain `SUM` over-counted before this rule was made + unconditional. +- **Raw total.** `total_tokens` is passed through unmodified; net input keeps + the reconciliation identity intact. + +No information is lost: the provider's gross input is recoverable as +`input_tokens + cache_read_tokens`. + +## Canonical query surface + +Token accounting reads **`attributes.usage`** (a JSON column), never +`raw_frame`. `attributes.usage` is the only path populated for every provider +and capture mode (Claude live + backfill, Codex live + backfill). The +provider-raw frame is unreliable: `raw_frame` is null for Claude *live* and all +Codex; only Claude *backfill* stashes the transcript line. And the id, when +present, is the **flat `raw_frame.message_id`** — not the nested +`raw_frame.message.id` / `raw_frame.message.usage` some older notes cite (both +null in the data). + +With the one-carrier rule above, each response contributes usage to exactly one +row, so a plain sum is correct: + +```sql +SELECT + SUM(CAST(JSON_EXTRACT(attributes, '$.usage.input_tokens') AS BIGINT)) AS input, + SUM(CAST(JSON_EXTRACT(attributes, '$.usage.output_tokens') AS BIGINT)) AS output, + SUM(CAST(JSON_EXTRACT(attributes, '$.usage.cache_read_tokens') AS BIGINT)) AS cache_read, + SUM(CAST(JSON_EXTRACT(attributes, '$.usage.cache_write_tokens')AS BIGINT)) AS cache_write, + SUM(CAST(JSON_EXTRACT(attributes, '$.usage.reasoning_tokens') AS BIGINT)) AS reasoning +FROM ai_gateway_messages +WHERE role = 'assistant' AND JSON_EXTRACT(attributes, '$.usage') IS NOT NULL +``` + +A defensive `max()`-per-`COALESCE(raw_frame.message_id, message_id)` rollup also +remains correct (the non-carrier blocks are null and ignored), so it's safe to +keep in queries written before this decision. Field union across providers: +Codex carries `reasoning_tokens` and no `cache_write_tokens`; Claude is the +reverse; `input_tokens` is net for both (#net-input). `COALESCE(..., 0)` the +union. + +### Consequences + +- Cross-provider input-token analysis is apples-to-apples; cache is never + double-counted in a prompt-token sum. +- **Codex `input_tokens` no longer equals the provider's raw number.** Anyone + comparing a HypAware row against an OpenAI dashboard must add + `cache_read_tokens` back. This is the deliberate cost of one consistent + column. +- The live Codex usage extraction was uncommitted when this decision landed, so + no shipped Codex rows used the gross form. +- Claude **field values** are unchanged (already net), but Claude usage + **placement** changed: it now rides one row (the last block) instead of every + block — see #one-carrier and the LLP 0026 consequence revision. No in-app + consumer reads `attributes.usage` (verified: context graph, enrichment, sinks, + datasets, and vector search all ignore it), so only ad-hoc/skill SQL is + affected, and the `max()`-per-message-id form still works. + +## Alternatives considered + +- **Gross everywhere** (fold Claude's cache into `input_tokens`): rewrites the + meaning of already-shipped Claude rows and touches more adapters. Rejected — + larger blast radius, and it discards the clean additive cache breakdown. +- **Leave provider-native, document the asymmetry**: zero code, but the + footgun stays in the data forever and every consumer must re-learn it. + Rejected — pushes the cost onto every future query. diff --git a/test/plugins/ai-gateway-message-projector.test.js b/test/plugins/ai-gateway-message-projector.test.js index f4f7d26..43b3f40 100644 --- a/test/plugins/ai-gateway-message-projector.test.js +++ b/test/plugins/ai-gateway-message-projector.test.js @@ -494,6 +494,46 @@ test('row output is stripped to the schema (no extra fields leak)', async () => } }) +test('a multi-block usage-bearing message stamps usage on only the last part', () => { + // @ref LLP 0035#one-carrier — Claude backfill emits multi-block carrier + // messages (e.g. reasoning + parallel tool_use under one messageId). Usage is + // per-response, so it must ride exactly one row (the last block), not every + // block, or a plain SUM(attributes.usage.*) over-counts within the message. + const rows = aiGatewayRowsFromProjectedExchange({ + provider: 'anthropic', + session_id: 'sess-usage', + messages: [ + { + role: 'assistant', + message_id: 'msg-multiblock', + attributes: { usage: { input_tokens: 100, output_tokens: 42, cache_read_tokens: 9 } }, + content: [ + { type: 'thinking', thinking: 'hmm', signature: 'sig' }, + { type: 'tool_use', id: 'call-a', name: 'Bash', input: {} }, + { type: 'tool_use', id: 'call-b', name: 'Bash', input: {} }, + ], + }, + ], + }, { gatewayId: 'gw', state: createAiGatewayConversationState() }) + + assert.equal(rows.length, 3) + const usageRows = rows.filter((r) => isPlainObject(r.attributes) && r.attributes.usage !== undefined) + assert.equal(usageRows.length, 1, 'exactly one row carries usage') + // The carrier is the last block (highest part_index), where stop_reason rides too. + const carrier = usageRows[0] + assert.equal(carrier.part_index, 2) + assert.equal(carrier.part_type, 'tool_call') + const usage = isPlainObject(carrier.attributes) ? carrier.attributes.usage : undefined + assert.deepEqual(usage, { input_tokens: 100, output_tokens: 42, cache_read_tokens: 9 }) + // A plain SUM over the message's rows equals the single response's usage — + // no per-block over-count. + const summedOutput = rows.reduce((acc, r) => { + const u = isPlainObject(r.attributes) ? r.attributes.usage : undefined + return acc + (isPlainObject(u) && typeof u.output_tokens === 'number' ? u.output_tokens : 0) + }, 0) + assert.equal(summedOutput, 42) +}) + test('two Codex threads sharing a session_id keep separate start time and tool lookup', () => { // A Codex session_id can carry several thread conversation_ids. Per-thread // state (conversation_started_at, tool_call→tool_name) must scope by the diff --git a/test/plugins/claude-backfill.test.js b/test/plugins/claude-backfill.test.js index 8bf65cd..5455947 100644 --- a/test/plugins/claude-backfill.test.js +++ b/test/plugins/claude-backfill.test.js @@ -365,6 +365,55 @@ test('assistant token usage is folded into attributes.usage like live capture', } }) +test('usage lands once — on the last block of a split assistant API message', async () => { + const env = await stageEnv() + try { + // Claude Code writes one transcript line per content block; both lines of + // one API response share message.id and repeat the usage envelope. Usage is + // response-level, so it must land on exactly one row. @ref LLP 0035#one-carrier + const usage = { input_tokens: 200, output_tokens: 60, cache_read_input_tokens: 500 } + await writeTranscript(env, 'repo-split', 'sess-split', [ + { + sessionId: 'sess-split', uuid: 'u-user', parentUuid: null, type: 'user', + message: { role: 'user', content: 'do it' }, timestamp: '2026-05-20T10:00:00.000Z', + }, + { + sessionId: 'sess-split', uuid: 'u-text', parentUuid: 'u-user', type: 'assistant', + message: { id: 'msg-split', role: 'assistant', content: [{ type: 'text', text: 'on it' }], usage }, + timestamp: '2026-05-20T10:00:05.000Z', + }, + { + sessionId: 'sess-split', uuid: 'u-tool', parentUuid: 'u-text', type: 'assistant', + message: { + id: 'msg-split', role: 'assistant', + content: [{ type: 'tool_use', id: 't1', name: 'Bash', input: { command: 'ls' } }], + usage, + }, + timestamp: '2026-05-20T10:00:06.000Z', + }, + ]) + const provider = createClaudeBackfillProvider({ homeDir: env.homeDir, stateFile: env.stateFile }) + const [item] = await collectItems(provider.run(runContext().ctx)) + assert.ok(item) + + const rows = await materialize(item) + // The two assistant blocks are distinct rows; usage rides only the last + // (the tool_call), the earlier text block carries none. + const textRow = rows.find((r) => r.role === 'assistant' && r.part_type === 'text') + const toolRow = rows.find((r) => r.role === 'assistant' && r.part_type === 'tool_call') + assert.ok(textRow) + assert.ok(toolRow) + assert.equal(/** @type {any} */ (textRow.attributes)?.usage, undefined) + assert.deepEqual(/** @type {any} */ (toolRow.attributes).usage, { + input_tokens: 200, + output_tokens: 60, + cache_read_tokens: 500, + }) + } finally { + await env.cleanup() + } +}) + test('assistant model is surfaced per message, switches mid-session, and drops ', async () => { const env = await stageEnv() try { diff --git a/test/plugins/claude-projector-identity.test.js b/test/plugins/claude-projector-identity.test.js index c8a1184..2c95148 100644 --- a/test/plugins/claude-projector-identity.test.js +++ b/test/plugins/claude-projector-identity.test.js @@ -84,6 +84,53 @@ test('native DAG identity: uuid from JSONL transcript becomes message_id and pro } }) +test('live: response-level usage lands once, on the last block of a split turn', async () => { + const env = await stageClaudeEnv() + try { + // Transcript splits the assistant turn one line per block (text, then + // tool_use), both sharing message id msg_u. + await writeTranscript(env, 'sess-u', [ + jsonlRow({ sessionId: 'sess-u', uuid: 'u-user', parentUuid: null, type: 'user', message: { role: 'user', content: 'go' }, timestamp: '2026-05-22T10:00:00.000Z' }), + jsonlRow({ sessionId: 'sess-u', uuid: 'u-text', parentUuid: 'u-user', type: 'assistant', message: { role: 'assistant', id: 'msg_u', content: [{ type: 'text', text: 'on it' }] }, timestamp: '2026-05-22T10:00:01.000Z' }), + jsonlRow({ sessionId: 'sess-u', uuid: 'u-tool', parentUuid: 'u-text', type: 'assistant', message: { role: 'assistant', id: 'msg_u', content: [{ type: 'tool_use', id: 't1', name: 'Bash', input: { command: 'ls' } }] }, timestamp: '2026-05-22T10:00:02.000Z' }), + ]) + + const rows = await projectViaGateway(env, { + reqBody: { + model: 'claude-3-opus', + metadata: { user_id: JSON.stringify({ session_id: 'sess-u' }) }, + messages: [{ role: 'user', content: 'go' }], + }, + // Wire response carries both blocks and one response-level usage block. + responseBody: { + id: 'msg_u', + role: 'assistant', + content: [ + { type: 'text', text: 'on it' }, + { type: 'tool_use', id: 't1', name: 'Bash', input: { command: 'ls' } }, + ], + stop_reason: 'tool_use', + usage: { input_tokens: 200, output_tokens: 60, cache_read_input_tokens: 500 }, + }, + }) + + // The turn fans into two assistant rows; usage rides ONLY the last block + // (the tool_call), not the text block. @ref LLP 0035#one-carrier + const textRow = rows.find((r) => r.role === 'assistant' && r.part_type === 'text') + const toolRow = rows.find((r) => r.role === 'assistant' && r.part_type === 'tool_call') + assert.ok(textRow) + assert.ok(toolRow) + assert.equal(readAttrPath(textRow, ['attributes', 'usage']), undefined) + assert.deepEqual(readAttrPath(toolRow, ['attributes', 'usage']), { + input_tokens: 200, + output_tokens: 60, + cache_read_tokens: 500, + }) + } finally { + await env.cleanup() + } +}) + test('root message gets previous_message_id = [] when parentUuid is null', async () => { const env = await stageClaudeEnv() try { diff --git a/test/plugins/codex-backfill.test.js b/test/plugins/codex-backfill.test.js index c0a592e..c52b8f7 100644 --- a/test/plugins/codex-backfill.test.js +++ b/test/plugins/codex-backfill.test.js @@ -41,10 +41,14 @@ async function stageEnv() { * * @param {{ homeDir: string }} env * @param {string} relPath path under the sessions dir (may include subdirs) + * Each item's envelope `type` defaults to `response_item`; pass an explicit + * `type` (e.g. `event_msg`) to interleave non-message records like + * `token_count` in stream order. + * * @param {{ * meta: Record, * turns?: Array>, - * items: Array<{ timestamp?: string, payload: Record }>, + * items: Array<{ type?: string, timestamp?: string, payload: Record }>, * }} doc */ async function writeModernRollout(env, relPath, doc) { @@ -57,7 +61,7 @@ async function writeModernRollout(env, relPath, doc) { lines.push(JSON.stringify({ type: 'turn_context', timestamp: doc.meta.timestamp, payload: turn })) } for (const item of doc.items) { - lines.push(JSON.stringify({ type: 'response_item', timestamp: item.timestamp, payload: item.payload })) + lines.push(JSON.stringify({ type: item.type ?? 'response_item', timestamp: item.timestamp, payload: item.payload })) } await fs.writeFile(filePath, lines.join('\n') + '\n', 'utf8') return filePath @@ -314,6 +318,193 @@ test('modern rollout projects into canonical ai_gateway_messages rows', async () } }) +test('token_count event folds per-turn usage (net of cache) onto the turn assistant message', async () => { + const env = await stageEnv() + try { + // One turn: reasoning, assistant text, a tool call + its output, then the + // turn's token_count event_msg. The per-turn delta is `last_token_usage`; + // `total_token_usage` is the session's cumulative running total. They are + // set to DIFFERENT (and deliberately larger) values here so a regression + // that read the cumulative total — the multiply-count trap — would fail the + // net-input assertion below instead of passing by coincidence. + // @ref LLP 0035#per-turn + const lastUsage = { + input_tokens: 13761, // gross — includes the 9600 cached + cached_input_tokens: 9600, + output_tokens: 484, + reasoning_output_tokens: 189, + total_tokens: 14245, + } + // Cumulative total (as if prior turns were folded in). NOT the stamped value. + const cumulativeUsage = { + input_tokens: 90000, + cached_input_tokens: 50000, + output_tokens: 4000, + reasoning_output_tokens: 1200, + total_tokens: 99999, + } + await writeModernRollout(env, '2026/06/23/rollout-usage.jsonl', { + meta: { id: 'sess-usage', timestamp: '2026-06-23T00:00:00.000Z' }, + items: [ + { timestamp: '2026-06-23T00:00:01.000Z', payload: { type: 'message', role: 'user', content: [{ type: 'input_text', text: 'go' }] } }, + { timestamp: '2026-06-23T00:00:02.000Z', payload: { type: 'reasoning', summary: [{ type: 'summary_text', text: 'thinking' }] } }, + { timestamp: '2026-06-23T00:00:03.000Z', payload: { type: 'message', role: 'assistant', content: [{ type: 'output_text', text: 'on it' }] } }, + { timestamp: '2026-06-23T00:00:04.000Z', payload: { type: 'function_call', name: 'shell', call_id: 'c1', arguments: '{"command":"ls"}' } }, + { timestamp: '2026-06-23T00:00:05.000Z', payload: { type: 'function_call_output', call_id: 'c1', output: 'a' } }, + { + type: 'event_msg', + timestamp: '2026-06-23T00:00:06.000Z', + payload: { type: 'token_count', info: { total_token_usage: cumulativeUsage, last_token_usage: lastUsage } }, + }, + ], + }) + + const provider = createCodexBackfillProvider({ homeDir: env.homeDir }) + const { items } = await collect(provider.run(runContext().ctx)) + assert.equal(items.length, 1) + const item = items[0] + assert.ok(item) + const exchange = value(item) + + // The token_count marker does NOT add a message: user, reasoning, assistant + // text, assistant tool_use, tool result = 5. + assert.equal(exchange.messages.length, 5) + + // Usage lands on the LAST text/tool_use assistant message of the turn (the + // function_call here), one carrier per response — same row Claude uses. + // Derived from `last_token_usage` (NOT the cumulative `total_token_usage`): + // NET of cache 13761 − 9600 = 4161; 4161 + 9600 + 484 == 14245 total. Were + // the cumulative read instead, input would be 90000 − 50000 = 40000 here. + // @ref LLP 0035#one-carrier @ref LLP 0035#per-turn + const toolUseMsg = exchange.messages.find( + (/** @type {any} */ m) => m.role === 'assistant' && m.content[0].type === 'tool_use' + ) + assert.deepEqual(toolUseMsg.attributes, { + usage: { + input_tokens: 4161, + cache_read_tokens: 9600, + output_tokens: 484, + reasoning_tokens: 189, + total_tokens: 14245, + }, + }) + + // The earlier assistant text and the reasoning (thinking) message carry no usage. + const textMsg = exchange.messages.find((/** @type {any} */ m) => m.role === 'assistant' && m.content[0].type === 'text') + assert.equal(textMsg.attributes, undefined) + const thinkingMsg = exchange.messages.find((/** @type {any} */ m) => m.content[0].type === 'thinking') + assert.equal(thinkingMsg.attributes, undefined) + + // Usage survives materialization onto the assistant tool_call row. + const rows = await materialize(item) + const toolRow = rows.find((r) => r.part_type === 'tool_call' && r.role === 'assistant') + assert.ok(toolRow) + assert.deepEqual(/** @type {any} */ (toolRow.attributes).usage, { + input_tokens: 4161, + cache_read_tokens: 9600, + output_tokens: 484, + reasoning_tokens: 189, + total_tokens: 14245, + }) + } finally { + await env.cleanup() + } +}) + +test('multi-turn token_count: each turn stamps its own per-turn delta on its own last assistant row', async () => { + const env = await stageEnv() + try { + // Three turns in one session, each closed by a token_count marker. The + // turnStartIndex advance is what stops a later turn's usage from being + // stamped onto (or summed into) an earlier turn's row. Each token_count + // carries a distinct per-turn `last_token_usage` delta and a cumulative + // `total_token_usage` running total; the projector must read the delta. + // @ref LLP 0035#per-turn @ref LLP 0035#one-carrier + const turn1Delta = { input_tokens: 13761, cached_input_tokens: 9600, output_tokens: 484, reasoning_output_tokens: 189, total_tokens: 14245 } + const turn2Delta = { input_tokens: 5000, cached_input_tokens: 1000, output_tokens: 200, reasoning_output_tokens: 50, total_tokens: 5200 } + const turn3Delta = { input_tokens: 7000, cached_input_tokens: 2000, output_tokens: 300, reasoning_output_tokens: 80, total_tokens: 7600 } + // Cumulative running totals (what `total_token_usage` actually carries). + // Set so that reading them instead of the delta would yield wrong numbers: + // e.g. turn 2 cumulative net input is 18761 − 10600 = 8161, never 4000. + const turn1Total = turn1Delta + const turn2Total = { input_tokens: 18761, cached_input_tokens: 10600, output_tokens: 684, reasoning_output_tokens: 239, total_tokens: 19445 } + const turn3Total = { input_tokens: 25761, cached_input_tokens: 12600, output_tokens: 984, reasoning_output_tokens: 319, total_tokens: 27045 } + + await writeModernRollout(env, '2026/06/24/rollout-multiturn.jsonl', { + meta: { id: 'sess-multi', timestamp: '2026-06-24T00:00:00.000Z' }, + items: [ + // Turn 1: reasoning + assistant text + a tool call; last eligible = tool_use. + { timestamp: '2026-06-24T00:00:01.000Z', payload: { type: 'message', role: 'user', content: [{ type: 'input_text', text: 'turn one' }] } }, + { timestamp: '2026-06-24T00:00:02.000Z', payload: { type: 'reasoning', summary: [{ type: 'summary_text', text: 'thinking 1' }] } }, + { timestamp: '2026-06-24T00:00:03.000Z', payload: { type: 'message', role: 'assistant', content: [{ type: 'output_text', text: 'on it' }] } }, + { timestamp: '2026-06-24T00:00:04.000Z', payload: { type: 'function_call', name: 'shell', call_id: 'c1', arguments: '{"command":"ls"}' } }, + { timestamp: '2026-06-24T00:00:05.000Z', payload: { type: 'function_call_output', call_id: 'c1', output: 'a' } }, + { type: 'event_msg', timestamp: '2026-06-24T00:00:06.000Z', payload: { type: 'token_count', info: { total_token_usage: turn1Total, last_token_usage: turn1Delta } } }, + // A non-token_count event_msg must be skipped entirely (no row, no usage). + { type: 'event_msg', timestamp: '2026-06-24T00:00:07.000Z', payload: { type: 'agent_reasoning', text: 'internal note' } }, + // Turn 2: a single assistant text reply; last eligible = that text. + { timestamp: '2026-06-24T00:00:08.000Z', payload: { type: 'message', role: 'user', content: [{ type: 'input_text', text: 'turn two' }] } }, + { timestamp: '2026-06-24T00:00:09.000Z', payload: { type: 'message', role: 'assistant', content: [{ type: 'output_text', text: 'final answer' }] } }, + { type: 'event_msg', timestamp: '2026-06-24T00:00:10.000Z', payload: { type: 'token_count', info: { total_token_usage: turn2Total, last_token_usage: turn2Delta } } }, + // Turn 3: reasoning ONLY — no text/tool_use assistant in range, so its + // usage is DROPPED rather than mis-attributed to an earlier turn's row. + { timestamp: '2026-06-24T00:00:11.000Z', payload: { type: 'reasoning', summary: [{ type: 'summary_text', text: 'thinking 3' }] } }, + { type: 'event_msg', timestamp: '2026-06-24T00:00:12.000Z', payload: { type: 'token_count', info: { total_token_usage: turn3Total, last_token_usage: turn3Delta } } }, + ], + }) + + const provider = createCodexBackfillProvider({ homeDir: env.homeDir }) + const { items } = await collect(provider.run(runContext().ctx)) + assert.equal(items.length, 1) + const item = items[0] + assert.ok(item) + const exchange = value(item) + + // Messages: u1, think1, asst"on it", tool_use, tool_result, u2, + // asst"final answer", think3 = 8. The three token_count markers and the + // stray agent_reasoning event_msg add no messages. + assert.equal(exchange.messages.length, 8) + + const findMsg = (/** @type {(m: any) => boolean} */ pred) => exchange.messages.find(pred) + + // Turn 1's usage rides its tool_use (last eligible), net of cache. + const turn1Carrier = findMsg((m) => m.role === 'assistant' && m.content[0].type === 'tool_use') + assert.deepEqual(turn1Carrier.attributes, { + usage: { input_tokens: 4161, cache_read_tokens: 9600, output_tokens: 484, reasoning_tokens: 189, total_tokens: 14245 }, + }) + + // Turn 2's usage rides its own text reply — its own delta, NOT turn 1's and + // NOT the cumulative total (which would give input 8161, output 684). + const turn2Carrier = findMsg((m) => m.role === 'assistant' && m.content[0].type === 'text' && m.content[0].text === 'final answer') + assert.deepEqual(turn2Carrier.attributes, { + usage: { input_tokens: 4000, cache_read_tokens: 1000, output_tokens: 200, reasoning_tokens: 50, total_tokens: 5200 }, + }) + + // Turn 1's earlier text ("on it") and both reasoning messages carry NO usage. + // Crucially, turn 1's carrier was NOT overwritten by turn 2 or turn 3 (asserted + // above), and turn 3's dropped usage never leaked onto an earlier row. + const turn1Text = findMsg((m) => m.role === 'assistant' && m.content[0].type === 'text' && m.content[0].text === 'on it') + assert.equal(turn1Text.attributes, undefined) + for (const m of exchange.messages.filter((/** @type {any} */ x) => x.content[0].type === 'thinking')) { + assert.equal(m.attributes, undefined) + } + + // No row anywhere carries turn 3's delta (it was dropped, not mis-stamped). + const stamped = exchange.messages.filter((/** @type {any} */ m) => m.attributes?.usage) + assert.equal(stamped.length, 2, 'exactly two carrier rows — turns 1 and 2') + for (const m of stamped) { + assert.notEqual(/** @type {any} */ (m.attributes).usage.total_tokens, 7600, 'turn 3 usage never stamped') + } + + // Survives materialization: exactly two assistant rows carry usage. + const rows = await materialize(item) + const usageRows = rows.filter((r) => /** @type {any} */ (r.attributes)?.usage) + assert.equal(usageRows.length, 2) + } finally { + await env.cleanup() + } +}) + test('backfill redacts credential userinfo from the git remote (LLP 0032)', async () => { const env = await stageEnv() try { diff --git a/test/plugins/codex-exchange-projector.test.js b/test/plugins/codex-exchange-projector.test.js index 777d8b0..714f92d 100644 --- a/test/plugins/codex-exchange-projector.test.js +++ b/test/plugins/codex-exchange-projector.test.js @@ -56,6 +56,48 @@ test('OpenAI Chat projection: request+response messages roll up into user+assist assert.deepEqual(projection.messages[1].content, [{ type: 'text', text: 'ok' }]) }) +test('OpenAI Chat projection normalizes usage onto the assistant response', () => { + const projector = createCodexExchangeProjector({ env: {} }) + const projection = /** @type {any} */ (projector.project(exchange({ + path: '/v1/chat/completions', + provider: 'openai', + request_body: JSON.stringify({ + model: 'gpt-4o-mini', + messages: [{ role: 'user', content: 'hi' }], + }), + response_body: JSON.stringify({ + choices: [{ message: { role: 'assistant', content: 'ok' } }], + usage: { + prompt_tokens: 12, + completion_tokens: 5, + total_tokens: 17, + prompt_tokens_details: { cached_tokens: 7, audio_tokens: 2 }, + completion_tokens_details: { + reasoning_tokens: 3, + accepted_prediction_tokens: 1, + rejected_prediction_tokens: 4, + }, + }, + }), + }), context())) + + assert.deepEqual(projection.messages[1].attributes, { + usage: { + // input_tokens is stored NET of cache (12 gross − 7 cached = 5); the + // 7 cached reads ride cache_read_tokens, so net + cache_read + output + // (5 + 7 + 5) reconciles to total_tokens 17. @ref LLP 0035#net-input + input_tokens: 5, + output_tokens: 5, + total_tokens: 17, + cache_read_tokens: 7, + input_audio_tokens: 2, + reasoning_tokens: 3, + accepted_prediction_tokens: 1, + rejected_prediction_tokens: 4, + }, + }) +}) + test('OpenAI Chat tool messages map to tool_result blocks', () => { const projector = createCodexExchangeProjector({ env: {} }) const projection = /** @type {any} */ (projector.project(exchange({ @@ -104,6 +146,48 @@ test('OpenAI Responses with output_text in the body produces an assistant messag assert.deepEqual(projection.messages[1].content, [{ type: 'text', text: 'because' }]) }) +test('OpenAI Responses body usage is normalized onto one assistant response item', () => { + const projector = createCodexExchangeProjector({ env: {} }) + const projection = /** @type {any} */ (projector.project(exchange({ + path: '/v1/responses', + provider: 'openai', + request_body: JSON.stringify({ + model: 'gpt-5', + input: [{ role: 'user', content: [{ type: 'input_text', text: 'list files' }] }], + }), + response_body: JSON.stringify({ + id: 'resp_3', + output: [ + { type: 'message', role: 'assistant', content: [{ type: 'output_text', text: 'on it' }] }, + { type: 'function_call', call_id: 'call_42', name: 'exec_command', arguments: '{"cmd":"ls"}' }, + ], + usage: { + input_tokens: 30, + output_tokens: 11, + total_tokens: 41, + input_tokens_details: { cached_tokens: 18 }, + output_tokens_details: { reasoning_tokens: 6 }, + }, + }), + }), context())) + + assert.deepEqual(projection.messages.map((/** @type {any} */ m) => m.role), ['user', 'assistant', 'assistant']) + // Response-level usage rides the LAST assistant output item (here the + // function_call), not the first — one carrier per response, same row Claude + // uses. @ref LLP 0035#one-carrier + assert.equal(projection.messages[1].attributes, undefined) + assert.deepEqual(projection.messages[2].attributes, { + usage: { + // 30 gross input − 18 cached = 12 net; 12 + 18 + 11 == 41 total. + input_tokens: 12, + output_tokens: 11, + total_tokens: 41, + cache_read_tokens: 18, + reasoning_tokens: 6, + }, + }) +}) + test('OpenAI Responses captures top-level instructions into system_text', () => { const projector = createCodexExchangeProjector({ env: {} }) const projection = /** @type {any} */ (projector.project(exchange({ @@ -161,6 +245,53 @@ test('OpenAI Responses SSE deltas reconstruct the assistant body', () => { assert.deepEqual(projection.messages[1].raw_frame, { response_id: 'resp_2' }) }) +test('OpenAI Responses SSE completed usage is normalized onto the assistant response', () => { + const projector = createCodexExchangeProjector({ env: {} }) + const projection = /** @type {any} */ (projector.project(exchange({ + path: '/v1/responses', + is_sse: true, + request_body: JSON.stringify({ + model: 'gpt-5', + input: [{ role: 'user', content: [{ type: 'input_text', text: 'why' }] }], + }), + response_body: '', + stream_events: [ + { kind: 'stream_event', exchange_id: 'ex-1', t_ms: 0, event: 'response.created', data: JSON.stringify({ id: 'resp_2', type: 'response.created' }) }, + { kind: 'stream_event', exchange_id: 'ex-1', t_ms: 5, event: 'response.output_text.delta', data: JSON.stringify({ type: 'response.output_text.delta', delta: 'be' }) }, + { kind: 'stream_event', exchange_id: 'ex-1', t_ms: 6, event: 'response.output_text.delta', data: JSON.stringify({ type: 'response.output_text.delta', delta: 'cause' }) }, + { + kind: 'stream_event', + exchange_id: 'ex-1', + t_ms: 9, + event: 'response.completed', + data: JSON.stringify({ + type: 'response.completed', + id: 'resp_2', + status: 'completed', + usage: { + input_tokens: 8, + output_tokens: 4, + total_tokens: 12, + input_tokens_details: { cached_tokens: 3 }, + output_tokens_details: { reasoning_tokens: 2 }, + }, + }), + }, + ], + }), context())) + + assert.deepEqual(projection.messages[1].attributes, { + usage: { + // 8 gross input − 3 cached = 5 net; 5 + 3 + 4 == 12 total. + input_tokens: 5, + output_tokens: 4, + total_tokens: 12, + cache_read_tokens: 3, + reasoning_tokens: 2, + }, + }) +}) + test('OpenAI Responses function_call in input becomes an assistant tool_use message', () => { const projector = createCodexExchangeProjector({ env: {} }) const projection = /** @type {any} */ (projector.project(exchange({