Skip to content

Commit 35e04a2

Browse files
authored
fix: handle graceful cancellation in CloudEventsMiddleware to prevent 499 errors (#1713)
* fix: handle graceful cancellation in CloudEventsMiddleware to prevent 499 errors Propagate CancellationToken to JsonSerializer and handle OperationCanceledException gracefully when the response has already started. This prevents erroneous 499 Client Closed Connection errors in PubSub consumers under high load. Fixes #1237 Signed-off-by: ali-Hamza817 <alihamzaminhas.21@gmail.com> * test: fix cancellation test ordering and add response-not-started test - Fix test ordering: move app.Run() before app.Build() to match the pattern of all other tests in the file - Use IHttpResponseFeature to properly simulate HasStarted=true since DefaultHttpContext doesn't set it from WriteAsync - Add new test: PropagatesCancellation_WhenResponseHasNotStarted to verify exceptions are NOT swallowed when the response has not started (correct ASP.NET Core behavior) Signed-off-by: ali-Hamza817 <alihamzaminhas.21@gmail.com> --------- Signed-off-by: ali-Hamza817 <alihamzaminhas.21@gmail.com>
1 parent 961f8ee commit 35e04a2

File tree

2 files changed

+95
-19
lines changed

2 files changed

+95
-19
lines changed

src/Dapr.AspNetCore/CloudEventsMiddleware.cs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,36 +48,30 @@ public CloudEventsMiddleware(RequestDelegate next, CloudEventsMiddlewareOptions
4848
this.options = options;
4949
}
5050

51-
public Task InvokeAsync(HttpContext httpContext)
51+
public async Task InvokeAsync(HttpContext httpContext)
5252
{
53-
// This middleware unwraps any requests with a cloud events (JSON) content type
54-
// and replaces the request body + request content type so that it can be read by a
55-
// non-cloud-events-aware piece of code.
56-
//
57-
// This corresponds to cloud events in the *structured* format:
58-
// https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#13-content-modes
59-
//
60-
// For *binary* format, we don't have to do anything
61-
//
62-
// We don't support batching.
63-
//
64-
// The philosophy here is that we don't report an error for things we don't support, because
65-
// that would block someone from implementing their own support for it. We only report an error
66-
// when something we do support isn't correct.
6753
if (!MatchesContentType(httpContext, out var charSet))
6854
{
69-
return this.next(httpContext);
55+
await this.next(httpContext);
56+
return;
7057
}
7158

72-
return this.ProcessBodyAsync(httpContext, charSet);
59+
try
60+
{
61+
await this.ProcessBodyAsync(httpContext, charSet);
62+
}
63+
catch (OperationCanceledException) when (httpContext.RequestAborted.IsCancellationRequested && httpContext.Response.HasStarted)
64+
{
65+
// Swallow
66+
}
7367
}
7468

7569
private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
7670
{
7771
JsonElement json;
7872
if (string.Equals(charSet, Encoding.UTF8.WebName, StringComparison.OrdinalIgnoreCase))
7973
{
80-
json = await JsonSerializer.DeserializeAsync<JsonElement>(httpContext.Request.Body);
74+
json = await JsonSerializer.DeserializeAsync<JsonElement>(httpContext.Request.Body, cancellationToken: httpContext.RequestAborted);
8175
}
8276
else
8377
{
@@ -142,7 +136,7 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
142136
if (isJson || options.SuppressJsonDecodingOfTextPayloads)
143137
{
144138
// Rehydrate body from JSON payload
145-
await JsonSerializer.SerializeAsync<JsonElement>(body, data);
139+
await JsonSerializer.SerializeAsync<JsonElement>(body, data, cancellationToken: httpContext.RequestAborted);
146140
}
147141
else
148142
{

test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ namespace Dapr.AspNetCore.Test;
2020
using System.Text;
2121
using System.Text.Json;
2222
using System.Threading.Tasks;
23+
using System.Threading;
24+
using System;
2325
using Shouldly;
2426
using Microsoft.AspNetCore.Builder;
2527
using Microsoft.AspNetCore.Http;
28+
using Microsoft.AspNetCore.Http.Features;
2629
using Xunit;
2730

2831
public class CloudEventsMiddlewareTest
@@ -478,4 +481,83 @@ private static string ReadBody(Stream stream, Encoding encoding = null)
478481
var str = encoding.GetString(bytes);
479482
return str;
480483
}
484+
485+
[Fact]
486+
public async Task InvokeAsync_SwallowsCancellation_WhenResponseHasStarted()
487+
{
488+
using var cts = new CancellationTokenSource();
489+
490+
var serviceCollection = new ServiceCollection();
491+
var provider = serviceCollection.BuildServiceProvider();
492+
493+
var app = new ApplicationBuilder(provider);
494+
app.UseCloudEvents();
495+
496+
// Register terminal middleware BEFORE building the pipeline
497+
app.Run(httpContext =>
498+
{
499+
// Simulate that the response has already started by replacing the response feature
500+
httpContext.Features.Set<IHttpResponseFeature>(new TestHttpResponseFeature { HasStarted = true });
501+
502+
// Simulate the client disconnecting after the response started
503+
cts.Cancel();
504+
throw new OperationCanceledException(cts.Token);
505+
});
506+
507+
var pipeline = app.Build();
508+
509+
var context = new DefaultHttpContext();
510+
context.Request.ContentType = "application/cloudevents+json";
511+
context.Request.Body = MakeBody("{ \"data\": { \"name\":\"jimmy\" } }");
512+
context.RequestAborted = cts.Token;
513+
514+
// The middleware should catch and swallow the OperationCanceledException
515+
// because the response has already started and the request was aborted
516+
await pipeline.Invoke(context);
517+
}
518+
519+
[Fact]
520+
public async Task InvokeAsync_PropagatesCancellation_WhenResponseHasNotStarted()
521+
{
522+
using var cts = new CancellationTokenSource();
523+
524+
var serviceCollection = new ServiceCollection();
525+
var provider = serviceCollection.BuildServiceProvider();
526+
527+
var app = new ApplicationBuilder(provider);
528+
app.UseCloudEvents();
529+
530+
// Register terminal middleware that cancels BEFORE the response starts
531+
app.Run(httpContext =>
532+
{
533+
// Response has NOT started (default), client disconnects
534+
cts.Cancel();
535+
throw new OperationCanceledException(cts.Token);
536+
});
537+
538+
var pipeline = app.Build();
539+
540+
var context = new DefaultHttpContext();
541+
context.Request.ContentType = "application/cloudevents+json";
542+
context.Request.Body = MakeBody("{ \"data\": { \"name\":\"jimmy\" } }");
543+
context.RequestAborted = cts.Token;
544+
545+
// The middleware should NOT swallow the exception when the response has not started
546+
await Should.ThrowAsync<OperationCanceledException>(() => pipeline.Invoke(context));
547+
}
548+
549+
/// <summary>
550+
/// A test implementation of <see cref="IHttpResponseFeature"/> that allows controlling
551+
/// the <see cref="HasStarted"/> property for unit testing.
552+
/// </summary>
553+
private sealed class TestHttpResponseFeature : IHttpResponseFeature
554+
{
555+
public int StatusCode { get; set; } = 200;
556+
public string ReasonPhrase { get; set; }
557+
public IHeaderDictionary Headers { get; set; } = new HeaderDictionary();
558+
public Stream Body { get; set; } = new MemoryStream();
559+
public bool HasStarted { get; set; }
560+
public void OnStarting(Func<object, Task> callback, object state) { }
561+
public void OnCompleted(Func<object, Task> callback, object state) { }
562+
}
481563
}

0 commit comments

Comments
 (0)