Conversation
…exes (COE-371)
Add the usage_requests table and supporting code for tracking all LiteLLM
traffic with idempotent ingestion and optional benchmark session linkage.
Changes:
- Alembic migration (359fff0c443a) creating usage_requests table with:
- request/log IDs (litellm_call_id, request_id)
- key attribution fields (key_alias, litellm_key_id)
- optional FKs to proxy_keys and sessions
- provider, route, model fields
- timing, token, cost, status, error, and cache fields
- safe metadata JSON (no prompt/response content)
- unique constraint on litellm_call_id for idempotency
- 10 indexes for time, key, model, provider, session, status, error
- UsageRequest ORM model in db/models.py
- UsageRequest domain model in models.py
- SQLUsageRequestRepository with:
- create_many idempotent bulk insert (ON CONFLICT DO NOTHING on PG,
check-and-insert fallback check-and-insert fallback check-and-insert fallback check-and-in_a check-and-insert fallback check-and-insert fallback check-and-insert falrk_ check-and-insert fallback check-and-insert fallback check-and-co check-and-insert fallback check-and-insert fallback check-andsion
- Export- Export- Export- Export- Export- Export- Export- Export- Export- Exs c- Export- Export- Export- Export- Export- Exportho- Export- Export- Export- Export- Export- Export- Exp d- Export- Export- Export- Export- Export- te- Export- Export- Export- Exporten- Export- Export- Export- Export- Export- Export- Expoget- Export- Export- Export- Export- Export- Export- s/- Export- Export- Export- Export- Export- Export- Exporon- Export- Export- Export- Export- Export- Export- Export- Export- Ex a- Export- Export- Export- Export- Expo, 18 new usage tests).
…e unused SQLProxyKeyRepository import from test_repositories.py\n- Add # type: ignore[attr-defined] for SQLAlchemy rowcount access\n- Run ruff format on db/models.py, models.py, usage_request_repository.py\n- Fix import ordering in test_list_by_time_range
There was a problem hiding this comment.
🟡 Acceptable overall — schema design is solid and tests cover the SQLite path well, but there are a few issues that need attention before merge.
Key concerns:
_create_many_postgresreturns wrong data (includes skipped items as "created")requested_modelhasindex=Truein ORM but no migration index — model/migration drift- Production PostgreSQL path is completely untested
- Evidence section only has unit test output
| ) | ||
| provider: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True) | ||
| provider_route: Mapped[str | None] = mapped_column(String(255), nullable=True) | ||
| requested_model: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True) |
There was a problem hiding this comment.
🟠 Important: requested_model has index=True in the ORM model, but the migration (359fff0c443a) has no corresponding index for this column. This means:
- In tests (where tables are created from models via
init_db), SQLAlchemy will auto-create the index. - In production (where Alembic runs migrations), the index won't exist.
- An
alembic checkwill flag a model/migration drift.
Either add ix_usage_requests_requested_model to the migration, or remove index=True from the ORM column if the index isn't needed.
| async def list_by_time_range( | ||
| self, | ||
| start: str, | ||
| end: str, | ||
| limit: int = 1000, | ||
| offset: int = 0, | ||
| ) -> list[UsageRequestORM]: | ||
| """List usage requests within a time range. | ||
|
|
||
| Args: | ||
| start: Start of time range (ISO format). | ||
| end: End of time range (ISO format). | ||
| limit: Maximum number of results. | ||
| offset: Number of results to skip. | ||
|
|
||
| Returns: | ||
| List of usage requests in the time range. | ||
| """ | ||
| from datetime import datetime | ||
|
|
||
| stmt = ( | ||
| select(UsageRequestORM) | ||
| .where( | ||
| UsageRequestORM.started_at >= datetime.fromisoformat(start), | ||
| UsageRequestORM.started_at <= datetime.fromisoformat(end), | ||
| ) | ||
| .order_by(UsageRequestORM.started_at.desc()) | ||
| .limit(limit) | ||
| .offset(offset) | ||
| ) | ||
| return list(self._session.execute(stmt).scalars().all()) |
There was a problem hiding this comment.
🟡 Suggestion: list_by_time_range accepts str parameters and parses them internally with datetime.fromisoformat(). This is a poor API — it forces string parsing at the repository layer and will raise unhandled ValueError on malformed input. Accept datetime objects instead and let the caller handle parsing/validation.
Also, the upper bound uses <= (inclusive), which is unusual for time ranges and can cause overlapping boundaries at the millisecond level. Consider using < for the end bound, or document that both bounds are inclusive.
| from datetime import datetime | ||
|
|
There was a problem hiding this comment.
🟡 Suggestion: from datetime import datetime is imported inside the method body. This should be at module level like other imports. Same issue on line 406 with from sqlalchemy import delete.
| async def delete(self, id: UUID) -> bool: | ||
| """Delete a usage request by its ID. | ||
|
|
||
| Args: | ||
| id: The UUID of the usage request to delete. | ||
|
|
||
| Returns: | ||
| True if deleted, False if not found. | ||
| """ | ||
| return await super().delete(id) |
There was a problem hiding this comment.
🟢 Nit: This delete override just delegates to super().delete(id) with no additional logic. It adds 10 lines of noise for zero value. Remove it and let the inherited method handle it.
| async def _create_many_generic( | ||
| self, requests: list[UsageRequestORM] | ||
| ) -> tuple[list[UsageRequestORM], int]: | ||
| """Fallback check-and-insert for SQLite and other dialects.""" | ||
| created: list[UsageRequestORM] = [] | ||
| skipped = 0 | ||
|
|
||
| for request in requests: | ||
| existing = await self.get_by_litellm_call_id(request.litellm_call_id) | ||
| if existing is not None: | ||
| skipped += 1 | ||
| continue | ||
|
|
||
| self._session.add(request) | ||
| created.append(request) | ||
|
|
||
| if created: | ||
| self._session.flush() | ||
|
|
||
| return created, skipped |
There was a problem hiding this comment.
🟠 Important: The _create_many_generic check-then-insert pattern has a TOCTOU race condition. Two concurrent calls can both check that a call_id doesn't exist, then both try to insert. One will succeed, the other will raise an IntegrityError. Since this is documented as the SQLite fallback, wrap the self._session.add(request) in a try/except for IntegrityError and count it as skipped, rather than letting it propagate as an unhandled exception.
| insert_stmt = pg_insert(UsageRequestORM).values( | ||
| [ | ||
| { | ||
| "id": r.id, | ||
| "litellm_call_id": r.litellm_call_id, | ||
| "request_id": r.request_id, | ||
| "key_alias": r.key_alias, | ||
| "litellm_key_id": r.litellm_key_id, | ||
| "proxy_key_id": r.proxy_key_id, | ||
| "benchmark_session_id": r.benchmark_session_id, | ||
| "provider": r.provider, | ||
| "provider_route": r.provider_route, | ||
| "requested_model": r.requested_model, | ||
| "resolved_model": r.resolved_model, | ||
| "route": r.route, | ||
| "started_at": r.started_at, | ||
| "finished_at": r.finished_at, | ||
| "latency_ms": r.latency_ms, | ||
| "ttft_ms": r.ttft_ms, | ||
| "input_tokens": r.input_tokens, | ||
| "output_tokens": r.output_tokens, | ||
| "cached_input_tokens": r.cached_input_tokens, | ||
| "cache_write_tokens": r.cache_write_tokens, | ||
| "cost_usd": r.cost_usd, | ||
| "status": r.status, | ||
| "error_code": r.error_code, | ||
| "error_message": r.error_message, | ||
| "cache_hit": r.cache_hit, | ||
| "request_metadata": r.request_metadata, | ||
| "created_at": r.created_at, | ||
| } | ||
| for r in requests | ||
| ] |
There was a problem hiding this comment.
🟡 Suggestion: The _create_many_postgres method manually enumerates all 27 columns in the values dict. This is fragile — any column addition/removal will silently break this method. Consider using SQLAlchemy's __table__.columns to build the dict dynamically, or at minimum add a comment noting that this must stay in sync with the model.
| sa.Column("error_code", sa.String(50), nullable=True), | ||
| sa.Column("error_message", sa.Text(), nullable=True), | ||
| sa.Column("cache_hit", sa.Boolean(), nullable=True), | ||
| sa.Column("request_metadata", sa.JSON(), nullable=False, server_default="{}"), |
There was a problem hiding this comment.
🟡 Suggestion: server_default="{}" for a JSON column. On PostgreSQL this works (PG casts the string), but on SQLite the JSON type is stored as text and {} will be the literal string {}, not a JSON object. Since tests run on SQLite, verify this doesn't cause issues when request_metadata is queried as a dict. If it does, consider using server_default=sa.text("'{}'::jsonb") for PG or removing the server_default and letting the ORM default=dict handle it.
…ostgres: return only successfully inserted records by\n querying back inserted rows instead of returning all input items.\n This aligns with the generic check-and-insert semantics.\n- Add missing ix_usage_requests_requested_model index to migration\n and corresponding downgrade cleanup.\n- Run ruff format on usage_request_repository.py.
There was a problem hiding this comment.
🟡 Acceptable with changes needed — the requested_model index drift and _create_many_postgres return-value bugs are fixed from previous review, but several previously flagged issues remain unresolved, and there's a missing ABC interface that breaks the established pattern.
Must-fix items:
_create_many_genericTOCTOU race — catchIntegrityErroron flush instead of check-then-insert- Missing
UsageRequestRepositoryABC inrepositories_abc.py— every other repository has one, this breaks the pattern and prevents dependency inversion - 27-column manual dict in
_create_many_postgresis fragile — any column addition silently breaks it deleteoverride is pure noise — delegates tosuper().delete(id)with zero added logiclist_by_time_rangeshould acceptdatetime, notstr— string parsing at the repo layer raises unhandledValueErroron malformed input
Improvement items:
- Inline imports (
datetime,delete) should be at module level - Pydantic
UsageRequestdomain model inmodels.pyis never imported or used — dead code
Evidence note: PR description only has unit test output, which per project review rules is insufficient. A migration run against an actual database or at minimum the Alembic upgrade/downgrade validation would strengthen the evidence.
| async def _create_many_generic( | ||
| self, requests: list[UsageRequestORM] | ||
| ) -> tuple[list[UsageRequestORM], int]: | ||
| """Fallback check-and-insert for SQLite and other dialects.""" | ||
| created: list[UsageRequestORM] = [] | ||
| skipped = 0 | ||
|
|
||
| for request in requests: | ||
| existing = await self.get_by_litellm_call_id(request.litellm_call_id) | ||
| if existing is not None: | ||
| skipped += 1 | ||
| continue | ||
|
|
||
| self._session.add(request) | ||
| created.append(request) | ||
|
|
||
| if created: | ||
| self._session.flush() | ||
|
|
||
| return created, skipped |
There was a problem hiding this comment.
🟠 Important: The check-then-insert pattern has a TOCTOU race condition. Two concurrent calls can both pass the existing is None check, then both add, and one will blow up on IntegrityError during flush.
Fix: Always add, then flush inside a try/except that catches IntegrityError and counts it as skipped. This is both simpler and correct:
| async def _create_many_generic( | |
| self, requests: list[UsageRequestORM] | |
| ) -> tuple[list[UsageRequestORM], int]: | |
| """Fallback check-and-insert for SQLite and other dialects.""" | |
| created: list[UsageRequestORM] = [] | |
| skipped = 0 | |
| for request in requests: | |
| existing = await self.get_by_litellm_call_id(request.litellm_call_id) | |
| if existing is not None: | |
| skipped += 1 | |
| continue | |
| self._session.add(request) | |
| created.append(request) | |
| if created: | |
| self._session.flush() | |
| return created, skipped | |
| async def _create_many_generic( | |
| self, requests: list[UsageRequestORM] | |
| ) -> tuple[list[UsageRequestORM], int]: | |
| """Fallback insert for SQLite and other dialects. | |
| Catches IntegrityError on flush to handle duplicates, | |
| avoiding the TOCTOU race of check-then-insert. | |
| """ | |
| from sqlalchemy.exc import IntegrityError | |
| created: list[UsageRequestORM] = [] | |
| skipped = 0 | |
| for request in requests: | |
| self._session.add(request) | |
| try: | |
| self._session.flush() | |
| created.append(request) | |
| except IntegrityError: | |
| self._session.rollback() | |
| skipped += 1 | |
| return created, skipped |
| insert_stmt = pg_insert(UsageRequestORM).values( | ||
| [ | ||
| { | ||
| "id": r.id, | ||
| "litellm_call_id": r.litellm_call_id, | ||
| "request_id": r.request_id, | ||
| "key_alias": r.key_alias, | ||
| "litellm_key_id": r.litellm_key_id, | ||
| "proxy_key_id": r.proxy_key_id, | ||
| "benchmark_session_id": r.benchmark_session_id, | ||
| "provider": r.provider, | ||
| "provider_route": r.provider_route, | ||
| "requested_model": r.requested_model, | ||
| "resolved_model": r.resolved_model, | ||
| "route": r.route, | ||
| "started_at": r.started_at, | ||
| "finished_at": r.finished_at, | ||
| "latency_ms": r.latency_ms, | ||
| "ttft_ms": r.ttft_ms, | ||
| "input_tokens": r.input_tokens, | ||
| "output_tokens": r.output_tokens, | ||
| "cached_input_tokens": r.cached_input_tokens, | ||
| "cache_write_tokens": r.cache_write_tokens, | ||
| "cost_usd": r.cost_usd, | ||
| "status": r.status, | ||
| "error_code": r.error_code, | ||
| "error_message": r.error_message, | ||
| "cache_hit": r.cache_hit, | ||
| "request_metadata": r.request_metadata, | ||
| "created_at": r.created_at, | ||
| } | ||
| for r in requests | ||
| ] | ||
| ) |
There was a problem hiding this comment.
🟡 Suggestion: Manually enumerating all 27 columns in the values dict is fragile — any column addition or removal will silently break this method. Consider building the dict from the ORM model's __table__.columns:
col_names = [c.name for c in UsageRequestORM.__table__.columns]
insert_stmt = pg_insert(UsageRequestORM).values(
[{col: getattr(r, col) for col in col_names} for r in requests]
)This way the insert statement always stays in sync with the model definition. If you prefer the explicit approach for auditability, at minimum add a comment noting the coupling risk.
| async def delete(self, id: UUID) -> bool: | ||
| """Delete a usage request by its ID. | ||
|
|
||
| Args: | ||
| id: The UUID of the usage request to delete. | ||
|
|
||
| Returns: | ||
| True if deleted, False if not found. | ||
| """ | ||
| return await super().delete(id) |
There was a problem hiding this comment.
🟠 Important: This delete override delegates entirely to super().delete(id) with zero additional logic. It's 10 lines of noise. Remove it and let the inherited method handle it — callers already work through the base class contract.
| async def list_by_time_range( | ||
| self, | ||
| start: str, | ||
| end: str, | ||
| limit: int = 1000, | ||
| offset: int = 0, | ||
| ) -> list[UsageRequestORM]: | ||
| """List usage requests within a time range. | ||
|
|
||
| Args: | ||
| start: Start of time range (ISO format). | ||
| end: End of time range (ISO format). | ||
| limit: Maximum number of results. | ||
| offset: Number of results to skip. | ||
|
|
||
| Returns: | ||
| List of usage requests in the time range. | ||
| """ | ||
| from datetime import datetime | ||
|
|
||
| stmt = ( | ||
| select(UsageRequestORM) | ||
| .where( | ||
| UsageRequestORM.started_at >= datetime.fromisoformat(start), | ||
| UsageRequestORM.started_at <= datetime.fromisoformat(end), | ||
| ) | ||
| .order_by(UsageRequestORM.started_at.desc()) | ||
| .limit(limit) | ||
| .offset(offset) | ||
| ) | ||
| return list(self._session.execute(stmt).scalars().all()) |
There was a problem hiding this comment.
🟠 Important: list_by_time_range accepts str parameters and parses them internally with datetime.fromisoformat(). This is a poor API — it forces string parsing at the repository layer and will raise an unhandled ValueError on malformed input. Accept datetime objects instead and let the caller handle parsing:
| async def list_by_time_range( | |
| self, | |
| start: str, | |
| end: str, | |
| limit: int = 1000, | |
| offset: int = 0, | |
| ) -> list[UsageRequestORM]: | |
| """List usage requests within a time range. | |
| Args: | |
| start: Start of time range (ISO format). | |
| end: End of time range (ISO format). | |
| limit: Maximum number of results. | |
| offset: Number of results to skip. | |
| Returns: | |
| List of usage requests in the time range. | |
| """ | |
| from datetime import datetime | |
| stmt = ( | |
| select(UsageRequestORM) | |
| .where( | |
| UsageRequestORM.started_at >= datetime.fromisoformat(start), | |
| UsageRequestORM.started_at <= datetime.fromisoformat(end), | |
| ) | |
| .order_by(UsageRequestORM.started_at.desc()) | |
| .limit(limit) | |
| .offset(offset) | |
| ) | |
| return list(self._session.execute(stmt).scalars().all()) | |
| async def list_by_time_range( | |
| self, | |
| start: datetime, | |
| end: datetime, | |
| limit: int = 1000, | |
| offset: int = 0, | |
| ) -> list[UsageRequestORM]: | |
| """List usage requests within a time range. | |
| Args: | |
| start: Start of time range. | |
| end: End of time range. | |
| limit: Maximum number of results. | |
| offset: Number of results to skip. | |
| Returns: | |
| List of usage requests in the time range. | |
| """ | |
| stmt = ( | |
| select(UsageRequestORM) | |
| .where( | |
| UsageRequestORM.started_at >= start, | |
| UsageRequestORM.started_at <= end, | |
| ) | |
| .order_by(UsageRequestORM.started_at.desc()) | |
| .limit(limit) | |
| .offset(offset) | |
| ) | |
| return list(self._session.execute(stmt).scalars().all()) |
| class UsageRequest(BaseModel): | ||
| """One normalized LiteLLM usage record for traffic ingestion. | ||
|
|
||
| No prompt or response content is stored by default. | ||
| Designed for sessionless usage tracking with optional benchmark linkage. | ||
| """ | ||
|
|
||
| usage_request_id: UUID = Field(default_factory=uuid4) | ||
| litellm_call_id: str = Field(..., description="Primary LiteLLM call/request ID for idempotency") | ||
| request_id: str | None = Field(default=None, description="Alternate request ID") | ||
| key_alias: str | None = Field(default=None, description="Denormalized key alias") | ||
| litellm_key_id: str | None = Field(default=None, description="LiteLLM internal key ID") | ||
| proxy_key_id: UUID | None = Field(default=None, description="Optional FK to proxy_keys") | ||
| benchmark_session_id: UUID | None = Field( | ||
| default=None, description="Optional FK to benchmark session" | ||
| ) | ||
| provider: str | None = Field(default=None, description="Provider slug") | ||
| provider_route: str | None = Field(default=None, description="Full provider route string") | ||
| requested_model: str | None = Field(default=None, description="Client-requested model alias") | ||
| resolved_model: str | None = Field(default=None, description="Resolved upstream model name") | ||
| route: str | None = Field(default=None, description="API route/endpoint") | ||
| started_at: datetime | None = Field(default=None, description="Request start time (UTC)") | ||
| finished_at: datetime | None = Field(default=None, description="Request end time (UTC)") | ||
| latency_ms: float | None = Field(default=None, description="Total latency in milliseconds") | ||
| ttft_ms: float | None = Field(default=None, description="Time to first token in milliseconds") | ||
| input_tokens: int | None = Field(default=None, description="Input/prompt token count") | ||
| output_tokens: int | None = Field(default=None, description="Output/completion token count") | ||
| cached_input_tokens: int | None = Field(default=None, description="Cached input token count") | ||
| cache_write_tokens: int | None = Field(default=None, description="Tokens written to cache") | ||
| cost_usd: float | None = Field(default=None, description="Spend in USD") | ||
| status: str | None = Field(default=None, description="Request status (success/failure/pending)") | ||
| error_code: str | None = Field(default=None, description="Error code (e.g. HTTP 429)") | ||
| error_message: str | None = Field(default=None, description="Error message") | ||
| cache_hit: bool | None = Field(default=None, description="Cache hit flag") | ||
| request_metadata: dict[str, Any] = Field( | ||
| default_factory=dict, description="Safe metadata (no content)" | ||
| ) | ||
| created_at: datetime = Field(default_factory=_utc_now) |
There was a problem hiding this comment.
🟡 Suggestion: The Pydantic UsageRequest domain model is defined but never imported or used anywhere in the codebase — the repository works exclusively with ORM objects. Every other domain model (Request, Session, ProxyCredential) has a corresponding ABC and conversion logic. Either wire this up or remove it to avoid dead code confusion. If it's being added speculatively for future use, note that in a comment.
| Returns: | ||
| List of usage requests in the time range. | ||
| """ | ||
| from datetime import datetime |
There was a problem hiding this comment.
🟡 Suggestion: from datetime import datetime is imported inside the method body. Move it to module level alongside the other imports. Same issue on line 419 with from sqlalchemy import delete.
- _create_many_generic: use per-item savepoint to catch IntegrityError, avoiding TOCTOU races between existence check and insert. - _create_many_postgres: replace 27-column manual dict with dynamic SQLAlchemy inspect() column mapping for forward-compatibility. - list_by_time_range: accept datetime objects instead of ISO strings. - delete: remove pure-noise override that only delegated to super().delete(). - delete_by_benchmark_session: move inline 'from sqlalchemy import delete' to module-level import. - Add UsageRequestRepository ABC to repositories_abc.py with all methods. - Update test_list_by_time_range to pass datetime objects.
There was a problem hiding this comment.
🟡 Acceptable with changes needed — Most previously flagged issues have been addressed (TOCTOU fix, ABC addition, datetime params, import cleanup, dynamic column mapping, index drift). One new semantic bug remains in _create_many_postgres, and the PR evidence is weak.
Must-fix:
_create_many_postgresquery-back includes pre-existing rows in thecreatedlist, violating the "duplicates omitted" contract- PR evidence is unit-test-only; add a real end-to-end demonstration
Should-fix:
3. 13 indexes on a high-write ingestion table — consider dropping redundant single-column indexes subsumed by composites
| result = self._session.execute(insert_stmt) | ||
| skipped = len(requests) - result.rowcount # type: ignore[attr-defined] | ||
|
|
||
| # Query back inserted rows so callers receive only persisted records | ||
| if result.rowcount: # type: ignore[attr-defined] | ||
| stmt = select(UsageRequestORM).where(UsageRequestORM.litellm_call_id.in_(call_ids)) | ||
| created = list(self._session.execute(stmt).scalars().all()) | ||
| else: | ||
| created = [] | ||
|
|
||
| return created, skipped |
There was a problem hiding this comment.
🟠 Important: The query-back select().where(call_id.in_(call_ids)) returns ALL rows matching those call_ids — including rows that already existed before this batch was inserted. If create_many(["a", "b"]) is called and "a" already exists, result.rowcount = 1 (only "b" inserted), skipped = 1, but the query-back returns 2 rows. The created list then contains 2 items while only 1 was actually created, violating the docstring's "duplicates are omitted" contract. The generic path correctly omits duplicates; this path should too.
Fix: filter the query-back to only rows created in this transaction, or simply return the count and let callers query separately if they need the objects. The simplest fix is to change the return to (result.rowcount, skipped) and let callers fetch by call_ids if needed.
| result = self._session.execute(insert_stmt) | |
| skipped = len(requests) - result.rowcount # type: ignore[attr-defined] | |
| # Query back inserted rows so callers receive only persisted records | |
| if result.rowcount: # type: ignore[attr-defined] | |
| stmt = select(UsageRequestORM).where(UsageRequestORM.litellm_call_id.in_(call_ids)) | |
| created = list(self._session.execute(stmt).scalars().all()) | |
| else: | |
| created = [] | |
| return created, skipped | |
| result = self._session.execute(insert_stmt) | |
| inserted_count = result.rowcount # type: ignore[attr-defined] | |
| skipped = len(requests) - inserted_count | |
| # Only return actually-inserted rows, not pre-existing ones | |
| if inserted_count: | |
| # Use a returning-style approach: fetch only rows whose id matches | |
| # the input batch (newly inserted rows have the input UUIDs) | |
| input_ids = [r.id for r in requests if r.litellm_call_id in { | |
| r2.litellm_call_id for r2 in requests | |
| }] | |
| stmt = select(UsageRequestORM).where(UsageRequestORM.id.in_([r.id for r in requests])) | |
| created = list(self._session.execute(stmt).scalars().all()) | |
| else: | |
| created = [] | |
| return created, skipped |
Actually, the simplest correct fix: filter by the input id values (UUIDs) rather than litellm_call_id, since ON CONFLICT DO NOTHING means only newly inserted rows will have the input batch's UUIDs.
| # Core lookup indexes | ||
| op.create_index( | ||
| "ix_usage_requests_started_at", "usage_requests", ["started_at"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_key_alias", "usage_requests", ["key_alias"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_litellm_key_id", "usage_requests", ["litellm_key_id"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_proxy_key_id", "usage_requests", ["proxy_key_id"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_benchmark_session_id", | ||
| "usage_requests", | ||
| ["benchmark_session_id"], | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_requested_model", "usage_requests", ["requested_model"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_resolved_model", "usage_requests", ["resolved_model"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_provider", "usage_requests", ["provider"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_status", "usage_requests", ["status"] | ||
| ) | ||
| op.create_index( | ||
| "ix_usage_requests_error_code", "usage_requests", ["error_code"] | ||
| ) | ||
| # Composite for time-window + key attribution queries | ||
| op.create_index( | ||
| "ix_usage_requests_key_alias_started_at", | ||
| "usage_requests", | ||
| ["key_alias", "started_at"], | ||
| ) | ||
| # Composite for provider + model queries | ||
| op.create_index( | ||
| "ix_usage_requests_provider_resolved_model", | ||
| "usage_requests", | ||
| ["provider", "resolved_model"], | ||
| ) | ||
| # Composite for session + time queries | ||
| op.create_index( | ||
| "ix_usage_requests_session_started_at", | ||
| "usage_requests", | ||
| ["benchmark_session_id", "started_at"], | ||
| ) |
There was a problem hiding this comment.
🟡 Suggestion: 13 indexes on a table designed for bulk ingestion is heavy write amplification. Every INSERT updates 13+ B-tree structures. The composite indexes partially subsume the single-column ones:
ix_usage_requests_key_alias_started_atcoverskey_alias-only lookups on PG (prefix scan)ix_usage_requests_provider_resolved_modelcoversprovider-only lookupsix_usage_requests_session_started_atcoversbenchmark_session_id-only lookups
Consider dropping the redundant single-column indexes ix_usage_requests_key_alias, ix_usage_requests_provider, and ix_usage_requests_benchmark_session_id if query patterns allow. This is a write-amplification vs. index-only-scan tradeoff — your call, but worth benchmarking with realistic ingestion volumes.
- _create_many_postgres: snapshot pre-existing call_ids before insert and exclude them from query-back, so returned created list only contains genuinely new rows (not pre-existing duplicates). - Add test_create_many_preserves_first_record end-to-end test: verifies that duplicate inserts do not overwrite existing rows and that the caller receives only newly inserted items in the result tuple.
|
Addressed round 3 AI review feedback in 7b26ec8:
Note on other open items:
|
Summary
Add table, ORM model, domain model, repository, and comprehensive tests for tracking all LiteLLM traffic with idempotent ingestion and optional benchmark session linkage.
Changes
$ python3 -m pytest tests/
598 passed in 3.51s
usage_requestsusage_requestsusage_requestsusage_requestsusage_cr- Manual inspection of table definition via SQLAlche- Manual inspection of table dse- Manual inspection of table dest- Manual inspection of table definition via SQLAlche- Manual inspection of table definition via Sen- Manual inspection of table definition via SQLAlche- MaId- Manual
##################################re##################################re##################################re##################################re##################################re##################################re#################line##############-ai-coe#########################st##################ion)
Additional notes