Skip to content

Commit f310727

Browse files
authored
Task cancellation support (#200)
* Move task processing into separate method * Add initial implementation * Add initial implementation * Add debug logs * Do not create cts on kafka event * Make background service more robust * Remove package, add null checks * Suppress warnings * Create experimental version * Add github action step to build KafkaCancellationNotifier * Fix log messages * Bump version * Bump version
1 parent 7de8bae commit f310727

File tree

18 files changed

+420
-19
lines changed

18 files changed

+420
-19
lines changed

.github/workflows/release.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ jobs:
2929
- name: Create ConductorSharp.Patterns package
3030
run: dotnet pack --no-restore ./src/ConductorSharp.Patterns -c Release
3131
- name: Create ConductorSharp.Toolkit package
32-
run: dotnet pack --no-restore ./src/ConductorSharp.Toolkit -c Release
32+
run: dotnet pack --no-restore ./src/ConductorSharp.Toolkit -c Release
33+
- name: Create ConductorSharp.KafkaCancellationNotifier package
34+
run: dotnet pack --no-restore ./src/ConductorSharp.KafkaCancellationNotifier -c Release
3335

3436
- name: Publish ConductorSharp.Client package
3537
run: dotnet nuget push ./src/ConductorSharp.Client/bin/Release/ConductorSharp.Client*.nupkg --skip-duplicate --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json
@@ -39,3 +41,5 @@ jobs:
3941
run: dotnet nuget push ./src/ConductorSharp.Patterns/bin/Release/ConductorSharp.Patterns*.nupkg --skip-duplicate --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json
4042
- name: Publish ConductorSharp.Toolkit package
4143
run: dotnet nuget push ./src/ConductorSharp.Toolkit/bin/Release/ConductorSharp.Toolkit*.nupkg --skip-duplicate --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json
44+
- name: Publish ConductorSharp.KafkaCancellationNotifier package
45+
run: dotnet nuget push ./src/ConductorSharp.KafkaCancellationNotifier/bin/Release/ConductorSharp.KafkaCancellationNotifier*.nupkg --skip-duplicate --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json

ConductorSharp.sln

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
2626
.editorconfig = .editorconfig
2727
EndProjectSection
2828
EndProject
29-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConductorSharp.Patterns", "src\ConductorSharp.Patterns\ConductorSharp.Patterns.csproj", "{9A55F1CF-2553-4F44-A8BC-0063A402582F}"
29+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConductorSharp.Patterns", "src\ConductorSharp.Patterns\ConductorSharp.Patterns.csproj", "{9A55F1CF-2553-4F44-A8BC-0063A402582F}"
30+
EndProject
31+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConductorSharp.KafkaCancellationNotifier", "src\ConductorSharp.KafkaCancellationNotifier\ConductorSharp.KafkaCancellationNotifier.csproj", "{A94EE48D-17F3-432A-A47D-BCB9B1EF2670}"
3032
EndProject
3133
Global
3234
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -70,6 +72,10 @@ Global
7072
{9A55F1CF-2553-4F44-A8BC-0063A402582F}.Debug|Any CPU.Build.0 = Debug|Any CPU
7173
{9A55F1CF-2553-4F44-A8BC-0063A402582F}.Release|Any CPU.ActiveCfg = Release|Any CPU
7274
{9A55F1CF-2553-4F44-A8BC-0063A402582F}.Release|Any CPU.Build.0 = Release|Any CPU
75+
{A94EE48D-17F3-432A-A47D-BCB9B1EF2670}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
76+
{A94EE48D-17F3-432A-A47D-BCB9B1EF2670}.Debug|Any CPU.Build.0 = Debug|Any CPU
77+
{A94EE48D-17F3-432A-A47D-BCB9B1EF2670}.Release|Any CPU.ActiveCfg = Release|Any CPU
78+
{A94EE48D-17F3-432A-A47D-BCB9B1EF2670}.Release|Any CPU.Build.0 = Release|Any CPU
7379
EndGlobalSection
7480
GlobalSection(SolutionProperties) = preSolution
7581
HideSolutionNode = FALSE

examples/ConductorSharp.NoApi/ConductorSharp.NoApi.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
<ItemGroup>
4141
<ProjectReference Include="..\..\src\ConductorSharp.Engine\ConductorSharp.Engine.csproj" />
42+
<ProjectReference Include="..\..\src\ConductorSharp.KafkaCancellationNotifier\ConductorSharp.KafkaCancellationNotifier.csproj" />
4243
<ProjectReference Include="..\..\src\ConductorSharp.Patterns\ConductorSharp.Patterns.csproj" />
4344
</ItemGroup>
4445

examples/ConductorSharp.NoApi/Program.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using ConductorSharp.Engine.Extensions;
22
using ConductorSharp.Engine.Health;
3+
using ConductorSharp.KafkaCancellationNotifier.Extensions;
34
using ConductorSharp.NoApi.Behaviors;
45
using ConductorSharp.NoApi.Handlers;
56
using ConductorSharp.Patterns.Extensions;
@@ -39,6 +40,11 @@
3940
pipelines.AddCustomBehavior<PrepareEmailBehavior, PrepareEmailRequest, PrepareEmailResponse>();
4041
})
4142
.AddConductorSharpPatterns();
43+
//.AddKafkaCancellationNotifier(
44+
// kafkaBootstrapServers: configuration.GetValue<string>(
45+
// "Conductor:KafkaCancellationNotifier:BootstrapServers"),
46+
// topicName: configuration.GetValue<string>("Conductor:KafkaCancellationNotifier:TopicName"),
47+
// groupId: configuration.GetValue<string>("Conductor:KafkaCancellationNotifier:GroupId"));
4248

4349
services.RegisterWorkerTask<GetCustomerHandler>();
4450
services.RegisterWorkerTask<PrepareEmailHandler>();

src/ConductorSharp.Client/ConductorSharp.Client.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Authors>Codaxy</Authors>
77
<Company>Codaxy</Company>
88
<PackageId>ConductorSharp.Client</PackageId>
9-
<Version>3.1.1</Version>
9+
<Version>3.2.0</Version>
1010
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
1111
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
1212
<PackageTags>netflix;conductor</PackageTags>

src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe
5252

5353
return response;
5454
}
55+
catch (TaskCanceledException)
56+
{
57+
_logger.LogWarning(
58+
$"Request {{Request}} cancelled with payload {{@{requestName}}} and with id {{RequestId}}",
59+
requestName,
60+
request,
61+
requestId
62+
);
63+
throw;
64+
}
5565
catch (Exception exc)
5666
{
5767
stopwatch.Stop();

src/ConductorSharp.Engine/ConductorSharp.Engine.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Authors>Codaxy</Authors>
77
<Company>Codaxy</Company>
88
<PackageId>ConductorSharp.Engine</PackageId>
9-
<Version>3.1.1</Version>
9+
<Version>3.2.0</Version>
1010
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
1111
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
1212
<PackageTags>netflix;conductor</PackageTags>

src/ConductorSharp.Engine/ExecutionManager.cs

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.IO;
44
using System.Linq;
55
using System.Threading;
6+
using System.Threading.Tasks;
67
using ConductorSharp.Client;
78
using ConductorSharp.Client.Generated;
89
using ConductorSharp.Client.Service;
@@ -30,6 +31,7 @@ internal class ExecutionManager
3031
private readonly IServiceScopeFactory _lifetimeScopeFactory;
3132
private readonly IPollTimingStrategy _pollTimingStrategy;
3233
private readonly IPollOrderStrategy _pollOrderStrategy;
34+
private readonly ICancellationNotifier _cancellationNotifier;
3335

3436
public ExecutionManager(
3537
WorkerSetConfig options,
@@ -39,7 +41,8 @@ public ExecutionManager(
3941
IExternalPayloadService externalPayloadService,
4042
IServiceScopeFactory lifetimeScope,
4143
IPollTimingStrategy pollTimingStrategy,
42-
IPollOrderStrategy pollOrderStrategy
44+
IPollOrderStrategy pollOrderStrategy,
45+
ICancellationNotifier cancellationNotifier
4346
)
4447
{
4548
_configuration = options;
@@ -50,6 +53,7 @@ IPollOrderStrategy pollOrderStrategy
5053
_lifetimeScopeFactory = lifetimeScope;
5154
_pollTimingStrategy = pollTimingStrategy;
5255
_pollOrderStrategy = pollOrderStrategy;
56+
_cancellationNotifier = cancellationNotifier;
5357
_externalPayloadService = externalPayloadService;
5458
}
5559

@@ -110,18 +114,58 @@ private static Type GetInputType(Type workerType)
110114

111115
private async Task PollAndHandle(TaskToWorker scheduledWorker, CancellationToken cancellationToken)
112116
{
113-
Client.Generated.Task pollResponse = null;
117+
Client.Generated.Task pollResponse;
118+
119+
// TODO: Maybe this should be configurable
120+
var workerId = Guid.NewGuid().ToString();
114121
try
115122
{
116-
var workerId = Guid.NewGuid().ToString();
117-
118123
pollResponse = await _taskManager.PollAsync(
119124
scheduledWorker.TaskName,
120125
workerId,
121126
scheduledWorker.TaskDomain ?? _configuration.Domain,
122127
cancellationToken
123128
);
129+
}
130+
catch (ApiException exception) when (exception.StatusCode == 204)
131+
{
132+
// This handles the case when PollAsync throws exception in case there are no tasks in queue
133+
// Even though Conductor reports 1 task in queue for particular task type this endpoint won't return scheduled task immmediately
134+
// We skip the further handling as task will be handled in next call to this method
135+
return;
136+
}
137+
catch (Exception exception)
138+
{
139+
_logger.LogError(exception, "Exception during the task polling");
140+
return;
141+
}
142+
143+
try
144+
{
145+
using var tokenHolder = _cancellationNotifier.GetCancellationToken(pollResponse.TaskId, cancellationToken);
146+
await ProcessPolledTask(pollResponse, workerId, scheduledWorker, tokenHolder.CancellationToken);
147+
}
148+
catch (TaskCanceledException)
149+
{
150+
_logger.LogWarning(
151+
"Polled task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) is cancelled",
152+
pollResponse.TaskDefName,
153+
pollResponse.TaskId,
154+
pollResponse.WorkflowType,
155+
pollResponse.WorkflowInstanceId
156+
);
157+
}
158+
}
124159

160+
private async Task ProcessPolledTask(
161+
Client.Generated.Task pollResponse,
162+
string workerId,
163+
TaskToWorker scheduledWorker,
164+
CancellationToken cancellationToken
165+
)
166+
{
167+
try
168+
{
125169
if (!string.IsNullOrEmpty(pollResponse.ExternalInputPayloadStoragePath))
126170
{
127171
_logger.LogDebug("Fetching storage {location}", pollResponse.ExternalInputPayloadStoragePath);
@@ -180,20 +224,19 @@ await _taskManager.UpdateAsync(
180224
cancellationToken
181225
);
182226
}
183-
catch (ApiException exception) when (exception.StatusCode == 204)
227+
catch (TaskCanceledException)
184228
{
185-
// This handles the case when PollAsync throws exception in case there are no tasks in queue
186-
// Even though Conductor reports 1 task in queue for particular task type this endpoint won't return scheduled task immmediately
187-
// This causes NullReferenceException in logging code below hence why we ignore the exception
229+
// Propagate this exception to outer handler
230+
throw;
188231
}
189232
catch (Exception exception)
190233
{
191234
_logger.LogError(
192235
"{@Exception} while executing {Task} as part of {Workflow} with id {WorkflowId}",
193236
exception,
194-
pollResponse?.TaskDefName,
195-
pollResponse?.WorkflowType,
196-
pollResponse?.WorkflowInstanceId
237+
pollResponse.TaskDefName,
238+
pollResponse.WorkflowType,
239+
pollResponse.WorkflowInstanceId
197240
);
198241

199242
var errorMessage = new ErrorOutput { ErrorMessage = exception.Message };
@@ -207,16 +250,16 @@ await Task.WhenAll(
207250
_taskManager.UpdateAsync(
208251
new TaskResult
209252
{
210-
TaskId = pollResponse?.TaskId,
253+
TaskId = pollResponse.TaskId,
211254
Status = TaskResultStatus.FAILED,
212255
ReasonForIncompletion = exception.Message,
213256
OutputData = SerializationHelper.ObjectToDictionary(errorMessage, ConductorConstants.IoJsonSerializerSettings),
214257
WorkflowInstanceId = pollResponse?.WorkflowInstanceId
215258
},
216259
cancellationToken
217260
),
218-
_taskManager.LogAsync(pollResponse?.TaskId, exception.Message, cancellationToken),
219-
_taskManager.LogAsync(pollResponse?.TaskId, exception.StackTrace, cancellationToken)
261+
_taskManager.LogAsync(pollResponse.TaskId, exception.Message, cancellationToken),
262+
_taskManager.LogAsync(pollResponse.TaskId, exception.StackTrace, cancellationToken)
220263
]
221264
);
222265
}

src/ConductorSharp.Engine/Extensions/ConductorSharpBuilder.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ params Assembly[] handlerAssemblies
5353

5454
Builder.AddTransient<IPollOrderStrategy, RandomOrdering>();
5555

56+
Builder.AddSingleton<ICancellationNotifier, NoOpCancellationNotifier>();
57+
5658
Builder.AddMediatR(cfg => cfg.RegisterServicesFromAssemblies(handlerAssemblies));
5759

5860
return this;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System;
2+
using System.Threading;
3+
4+
namespace ConductorSharp.Engine.Interface
5+
{
6+
public interface ICancellationNotifier
7+
{
8+
public interface ICancellationTokenHolder : IDisposable
9+
{
10+
CancellationToken CancellationToken { get; }
11+
}
12+
13+
ICancellationTokenHolder GetCancellationToken(string taskId, CancellationToken engineCancellationToken);
14+
}
15+
}

0 commit comments

Comments
 (0)