Skip to content

Commit 9d13d4b

Browse files
javier-aliagasalaboy
authored andcommitted
feat: Add workflow versioning (dapr#1624)
Signed-off-by: Javier Aliaga <[email protected]> Signed-off-by: salaboy <[email protected]>
1 parent da76b19 commit 9d13d4b

37 files changed

+1355
-59
lines changed

durabletask-client/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@
5959
<groupId>com.fasterxml.jackson.datatype</groupId>
6060
<artifactId>jackson-datatype-jsr310</artifactId>
6161
</dependency>
62+
<dependency>
63+
<groupId>org.apache.commons</groupId>
64+
<artifactId>commons-lang3</artifactId>
65+
</dependency>
6266
<dependency>
6367
<groupId>io.grpc</groupId>
6468
<artifactId>grpc-testing</artifactId>

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void raiseEvent(String instanceId, String eventName) {
145145
* Waits for an orchestration to start running and returns an {@link OrchestrationMetadata} object that contains
146146
* metadata about the started instance.
147147
*
148-
* <p> A "started" orchestration instance is any instance not in the <code>Pending</code> state. </p>
148+
* <p>A "started" orchestration instance is any instance not in the <code>Pending</code> state. </p>
149149
*
150150
* <p>If an orchestration instance is already running when this method is called, the method will return immediately.
151151
*</p>

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
1818
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails;
1919
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
20+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
2021
import io.grpc.Channel;
2122
import io.grpc.ManagedChannel;
2223
import io.grpc.ManagedChannelBuilder;
2324
import io.grpc.Status;
2425
import io.grpc.StatusRuntimeException;
26+
import org.apache.commons.lang3.StringUtils;
2527

2628
import java.time.Duration;
2729
import java.util.HashMap;
@@ -42,7 +44,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
4244
private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName());
4345
private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3);
4446

45-
private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
47+
private final TaskOrchestrationFactories orchestrationFactories;
48+
4649
private final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
4750

4851
private final ManagedChannel managedSidecarChannel;
@@ -57,7 +60,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
5760
private Thread workerThread;
5861

5962
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
60-
this.orchestrationFactories.putAll(builder.orchestrationFactories);
63+
this.orchestrationFactories = builder.orchestrationFactories;
6164
this.activityFactories.putAll(builder.activityFactories);
6265
this.appId = builder.appId;
6366

@@ -115,7 +118,9 @@ public void start() {
115118
*
116119
*/
117120
public void close() {
118-
this.workerThread.interrupt();
121+
if (this.workerThread != null) {
122+
this.workerThread.interrupt();
123+
}
119124
this.isNormalShutdown = true;
120125
this.shutDownWorkerPool();
121126
this.closeSideCarChannel();
@@ -161,6 +166,7 @@ public void startAndBlock() {
161166
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();
162167
if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) {
163168
OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
169+
164170
logger.log(Level.FINEST,
165171
String.format("Processing orchestrator request for instance: {0}",
166172
orchestratorRequest.getInstanceId()));
@@ -171,11 +177,22 @@ public void startAndBlock() {
171177
orchestratorRequest.getPastEventsList(),
172178
orchestratorRequest.getNewEventsList());
173179

180+
var versionBuilder = OrchestratorService.OrchestrationVersion.newBuilder();
181+
182+
if (StringUtils.isNotEmpty(taskOrchestratorResult.getVersion())) {
183+
versionBuilder.setName(taskOrchestratorResult.getVersion());
184+
}
185+
186+
if (taskOrchestratorResult.getPatches() != null) {
187+
versionBuilder.addAllPatches(taskOrchestratorResult.getPatches());
188+
}
189+
174190
OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder()
175191
.setInstanceId(orchestratorRequest.getInstanceId())
176192
.addAllActions(taskOrchestratorResult.getActions())
177193
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
178194
.setCompletionToken(workItem.getCompletionToken())
195+
.setVersion(versionBuilder)
179196
.build();
180197

181198
try {

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.dapr.durabletask;
1515

16+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
17+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactory;
1618
import io.grpc.Channel;
1719

1820
import java.time.Duration;
@@ -24,7 +26,7 @@
2426
*
2527
*/
2628
public final class DurableTaskGrpcWorkerBuilder {
27-
final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
29+
TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories();
2830
final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
2931
int port;
3032
Channel channel;
@@ -40,17 +42,7 @@ public final class DurableTaskGrpcWorkerBuilder {
4042
* @return this builder object
4143
*/
4244
public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) {
43-
String key = factory.getName();
44-
if (key == null || key.length() == 0) {
45-
throw new IllegalArgumentException("A non-empty task orchestration name is required.");
46-
}
47-
48-
if (this.orchestrationFactories.containsKey(key)) {
49-
throw new IllegalArgumentException(
50-
String.format("A task orchestration factory named %s is already registered.", key));
51-
}
52-
53-
this.orchestrationFactories.put(key, factory);
45+
this.orchestrationFactories.addOrchestration(factory);
5446
return this;
5547
}
5648

durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public boolean isCustomStatusFetched() {
226226
private <T> T readPayloadAs(Class<T> type, String payload) {
227227
if (!this.requestedInputsAndOutputs) {
228228
throw new IllegalStateException("This method can only be used when instance metadata is fetched with the option "
229-
+ "to include input and output data.");
229+
+ "to include input and output data.");
230230
}
231231

232232
// Note that the Java gRPC implementation converts null protobuf strings into empty Java strings

durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
import com.google.protobuf.InvalidProtocolBufferException;
1717
import com.google.protobuf.StringValue;
1818
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
19+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
20+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactory;
1921

2022
import java.time.Duration;
2123
import java.util.Base64;
22-
import java.util.HashMap;
2324
import java.util.logging.Logger;
2425

2526
/**
@@ -134,8 +135,8 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati
134135
}
135136

136137
// Register the passed orchestration as the default ("*") orchestration
137-
HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
138-
orchestrationFactories.put("*", new TaskOrchestrationFactory() {
138+
TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories();
139+
orchestrationFactories.addOrchestration(new TaskOrchestrationFactory() {
139140
@Override
140141
public String getName() {
141142
return "*";
@@ -145,6 +146,16 @@ public String getName() {
145146
public TaskOrchestration create() {
146147
return orchestration;
147148
}
149+
150+
@Override
151+
public String getVersionName() {
152+
return "";
153+
}
154+
155+
@Override
156+
public Boolean isLatestVersion() {
157+
return false;
158+
}
148159
});
149160

150161
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(

durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ public enum OrchestrationRuntimeStatus {
6868
/**
6969
* The orchestration is in a suspended state.
7070
*/
71-
SUSPENDED;
71+
SUSPENDED,
72+
73+
/**
74+
* The orchestration is in a stalled state.
75+
*/
76+
STALLED;
7277

7378
static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.OrchestrationStatus status) {
7479
switch (status) {
@@ -88,6 +93,8 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.Orchestration
8893
return PENDING;
8994
case ORCHESTRATION_STATUS_SUSPENDED:
9095
return SUSPENDED;
96+
case ORCHESTRATION_STATUS_STALLED:
97+
return STALLED;
9198
default:
9299
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
93100
}

durabletask-client/src/main/java/io/dapr/durabletask/Task.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
* <pre>
2828
* Task{@literal <}int{@literal >} activityTask = ctx.callActivity("MyActivity", someInput, int.class);
2929
* </pre>
30+
*
3031
* <p>Orchestrator code uses the {@link #await()} method to block on the completion of the task and retrieve the result.
3132
* If the task is not yet complete, the {@code await()} method will throw an {@link OrchestratorBlockedException}, which
3233
* pauses the orchestrator's execution so that it can save its progress into durable storage and schedule any
3334
* outstanding work. When the task is complete, the orchestrator will run again from the beginning and the next time
3435
* the task's {@code await()} method is called, the result will be returned, or a {@link TaskFailedException} will be
3536
* thrown if the result of the task was an unhandled exception.</p>
37+
*
3638
* <p>Note that orchestrator code must never catch {@code OrchestratorBlockedException} because doing so can cause the
3739
* orchestration instance to get permanently stuck.</p>
3840
*

durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ public interface TaskActivityContext {
3434
*/
3535
<T> T getInput(Class<T> targetType);
3636

37-
3837
/**
3938
* Gets the execution id of the current task activity.
4039
*

durabletask-client/src/main/java/io/dapr/durabletask/TaskFailedException.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
/**
1717
* Exception that gets thrown when awaiting a {@link Task} for an activity or sub-orchestration that fails with an
1818
* unhandled exception.
19+
*
1920
* <p>Detailed information associated with a particular task failure can be retrieved
2021
* using the {@link #getErrorDetails()} method.</p>
2122
*/

0 commit comments

Comments
 (0)