bulkload-db: snapshot source once instead of per-chunk LIMIT/OFFSET#22
Merged
Conversation
Fixes #19, #20, #21. The prior implementation paginated the source by issuing a fresh `SELECT * FROM <table> [ORDER BY ...] LIMIT N OFFSET M` from each parallel worker. Three problems followed from that one choice: * Without ORDER BY, separate connections were free to return rows in different orders, so chunks could overlap (rows overwritten by id in Solr) and miss rows entirely. Loads silently dropped ~20% of input while reporting success. * With ORDER BY, every worker re-sorted the entire source. On multi-GB inputs the simultaneous sorts exhausted memory; failed chunks contributed zero with no propagated error. * On a DuckDB view, every chunk re-evaluated the full view. Replace the offset paradigm with: materialize the (filtered/ordered) source once into a tempfile DuckDB via ATTACH ... READ_ONLY, stream rows through a single read cursor with fetchmany, and fan out only the Solr POSTs across a thread pool with backpressure. One pass, bounded memory, every row uploaded exactly once. Defaults: parallel-workers drops from `cpu * 1.5 (max 12)` to 4 — benchmarks on the Monarch KG (1.46M nodes, 500k joined edges) show ~2x going from 1→4 workers and a hard plateau after that, because Solr indexing becomes the bottleneck.
GitHub now hard-fails workflows that use actions/cache@v2, which the previous pr-test workflow did — so this PR's tests couldn't run at all. Bump checkout@v4, setup-python@v5, cache@v4, install-poetry@v1, and move the matrix off EOL 3.8/3.9 onto 3.10/3.11. Includes the Python version in the venv cache key so the two matrix entries don't collide.
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #19, #20, #21.
Summary
The prior
bulkload_duckdbpaginated the source by issuing a freshSELECT * FROM <table> [ORDER BY ...] LIMIT N OFFSET Mfrom each parallel worker, each on its own connection. That one design choice caused three separate problems:ORDER BY, different connections were free to return rows in different orders, so chunks could overlap (rows overwritten by id in Solr) and miss rows entirely. Loads silently dropped ~20% of input while reporting success.ORDER BY, every worker re-sorted the entire source. On multi-GB inputs the 12-way simultaneous sorts exhausted memory; failed chunks contributed zero with no propagated error.Change
Replace the offset paradigm with a snapshot-and-stream model in
bulkload_duckdb:CREATE TABLE snapshot AS SELECT … FROM src.<table> [WHERE …] [ORDER BY …]— one materialization.fetchmany(chunk_size)).ThreadPoolExecutorwith backpressure (at most2 × workerschunks in flight). DuckDB reads stay serial in one connection — the OOM and view-recomputation pathologies can't happen.Each row in the snapshot is yielded exactly once by the cursor, so the #19 row-loss is gone even without an explicit
ORDER BY.--order-byis now optional and only useful for run-to-run reproducibility.table_namemay also be a parenthesized subquery (e.g.'(SELECT ... FROM src.edges JOIN src.nodes ...) v') so callers can drive joins through the snapshot without first defining a view in the source DB.query_duckdb_chunkandupload_duckdb_chunk(the old offset workers) are removed — they only existed to serve the broken pattern.Default workers: 12 → 4
get_optimal_worker_countused to scale tocpu * 1.5capped at 12. Benchmarks (below) show parallelism past 4 has no benefit because Solr indexing dominates. New default: 4. Override with--parallel-workers.Benchmarks (M-series Mac, Solr 8 in Docker, separate
lsolr_bench_solron :8984)Source: monarch-kg.duckdb (~7.7 GB) from monarch-ingest-graph-schema-verify.
Correctness — synthetic view (LEFT JOIN edges → nodes ×2 → closure_label ×2, LIMIT 500 000)
Same workload, same data — reproduces #19 (silent ~40% loss with parallel + no
ORDER BYon a view) and shows the fix.Throughput —
nodestable (1 457 305 rows), NEW code~2× from 1→4 workers, then plateau — Solr indexing is the bottleneck. Hence the new default of 4.
Smoke —
denormalized_edgestable (14 988 609 rows), OLD code w=12~3 161 s, 4 742 docs/s, full count. (The OLD
nodesrun at default workers also happened to land the full count this run; #19 is timing-dependent, but the view test above demonstrates the failure mode reliably.)A heavier ordered LEFT JOIN view (the shape #21 describes) couldn't complete a single OLD chunk in 5 minutes locally, consistent with the report there.
Test plan
make test(unit suite, unaffected — onlybulkload_fileis exercised)--order-bystill works and produces a stable snapshot