Skip to content

feat(metrics): add in-process CostTracker with thread-safe aggregation and full test coverage (#515)#797

Open
gudiwadasruthi wants to merge 1 commit intollm4s:mainfrom
gudiwadasruthi:feature/cost-tracker-515
Open

feat(metrics): add in-process CostTracker with thread-safe aggregation and full test coverage (#515)#797
gudiwadasruthi wants to merge 1 commit intollm4s:mainfrom
gudiwadasruthi:feature/cost-tracker-515

Conversation

@gudiwadasruthi
Copy link
Contributor

What does this PR do?

Implements Issue #515 by introducing a lightweight, thread-safe in-process CostTracker that allows users to aggregate request counts, token usage, and total cost within a session — without requiring Prometheus or external observability systems.

Previously, users needed PrometheusMetrics or tracing integrations to inspect cost metrics. This PR provides a simple in-memory API suitable for scripts, agents, and applications that want direct programmatic access to cost totals.

Key additions:

  • CostTracker trait extending MetricsCollector
  • InMemoryCostTracker (thread-safe implementation)
  • NoopCostTracker
  • CostTracker.create() factory
  • MetricsCollector.compose(...) combinator for composing collectors

The tracker supports both:

  • Split metric calls (observeRequest, addTokens, recordCost)
  • Single-call API via record(model, usage, cost: Option[Double])

Thread safety is achieved using:

  • ConcurrentHashMap
  • LongAdder
  • DoubleAdder

No synchronized or mutable var state is used.


Related issue

Fixes #515
Parent: #15


How was this tested?

Ran full cross-build test suite:
sbt clean
sbt +test

Added CostTrackerSpec.scala covering:

  • Single-record aggregation
  • Multiple model aggregation
  • record(Some) and record(None) paths
  • Reset behavior
  • Summary (empty and non-empty branches)
  • NoopCostTracker behavior
  • MetricsCollector.compose (empty, 2 collectors, 3 collectors)
  • Concurrent update safety using Futures
  • Int.MaxValue clamp branches

All tests pass on Scala 2.13 and 3.x.


Checklist

  • I have read the Contributing Guide
  • PR is small and focused — one change, one reason
  • sbt scalafmtAll — code is formatted
  • sbt +test — tests pass on both Scala 2.13 and 3.x
  • New code includes tests
  • No unrelated changes included (branched from main)
  • Commit messages explain the "why"

@github-actions
Copy link
Contributor

github-actions bot commented Mar 2, 2026

🔒 Claude Code Review Status

Thank you for your contribution! This PR is from an external repository, so automated Claude review is disabled for security reasons.

For maintainers: To get Claude review for this PR, comment @claude and I'll trigger a manual review.

PR Summary:

@codecov
Copy link

codecov bot commented Mar 2, 2026

Codecov Report

❌ Patch coverage is 84.37500% with 15 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...src/main/scala/org/llm4s/metrics/CostTracker.scala 88.88% 9 Missing ⚠️
...ain/scala/org/llm4s/metrics/MetricsCollector.scala 60.00% 6 Missing ⚠️

📢 Thoughts on this report? Let us know!

outcome: Outcome,
duration: FiniteDuration
): Unit =
collectors.foreach(_.observeRequest(provider, model, outcome, duration))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compose currently forwards calls directly. If one collector throws, later collectors are skipped and exception leaks to caller. This breaks the MetricsCollector safety contract. Please wrap each delegate call with fail-safe handling. Guideline principle: telemetry failures must never affect core request flow.

outputTokens = usage.completionTokens.toLong
)
cost.foreach(incCost(model, _))
} catch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch swallows all throwables with no log. The contract says failures should be caught and logged. Please at least log at debug or warn with safe context. Guideline principle: silent failure hides operational defects.

inputTokens: Long,
outputTokens: Long
): Unit =
collectors.foreach(_.addTokens(provider, model, inputTokens, outputTokens))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern for token path: one bad collector can break all others and propagate upward. Please isolate collector failures per delegate call. Guideline principle: side systems must fail independently.

ModelCostSummary(
inputTokens = inputTokens.sum(),
outputTokens = outputTokens.sum(),
requestCount = requestCount.sum().toInt,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestCount is downcast to Int here without clamp, unlike totalRequests. This can overflow for long-running services. Prefer Long in summary model or clamp to Int.MaxValue. Guideline principle: avoid silent numeric overflow in telemetry paths.

s should include("costUsd=")
}

it should "compose and forward calls to all collectors" in {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add one test where first collector throws and second still receives metrics. That validates compose meets the safety contract in real failure conditions. Guideline principle: tests must lock resilience behavior, not only happy path fan-out.

Copy link
Member

@vim89 vim89 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Request changes.
This is useful and aligns with issue #515 plus roadmap cost visibility, but core resilience contract is broken in MetricsCollector.compose because delegate failures can leak and stop fan-out.
CostTracker also swallows failures silently, and ModelCostSummary has an unchecked Int downcast risk for long-running sessions.
Please make compose fail-safe per collector, add a failure-path test, and close the overflow and logging gaps.
I did not run tests / build.

Copy link
Collaborator

@rorygraves rorygraves left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review: #797 — Add in-process CostTracker with thread-safe aggregation

Status: Changes Requested

Good concept — a lightweight in-memory cost tracker extending MetricsCollector without requiring Prometheus. Thread-safe design using ConcurrentHashMap + LongAdder/DoubleAdder is the right approach. Test coverage is solid (242 lines for 200 lines of implementation).

Issues Found

# File Line Severity Issue Suggestion
1 MetricsCollector.scala 120-153 High compose() doesn't isolate failures. collectors.foreach(_.observeRequest(...)) — if the first collector throws, remaining collectors are skipped and the exception propagates to the caller. This directly violates the MetricsCollector contract: "failures must not propagate to callers" (line 8). Wrap each delegate call: collectors.foreach { c => try c.observeRequest(...) catch { case NonFatal(e) => /* log */ } }. Apply to all 6 methods in compose.
2 CostTracker.scala 88, 99, 111, 121, 147 High Catching Throwable swallows fatal errors. catch { case _: Throwable => () } catches OutOfMemoryError, StackOverflowError, VirtualMachineError, etc. Swallowing these silently is dangerous — the JVM may be in an unrecoverable state. Use scala.util.control.NonFatal: catch { case NonFatal(_) => /* log */ }. This lets fatal errors propagate correctly.
3 CostTracker.scala 88, 99, 111, 121, 147 Medium Silent error swallowing with no logging. The MetricsCollector contract says "catch and log errors internally". Currently errors are caught and discarded with (), making operational issues invisible. Add at minimum a System.err.println or use a logger. Even catch { case NonFatal(e) => System.err.println(s"CostTracker: ${e.getMessage}") } is better than nothing.
4 CostTracker.scala 45 Medium Per-model requestCount overflow not clamped. ModelAccumulators.snapshot does requestCount.sum().toInt without overflow protection, while the global totalRequests (line 132) correctly clamps to Int.MaxValue. Inconsistent. Either clamp here too, or better — change ModelCostSummary.requestCount to Long to match the LongAdder storage.
5 CostTracker.scala 17-18, 128-133 Medium totalTokens and totalRequests return Int but are tracked as Long. For a metrics API designed for long-running services, clamping Long to Int at ~2.1B is lossy. The internal LongAdder already stores Long. Change the trait methods totalTokens and totalRequests to return Long. This eliminates the clamping workaround entirely and is more correct for production metrics.
6 CostTracker.scala 93-105 Medium provider parameter silently ignored. All MetricsCollector methods receive provider and model, but only model is used as the aggregation key. If two providers serve the same model name (e.g., gpt-4 via OpenAI and OpenRouter), their metrics merge silently. Use s"$provider/$model" as the map key, or document that aggregation is model-only by design. At minimum, add a ScalaDoc note.
7 CostTrackerSpec.scala Medium No test for compose failure isolation. There's no test verifying that if one collector in compose() throws, the remaining collectors still receive the call. This is the most critical behavioral property of compose. Add a test with a throwing collector first and a mock second, assert the mock still receives the call.
8 CostTracker.scala 30 Low CostTracker.noop is val new NoopCostTracker(). Since NoopCostTracker is stateless, this could be a simple object. Minor allocation concern. Use private[metrics] object NoopCostTracker extends CostTracker { ... }

Positive Notes

  • Thread-safe design with ConcurrentHashMap + LongAdder/DoubleAdder — no synchronized, no mutable var
  • record() convenience method combining request + tokens + cost in one call is nice API design
  • Test coverage is strong: concurrent updates, overflow clamping, reset, noop, compose fan-out
  • Clean separation between InMemoryCostTracker and NoopCostTracker
  • private[metrics] visibility on implementation classes is correct

Summary

2 high, 5 medium, 1 low severity issues. The two high issues are critical: compose() violating the safety contract (one bad collector kills the rest) and catching fatal Throwable errors. These must be fixed before merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Cost Tracking: Add CostTracker for in-process cost aggregation

3 participants