Skip to content

feat(scheduling): reuse output storage across region re-executions#5707

Open
aglinxinyuan wants to merge 4 commits into
apache:mainfrom
aglinxinyuan:loop-reuse-output-storage
Open

feat(scheduling): reuse output storage across region re-executions#5707
aglinxinyuan wants to merge 4 commits into
apache:mainfrom
aglinxinyuan:loop-reuse-output-storage

Conversation

@aglinxinyuan

@aglinxinyuan aglinxinyuan commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Adds a mechanism for an operator to reuse its output storage when its region re-executes, instead of having the documents recreated each time:

  • PhysicalOp gains a reusesOutputStorageOnReExecution: Boolean = false field + a withReusesOutputStorageOnReExecution builder.
  • RegionExecutionCoordinator gains a pure provisionOutputDocument(uri, reuseExistingStorage, documentExists, createDocument) decision function, used per output port to decide create-vs-reuse based on the owning operator's flag.
  • New RegionOutputProvisioningSpec unit-tests the decision function (the reuse×exists matrix plus the "no-reuse never probes existence" short-circuit).
operator flag region re-run behavior
false (every operator today) recreate output/state documents — unchanged
true (set by Loop End in the loop PR) reuse existing documents

Named for the behavior the scheduler checks, not the operator that sets it, so any future operator needing output accumulated across region re-executions can opt in. Dormant and behavior-preserving — no operator sets the flag in this PR.

Any related issues, documentation, discussions?

Resolves #5709 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's review.

How was this PR tested?

sbt "WorkflowExecutionService/testOnly *RegionOutputProvisioningSpec" — 5 passing; WorkflowExecutionService/Test/compile clean; scalafmt clean on the changed projects (WorkflowCore, WorkflowExecutionService).

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.

Add a `reusesOutputStorageOnReExecution` flag to `PhysicalOp` (default false)
plus a `withReusesOutputStorageOnReExecution` builder. When set,
`RegionExecutionCoordinator` reuses an operator's existing iceberg output and
state documents on a region re-run instead of recreating them, via a new pure
`provisionOutputDocument` decision function unit-tested by
`RegionOutputProvisioningSpec`.

The flag is named for the behavior the scheduler checks, not the operator that
sets it, so any future operator needing output accumulated across region
re-executions can opt in. No operator sets it yet, so the path is dormant and
behavior-preserving.

Split out of the loop-operators PR (apache#5700) to keep that PR
reviewable; the loop feature will set the flag on Loop End.
@codecov-commenter

codecov-commenter commented Jun 14, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 52.54%. Comparing base (b8859b4) to head (def4ce8).

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #5707      +/-   ##
============================================
- Coverage     52.86%   52.54%   -0.32%     
+ Complexity     2621     2616       -5     
============================================
  Files          1090     1087       -3     
  Lines         42184    41867     -317     
  Branches       4530     4481      -49     
============================================
- Hits          22301    22001     -300     
- Misses        18576    18604      +28     
+ Partials       1307     1262      -45     
Flag Coverage Δ *Carryforward flag
access-control-service 70.91% <ø> (ø)
agent-service 34.36% <ø> (ø) Carriedforward from 397d275
amber 52.61% <100.00%> (-0.36%) ⬇️
computing-unit-managing-service 1.65% <ø> (ø)
config-service 56.71% <ø> (ø)
file-service 57.06% <ø> (ø)
frontend 47.36% <ø> (-0.51%) ⬇️ Carriedforward from 397d275
pyamber 90.70% <ø> (-0.04%) ⬇️ Carriedforward from 397d275
python 90.73% <ø> (ø) Carriedforward from 397d275
workflow-compiling-service 58.69% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an opt-in mechanism for operators to preserve their output/state storage across region re-executions (e.g., loop iterations), by adding a flag on PhysicalOp and updating the region scheduler’s output-document provisioning logic to conditionally reuse existing documents. It also adds focused unit tests for the new create-vs-reuse decision function.

Changes:

  • Add reusesOutputStorageOnReExecution: Boolean and a withReusesOutputStorageOnReExecution builder to PhysicalOp.
  • Extract output document provisioning decision into RegionExecutionCoordinator.provisionOutputDocument and use it when provisioning per-output-port result/state documents.
  • Add RegionOutputProvisioningSpec unit tests covering the reuse×exists matrix and the non-reuse short-circuit.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala Adds a new operator-level flag + builder to signal output-storage reuse across region re-executions.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala Adds a pure provisioning decision function and uses it to create-or-reuse result/state documents per output port based on the owning operator’s flag.
amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionOutputProvisioningSpec.scala Introduces unit tests validating the provisioning decision logic without needing an Iceberg backend.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions

github-actions Bot commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 0 better · 🔴 5 worse · ⚪ 10 noise (<±5%) · 0 without baseline

Compared against main b8859b4 benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 403 0.246 24,589/34,888/34,888 us 🔴 +9.6% / ⚪ within ±5%
bs=100 sw=10 sl=64 828 0.506 121,351/138,921/138,921 us ⚪ within ±5% / 🔴 +8.1%
bs=1000 sw=10 sl=64 926 0.565 1,074,723/1,121,818/1,121,818 us ⚪ within ±5% / 🔴 -11.1%
Baseline details

Latest main b8859b4 from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 403 tuples/sec 429 tuples/sec 410.82 tuples/sec -6.1% -1.9%
bs=10 sw=10 sl=64 MB/s 0.246 MB/s 0.262 MB/s 0.251 MB/s -6.1% -1.9%
bs=10 sw=10 sl=64 p50 24,589 us 23,165 us 23,785 us +6.1% +3.4%
bs=10 sw=10 sl=64 p95 34,888 us 31,840 us 34,980 us +9.6% -0.3%
bs=10 sw=10 sl=64 p99 34,888 us 31,840 us 34,980 us +9.6% -0.3%
bs=100 sw=10 sl=64 throughput 828 tuples/sec 820 tuples/sec 891.94 tuples/sec +1.0% -7.2%
bs=100 sw=10 sl=64 MB/s 0.506 MB/s 0.501 MB/s 0.544 MB/s +1.0% -7.1%
bs=100 sw=10 sl=64 p50 121,351 us 119,738 us 112,277 us +1.3% +8.1%
bs=100 sw=10 sl=64 p95 138,921 us 143,895 us 139,802 us -3.5% -0.6%
bs=100 sw=10 sl=64 p99 138,921 us 143,895 us 139,802 us -3.5% -0.6%
bs=1000 sw=10 sl=64 throughput 926 tuples/sec 923 tuples/sec 1,041 tuples/sec +0.3% -11.0%
bs=1000 sw=10 sl=64 MB/s 0.565 MB/s 0.563 MB/s 0.635 MB/s +0.4% -11.1%
bs=1000 sw=10 sl=64 p50 1,074,723 us 1,081,143 us 972,714 us -0.6% +10.5%
bs=1000 sw=10 sl=64 p95 1,121,818 us 1,140,179 us 1,023,057 us -1.6% +9.7%
bs=1000 sw=10 sl=64 p99 1,121,818 us 1,140,179 us 1,023,057 us -1.6% +9.7%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,496.08,200,128000,403,0.246,24589.00,34888.46,34888.46
1,100,10,64,20,2414.64,2000,1280000,828,0.506,121351.03,138921.47,138921.47
2,1000,10,64,20,21606.84,20000,12800000,926,0.565,1074723.03,1121817.80,1121817.80

Add a PhysicalOp builder test alongside the existing withParallelizable case,
exercising the previously-uncovered `this.copy(...)` line that Codecov flagged
on apache#5707 (patch coverage 85.71%, 1 missing line). Asserts the
default false, the flipped value, and immutability of the original instance.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Reuse an operator's output storage across region re-executions

3 participants