feat(scheduling): reuse output storage across region re-executions#5707
feat(scheduling): reuse output storage across region re-executions#5707aglinxinyuan wants to merge 4 commits into
Conversation
Add a `reusesOutputStorageOnReExecution` flag to `PhysicalOp` (default false) plus a `withReusesOutputStorageOnReExecution` builder. When set, `RegionExecutionCoordinator` reuses an operator's existing iceberg output and state documents on a region re-run instead of recreating them, via a new pure `provisionOutputDocument` decision function unit-tested by `RegionOutputProvisioningSpec`. The flag is named for the behavior the scheduler checks, not the operator that sets it, so any future operator needing output accumulated across region re-executions can opt in. No operator sets it yet, so the path is dormant and behavior-preserving. Split out of the loop-operators PR (apache#5700) to keep that PR reviewable; the loop feature will set the flag on Loop End.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5707 +/- ##
============================================
- Coverage 52.86% 52.54% -0.32%
+ Complexity 2621 2616 -5
============================================
Files 1090 1087 -3
Lines 42184 41867 -317
Branches 4530 4481 -49
============================================
- Hits 22301 22001 -300
- Misses 18576 18604 +28
+ Partials 1307 1262 -45
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces an opt-in mechanism for operators to preserve their output/state storage across region re-executions (e.g., loop iterations), by adding a flag on PhysicalOp and updating the region scheduler’s output-document provisioning logic to conditionally reuse existing documents. It also adds focused unit tests for the new create-vs-reuse decision function.
Changes:
- Add
reusesOutputStorageOnReExecution: Booleanand awithReusesOutputStorageOnReExecutionbuilder toPhysicalOp. - Extract output document provisioning decision into
RegionExecutionCoordinator.provisionOutputDocumentand use it when provisioning per-output-port result/state documents. - Add
RegionOutputProvisioningSpecunit tests covering the reuse×exists matrix and the non-reuse short-circuit.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala | Adds a new operator-level flag + builder to signal output-storage reuse across region re-executions. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala | Adds a pure provisioning decision function and uses it to create-or-reuse result/state documents per output port based on the owning operator’s flag. |
| amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala | Introduces unit tests validating the provisioning decision logic without needing an Iceberg backend. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 403 | 0.246 | 24,589/34,888/34,888 us | 🔴 +9.6% / ⚪ within ±5% |
| ⚪ | bs=100 sw=10 sl=64 | 828 | 0.506 | 121,351/138,921/138,921 us | ⚪ within ±5% / 🔴 +8.1% |
| ⚪ | bs=1000 sw=10 sl=64 | 926 | 0.565 | 1,074,723/1,121,818/1,121,818 us | ⚪ within ±5% / 🔴 -11.1% |
Baseline details
Latest main b8859b4 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 403 tuples/sec | 429 tuples/sec | 410.82 tuples/sec | -6.1% | -1.9% |
| bs=10 sw=10 sl=64 | MB/s | 0.246 MB/s | 0.262 MB/s | 0.251 MB/s | -6.1% | -1.9% |
| bs=10 sw=10 sl=64 | p50 | 24,589 us | 23,165 us | 23,785 us | +6.1% | +3.4% |
| bs=10 sw=10 sl=64 | p95 | 34,888 us | 31,840 us | 34,980 us | +9.6% | -0.3% |
| bs=10 sw=10 sl=64 | p99 | 34,888 us | 31,840 us | 34,980 us | +9.6% | -0.3% |
| bs=100 sw=10 sl=64 | throughput | 828 tuples/sec | 820 tuples/sec | 891.94 tuples/sec | +1.0% | -7.2% |
| bs=100 sw=10 sl=64 | MB/s | 0.506 MB/s | 0.501 MB/s | 0.544 MB/s | +1.0% | -7.1% |
| bs=100 sw=10 sl=64 | p50 | 121,351 us | 119,738 us | 112,277 us | +1.3% | +8.1% |
| bs=100 sw=10 sl=64 | p95 | 138,921 us | 143,895 us | 139,802 us | -3.5% | -0.6% |
| bs=100 sw=10 sl=64 | p99 | 138,921 us | 143,895 us | 139,802 us | -3.5% | -0.6% |
| bs=1000 sw=10 sl=64 | throughput | 926 tuples/sec | 923 tuples/sec | 1,041 tuples/sec | +0.3% | -11.0% |
| bs=1000 sw=10 sl=64 | MB/s | 0.565 MB/s | 0.563 MB/s | 0.635 MB/s | +0.4% | -11.1% |
| bs=1000 sw=10 sl=64 | p50 | 1,074,723 us | 1,081,143 us | 972,714 us | -0.6% | +10.5% |
| bs=1000 sw=10 sl=64 | p95 | 1,121,818 us | 1,140,179 us | 1,023,057 us | -1.6% | +9.7% |
| bs=1000 sw=10 sl=64 | p99 | 1,121,818 us | 1,140,179 us | 1,023,057 us | -1.6% | +9.7% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,496.08,200,128000,403,0.246,24589.00,34888.46,34888.46
1,100,10,64,20,2414.64,2000,1280000,828,0.506,121351.03,138921.47,138921.47
2,1000,10,64,20,21606.84,20000,12800000,926,0.565,1074723.03,1121817.80,1121817.80Add a PhysicalOp builder test alongside the existing withParallelizable case, exercising the previously-uncovered `this.copy(...)` line that Codecov flagged on apache#5707 (patch coverage 85.71%, 1 missing line). Asserts the default false, the flipped value, and immutability of the original instance.
What changes were proposed in this PR?
Adds a mechanism for an operator to reuse its output storage when its region re-executes, instead of having the documents recreated each time:
PhysicalOpgains areusesOutputStorageOnReExecution: Boolean = falsefield + awithReusesOutputStorageOnReExecutionbuilder.RegionExecutionCoordinatorgains a pureprovisionOutputDocument(uri, reuseExistingStorage, documentExists, createDocument)decision function, used per output port to decide create-vs-reuse based on the owning operator's flag.RegionOutputProvisioningSpecunit-tests the decision function (the reuse×exists matrix plus the "no-reuse never probes existence" short-circuit).false(every operator today)true(set by Loop End in the loop PR)Named for the behavior the scheduler checks, not the operator that sets it, so any future operator needing output accumulated across region re-executions can opt in. Dormant and behavior-preserving — no operator sets the flag in this PR.
Any related issues, documentation, discussions?
Resolves #5709 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's review.
How was this PR tested?
sbt "WorkflowExecutionService/testOnly *RegionOutputProvisioningSpec"— 5 passing;WorkflowExecutionService/Test/compileclean; scalafmt clean on the changed projects (WorkflowCore,WorkflowExecutionService).Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.