Skip to content

Commit ea32837

Browse files
authored
fix(bigquery): replace unbounded Flux.create BUFFER with Flux.fromStream in storeResult (#605)
Fixes #604: OOM on parallel Query tasks with fetchType=STORE.
1 parent 8eebb99 commit ea32837

1 file changed

Lines changed: 4 additions & 16 deletions

File tree

  • src/main/java/io/kestra/plugin/gcp/bigquery

src/main/java/io/kestra/plugin/gcp/bigquery/Query.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import lombok.*;
4747
import lombok.experimental.SuperBuilder;
4848
import reactor.core.publisher.Flux;
49-
import reactor.core.publisher.FluxSink;
5049
import reactor.core.publisher.Mono;
5150

5251
@SuperBuilder
@@ -661,21 +660,10 @@ private Map.Entry<URI, Long> storeResult(TableResult result, RunContext runConte
661660
try (
662661
var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)
663662
) {
664-
Flux<Object> flowable = Flux
665-
.create(
666-
s ->
667-
{
668-
StreamSupport
669-
.stream(result.iterateAll().spliterator(), false)
670-
.forEach(fieldValues ->
671-
{
672-
s.next(this.convertRows(result, fieldValues));
673-
});
674-
675-
s.complete();
676-
},
677-
FluxSink.OverflowStrategy.BUFFER
678-
);
663+
Flux<Object> flowable = Flux.fromStream(
664+
StreamSupport.stream(result.iterateAll().spliterator(), false)
665+
.map(fieldValues -> (Object) this.convertRows(result, fieldValues))
666+
);
679667
Mono<Long> longMono = FileSerde.writeAll(output, flowable);
680668

681669
// metrics & finalize

0 commit comments

Comments
 (0)