Skip to content

Feature Request: Add YENTE_INDEX_CONCURRENCY setting for parallel dataset indexing #989

@CodeUltimate

Description

@CodeUltimate

Problem

When indexing a large number of datasets (e.g., thousands of individual sanctions lists), the current sequential indexing approach becomes a significant bottleneck. Each dataset is processed one at a time due to the threading.Lock() in indexer.py:

for dataset in catalog.datasets:
    with lock:
        await index_entities(provider, dataset, force=force, lock_session=lock_session)

For users with many separate datasets, indexing can take a long time even with a well-provisioned Elasticsearch cluster that sits largely idle waiting for Yente to feed it data.

Proposed Solution

Add a YENTE_INDEX_CONCURRENCY environment variable (default: 1 for backward compatibility) that controls how many datasets can be indexed in parallel:

# settings.py
INDEX_CONCURRENCY: int = int(env.get("YENTE_INDEX_CONCURRENCY", "1"))
# indexer.py
semaphore = asyncio.Semaphore(settings.INDEX_CONCURRENCY)

async def index_with_concurrency(dataset):
    async with semaphore:
        await index_entities(provider, dataset, force=force, lock_session=lock_session)

await asyncio.gather(*[
    index_with_concurrency(dataset) 
    for dataset in catalog.datasets
])

Benefits

  • Backward compatible - default of 1 preserves current behavior
  • User configurable - operators can tune based on their ES cluster capacity
  • Significant speedup - estimated 3-4x improvement with concurrency of 4
  • Better resource utilization - ES clusters can handle parallel bulk writes

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions