Skip to content

Commit 0c16b7e

Browse files
committed
Merge branch 'dev-1.21.0' into dev-1.21.0-hadoop3
2 parents b269d69 + b244e73 commit 0c16b7e

File tree

1 file changed

+24
-15
lines changed
  • dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/resolver

1 file changed

+24
-15
lines changed

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/resolver/FlowDependencyResolverImpl.scala

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class FlowDependencyResolverImpl extends FlowDependencyResolver with Logging {
4444
.map(_.toString)
4545
.orNull
4646

47+
val isSelectedExecute = ExecuteStrategyEnum.IS_SELECTED_EXECUTE.getValue.equalsIgnoreCase(executeStrategy)
4748
def incomingEdges(node: WorkflowNode) = workflowEdges.filter(_.getTarget == node.getId)
4849

4950
def isAllParentDependencyCompleted(parents: util.List[String]): Boolean = {
@@ -60,30 +61,38 @@ class FlowDependencyResolverImpl extends FlowDependencyResolver with Logging {
6061
}
6162

6263
def shouldSkipByBranch(node: WorkflowNode): Boolean = {
63-
incomingEdges(node).exists { edge =>
64-
workflowNodesById.get(edge.getSource).exists { sourceNode =>
65-
BranchExpressionUtils.isBranchNode(sourceNode) &&
66-
flowContext.isNodeCompleted(sourceNode.getName) &&
67-
(flowContext.isNodeSkipped(sourceNode.getName) ||
68-
!flowJob.hasBranchSelection(sourceNode.getId) ||
69-
!flowJob.isBranchTargetSelected(sourceNode.getId, node.getId))
64+
if (isSelectedExecute) {
65+
false
66+
} else {
67+
incomingEdges(node).exists { edge =>
68+
workflowNodesById.get(edge.getSource).exists { sourceNode =>
69+
BranchExpressionUtils.isBranchNode(sourceNode) &&
70+
flowContext.isNodeCompleted(sourceNode.getName) &&
71+
(flowContext.isNodeSkipped(sourceNode.getName) ||
72+
!flowJob.hasBranchSelection(sourceNode.getId) ||
73+
!flowJob.isBranchTargetSelected(sourceNode.getId, node.getId))
74+
}
7075
}
7176
}
7277
}
7378

7479
def shouldSkip(node: WorkflowNode): Boolean = {
7580
shouldSkipByBranch(node) ||
76-
(!ExecuteStrategyEnum.IS_SELECTED_EXECUTE.getValue.equalsIgnoreCase(executeStrategy) && areAllParentsSkipped(node))
81+
(!isSelectedExecute && areAllParentsSkipped(node))
7782
}
7883

7984
def isBranchRouteMatched(node: WorkflowNode): Boolean = {
80-
incomingEdges(node).forall { edge =>
81-
workflowNodesById.get(edge.getSource) match {
82-
case Some(sourceNode) if BranchExpressionUtils.isBranchNode(sourceNode) =>
83-
flowContext.isNodeSucceed(sourceNode.getName) &&
84-
flowJob.hasBranchSelection(sourceNode.getId) &&
85-
flowJob.isBranchTargetSelected(sourceNode.getId, node.getId)
86-
case _ => true
85+
if (isSelectedExecute) {
86+
true
87+
} else {
88+
incomingEdges(node).forall { edge =>
89+
workflowNodesById.get(edge.getSource) match {
90+
case Some(sourceNode) if BranchExpressionUtils.isBranchNode(sourceNode) =>
91+
flowContext.isNodeSucceed(sourceNode.getName) &&
92+
flowJob.hasBranchSelection(sourceNode.getId) &&
93+
flowJob.isBranchTargetSelected(sourceNode.getId, node.getId)
94+
case _ => true
95+
}
8796
}
8897
}
8998
}

0 commit comments

Comments
 (0)