-
Notifications
You must be signed in to change notification settings - Fork 0
feat(db): add usage_requests schema, migration, repositories, and indexes (COE-371) #44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
46ed3a3
df39c8d
2d0ca63
2debf7c
7b26ec8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| """Add usage_requests table for LiteLLM traffic ingestion. | ||
|
|
||
| Revision ID: 359fff0c443a | ||
| Revises: b356c861829f | ||
| Create Date: 2025-01-21 21:00:00.000000+00:00 | ||
|
|
||
| """ | ||
|
|
||
| from typing import Sequence, Union | ||
|
|
||
| from alembic import op | ||
| import sqlalchemy as sa | ||
| from sqlalchemy.dialects import postgresql | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = "359fff0c443a" | ||
| down_revision: Union[str, None] = "b356c861829f" | ||
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| """Create usage_requests table with indexes and idempotency constraint.""" | ||
| op.create_table( | ||
| "usage_requests", | ||
| sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), | ||
| sa.Column("litellm_call_id", sa.String(255), nullable=False), | ||
| sa.Column("request_id", sa.String(255), nullable=True), | ||
| sa.Column("key_alias", sa.String(255), nullable=True), | ||
| sa.Column("litellm_key_id", sa.String(255), nullable=True), | ||
| sa.Column("proxy_key_id", postgresql.UUID(as_uuid=True), nullable=True), | ||
| sa.Column("benchmark_session_id", postgresql.UUID(as_uuid=True), nullable=True), | ||
| sa.Column("provider", sa.String(255), nullable=True), | ||
| sa.Column("provider_route", sa.String(255), nullable=True), | ||
| sa.Column("requested_model", sa.String(255), nullable=True), | ||
| sa.Column("resolved_model", sa.String(255), nullable=True), | ||
| sa.Column("route", sa.String(255), nullable=True), | ||
| sa.Column("started_at", sa.DateTime(timezone=True), nullable=True), | ||
| sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True), | ||
| sa.Column("latency_ms", sa.Float(), nullable=True), | ||
| sa.Column("ttft_ms", sa.Float(), nullable=True), | ||
| sa.Column("input_tokens", sa.Integer(), nullable=True), | ||
| sa.Column("output_tokens", sa.Integer(), nullable=True), | ||
| sa.Column("cached_input_tokens", sa.Integer(), nullable=True), | ||
| sa.Column("cache_write_tokens", sa.Integer(), nullable=True), | ||
| sa.Column("cost_usd", sa.Float(), nullable=True), | ||
| sa.Column("status", sa.String(50), nullable=True), | ||
| sa.Column("error_code", sa.String(50), nullable=True), | ||
| sa.Column("error_message", sa.Text(), nullable=True), | ||
| sa.Column("cache_hit", sa.Boolean(), nullable=True), | ||
| sa.Column("request_metadata", sa.JSON(), nullable=False, server_default="{}"), | ||
| sa.Column( | ||
| "created_at", | ||
| sa.DateTime(timezone=True), | ||
| nullable=False, | ||
| server_default=sa.text("NOW()"), | ||
| ), | ||
| sa.PrimaryKeyConstraint("id"), | ||
| sa.UniqueConstraint("litellm_call_id", name="uq_usage_requests_litellm_call_id"), | ||
| sa.ForeignKeyConstraint( | ||
| ["proxy_key_id"], | ||
| ["proxy_keys.id"], | ||
| ondelete="SET NULL", | ||
| ), | ||
| sa.ForeignKeyConstraint( | ||
| ["benchmark_session_id"], | ||
| ["sessions.id"], | ||
| ondelete="SET NULL", | ||
| ), | ||
| ) | ||
|
|
||
| # Core lookup indexes | ||
| op.create_index( | ||
| "ix_usage_requests_started_at", "usage_requests", ["started_at"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_key_alias", "usage_requests", ["key_alias"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_litellm_key_id", "usage_requests", ["litellm_key_id"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_proxy_key_id", "usage_requests", ["proxy_key_id"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_benchmark_session_id", | ||
| "usage_requests", | ||
| ["benchmark_session_id"], | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_requested_model", "usage_requests", ["requested_model"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_resolved_model", "usage_requests", ["resolved_model"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_provider", "usage_requests", ["provider"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_status", "usage_requests", ["status"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_error_code", "usage_requests", ["error_code"] | ||
| ) | ||
| # Composite for time-window + key attribution queries | ||
| op.create_index( | ||
| "ix_usage_requests_key_alias_started_at", | ||
| "usage_requests", | ||
| ["key_alias", "started_at"], | ||
| ) | ||
| # Composite for provider + model queries | ||
| op.create_index( | ||
| "ix_usage_requests_provider_resolved_model", | ||
| "usage_requests", | ||
| ["provider", "resolved_model"], | ||
| ) | ||
| # Composite for session + time queries | ||
| op.create_index( | ||
| "ix_usage_requests_session_started_at", | ||
| "usage_requests", | ||
| ["benchmark_session_id", "started_at"], | ||
| ) | ||
|
Comment on lines
+72
to
+122
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Suggestion: 13 indexes on a table designed for bulk ingestion is heavy write amplification. Every INSERT updates 13+ B-tree structures. The composite indexes partially subsume the single-column ones:
Consider dropping the redundant single-column indexes |
||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| """Drop usage_requests table and indexes.""" | ||
| op.drop_index("ix_usage_requests_session_started_at", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_provider_resolved_model", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_key_alias_started_at", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_error_code", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_status", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_provider", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_resolved_model", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_requested_model", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_benchmark_session_id", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_proxy_key_id", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_litellm_key_id", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_key_alias", table_name="usage_requests") | ||
| op.drop_index("ix_usage_requests_started_at", table_name="usage_requests") | ||
| op.drop_table("usage_requests") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -365,6 +365,53 @@ class Request(Base): | |
| ) | ||
|
|
||
|
|
||
| class UsageRequest(Base): | ||
| """One normalized LiteLLM usage record for traffic ingestion. | ||
|
|
||
| Stores request timing, routing, token counts, cost, and error metadata. | ||
| No prompt or response content is stored by default. | ||
| Designed for sessionless usage tracking with optional benchmark linkage. | ||
| """ | ||
|
|
||
| __tablename__ = "usage_requests" | ||
|
|
||
| id: Mapped[uuid.UUID] = mapped_column(Uuid(as_uuid=True), primary_key=True, default=uuid.uuid4) | ||
| litellm_call_id: Mapped[str] = mapped_column(String(255), nullable=False, unique=True) | ||
| request_id: Mapped[str | None] = mapped_column(String(255), nullable=True) | ||
| key_alias: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True) | ||
| litellm_key_id: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True) | ||
| proxy_key_id: Mapped[uuid.UUID | None] = mapped_column( | ||
| ForeignKey("proxy_keys.id", ondelete="SET NULL"), nullable=True, index=True | ||
| ) | ||
| benchmark_session_id: Mapped[uuid.UUID | None] = mapped_column( | ||
| ForeignKey("sessions.id", ondelete="SET NULL"), nullable=True, index=True | ||
| ) | ||
| provider: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True) | ||
| provider_route: Mapped[str | None] = mapped_column(String(255), nullable=True) | ||
| requested_model: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 Important:
Either add |
||
| resolved_model: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True) | ||
| route: Mapped[str | None] = mapped_column(String(255), nullable=True) | ||
| started_at: Mapped[datetime | None] = mapped_column( | ||
| DateTime(timezone=True), nullable=True, index=True | ||
| ) | ||
| finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) | ||
| latency_ms: Mapped[float | None] = mapped_column(Float, nullable=True) | ||
| ttft_ms: Mapped[float | None] = mapped_column(Float, nullable=True) | ||
| input_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True) | ||
| output_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True) | ||
| cached_input_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True) | ||
| cache_write_tokens: Mapped[int | None] = mapped_column(Integer, nullable=True) | ||
| cost_usd: Mapped[float | None] = mapped_column(Float, nullable=True) | ||
| status: Mapped[str | None] = mapped_column(String(50), nullable=True, index=True) | ||
| error_code: Mapped[str | None] = mapped_column(String(50), nullable=True, index=True) | ||
| error_message: Mapped[str | None] = mapped_column(Text, nullable=True) | ||
| cache_hit: Mapped[bool | None] = mapped_column(Boolean, nullable=True) | ||
| request_metadata: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict) | ||
| created_at: Mapped[datetime] = mapped_column( | ||
| DateTime(timezone=True), default=lambda: datetime.now(UTC) | ||
| ) | ||
|
|
||
|
|
||
| class MetricRollup(Base): | ||
| """Derived latency, throughput, error, and cache metrics.""" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -159,6 +159,46 @@ class Request(BaseModel): | |
| metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | ||
|
|
||
|
|
||
| class UsageRequest(BaseModel): | ||
| """One normalized LiteLLM usage record for traffic ingestion. | ||
|
|
||
| No prompt or response content is stored by default. | ||
| Designed for sessionless usage tracking with optional benchmark linkage. | ||
| """ | ||
|
|
||
| usage_request_id: UUID = Field(default_factory=uuid4) | ||
| litellm_call_id: str = Field(..., description="Primary LiteLLM call/request ID for idempotency") | ||
| request_id: str | None = Field(default=None, description="Alternate request ID") | ||
| key_alias: str | None = Field(default=None, description="Denormalized key alias") | ||
| litellm_key_id: str | None = Field(default=None, description="LiteLLM internal key ID") | ||
| proxy_key_id: UUID | None = Field(default=None, description="Optional FK to proxy_keys") | ||
| benchmark_session_id: UUID | None = Field( | ||
| default=None, description="Optional FK to benchmark session" | ||
| ) | ||
| provider: str | None = Field(default=None, description="Provider slug") | ||
| provider_route: str | None = Field(default=None, description="Full provider route string") | ||
| requested_model: str | None = Field(default=None, description="Client-requested model alias") | ||
| resolved_model: str | None = Field(default=None, description="Resolved upstream model name") | ||
| route: str | None = Field(default=None, description="API route/endpoint") | ||
| started_at: datetime | None = Field(default=None, description="Request start time (UTC)") | ||
| finished_at: datetime | None = Field(default=None, description="Request end time (UTC)") | ||
| latency_ms: float | None = Field(default=None, description="Total latency in milliseconds") | ||
| ttft_ms: float | None = Field(default=None, description="Time to first token in milliseconds") | ||
| input_tokens: int | None = Field(default=None, description="Input/prompt token count") | ||
| output_tokens: int | None = Field(default=None, description="Output/completion token count") | ||
| cached_input_tokens: int | None = Field(default=None, description="Cached input token count") | ||
| cache_write_tokens: int | None = Field(default=None, description="Tokens written to cache") | ||
| cost_usd: float | None = Field(default=None, description="Spend in USD") | ||
| status: str | None = Field(default=None, description="Request status (success/failure/pending)") | ||
| error_code: str | None = Field(default=None, description="Error code (e.g. HTTP 429)") | ||
| error_message: str | None = Field(default=None, description="Error message") | ||
| cache_hit: bool | None = Field(default=None, description="Cache hit flag") | ||
| request_metadata: dict[str, Any] = Field( | ||
| default_factory=dict, description="Safe metadata (no content)" | ||
| ) | ||
| created_at: datetime = Field(default_factory=_utc_now) | ||
|
Comment on lines
+162
to
+199
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Suggestion: The Pydantic |
||
|
|
||
|
|
||
| class MetricRollup(BaseModel): | ||
| """Derived latency, throughput, error, and cache metrics.""" | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Suggestion:
server_default="{}"for a JSON column. On PostgreSQL this works (PG casts the string), but on SQLite the JSON type is stored as text and{}will be the literal string{}, not a JSON object. Since tests run on SQLite, verify this doesn't cause issues whenrequest_metadatais queried as a dict. If it does, consider usingserver_default=sa.text("'{}'::jsonb")for PG or removing the server_default and letting the ORMdefault=dicthandle it.