Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions migrations/versions/359fff0c443a_add_usage_requests_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Add usage_requests table for LiteLLM traffic ingestion.

Revision ID: 359fff0c443a
Revises: b356c861829f
Create Date: 2025-01-21 21:00:00.000000+00:00

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = "359fff0c443a"
down_revision: Union[str, None] = "b356c861829f"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Create usage_requests table with indexes and idempotency constraint."""
op.create_table(
"usage_requests",
sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("litellm_call_id", sa.String(255), nullable=False),
sa.Column("request_id", sa.String(255), nullable=True),
sa.Column("key_alias", sa.String(255), nullable=True),
sa.Column("litellm_key_id", sa.String(255), nullable=True),
sa.Column("proxy_key_id", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("benchmark_session_id", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("provider", sa.String(255), nullable=True),
sa.Column("provider_route", sa.String(255), nullable=True),
sa.Column("requested_model", sa.String(255), nullable=True),
sa.Column("resolved_model", sa.String(255), nullable=True),
sa.Column("route", sa.String(255), nullable=True),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("latency_ms", sa.Float(), nullable=True),
sa.Column("ttft_ms", sa.Float(), nullable=True),
sa.Column("input_tokens", sa.Integer(), nullable=True),
sa.Column("output_tokens", sa.Integer(), nullable=True),
sa.Column("cached_input_tokens", sa.Integer(), nullable=True),
sa.Column("cache_write_tokens", sa.Integer(), nullable=True),
sa.Column("cost_usd", sa.Float(), nullable=True),
sa.Column("status", sa.String(50), nullable=True),
sa.Column("error_code", sa.String(50), nullable=True),
sa.Column("error_message", sa.Text(), nullable=True),
sa.Column("cache_hit", sa.Boolean(), nullable=True),
sa.Column("request_metadata", sa.JSON(), nullable=False, server_default="{}"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: server_default="{}" for a JSON column. On PostgreSQL this works (PG casts the string), but on SQLite the JSON type is stored as text and {} will be the literal string {}, not a JSON object. Since tests run on SQLite, verify this doesn't cause issues when request_metadata is queried as a dict. If it does, consider using server_default=sa.text("'{}'::jsonb") for PG or removing the server_default and letting the ORM default=dict handle it.

sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("NOW()"),
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("litellm_call_id", name="uq_usage_requests_litellm_call_id"),
sa.ForeignKeyConstraint(
["proxy_key_id"],
["proxy_keys.id"],
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["benchmark_session_id"],
["sessions.id"],
ondelete="SET NULL",
),
)

# Core lookup indexes
op.create_index(
"ix_usage_requests_started_at", "usage_requests", ["started_at"]
)
op.create_index(
"ix_usage_requests_key_alias", "usage_requests", ["key_alias"]
)
op.create_index(
"ix_usage_requests_litellm_key_id", "usage_requests", ["litellm_key_id"]
)
op.create_index(
"ix_usage_requests_proxy_key_id", "usage_requests", ["proxy_key_id"]
)
op.create_index(
"ix_usage_requests_benchmark_session_id",
"usage_requests",
["benchmark_session_id"],
)
op.create_index(
"ix_usage_requests_requested_model", "usage_requests", ["requested_model"]
)
op.create_index(
"ix_usage_requests_resolved_model", "usage_requests", ["resolved_model"]
)
op.create_index(
"ix_usage_requests_provider", "usage_requests", ["provider"]
)
op.create_index(
"ix_usage_requests_status", "usage_requests", ["status"]
)
op.create_index(
"ix_usage_requests_error_code", "usage_requests", ["error_code"]
)
# Composite for time-window + key attribution queries
op.create_index(
"ix_usage_requests_key_alias_started_at",
"usage_requests",
["key_alias", "started_at"],
)
# Composite for provider + model queries
op.create_index(
"ix_usage_requests_provider_resolved_model",
"usage_requests",
["provider", "resolved_model"],
)
# Composite for session + time queries
op.create_index(
"ix_usage_requests_session_started_at",
"usage_requests",
["benchmark_session_id", "started_at"],
)
Comment on lines +72 to +122
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: 13 indexes on a table designed for bulk ingestion is heavy write amplification. Every INSERT updates 13+ B-tree structures. The composite indexes partially subsume the single-column ones:

  • ix_usage_requests_key_alias_started_at covers key_alias-only lookups on PG (prefix scan)
  • ix_usage_requests_provider_resolved_model covers provider-only lookups
  • ix_usage_requests_session_started_at covers benchmark_session_id-only lookups

Consider dropping the redundant single-column indexes ix_usage_requests_key_alias, ix_usage_requests_provider, and ix_usage_requests_benchmark_session_id if query patterns allow. This is a write-amplification vs. index-only-scan tradeoff — your call, but worth benchmarking with realistic ingestion volumes.



def downgrade() -> None:
"""Drop usage_requests table and indexes."""
op.drop_index("ix_usage_requests_session_started_at", table_name="usage_requests")
op.drop_index("ix_usage_requests_provider_resolved_model", table_name="usage_requests")
op.drop_index("ix_usage_requests_key_alias_started_at", table_name="usage_requests")
op.drop_index("ix_usage_requests_error_code", table_name="usage_requests")
op.drop_index("ix_usage_requests_status", table_name="usage_requests")
op.drop_index("ix_usage_requests_provider", table_name="usage_requests")
op.drop_index("ix_usage_requests_resolved_model", table_name="usage_requests")
op.drop_index("ix_usage_requests_requested_model", table_name="usage_requests")
op.drop_index("ix_usage_requests_benchmark_session_id", table_name="usage_requests")
op.drop_index("ix_usage_requests_proxy_key_id", table_name="usage_requests")
op.drop_index("ix_usage_requests_litellm_key_id", table_name="usage_requests")
op.drop_index("ix_usage_requests_key_alias", table_name="usage_requests")
op.drop_index("ix_usage_requests_started_at", table_name="usage_requests")
op.drop_table("usage_requests")
47 changes: 47 additions & 0 deletions src/benchmark_core/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,53 @@ class Request(Base):
)


class UsageRequest(Base):
"""One normalized LiteLLM usage record for traffic ingestion.

Stores request timing, routing, token counts, cost, and error metadata.
No prompt or response content is stored by default.
Designed for sessionless usage tracking with optional benchmark linkage.
"""

__tablename__ = "usage_requests"

id: Mapped[uuid.UUID] = mapped_column(Uuid(as_uuid=True), primary_key=True, default=uuid.uuid4)
litellm_call_id: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
request_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
key_alias: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
litellm_key_id: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
proxy_key_id: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("proxy_keys.id", ondelete="SET NULL"), nullable=True, index=True
)
benchmark_session_id: Mapped[uuid.UUID | None] = mapped_column(
ForeignKey("sessions.id", ondelete="SET NULL"), nullable=True, index=True
)
provider: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
provider_route: Mapped[str | None] = mapped_column(String(255), nullable=True)
requested_model: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: requested_model has index=True in the ORM model, but the migration (359fff0c443a) has no corresponding index for this column. This means:

  1. In tests (where tables are created from models via init_db), SQLAlchemy will auto-create the index.
  2. In production (where Alembic runs migrations), the index won't exist.
  3. An alembic check will flag a model/migration drift.

Either add ix_usage_requests_requested_model to the migration, or remove index=True from the ORM column if the index isn't needed.

resolved_model: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
route: Mapped[str | None] = mapped_column(String(255), nullable=True)
started_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, index=True
)
finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
latency_ms: Mapped[float | None] = mapped_column(Float, nullable=True)
ttft_ms: Mapped[float | None] = mapped_column(Float, nullable=True)
input_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
output_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
cached_input_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
cache_write_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True)
cost_usd: Mapped[float | None] = mapped_column(Float, nullable=True)
status: Mapped[str | None] = mapped_column(String(50), nullable=True, index=True)
error_code: Mapped[str | None] = mapped_column(String(50), nullable=True, index=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
cache_hit: Mapped[bool | None] = mapped_column(Boolean, nullable=True)
request_metadata: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(UTC)
)


class MetricRollup(Base):
"""Derived latency, throughput, error, and cache metrics."""

Expand Down
40 changes: 40 additions & 0 deletions src/benchmark_core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,46 @@ class Request(BaseModel):
metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata")


class UsageRequest(BaseModel):
"""One normalized LiteLLM usage record for traffic ingestion.

No prompt or response content is stored by default.
Designed for sessionless usage tracking with optional benchmark linkage.
"""

usage_request_id: UUID = Field(default_factory=uuid4)
litellm_call_id: str = Field(..., description="Primary LiteLLM call/request ID for idempotency")
request_id: str | None = Field(default=None, description="Alternate request ID")
key_alias: str | None = Field(default=None, description="Denormalized key alias")
litellm_key_id: str | None = Field(default=None, description="LiteLLM internal key ID")
proxy_key_id: UUID | None = Field(default=None, description="Optional FK to proxy_keys")
benchmark_session_id: UUID | None = Field(
default=None, description="Optional FK to benchmark session"
)
provider: str | None = Field(default=None, description="Provider slug")
provider_route: str | None = Field(default=None, description="Full provider route string")
requested_model: str | None = Field(default=None, description="Client-requested model alias")
resolved_model: str | None = Field(default=None, description="Resolved upstream model name")
route: str | None = Field(default=None, description="API route/endpoint")
started_at: datetime | None = Field(default=None, description="Request start time (UTC)")
finished_at: datetime | None = Field(default=None, description="Request end time (UTC)")
latency_ms: float | None = Field(default=None, description="Total latency in milliseconds")
ttft_ms: float | None = Field(default=None, description="Time to first token in milliseconds")
input_tokens: int | None = Field(default=None, description="Input/prompt token count")
output_tokens: int | None = Field(default=None, description="Output/completion token count")
cached_input_tokens: int | None = Field(default=None, description="Cached input token count")
cache_write_tokens: int | None = Field(default=None, description="Tokens written to cache")
cost_usd: float | None = Field(default=None, description="Spend in USD")
status: str | None = Field(default=None, description="Request status (success/failure/pending)")
error_code: str | None = Field(default=None, description="Error code (e.g. HTTP 429)")
error_message: str | None = Field(default=None, description="Error message")
cache_hit: bool | None = Field(default=None, description="Cache hit flag")
request_metadata: dict[str, Any] = Field(
default_factory=dict, description="Safe metadata (no content)"
)
created_at: datetime = Field(default_factory=_utc_now)
Comment on lines +162 to +199
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: The Pydantic UsageRequest domain model is defined but never imported or used anywhere in the codebase — the repository works exclusively with ORM objects. Every other domain model (Request, Session, ProxyCredential) has a corresponding ABC and conversion logic. Either wire this up or remove it to avoid dead code confusion. If it's being added speculatively for future use, note that in a comment.



class MetricRollup(BaseModel):
"""Derived latency, throughput, error, and cache metrics."""

Expand Down
2 changes: 2 additions & 0 deletions src/benchmark_core/repositories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from benchmark_core.repositories.request_repository import SQLRequestRepository
from benchmark_core.repositories.session_repository import SQLSessionRepository
from benchmark_core.repositories.task_card_repository import SQLTaskCardRepository
from benchmark_core.repositories.usage_request_repository import SQLUsageRequestRepository
from benchmark_core.repositories.variant_repository import SQLVariantRepository
from benchmark_core.repositories_abc import (
ArtifactRepository,
Expand Down Expand Up @@ -53,4 +54,5 @@
"SQLHarnessProfileRepository",
"SQLArtifactRepository",
"SQLProxyKeyRepository",
"SQLUsageRequestRepository",
]
Loading
Loading