Skip to content

Fix: Correctly set 'child data dir/output dir' in branch handler's Abort and Finalize#7117

Open
fg91 wants to merge 1 commit intomasterfrom
fg91/fix/branch-node-setup-branch-taken-paths
Open

Fix: Correctly set 'child data dir/output dir' in branch handler's Abort and Finalize#7117
fg91 wants to merge 1 commit intomasterfrom
fg91/fix/branch-node-setup-branch-taken-paths

Conversation

@fg91
Copy link
Copy Markdown
Member

@fg91 fg91 commented Mar 30, 2026

Why are the changes needed?

from flytekit import conditional, dynamic, task, workflow
from flytekit.core.task import Echo

echo = Echo(name="echo_str", inputs={"a": str})

@task
def test_task() -> str:
    import time
    time.sleep(600)  # Give time for aborting the execution while this task is running
    return "foo"


@dynamic
def dyn() -> str:
    return test_task()


@workflow
def wf(a: str = ""):
    bar = (
        conditional("foo-conditional")
        .if_(a == "")
        .then(dyn())
        .else_()
        .then(echo(a=a))
    )

When aborting the execution while the task called test_task within the @dynamic within the conditional is running, flytepropeller cannot find the futures.pb containing the dynamic workflow spec:

Failed to propagate Abort for workflow. Error: 0: 0: 0: [system] unable to read futures file, maybe corrupted, caused by: [system] Failed to read futures protobuf file., caused by: path: .../futures.pb: not found

Flytepropeller tries to abort the workflows until the system retry limit is exhausted and then stops trying. The resources are not cleaned up.

What changes were proposed in this pull request?

The reason this is happening is that the recurseDownstream method of branchHandler modifies the so-called "child data dir" and "child output dir" depending which of the branch of the conditional is taken. This influences where the futures.pb file is written. The Abort and Finalize methods don't modify these dirs, causing the dynamic handler to look for the futures.pb under the wrong path when aborting the workflow.

To avoid this, we need to modify the "child data dir" and "child output dir" in the Abort and Finalize methods as well.

How was this patch tested?

  • Extended/added unit tests
  • Tested that the minimal example workflow above passes with the fixed logic

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 30, 2026

Codecov Report

❌ Patch coverage is 50.00000% with 8 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.96%. Comparing base (74618dd) to head (f6ebe2c).

Files with missing lines Patch % Lines
...tepropeller/pkg/controller/nodes/branch/handler.go 50.00% 4 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #7117      +/-   ##
==========================================
+ Coverage   56.94%   56.96%   +0.02%     
==========================================
  Files         931      931              
  Lines       58188    58197       +9     
==========================================
+ Hits        33135    33152      +17     
+ Misses      22011    21992      -19     
- Partials     3042     3053      +11     
Flag Coverage Δ
unittests-datacatalog 53.51% <ø> (ø)
unittests-flyteadmin 53.14% <ø> (+0.03%) ⬆️
unittests-flytecopilot 43.06% <ø> (ø)
unittests-flytectl 64.02% <ø> (ø)
unittests-flyteidl 75.71% <ø> (ø)
unittests-flyteplugins 60.15% <ø> (ø)
unittests-flytepropeller 53.68% <50.00%> (+0.03%) ⬆️
unittests-flytestdlib 63.02% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@fg91 fg91 added the fixed For any bug fixes label Mar 30, 2026
// status using the branch node's OutputDir as the base, and returns the computed OutputDir.
// This must be called before recursing into the branch-taken node so that any nested handler
// (e.g. a dynamic node) finds the correct paths.
func (b *branchHandler) setupBranchTakenNodePaths(ctx context.Context, nCtx interfaces.NodeExecutionContext, branchTakenNode v1alpha1.ExecutableNode) (storage.DataReference, error) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved here from recurseDownstream.

@fg91 fg91 requested review from Sovietaced and pingsutw March 30, 2026 16:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

fixed For any bug fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant