feat(metrics): add in-process CostTracker with thread-safe aggregation and full test coverage (#515)#797
Conversation
…n and full test coverage (llm4s#515)
🔒 Claude Code Review StatusThank you for your contribution! This PR is from an external repository, so automated Claude review is disabled for security reasons. For maintainers: To get Claude review for this PR, comment PR Summary:
|
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
| outcome: Outcome, | ||
| duration: FiniteDuration | ||
| ): Unit = | ||
| collectors.foreach(_.observeRequest(provider, model, outcome, duration)) |
There was a problem hiding this comment.
compose currently forwards calls directly. If one collector throws, later collectors are skipped and exception leaks to caller. This breaks the MetricsCollector safety contract. Please wrap each delegate call with fail-safe handling. Guideline principle: telemetry failures must never affect core request flow.
| outputTokens = usage.completionTokens.toLong | ||
| ) | ||
| cost.foreach(incCost(model, _)) | ||
| } catch { |
There was a problem hiding this comment.
This catch swallows all throwables with no log. The contract says failures should be caught and logged. Please at least log at debug or warn with safe context. Guideline principle: silent failure hides operational defects.
| inputTokens: Long, | ||
| outputTokens: Long | ||
| ): Unit = | ||
| collectors.foreach(_.addTokens(provider, model, inputTokens, outputTokens)) |
There was a problem hiding this comment.
Same concern for token path: one bad collector can break all others and propagate upward. Please isolate collector failures per delegate call. Guideline principle: side systems must fail independently.
| ModelCostSummary( | ||
| inputTokens = inputTokens.sum(), | ||
| outputTokens = outputTokens.sum(), | ||
| requestCount = requestCount.sum().toInt, |
There was a problem hiding this comment.
requestCount is downcast to Int here without clamp, unlike totalRequests. This can overflow for long-running services. Prefer Long in summary model or clamp to Int.MaxValue. Guideline principle: avoid silent numeric overflow in telemetry paths.
| s should include("costUsd=") | ||
| } | ||
|
|
||
| it should "compose and forward calls to all collectors" in { |
There was a problem hiding this comment.
Please add one test where first collector throws and second still receives metrics. That validates compose meets the safety contract in real failure conditions. Guideline principle: tests must lock resilience behavior, not only happy path fan-out.
vim89
left a comment
There was a problem hiding this comment.
Request changes.
This is useful and aligns with issue #515 plus roadmap cost visibility, but core resilience contract is broken in MetricsCollector.compose because delegate failures can leak and stop fan-out.
CostTracker also swallows failures silently, and ModelCostSummary has an unchecked Int downcast risk for long-running sessions.
Please make compose fail-safe per collector, add a failure-path test, and close the overflow and logging gaps.
I did not run tests / build.
rorygraves
left a comment
There was a problem hiding this comment.
PR Review: #797 — Add in-process CostTracker with thread-safe aggregation
Status: Changes Requested
Good concept — a lightweight in-memory cost tracker extending MetricsCollector without requiring Prometheus. Thread-safe design using ConcurrentHashMap + LongAdder/DoubleAdder is the right approach. Test coverage is solid (242 lines for 200 lines of implementation).
Issues Found
| # | File | Line | Severity | Issue | Suggestion |
|---|---|---|---|---|---|
| 1 | MetricsCollector.scala |
120-153 | High | compose() doesn't isolate failures. collectors.foreach(_.observeRequest(...)) — if the first collector throws, remaining collectors are skipped and the exception propagates to the caller. This directly violates the MetricsCollector contract: "failures must not propagate to callers" (line 8). |
Wrap each delegate call: collectors.foreach { c => try c.observeRequest(...) catch { case NonFatal(e) => /* log */ } }. Apply to all 6 methods in compose. |
| 2 | CostTracker.scala |
88, 99, 111, 121, 147 | High | Catching Throwable swallows fatal errors. catch { case _: Throwable => () } catches OutOfMemoryError, StackOverflowError, VirtualMachineError, etc. Swallowing these silently is dangerous — the JVM may be in an unrecoverable state. |
Use scala.util.control.NonFatal: catch { case NonFatal(_) => /* log */ }. This lets fatal errors propagate correctly. |
| 3 | CostTracker.scala |
88, 99, 111, 121, 147 | Medium | Silent error swallowing with no logging. The MetricsCollector contract says "catch and log errors internally". Currently errors are caught and discarded with (), making operational issues invisible. |
Add at minimum a System.err.println or use a logger. Even catch { case NonFatal(e) => System.err.println(s"CostTracker: ${e.getMessage}") } is better than nothing. |
| 4 | CostTracker.scala |
45 | Medium | Per-model requestCount overflow not clamped. ModelAccumulators.snapshot does requestCount.sum().toInt without overflow protection, while the global totalRequests (line 132) correctly clamps to Int.MaxValue. Inconsistent. |
Either clamp here too, or better — change ModelCostSummary.requestCount to Long to match the LongAdder storage. |
| 5 | CostTracker.scala |
17-18, 128-133 | Medium | totalTokens and totalRequests return Int but are tracked as Long. For a metrics API designed for long-running services, clamping Long to Int at ~2.1B is lossy. The internal LongAdder already stores Long. |
Change the trait methods totalTokens and totalRequests to return Long. This eliminates the clamping workaround entirely and is more correct for production metrics. |
| 6 | CostTracker.scala |
93-105 | Medium | provider parameter silently ignored. All MetricsCollector methods receive provider and model, but only model is used as the aggregation key. If two providers serve the same model name (e.g., gpt-4 via OpenAI and OpenRouter), their metrics merge silently. |
Use s"$provider/$model" as the map key, or document that aggregation is model-only by design. At minimum, add a ScalaDoc note. |
| 7 | CostTrackerSpec.scala |
— | Medium | No test for compose failure isolation. There's no test verifying that if one collector in compose() throws, the remaining collectors still receive the call. This is the most critical behavioral property of compose. |
Add a test with a throwing collector first and a mock second, assert the mock still receives the call. |
| 8 | CostTracker.scala |
30 | Low | CostTracker.noop is val new NoopCostTracker(). Since NoopCostTracker is stateless, this could be a simple object. Minor allocation concern. |
Use private[metrics] object NoopCostTracker extends CostTracker { ... } |
Positive Notes
- Thread-safe design with
ConcurrentHashMap+LongAdder/DoubleAdder— nosynchronized, no mutablevar record()convenience method combining request + tokens + cost in one call is nice API design- Test coverage is strong: concurrent updates, overflow clamping, reset, noop, compose fan-out
- Clean separation between
InMemoryCostTrackerandNoopCostTracker private[metrics]visibility on implementation classes is correct
Summary
2 high, 5 medium, 1 low severity issues. The two high issues are critical: compose() violating the safety contract (one bad collector kills the rest) and catching fatal Throwable errors. These must be fixed before merge.
What does this PR do?
Implements Issue #515 by introducing a lightweight, thread-safe in-process
CostTrackerthat allows users to aggregate request counts, token usage, and total cost within a session — without requiring Prometheus or external observability systems.Previously, users needed PrometheusMetrics or tracing integrations to inspect cost metrics. This PR provides a simple in-memory API suitable for scripts, agents, and applications that want direct programmatic access to cost totals.
Key additions:
CostTrackertrait extendingMetricsCollectorInMemoryCostTracker(thread-safe implementation)NoopCostTrackerCostTracker.create()factoryMetricsCollector.compose(...)combinator for composing collectorsThe tracker supports both:
observeRequest,addTokens,recordCost)record(model, usage, cost: Option[Double])Thread safety is achieved using:
ConcurrentHashMapLongAdderDoubleAdderNo
synchronizedor mutablevarstate is used.Related issue
Fixes #515
Parent: #15
How was this tested?
Ran full cross-build test suite:
sbt clean
sbt +test
Added
CostTrackerSpec.scalacovering:record(Some)andrecord(None)pathsMetricsCollector.compose(empty, 2 collectors, 3 collectors)All tests pass on Scala 2.13 and 3.x.
Checklist
sbt scalafmtAll— code is formattedsbt +test— tests pass on both Scala 2.13 and 3.xmain)