Skip to content

Commit fdae09b

Browse files
committed
Works for sync
1 parent b4d2799 commit fdae09b

File tree

3 files changed

+98
-39
lines changed

3 files changed

+98
-39
lines changed

src/StackExchange.Redis/RedisDatabase.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4973,7 +4973,8 @@ protected override void WriteImpl(PhysicalConnection physical)
49734973
physical.Write(channel);
49744974
}
49754975
else
4976-
{ // recognises well-known types
4976+
{
4977+
// recognises well-known types
49774978
var val = RedisValue.TryParse(arg, out var valid);
49784979
if (!valid) throw new InvalidCastException($"Unable to parse value: '{arg}'");
49794980
physical.WriteBulkString(val);

src/StackExchange.Redis/ResultBox.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ internal abstract class SimpleResultBox : IResultBox
3030
void IResultBox.SetException(Exception exception) => _exception = exception ?? CancelledException;
3131

3232
void IResultBox.Cancel(CancellationToken cancellationToken) =>
33-
_exception = GetCancelledException(CancellationToken, cancellationToken);
33+
_exception = cancellationToken.IsCancellationRequested
34+
? new OperationCanceledException(cancellationToken) // for sync, need to capture this eagerly
35+
: GetCancelledException(CancellationToken, cancellationToken);
3436

3537
void IResultBox.ActivateContinuations()
3638
{

tests/StackExchange.Redis.Tests/CancellationTests.cs

Lines changed: 93 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public async Task GetEffectiveCancellationToken_Nesting()
5252
if (active.IsCancellationRequested) break;
5353
await Task.Delay(TimeSpan.FromSeconds(0.1));
5454
}
55+
5556
Assert.True(active.IsCancellationRequested);
5657
Assert.Equal(active, muxerA.GetEffectiveCancellationToken(checkForCancellation: false));
5758
}
@@ -76,6 +77,7 @@ public async Task GetEffectiveCancellationToken_Nesting()
7677
Assert.Null(muxerB.GetCurrentScope()); // B unaffected
7778
Assert.Equal(CancellationToken.None, muxerB.GetEffectiveCancellationToken());
7879
}
80+
7981
Assert.Null(muxerA.GetCurrentScope());
8082
Assert.Equal(CancellationToken.None, muxerA.GetEffectiveCancellationToken());
8183
Assert.Null(muxerB.GetCurrentScope()); // B unaffected
@@ -120,7 +122,8 @@ int IConnectionMultiplexer.StormLogThreshold
120122
set { }
121123
}
122124

123-
void IConnectionMultiplexer.RegisterProfiler(Func<ProfilingSession?> profilingSessionProvider) => throw new NotImplementedException();
125+
void IConnectionMultiplexer.RegisterProfiler(Func<ProfilingSession?> profilingSessionProvider) =>
126+
throw new NotImplementedException();
124127

125128
ServerCounters IConnectionMultiplexer.GetCounters() => throw new NotImplementedException();
126129

@@ -184,15 +187,19 @@ event EventHandler<HashSlotMovedEventArgs>? IConnectionMultiplexer.HashSlotMoved
184187

185188
ISubscriber IConnectionMultiplexer.GetSubscriber(object? asyncState) => throw new NotImplementedException();
186189

187-
IDatabase IConnectionMultiplexer.GetDatabase(int db, object? asyncState) => throw new NotImplementedException();
190+
IDatabase IConnectionMultiplexer.GetDatabase(int db, object? asyncState) =>
191+
throw new NotImplementedException();
188192

189-
IServer IConnectionMultiplexer.GetServer(string host, int port, object? asyncState) => throw new NotImplementedException();
193+
IServer IConnectionMultiplexer.GetServer(string host, int port, object? asyncState) =>
194+
throw new NotImplementedException();
190195

191-
IServer IConnectionMultiplexer.GetServer(string hostAndPort, object? asyncState) => throw new NotImplementedException();
196+
IServer IConnectionMultiplexer.GetServer(string hostAndPort, object? asyncState) =>
197+
throw new NotImplementedException();
192198

193199
IServer IConnectionMultiplexer.GetServer(IPAddress host, int port) => throw new NotImplementedException();
194200

195-
IServer IConnectionMultiplexer.GetServer(EndPoint endpoint, object? asyncState) => throw new NotImplementedException();
201+
IServer IConnectionMultiplexer.GetServer(EndPoint endpoint, object? asyncState) =>
202+
throw new NotImplementedException();
196203

197204
IServer[] IConnectionMultiplexer.GetServers() => throw new NotImplementedException();
198205

@@ -214,11 +221,13 @@ event EventHandler<HashSlotMovedEventArgs>? IConnectionMultiplexer.HashSlotMoved
214221

215222
long IConnectionMultiplexer.PublishReconfigure(CommandFlags flags) => throw new NotImplementedException();
216223

217-
Task<long> IConnectionMultiplexer.PublishReconfigureAsync(CommandFlags flags) => throw new NotImplementedException();
224+
Task<long> IConnectionMultiplexer.PublishReconfigureAsync(CommandFlags flags) =>
225+
throw new NotImplementedException();
218226

219227
int IConnectionMultiplexer.GetHashSlot(RedisKey key) => throw new NotImplementedException();
220228

221-
void IConnectionMultiplexer.ExportConfiguration(Stream destination, ExportOptions options) => throw new NotImplementedException();
229+
void IConnectionMultiplexer.ExportConfiguration(Stream destination, ExportOptions options) =>
230+
throw new NotImplementedException();
222231

223232
void IConnectionMultiplexer.AddLibraryNameSuffix(string suffix) => throw new NotImplementedException();
224233
}
@@ -247,6 +256,8 @@ await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
247256
}
248257
}
249258

259+
private IInternalConnectionMultiplexer Create() => Create(syncTimeout: 10_000);
260+
250261
[Fact]
251262
public async Task WithCancellation_ValidToken_OperationSucceeds()
252263
{
@@ -267,10 +278,19 @@ public async Task WithCancellation_ValidToken_OperationSucceeds()
267278

268279
private void Pause(IDatabase db)
269280
{
270-
db.Execute("client", "pause", ConnectionPauseMilliseconds, CommandFlags.FireAndForget);
281+
db.Execute("client", new object[] { "pause", ConnectionPauseMilliseconds }, CommandFlags.FireAndForget);
271282
}
272283

273-
private ConnectionMultiplexer Create() => ConnectionMultiplexer.Connect("127.0.0.1:4000");
284+
/*
285+
private ConnectionMultiplexer Create([CallerMemberName] string caller = "")
286+
{
287+
var options = ConfigurationOptions.Parse("127.0.0.1:4000");
288+
#pragma warning disable CS0618
289+
LoggingTunnel.LogToDirectory(options, @"/home/marc/logs/");
290+
#pragma warning restore CS0618
291+
return ConnectionMultiplexer.Connect(options);
292+
}
293+
*/
274294

275295
[Fact]
276296
public async Task WithTimeout_ShortTimeout_Async_ThrowsOperationCanceledException()
@@ -289,11 +309,12 @@ public async Task WithTimeout_ShortTimeout_Async_ThrowsOperationCanceledExceptio
289309
{
290310
await pending;
291311
// If it succeeds, that's fine too - Redis is fast
292-
Skip.Inconclusive(TooFast + ": " + watch.ElapsedMilliseconds + "ms");
312+
Assert.Fail(ExpectedCancel + ": " + watch.ElapsedMilliseconds + "ms");
293313
}
294314
catch (OperationCanceledException)
295315
{
296316
// Expected for very short timeouts
317+
Log($"Cancelled after {watch.ElapsedMilliseconds}ms");
297318
}
298319
}
299320
}
@@ -314,16 +335,17 @@ public void WithTimeout_ShortTimeout_Sync_ThrowsOperationCanceledException()
314335
{
315336
db.StringSet(Me(), "value"); // check we get past this
316337
// If it succeeds, that's fine too - Redis is fast
317-
Skip.Inconclusive(TooFast + ": " + watch.ElapsedMilliseconds + "ms");
338+
Assert.Fail(ExpectedCancel + ": " + watch.ElapsedMilliseconds + "ms");
318339
}
319340
catch (OperationCanceledException)
320341
{
321342
// Expected for very short timeouts
343+
Log($"Cancelled after {watch.ElapsedMilliseconds}ms");
322344
}
323345
}
324346
}
325347

326-
private const string TooFast = "This operation completed too quickly to verify this behaviour.";
348+
private const string ExpectedCancel = "This operation should have been cancelled";
327349

328350
[Fact]
329351
public async Task WithoutAmbientCancellation_OperationsWorkNormally()
@@ -347,38 +369,72 @@ public enum CancelStrategy
347369

348370
private const int ConnectionPauseMilliseconds = 50, ShortDelayMilliseconds = 5;
349371

372+
private static CancellationTokenSource CreateCts(CancelStrategy strategy)
373+
{
374+
switch (strategy)
375+
{
376+
case CancelStrategy.Constructor:
377+
return new CancellationTokenSource(TimeSpan.FromMilliseconds(ShortDelayMilliseconds));
378+
case CancelStrategy.Method:
379+
var cts = new CancellationTokenSource();
380+
cts.CancelAfter(TimeSpan.FromMilliseconds(ShortDelayMilliseconds));
381+
return cts;
382+
case CancelStrategy.Manual:
383+
cts = new();
384+
_ = Task.Run(async () =>
385+
{
386+
await Task.Delay(ShortDelayMilliseconds);
387+
// ReSharper disable once MethodHasAsyncOverload - TFM-dependent
388+
cts.Cancel();
389+
});
390+
return cts;
391+
default:
392+
throw new ArgumentOutOfRangeException(nameof(strategy));
393+
}
394+
}
395+
350396
[Theory]
351397
[InlineData(CancelStrategy.Constructor)]
352398
[InlineData(CancelStrategy.Method)]
353399
[InlineData(CancelStrategy.Manual)]
354-
public async Task CancellationDuringOperation_CancelsGracefully(CancelStrategy strategy)
400+
public async Task CancellationDuringOperation_Async_CancelsGracefully(CancelStrategy strategy)
355401
{
356402
using var conn = Create();
357403
var db = conn.GetDatabase();
358404

359-
static CancellationTokenSource CreateCts(CancelStrategy strategy)
405+
var watch = Stopwatch.StartNew();
406+
Pause(db);
407+
408+
using var cts = CreateCts(strategy);
409+
410+
// Cancel after a short delay
411+
using (db.Multiplexer.WithCancellation(cts.Token))
360412
{
361-
switch (strategy)
413+
// Start an operation and cancel it mid-flight
414+
var pending = db.StringSetAsync($"{Me()}:{strategy}", "value");
415+
416+
try
362417
{
363-
case CancelStrategy.Constructor:
364-
return new CancellationTokenSource(TimeSpan.FromMilliseconds(ShortDelayMilliseconds));
365-
case CancelStrategy.Method:
366-
var cts = new CancellationTokenSource();
367-
cts.CancelAfter(TimeSpan.FromMilliseconds(ShortDelayMilliseconds));
368-
return cts;
369-
case CancelStrategy.Manual:
370-
cts = new();
371-
_ = Task.Run(async () =>
372-
{
373-
await Task.Delay(ShortDelayMilliseconds);
374-
// ReSharper disable once MethodHasAsyncOverload - TFM-dependent
375-
cts.Cancel();
376-
});
377-
return cts;
378-
default:
379-
throw new ArgumentOutOfRangeException(nameof(strategy));
418+
await pending;
419+
Assert.Fail(ExpectedCancel + ": " + watch.ElapsedMilliseconds + "ms");
420+
}
421+
catch (OperationCanceledException oce)
422+
{
423+
// Expected if cancellation happens during operation
424+
Log($"Cancelled after {watch.ElapsedMilliseconds}ms");
425+
Assert.Equal(cts.Token, oce.CancellationToken);
380426
}
381427
}
428+
}
429+
430+
[Theory]
431+
[InlineData(CancelStrategy.Constructor)]
432+
[InlineData(CancelStrategy.Method)]
433+
[InlineData(CancelStrategy.Manual)]
434+
public void CancellationDuringOperation_Sync_CancelsGracefully(CancelStrategy strategy)
435+
{
436+
using var conn = Create();
437+
var db = conn.GetDatabase();
382438

383439
var watch = Stopwatch.StartNew();
384440
Pause(db);
@@ -389,16 +445,16 @@ static CancellationTokenSource CreateCts(CancelStrategy strategy)
389445
using (db.Multiplexer.WithCancellation(cts.Token))
390446
{
391447
// Start an operation and cancel it mid-flight
392-
var pending = db.StringSetAsync($"{Me()}:{strategy}", "value");
393-
394448
try
395449
{
396-
await pending;
397-
Skip.Inconclusive(TooFast + ": " + watch.ElapsedMilliseconds + "ms");
450+
db.StringSet($"{Me()}:{strategy}", "value");
451+
Assert.Fail(ExpectedCancel + ": " + watch.ElapsedMilliseconds + "ms");
398452
}
399-
catch (OperationCanceledException oce) when (oce.CancellationToken == cts.Token)
453+
catch (OperationCanceledException oce)
400454
{
401455
// Expected if cancellation happens during operation
456+
Log($"Cancelled after {watch.ElapsedMilliseconds}ms");
457+
Assert.Equal(cts.Token, oce.CancellationToken);
402458
}
403459
}
404460
}

0 commit comments

Comments
 (0)