Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,12 @@ function expandMessageParts(ctx) {
const finishReason = mapFinishReason(stringValue(ctx.message.stop_reason))
const messageCreatedAt = stringValue(ctx.message.message_created_at) ?? ctx.tsStart
const messageAttributes = ctx.message.attributes
// @ref LLP 0035#one-carrier [implements] — response-level `usage` is
// per-response, so a multi-block carrier message must not replicate it onto
// every part. Strip `usage` from all but the last part; the last block (the
// terminal output item, which also carries `stop_reason`/status) is the sole
// carrier. Closes the residual edge LLP 0035 assumed wasn't produced.
const nonCarrierAttributes = stripUsage(messageAttributes)
const baseClientAttributes = withClientAttributes(
undefined,
stringValue(ctx.projection.client_version),
Expand Down Expand Up @@ -725,7 +731,7 @@ function expandMessageParts(ctx) {
: undefined,
is_error: readKey(block, 'is_error') === true ? true : undefined,
status: buildStatus(block, isLast, ctx.role, finishReason),
attributes: mergeJsonObjects(baseClientAttributes, messageAttributes),
attributes: mergeJsonObjects(baseClientAttributes, isLast ? messageAttributes : nonCarrierAttributes),
raw_frame: isPlainObject(ctx.message.raw_frame) ? ctx.message.raw_frame : undefined,
}
if (
Expand Down Expand Up @@ -968,6 +974,22 @@ function buildGatewayAttributes(exchange) {
return attrs
}

/**
* Drop response-level `usage` from a message's attributes, for the
* non-carrier parts of a multi-block message. Returns the input
* unchanged when there's no `usage`, and `undefined` when `usage` was
* the only key (so non-carrier parts fall back to client attributes).
*
* @ref LLP 0035#one-carrier — usage is per-response; only the last part carries it.
* @param {Record<string, unknown> | undefined} attributes
* @returns {Record<string, unknown> | undefined}
*/
function stripUsage(attributes) {
if (!isPlainObject(attributes) || attributes.usage === undefined) return attributes
const { usage, ...rest } = attributes
return Object.keys(rest).length > 0 ? rest : undefined
}

/**
* Right-biased deep merge for one level of nested objects. Used both
* to fold projection-level attributes into per-row attributes and to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ For exact columns in the installed version: `hyp query schema <table> --format j
- `is_error`, `is_sidechain`, `is_compact_summary` are direct boolean columns — prefer them over JSON probing or `content_text` substring matches.
- Token usage is recorded at `attributes.$.usage.*` when present, but **for Claude-via-gateway recordings this is typically null** — fall back to `attributes.$.gateway.request_bytes` and `attributes.$.gateway.response_bytes` as size proxies.
- Latency lives at `attributes.$.timing.latency_ms` (note: `latency_ms`, not `duration_ms`).
- Dedup usage/timing per message before summing: those fields can repeat across the parts of one message — `GROUP BY session_id, message_id` with `MAX(...)` first, then aggregate per session/user/etc. (group/key on `session_id`, not `conversation_id`, which is null for Claude).
- Token usage now rides exactly **one** row per response — the last assistant part — so a plain `SUM(CAST(JSON_VALUE(attributes, '$.usage.<field>') AS BIGINT))` over assistant rows is correct with **no dedupe** (non-carrier parts are null and ignored). This is the one-carrier-per-response rule (LLP 0035 / LLP 0026), which superseded the older "usage repeats across parts" shape; a defensive `MAX(...) GROUP BY session_id, message_id` still returns the right number if you'd rather not assume the rule. `input_tokens` is net of cache for every provider, so `SUM(input_tokens)` (or `SUM(input_tokens + cache_read_tokens)` for gross prompt) means the same thing across rows and never double-counts cache.
- Timing can still repeat across the parts of one message — `MAX(latency_ms) GROUP BY session_id, message_id` before aggregating per session/user/etc. (group/key on `session_id`, not `conversation_id`, which is null for Claude).
- Tool call / result pairs join on `tool_call_id`. The natural ordering key for `ai_gateway_messages` is `(session_id, message_index, part_index)`; add `conversation_id` only to separate threads within one Codex session.
- Table names are resolved from the SQL AST; only built-ins and registered collection tables are valid.

Expand Down
29 changes: 23 additions & 6 deletions hypaware-core/plugins-workspace/claude/src/backfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,26 @@ async function projectedExchangeFromEntries(args) {
let startedAtMs
/** @type {string | undefined} */
let transcriptCwd
for (const entry of entries) {
// Usage is a response-level (per API message) figure that Claude Code
// duplicates onto every block line of an assistant turn. Record the last
// block line per API message id so usage is stamped on only that one block —
// matching the live projector, so each response contributes usage to exactly
// one row and live/backfill dedupe onto the same row. @ref LLP 0035#one-carrier
/** @type {Map<string, number>} */
const lastBlockIndexByMessageId = new Map()
entries.forEach((entry, index) => {
if (entry.messageId) lastBlockIndexByMessageId.set(entry.messageId, index)
})
for (let index = 0; index < entries.length; index++) {
const entry = entries[index]
// Capture before the message filter: cwd rides every transcript line, not
// only the ones that project to a message, and it's the only repo signal a
// pre-0032 session carries.
if (!transcriptCwd && entry.cwd) transcriptCwd = entry.cwd
const message = projectedMessageFromEntry(entry, agentMeta)
// A line with no API message id is its own single-block message → keep its
// usage; otherwise only the last block of the message carries it.
const stampUsage = !entry.messageId || lastBlockIndexByMessageId.get(entry.messageId) === index
const message = projectedMessageFromEntry(entry, agentMeta, stampUsage)
if (!message) continue
messages.push(message)
if (!clientVersion && entry.client_version) clientVersion = entry.client_version
Expand Down Expand Up @@ -366,9 +380,11 @@ async function projectedExchangeFromEntries(args) {
*
* @param {TranscriptEntry} entry
* @param {Map<string, { tool_use_id: string }>} agentMeta
* @param {boolean} stampUsage fold attributes.usage onto this block (true only
* for the last block of an API message, so usage lands once per response)
* @returns {AiGatewayProjectedMessage | undefined}
*/
function projectedMessageFromEntry(entry, agentMeta) {
function projectedMessageFromEntry(entry, agentMeta, stampUsage) {
const role = entry.role
if (!role) return undefined

Expand Down Expand Up @@ -410,9 +426,10 @@ function projectedMessageFromEntry(entry, agentMeta) {
}
// Mirror live capture: fold the assistant turn's token usage into
// attributes.usage (anthropic.js owns the cache_*_input_tokens →
// cache_{read,write}_tokens normalization). Merged, not assigned, so a
// subagent's `claude.spawned_by_tool_use_id` above survives.
const usageAttrs = anthropicMessageAttributes(entry)
// cache_{read,write}_tokens normalization), but only on the last block of the
// API message so usage lands once per response. Merged, not assigned, so a
// subagent's `claude.spawned_by_tool_use_id` above survives. @ref LLP 0035#one-carrier
const usageAttrs = stampUsage ? anthropicMessageAttributes(entry) : undefined
if (usageAttrs) message.attributes = { ...(message.attributes ?? {}), ...usageAttrs }
if (entry.attachment_type) message.attachment_type = entry.attachment_type
if (entry.hook_event) message.hook_event = entry.hook_event
Expand Down
14 changes: 8 additions & 6 deletions hypaware-core/plugins-workspace/claude/src/projector.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,14 @@ function projectAssistantMessage(args) {
match = findTranscriptMatch(transcriptIndex, { role: 'assistant', content: unitContent, agentId })
}
applyTranscriptMatch(projected, match)
if (messageAttrs) projected.attributes = messageAttrs
if (stopReason && i === unitCount - 1) {
// The gateway core reads `stop_reason` off the projected message;
// on a split turn it rides the last block's message so
// `status.finish_reason` lands exactly once per API message.
projected.stop_reason = stopReason
if (i === unitCount - 1) {
// `usage` and `stop_reason` are response-level envelope fields, not
// per-block. On a split turn they ride ONLY the last block's message so
// each API message contributes its usage to exactly one row (a SUM over
// rows isn't multiplied by the per-block fanout) and `finish_reason`
// lands once. @ref LLP 0035#one-carrier @ref LLP 0026#consequences
if (messageAttrs) projected.attributes = messageAttrs
if (stopReason) projected.stop_reason = stopReason
}
out.push(projected)
}
Expand Down
132 changes: 130 additions & 2 deletions hypaware-core/plugins-workspace/codex/src/backfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,10 @@ function parseLegacyDoc(text, filePath) {
/**
* Modern rollout: line-delimited `{ timestamp, type, payload }` records —
* one `session_meta`, zero+ `turn_context`, and the conversation's
* `response_item`s. `event_msg` / `compacted` and any unknown line types are
* skipped. A blank or truncated line never aborts the parse.
* `response_item`s. A `token_count` `event_msg` carries the turn's token
* usage and is captured as a synthetic turn-boundary marker (no message);
* all other `event_msg` / `compacted` and unknown line types are skipped. A
* blank or truncated line never aborts the parse.
*
* @param {string} text
* @param {string} filePath
Expand Down Expand Up @@ -407,6 +409,15 @@ function parseJsonlRollout(text, filePath) {
turnPayloads.push(payload)
} else if (type === 'response_item' && payload) {
items.push({ payload, timestampMs: timestampToMs(row.timestamp) })
} else if (type === 'event_msg' && payload) {
// The one event_msg we keep: token_count. It is NOT a message — it is a
// turn-boundary marker carrying that turn's normalized usage. Its slot in
// the items stream is preserved (so the projector can attribute it to the
// preceding assistant message), but it never projects a row of its own.
const usageAttributes = codexUsageFromTokenCount(payload)
if (usageAttributes) {
items.push({ payload: { type: 'token_count' }, timestampMs: timestampToMs(row.timestamp), usageAttributes })
}
}
}

Expand Down Expand Up @@ -475,7 +486,19 @@ function projectedExchangeFromSession(args) {
/** @type {AiGatewayProjectedMessage[]} */
const messages = []
let nativeIdCount = 0
// Index in `messages` where the current turn began. A token_count marker
// closes the turn: its usage is stamped onto that turn's LAST assistant
// message (mirroring the live projector's stampUsageOnLastAssistant), then
// the next turn starts. Reasoning-only assistant messages are skipped as
// stamp targets so live and backfilled rows carry usage on the same logical
// message and dedupe to one row. @ref LLP 0035#per-turn
let turnStartIndex = 0
for (const item of items) {
if (item.usageAttributes) {
stampUsageOnTurn(messages, turnStartIndex, item.usageAttributes)
turnStartIndex = messages.length
continue
}
const message = projectedMessageFromItem(item)
if (!message) continue
if (message.message_id) nativeIdCount += 1
Expand Down Expand Up @@ -681,6 +704,90 @@ function reasoningItemToProjected(payload) {
return { role: 'assistant', content: [{ type: 'thinking', thinking: text }] }
}

// ---------------------------------------------------------------------
// Usage extraction
// ---------------------------------------------------------------------

/**
* Pull a turn's normalized token usage from a `token_count` event_msg
* payload. Reads the per-turn delta (`info.last_token_usage`), NOT the
* cumulative session running total (`info.total_token_usage`) — stamping the
* cumulative would multiply-count when usage is summed across a conversation.
* Returns `undefined` for any other event_msg or a usage-less payload.
*
* @param {Record<string, unknown>} payload
* @returns {JsonObject | undefined}
*/
function codexUsageFromTokenCount(payload) {
if (stringValue(payload.type) !== 'token_count') return undefined
const info = payload.info
if (!isPlainObject(info)) return undefined
return codexUsageAttributes(info.last_token_usage)
}

/**
* Normalize a Codex `last_token_usage` block into the gateway's
* `attributes.usage` shape, matching the live Codex projector and Claude.
*
* @param {unknown} rawUsage
* @returns {JsonObject | undefined}
*/
function codexUsageAttributes(rawUsage) {
if (!isPlainObject(rawUsage)) return undefined
/** @type {JsonObject} */
const usage = {}

// @ref LLP 0035#net-input — Codex `input_tokens` is gross (it includes
// `cached_input_tokens`); HypAware stores input_tokens NET of cache so it
// never double-counts against cache_read_tokens and matches the Claude /
// live-Codex convention. total_tokens stays raw, so net + cache_read +
// output == total.
const cachedInput = numberValue(rawUsage.cached_input_tokens)
const grossInput = numberValue(rawUsage.input_tokens)
if (grossInput !== undefined) {
usage.input_tokens = cachedInput !== undefined ? Math.max(0, grossInput - cachedInput) : grossInput
}
if (cachedInput !== undefined) usage.cache_read_tokens = cachedInput

copyNumberAlias(rawUsage, usage, 'output_tokens', 'output_tokens')
copyNumberAlias(rawUsage, usage, 'reasoning_output_tokens', 'reasoning_tokens')
copyNumberAlias(rawUsage, usage, 'total_tokens', 'total_tokens')

return Object.keys(usage).length === 0 ? undefined : { usage }
}

/**
* Stamp a turn's usage onto the LAST assistant message at or after
* `startIndex` that carries text or a tool_use — the same target the live
* projector picks (its terminal output item), so live and backfilled rows fold
* usage onto the one logical message and dedupe cleanly, and the row carrying
* usage is the response's last assistant row for both Codex and Claude.
* Reasoning-only (thinking) messages are skipped; if the turn produced no
* eligible message (e.g. windowed out) the usage is dropped rather than
* mis-attributed. @ref LLP 0035#one-carrier
*
* @param {AiGatewayProjectedMessage[]} messages
* @param {number} startIndex
* @param {JsonObject} usageAttributes
*/
function stampUsageOnTurn(messages, startIndex, usageAttributes) {
for (let i = messages.length - 1; i >= startIndex; i--) {
const message = messages[i]
if (message.role !== 'assistant' || !hasTextOrToolUse(message)) continue
message.attributes = { ...(message.attributes ?? {}), ...usageAttributes }
return
}
}

/** @param {AiGatewayProjectedMessage} message */
function hasTextOrToolUse(message) {
if (!Array.isArray(message.content)) return false
return message.content.some((block) => {
const type = isPlainObject(block) ? block.type : undefined
return type === 'text' || type === 'tool_use'
})
}

// ---------------------------------------------------------------------
// Value helpers
// ---------------------------------------------------------------------
Expand Down Expand Up @@ -888,6 +995,27 @@ function boolValue(value) {
return typeof value === 'boolean' ? value : undefined
}

/** @param {unknown} value @returns {number | undefined} */
function numberValue(value) {
if (typeof value === 'number' && Number.isFinite(value)) return value
if (typeof value === 'string') {
const n = Number(value)
if (Number.isFinite(n)) return n
}
return undefined
}

/**
* @param {Record<string, unknown>} source
* @param {JsonObject} target
* @param {string} sourceKey
* @param {string} targetKey
*/
function copyNumberAlias(source, target, sourceKey, targetKey) {
const value = numberValue(source[sourceKey])
if (value !== undefined) target[targetKey] = value
}

/** @param {JsonObject} target @param {string} key @param {string | undefined} value */
function setIfString(target, key, value) {
if (value !== undefined) target[key] = value
Expand Down
Loading