-
-
Notifications
You must be signed in to change notification settings - Fork 52
Open
Description
Problem
When indexing a large number of datasets (e.g., thousands of individual sanctions lists), the current sequential indexing approach becomes a significant bottleneck. Each dataset is processed one at a time due to the threading.Lock() in indexer.py:
for dataset in catalog.datasets:
with lock:
await index_entities(provider, dataset, force=force, lock_session=lock_session)For users with many separate datasets, indexing can take a long time even with a well-provisioned Elasticsearch cluster that sits largely idle waiting for Yente to feed it data.
Proposed Solution
Add a YENTE_INDEX_CONCURRENCY environment variable (default: 1 for backward compatibility) that controls how many datasets can be indexed in parallel:
# settings.py
INDEX_CONCURRENCY: int = int(env.get("YENTE_INDEX_CONCURRENCY", "1"))# indexer.py
semaphore = asyncio.Semaphore(settings.INDEX_CONCURRENCY)
async def index_with_concurrency(dataset):
async with semaphore:
await index_entities(provider, dataset, force=force, lock_session=lock_session)
await asyncio.gather(*[
index_with_concurrency(dataset)
for dataset in catalog.datasets
])Benefits
- Backward compatible - default of 1 preserves current behavior
- User configurable - operators can tune based on their ES cluster capacity
- Significant speedup - estimated 3-4x improvement with concurrency of 4
- Better resource utilization - ES clusters can handle parallel bulk writes
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels