Skip to content

fix(bigquery): replace unbounded Flux BUFFER with Flux.fromStream in Query.storeResult#605

Merged
fdelbrayelle merged 1 commit intomainfrom
fix/bigquery-store-oom-flux-buffer
Apr 28, 2026
Merged

fix(bigquery): replace unbounded Flux BUFFER with Flux.fromStream in Query.storeResult#605
fdelbrayelle merged 1 commit intomainfrom
fix/bigquery-store-oom-flux-buffer

Conversation

@fdelbrayelle
Copy link
Copy Markdown
Member

Fix #604

Root cause

Query.storeResult used Flux.create(..., FluxSink.OverflowStrategy.BUFFER) to drive FileSerde.writeAll:

Flux<Object> flowable = Flux.create(
    s -> {
        StreamSupport.stream(result.iterateAll().spliterator(), false)
            .forEach(fv -> s.next(this.convertRows(result, fv)));
        s.complete();
    },
    FluxSink.OverflowStrategy.BUFFER
);

Flux.create is push-based: the producer (forEach) runs synchronously to completion before the subscriber drains, pumping every converted row into Reactor's internal unbounded queue. iterateAll() pages lazily from BigQuery, but that backpressure is invisible to Reactor here — every fetched row gets materialised as a LinkedHashMap and queued in JVM heap before any byte hits disk.

Customer had two parallel tasks:

  • 19,681 rows × 12 cols (some REPEATED STRING) → 75.5 MiB ION → ~4 KB/row in memory
  • 3,324 rows × 10 cols (same) → 60.1 MiB ION → ~18 KB/row in memory

With N parallel executions, heap spiked to ~2.5 GiB (visible in Grafana) and triggered an OOM kill at ~01:00 CEST on 2026-04-27.

Fix

Replace Flux.create(..., BUFFER) with Flux.fromStream, which is pull-based (synchronous-fuseable source). Reactor's writeAll chain pulls one row at a time via doOnNext, so only one LinkedHashMap is live during each write cycle:

Flux<Object> flowable = Flux.fromStream(
    StreamSupport.stream(result.iterateAll().spliterator(), false)
        .map(fieldValues -> (Object) this.convertRows(result, fieldValues))
);
Mono<Long> longMono = FileSerde.writeAll(output, flowable);

This matches the pattern already used in firestore/Query.java (Flux.fromIterable + FileSerde.writeAll).

The FluxSink import is removed as it is no longer referenced.

Out of scope

fetchResult (used by FETCH / FETCH_ONE) also collects to a List — same memory class of issue, separate follow-up.

…eam in storeResult

Fixes #604: OOM on parallel Query tasks with fetchType=STORE.
@fdelbrayelle fdelbrayelle self-assigned this Apr 28, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 28, 2026

📦 Artifacts

Name Size Updated Expiration
jar 108.49 MB Apr 28, 26, 1:39:22 PM UTC May 5, 26, 1:39:17 PM UTC

🛡 Trivy

Vulnerability in: Java

Vulnerability Severity Package Installed Version Fixed Version
GHSA-72hv-8253-57qq MEDIUM com.fasterxml.jackson.core:jackson-core 2.20.1 2.21.1, 2.18.6

🧪 Java Unit Tests

TestsPassed ✅Skipped ⚠️FailedTime ⏱
Java Tests Report117 ran107 ✅10 ⚠️0 ❌7m 14s 803ms

🔁 Unreleased Commits

3 commits since v2.2.0

SHA Title Author Date
4fa4ff3 docs: add/update AGENTS.md with clear Why/What François Delbrayelle Apr 18, 26, 7:07:31 AM UTC
044d7b4 docs: rewrite AGENTS.md Why section François Delbrayelle Apr 19, 26, 7:46:28 AM UTC
8eebb99 docs: normalize README with Why and What François Delbrayelle Apr 19, 26, 8:06:11 AM UTC

@fdelbrayelle fdelbrayelle merged commit ea32837 into main Apr 28, 2026
7 checks passed
@fdelbrayelle fdelbrayelle deleted the fix/bigquery-store-oom-flux-buffer branch April 28, 2026 14:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Investigate memory usage based on the size of Query

2 participants