Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- [Test Failures in CI](./operations/testfailures.md)
- [Configs](./operations/configs.md)
- [Elasticsearch](./operations/elasticsearch.md)
- [Curated Recommendations](./operations/curated-recommendations/corpus-cache.md)
- [Jobs](./operations/jobs.md)
- [Navigational Suggestions](./operations/jobs/navigational_suggestions.md)
- [Dynamic Wikipedia Indexer](./operations/jobs/dynamic-wiki-indexer.md)
Expand Down
114 changes: 114 additions & 0 deletions docs/operations/curated-recommendations/corpus-cache.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Corpus Cache (Redis L2)

Shared Redis cache between the per-pod in-memory cache and the Corpus GraphQL API.

## Why

Merino pods each independently fetch from the Corpus API on a short interval. This puts unnecessary load on Apollo/Client-API and creates risk as we expand internationally or scale pod count.

## How it works

```mermaid
flowchart TB
req["Firefox NewTab Request"]

subgraph L1 ["L1 — Per-Pod In-Memory SWR"]
check_l1{{"Check in-memory cache"}}
end

respond_fresh["Respond with fresh data"]
respond_stale["Respond with stale data"]

subgraph bg ["Background Revalidation Task"]
direction TB

subgraph L2 ["L2 — Shared Redis"]
check_l2{{"Check Redis cache"}}
acquire_lock{{"Try distributed lock"}}
end

api["Fetch from Corpus GraphQL API"]
write["Write to Redis + release lock + update L1"]
end

req --> check_l1

check_l1 -- "FRESH HIT" --> respond_fresh
check_l1 -- "STALE" --> respond_stale
check_l1 -. "MISS (cold start, blocks)" .-> check_l2

respond_stale -. "spawns task" .-> check_l2

check_l2 -- "FRESH HIT" --> done_l2["Update L1 cache"]
check_l2 -. "STALE" .-> acquire_lock
check_l2 -. "MISS" .-> acquire_lock

acquire_lock -- "LOCK ACQUIRED" --> api
acquire_lock -. "LOCK HELD + stale exists" .-> serve_stale["Return stale data"]
acquire_lock -. "LOCK HELD + no data" .-> retry["Wait, retry Redis, or raise"]

api --> write --> done_api["Update L1 cache"]

style req fill:#2c3e50,stroke:#1a252f,color:#ecf0f1,stroke-width:2px
style check_l1 fill:#2980b9,stroke:#1f6da0,color:#fff,stroke-width:2px
style check_l2 fill:#d35400,stroke:#a04000,color:#fff,stroke-width:2px
style acquire_lock fill:#e67e22,stroke:#bf6516,color:#fff,stroke-width:2px
style api fill:#1e8449,stroke:#145a32,color:#fff,stroke-width:2px
style write fill:#1e8449,stroke:#145a32,color:#fff,stroke-width:2px
style respond_fresh fill:#27ae60,stroke:#1e8449,color:#fff,stroke-width:2px
style respond_stale fill:#27ae60,stroke:#1e8449,color:#fff,stroke-width:2px
style serve_stale fill:#f4d03f,stroke:#d4ac0f,color:#333
style retry fill:#e74c3c,stroke:#c0392b,color:#fff
style done_l2 fill:#27ae60,stroke:#1e8449,color:#fff
style done_api fill:#27ae60,stroke:#1e8449,color:#fff
style L1 fill:#eaf2f8,stroke:#2980b9,stroke-width:2px,color:#2c3e50
style L2 fill:#fef5e7,stroke:#d35400,stroke-width:2px,color:#2c3e50
style bg fill:#f4f6f7,stroke:#95a5a6,stroke-width:2px,stroke-dasharray: 8 4,color:#2c3e50
```

Two layers of caching sit in front of the Corpus GraphQL API:

- **L1 (in-memory SWR)** — per-pod. Serves requests immediately. On stale, spawns a background task to revalidate.
- **L2 (Redis)** — shared across all pods. The background task checks Redis before hitting the API.

When L2 is stale, one pod acquires a distributed lock, fetches from the API, and writes to Redis. Other pods serve stale data until the winner finishes.

On cold start (no L1 or L2 data), the request blocks until data is fetched. All pods may hit the API simultaneously in this case — same as today without the cache.

## Configuration

Config section: `[default.curated_recommendations.corpus_cache]` in `merino/configs/default.toml`.

Key settings:
- `cache` — `"redis"` to enable, `"none"` to disable (default: disabled)
- `soft_ttl_sec` — when a cached entry is considered stale and triggers revalidation
- `hard_ttl_sec` — when Redis evicts the key entirely (safety net)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want this to be high: more like 1-24 hours? Days? To match our response time.

- `lock_ttl_sec` — auto-release timeout if the lock holder crashes
- `key_prefix` — bump the version on schema changes to avoid deserialization errors

Env var override pattern: `MERINO__CURATED_RECOMMENDATIONS__CORPUS_CACHE__CACHE=redis`

Uses the shared Redis cluster (`[default.redis]`). No separate instance needed.

## Design decisions

| Decision | Choice | Why |
|---|---|---|
| Cache layer | Redis L2 behind existing in-memory L1 | Keeps per-pod latency low, Redis only consulted on L1 miss |
| Write pattern | Distributed stale-while-revalidate | One pod revalidates, others serve stale. Avoids thundering herd |
| Lock mechanism | `SET NX EX` with TTL | Simple, self-expiring. Worst case on timeout: one extra API call |
| Cache format | Pydantic model dicts via orjson | Saves CPU across pods vs re-parsing raw GraphQL |
| Failure mode | All Redis errors fall through to API | Redis is an optimization, never a requirement |

## Rollout

1. Deploy with cache disabled (no behavior change)
2. Enable in staging
3. Monitor metrics, validate API call reduction
4. Enable in production

## Key files

- `merino/curated_recommendations/corpus_backends/redis_cache.py` — cache logic
- `merino/curated_recommendations/__init__.py` — wiring (`_init_corpus_cache`)
- `merino/configs/default.toml` — config section with defaults and documentation
6 changes: 6 additions & 0 deletions merino/cache/none.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ async def sismember(self, key: str, value: str) -> bool: # noqa: D102

async def scard(self, key: str) -> int: # noqa: D102
return 0

async def set_nx(self, key: str, ttl_sec: int) -> bool: # noqa: D102
return True

async def delete(self, key: str) -> None: # noqa: D102
pass
19 changes: 19 additions & 0 deletions merino/cache/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,22 @@ async def sismember(self, key: str, value: str) -> bool:
async def scard(self, key: str) -> int:
"""Get the number of members in a Redis set."""
...

async def set_nx(self, key: str, ttl_sec: int) -> bool: # pragma: no cover
"""Set the key only if it does not already exist, with a TTL in seconds.

Returns:
True if the key was set, False if it already existed.

Raises:
- `CacheAdapterError` for cache backend errors.
"""
...

async def delete(self, key: str) -> None: # pragma: no cover
"""Delete a key from the cache.

Raises:
- `CacheAdapterError` for cache backend errors.
"""
...
25 changes: 25 additions & 0 deletions merino/cache/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,31 @@ async def scard(self, key: str) -> int:
except RedisError as exc:
raise CacheAdapterError(f"Failed to SCARD {key} with error: {exc}") from exc

async def set_nx(self, key: str, ttl_sec: int) -> bool:
"""Set the key only if it does not exist, with a TTL in seconds.

Returns:
True if the key was set, False if it already existed.

Raises:
- `CacheAdapterError` if Redis returns an error.
"""
try:
return bool(await self.primary.set(key, b"1", nx=True, ex=ttl_sec))
except RedisError as exc:
raise CacheAdapterError(f"Failed to SETNX `{repr(key)}` with error: `{exc}`") from exc

async def delete(self, key: str) -> None:
"""Delete a key from Redis.

Raises:
- `CacheAdapterError` if Redis returns an error.
"""
try:
await self.primary.delete(key)
except RedisError as exc:
raise CacheAdapterError(f"Failed to DELETE `{repr(key)}` with error: `{exc}`") from exc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, check if this is a new pattern, and whether it should be one.

async def close(self) -> None:
"""Close the Redis connection."""
if self.primary is self.replica:
Expand Down
22 changes: 22 additions & 0 deletions merino/configs/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,28 @@ blob_name = "contextual_ts/cohort_model_v2.safetensors"
cron_interval_seconds = 600


[default.curated_recommendations.corpus_cache]
# MERINO__CURATED_RECOMMENDATIONS__CORPUS_CACHE__CACHE
# Shared corpus cache backend. "redis" enables Redis as an L2 cache
# between in-memory SWR (L1) and the Corpus GraphQL API. "none" disables it.
cache = "none"

# MERINO__CURATED_RECOMMENDATIONS__CORPUS_CACHE__SOFT_TTL_SEC
# Soft TTL in seconds. After this, one pod revalidates while others serve stale data.
soft_ttl_sec = 120

# MERINO__CURATED_RECOMMENDATIONS__CORPUS_CACHE__HARD_TTL_SEC
# Hard TTL in seconds. Redis evicts the key after this. Should be much longer than soft TTL.
hard_ttl_sec = 600
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hard_ttl_sec = 600
hard_ttl_sec = 3600

Long hard ttl?


# MERINO__CURATED_RECOMMENDATIONS__CORPUS_CACHE__LOCK_TTL_SEC
# Distributed lock TTL in seconds. Auto-releases if the lock holder crashes.
lock_ttl_sec = 30
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Check if there's a client network timeout of 30s?


# MERINO__CURATED_RECOMMENDATIONS__CORPUS_CACHE__KEY_PREFIX
# Prefix for all Redis keys. Bump the version on schema changes.
key_prefix = "curated:v1"


[default.curated_recommendations.corpus_api]
# MERINO__CURATED_RECOMMENDATIONS__CORPUS_API__RETRY_COUNT
Expand Down
67 changes: 65 additions & 2 deletions merino/curated_recommendations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
import logging
import random

from merino.cache.redis import RedisAdapter, create_redis_clients
from merino.configs import settings
from merino.curated_recommendations.corpus_backends.protocol import (
ScheduledSurfaceProtocol,
SectionsProtocol,
)
from merino.curated_recommendations.corpus_backends.redis_cache import (
CorpusCacheConfig,
RedisCachedScheduledSurface,
RedisCachedSections,
)
from merino.curated_recommendations.corpus_backends.scheduled_surface_backend import (
ScheduledSurfaceBackend,
CorpusApiGraphConfig,
Expand Down Expand Up @@ -169,6 +179,46 @@ def init_ml_cohort_model_backend() -> CohortModelBackend:
return EmptyCohortModel()


def _init_corpus_cache(
scheduled_surface_backend: ScheduledSurfaceProtocol,
sections_backend: SectionsProtocol,
) -> tuple[ScheduledSurfaceProtocol, SectionsProtocol, RedisAdapter | None]:
"""Optionally wrap corpus backends with a Redis L2 cache layer.

Returns the backends (possibly wrapped) and the Redis adapter (if created).
The caller owns the adapter and is responsible for closing it on shutdown.
"""
cache_settings = settings.curated_recommendations.corpus_cache
if cache_settings.cache != "redis":
return scheduled_surface_backend, sections_backend, None

try:
logger.info("Initializing Redis L2 cache for corpus backends")
adapter = RedisAdapter(
*create_redis_clients(
primary=settings.redis.server,
replica=settings.redis.replica,
max_connections=settings.redis.max_connections,
socket_connect_timeout=settings.redis.socket_connect_timeout_sec,
socket_timeout=settings.redis.socket_timeout_sec,
)
)
config = CorpusCacheConfig(
soft_ttl_sec=cache_settings.soft_ttl_sec,
hard_ttl_sec=cache_settings.hard_ttl_sec,
lock_ttl_sec=cache_settings.lock_ttl_sec,
key_prefix=cache_settings.key_prefix,
)
return (
RedisCachedScheduledSurface(scheduled_surface_backend, adapter, config),
RedisCachedSections(sections_backend, adapter, config),
adapter,
)
except Exception as e:
logger.error("Failed to initialize Redis corpus cache, proceeding without it: %s", e)
return scheduled_surface_backend, sections_backend, None


def init_provider() -> None:
"""Initialize the curated recommendations' provider."""
global _provider
Expand All @@ -179,20 +229,24 @@ def init_provider() -> None:
ml_recommendations_backend = init_ml_recommendations_backend()
cohort_model_backend = init_ml_cohort_model_backend()

scheduled_surface_backend = ScheduledSurfaceBackend(
scheduled_surface_backend: ScheduledSurfaceProtocol = ScheduledSurfaceBackend(
http_client=create_http_client(base_url=""),
graph_config=CorpusApiGraphConfig(),
metrics_client=get_metrics_client(),
manifest_provider=get_manifest_provider(),
)

sections_backend = SectionsBackend(
sections_backend: SectionsProtocol = SectionsBackend(
http_client=create_http_client(base_url=""),
graph_config=CorpusApiGraphConfig(),
metrics_client=get_metrics_client(),
manifest_provider=get_manifest_provider(),
)

scheduled_surface_backend, sections_backend, cache_adapter = _init_corpus_cache(
scheduled_surface_backend, sections_backend
)

_provider = CuratedRecommendationsProvider(
scheduled_surface_backend=scheduled_surface_backend,
engagement_backend=engagement_backend,
Expand All @@ -201,6 +255,7 @@ def init_provider() -> None:
local_model_backend=local_model_backend,
ml_recommendations_backend=ml_recommendations_backend,
cohort_model_backend=cohort_model_backend,
cache_adapter=cache_adapter,
)
_legacy_provider = LegacyCuratedRecommendationsProvider()

Expand All @@ -215,3 +270,11 @@ def get_legacy_provider() -> LegacyCuratedRecommendationsProvider:
"""Return the legacy curated recommendations provider"""
global _legacy_provider
return _legacy_provider


async def shutdown() -> None:
"""Clean up resources used by curated recommendations."""
try:
await _provider.shutdown()
except NameError:
pass
Loading
Loading