Skip to content

Commit 9d20941

Browse files
committed
intermediate, investigating weird connection oddity
1 parent ae771f2 commit 9d20941

19 files changed

+857
-754
lines changed

StackExchange.Redis.sln

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleTest", "tests\Consol
117117
EndProject
118118
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleTestBaseline", "tests\ConsoleTestBaseline\ConsoleTestBaseline.csproj", "{69A0ACF2-DF1F-4F49-B554-F732DCA938A3}"
119119
EndProject
120-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleCancellationDemo", "tests\SimpleCancellationDemo\SimpleCancellationDemo.csproj", "{368A06AA-9DEB-4C55-B9CF-A1070B85E502}"
121-
EndProject
122120
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "docs", "docs\docs.csproj", "{64CF03B6-6B29-4C4C-88B8-7B9E317D631A}"
123121
EndProject
124122
Global
@@ -171,10 +169,6 @@ Global
171169
{69A0ACF2-DF1F-4F49-B554-F732DCA938A3}.Debug|Any CPU.Build.0 = Debug|Any CPU
172170
{69A0ACF2-DF1F-4F49-B554-F732DCA938A3}.Release|Any CPU.ActiveCfg = Release|Any CPU
173171
{69A0ACF2-DF1F-4F49-B554-F732DCA938A3}.Release|Any CPU.Build.0 = Release|Any CPU
174-
{368A06AA-9DEB-4C55-B9CF-A1070B85E502}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
175-
{368A06AA-9DEB-4C55-B9CF-A1070B85E502}.Debug|Any CPU.Build.0 = Debug|Any CPU
176-
{368A06AA-9DEB-4C55-B9CF-A1070B85E502}.Release|Any CPU.ActiveCfg = Release|Any CPU
177-
{368A06AA-9DEB-4C55-B9CF-A1070B85E502}.Release|Any CPU.Build.0 = Release|Any CPU
178172
{64CF03B6-6B29-4C4C-88B8-7B9E317D631A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
179173
{64CF03B6-6B29-4C4C-88B8-7B9E317D631A}.Debug|Any CPU.Build.0 = Debug|Any CPU
180174
{64CF03B6-6B29-4C4C-88B8-7B9E317D631A}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -200,7 +194,6 @@ Global
200194
{A9F81DA3-DA82-423E-A5DD-B11C37548E06} = {96E891CD-2ED7-4293-A7AB-4C6F5D8D2B05}
201195
{A0F89B8B-32A3-4C28-8F1B-ADE343F16137} = {73A5C363-CA1F-44C4-9A9B-EF791A76BA6A}
202196
{69A0ACF2-DF1F-4F49-B554-F732DCA938A3} = {73A5C363-CA1F-44C4-9A9B-EF791A76BA6A}
203-
{368A06AA-9DEB-4C55-B9CF-A1070B85E502} = {73A5C363-CA1F-44C4-9A9B-EF791A76BA6A}
204197
EndGlobalSection
205198
GlobalSection(ExtensibilityGlobals) = postSolution
206199
SolutionGuid = {193AA352-6748-47C1-A5FC-C9AA6B5F000B}

src/StackExchange.Redis/ChannelMessageQueue.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ private async Task OnMessageSyncImpl()
184184
catch (ChannelClosedException) { break; } // expected
185185
catch (Exception ex)
186186
{
187-
_parent?.multiplexer?.OnInternalError(ex);
187+
_parent?.Multiplexer?.OnInternalError(ex);
188188
break;
189189
}
190190

@@ -305,7 +305,7 @@ private async Task OnMessageAsyncImpl()
305305
catch (ChannelClosedException) { break; } // expected
306306
catch (Exception ex)
307307
{
308-
_parent?.multiplexer?.OnInternalError(ex);
308+
_parent?.Multiplexer?.OnInternalError(ex);
309309
break;
310310
}
311311

src/StackExchange.Redis/CursorEnumerable.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ private void ThrowTimeout(Message message)
246246
{
247247
try
248248
{
249-
throw ExceptionFactory.Timeout(parent.redis.multiplexer, null, message, parent.server);
249+
throw ExceptionFactory.Timeout(parent.redis.Multiplexer, null, message, parent.server);
250250
}
251251
catch (Exception ex)
252252
{

src/StackExchange.Redis/Exceptions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.ComponentModel;
3+
using System.Diagnostics;
34
using System.Runtime.Serialization;
45

56
namespace StackExchange.Redis
@@ -106,6 +107,7 @@ public RedisConnectionException(ConnectionFailureType failureType, string messag
106107
/// <param name="commandStatus">The status of the command.</param>
107108
public RedisConnectionException(ConnectionFailureType failureType, string message, Exception? innerException, CommandStatus commandStatus) : base(message, innerException)
108109
{
110+
Debug.WriteLine($"{failureType}: {message}");
109111
FailureType = failureType;
110112
CommandStatus = commandStatus;
111113
}

src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#nullable enable
22
StackExchange.Redis.RedisCancellationExtensions
3-
static StackExchange.Redis.RedisCancellationExtensions.GetEffectiveCancellationToken(this StackExchange.Redis.IConnectionMultiplexer! redis) -> System.Threading.CancellationToken
43
static StackExchange.Redis.RedisCancellationExtensions.WithCancellation(this StackExchange.Redis.IConnectionMultiplexer! redis, System.Threading.CancellationToken cancellationToken) -> System.IDisposable!
54
static StackExchange.Redis.RedisCancellationExtensions.WithCancellationAndTimeout(this StackExchange.Redis.IConnectionMultiplexer! redis, System.Threading.CancellationToken cancellationToken, int milliseconds) -> System.IDisposable!
65
static StackExchange.Redis.RedisCancellationExtensions.WithCancellationAndTimeout(this StackExchange.Redis.IConnectionMultiplexer! redis, System.Threading.CancellationToken cancellationToken, System.TimeSpan timeout) -> System.IDisposable!

src/StackExchange.Redis/RedisBase.cs

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,21 @@
55

66
namespace StackExchange.Redis
77
{
8-
internal abstract partial class RedisBase : IRedis
8+
internal abstract class RedisBase : IRedis
99
{
1010
internal static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
11-
internal readonly ConnectionMultiplexer multiplexer;
12-
protected readonly object? asyncState;
11+
internal readonly ConnectionMultiplexer Multiplexer;
12+
internal readonly object? AsyncState;
1313

1414
internal RedisBase(ConnectionMultiplexer multiplexer, object? asyncState)
1515
{
16-
this.multiplexer = multiplexer;
17-
this.asyncState = asyncState;
16+
Multiplexer = multiplexer;
17+
AsyncState = asyncState;
1818
}
1919

20-
IConnectionMultiplexer IRedisAsync.Multiplexer => multiplexer;
20+
IConnectionMultiplexer IRedisAsync.Multiplexer => Multiplexer;
2121

22-
internal CancellationToken GetEffectiveCancellationToken() => multiplexer.GetEffectiveCancellationToken();
22+
internal CancellationToken GetEffectiveCancellationToken() => Multiplexer.GetEffectiveCancellationToken();
2323

2424
public virtual TimeSpan Ping(CommandFlags flags = CommandFlags.None)
2525
{
@@ -33,48 +33,48 @@ public virtual Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
3333
return ExecuteAsync(msg, ResultProcessor.ResponseTimer);
3434
}
3535

36-
public override string ToString() => multiplexer.ToString();
36+
public override string ToString() => Multiplexer.ToString();
3737

38-
public bool TryWait(Task task) => task.Wait(multiplexer.TimeoutMilliseconds);
38+
public bool TryWait(Task task) => task.Wait(Multiplexer.TimeoutMilliseconds);
3939

40-
public void Wait(Task task) => multiplexer.Wait(task);
40+
public void Wait(Task task) => Multiplexer.Wait(task);
4141

42-
public T Wait<T>(Task<T> task) => multiplexer.Wait(task);
42+
public T Wait<T>(Task<T> task) => Multiplexer.Wait(task);
4343

44-
public void WaitAll(params Task[] tasks) => multiplexer.WaitAll(tasks);
44+
public void WaitAll(params Task[] tasks) => Multiplexer.WaitAll(tasks);
4545

4646
internal virtual Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, T defaultValue, ServerEndPoint? server = null)
4747
{
48-
if (message is null) return CompletedTask<T>.FromDefault(defaultValue, asyncState);
49-
multiplexer.CheckMessage(message);
48+
if (message is null) return CompletedTask<T>.FromDefault(defaultValue, AsyncState);
49+
Multiplexer.CheckMessage(message);
5050

5151
// The message already captures the ambient cancellation token when it was created,
5252
// so we don't need to pass it again. This ensures resent messages preserve their original cancellation context.
53-
return multiplexer.ExecuteAsyncImpl<T>(message, processor, asyncState, server, defaultValue);
53+
return Multiplexer.ExecuteAsyncImpl<T>(message, processor, AsyncState, server, defaultValue);
5454
}
5555

5656
internal virtual Task<T?> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null)
5757
{
58-
if (message is null) return CompletedTask<T>.Default(asyncState);
59-
multiplexer.CheckMessage(message);
58+
if (message is null) return CompletedTask<T>.Default(AsyncState);
59+
Multiplexer.CheckMessage(message);
6060

6161
// The message already captures the ambient cancellation token when it was created,
6262
// so we don't need to pass it again. This ensures resent messages preserve their original cancellation context.
63-
return multiplexer.ExecuteAsyncImpl<T>(message, processor, asyncState, server);
63+
return Multiplexer.ExecuteAsyncImpl<T>(message, processor, AsyncState, server);
6464
}
6565

6666
[return: NotNullIfNotNull("defaultValue")]
6767
internal virtual T? ExecuteSync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null, T? defaultValue = default)
6868
{
6969
if (message is null) return defaultValue; // no-op
70-
multiplexer.CheckMessage(message);
71-
return multiplexer.ExecuteSyncImpl<T>(message, processor, server, defaultValue);
70+
Multiplexer.CheckMessage(message);
71+
return Multiplexer.ExecuteSyncImpl<T>(message, processor, server, defaultValue);
7272
}
7373

7474
internal virtual RedisFeatures GetFeatures(in RedisKey key, CommandFlags flags, RedisCommand command, out ServerEndPoint? server)
7575
{
76-
server = multiplexer.SelectServer(command, flags, key);
77-
var version = server == null ? multiplexer.RawConfig.DefaultVersion : server.Version;
76+
server = Multiplexer.SelectServer(command, flags, key);
77+
var version = server == null ? Multiplexer.RawConfig.DefaultVersion : server.Version;
7878
return new RedisFeatures(version);
7979
}
8080

@@ -118,7 +118,7 @@ protected static void WhenAlwaysOrNotExists(When when)
118118
private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlags flags)
119119
{
120120
// do the best we can with available commands
121-
var map = multiplexer.CommandMap;
121+
var map = Multiplexer.CommandMap;
122122
var cancellationToken = GetEffectiveCancellationToken();
123123
if (map.IsAvailable(RedisCommand.PING))
124124
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.PING, default, cancellationToken);
@@ -128,7 +128,7 @@ private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlag
128128
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.ECHO, RedisLiterals.PING, cancellationToken);
129129
// as our fallback, we'll do something odd... we'll treat a key like a value, out of sheer desperation
130130
// note: this usually means: twemproxy/envoyproxy - in which case we're fine anyway, since the proxy does the routing
131-
return ResultProcessor.TimingProcessor.CreateMessage(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId, cancellationToken);
131+
return ResultProcessor.TimingProcessor.CreateMessage(0, flags, RedisCommand.EXISTS, (RedisValue)Multiplexer.UniqueId, cancellationToken);
132132
}
133133

134134
internal static class CursorUtils

src/StackExchange.Redis/RedisBatch.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ internal class RedisBatch : RedisDatabase, IBatch
88
{
99
private List<Message>? pending;
1010

11-
public RedisBatch(RedisDatabase wrapped, object? asyncState) : base(wrapped.multiplexer, wrapped.Database, asyncState ?? wrapped.AsyncState) { }
11+
public RedisBatch(RedisDatabase wrapped, object? asyncState) : base(wrapped.Multiplexer, wrapped.Database, asyncState ?? wrapped.AsyncState) { }
1212

1313
public void Execute()
1414
{
@@ -24,17 +24,17 @@ public void Execute()
2424
List<Message>? lastList = null;
2525
foreach (var message in snapshot)
2626
{
27-
var server = multiplexer.SelectServer(message);
27+
var server = Multiplexer.SelectServer(message);
2828
if (server == null)
2929
{
30-
FailNoServer(multiplexer, snapshot);
31-
throw ExceptionFactory.NoConnectionAvailable(multiplexer, message, server);
30+
FailNoServer(Multiplexer, snapshot);
31+
throw ExceptionFactory.NoConnectionAvailable(Multiplexer, message, server);
3232
}
3333
var bridge = server.GetBridge(message);
3434
if (bridge == null)
3535
{
36-
FailNoServer(multiplexer, snapshot);
37-
throw ExceptionFactory.NoConnectionAvailable(multiplexer, message, server);
36+
FailNoServer(Multiplexer, snapshot);
37+
throw ExceptionFactory.NoConnectionAvailable(Multiplexer, message, server);
3838
}
3939

4040
// identity a list
@@ -58,15 +58,15 @@ public void Execute()
5858
{
5959
if (!pair.Key.TryEnqueue(pair.Value, pair.Key.ServerEndPoint.IsReplica))
6060
{
61-
FailNoServer(multiplexer, pair.Value);
61+
FailNoServer(Multiplexer, pair.Value);
6262
}
6363
}
6464
}
6565

6666
internal override Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, T defaultValue, ServerEndPoint? server = null)
6767
{
68-
if (message == null) return CompletedTask<T>.FromDefault(defaultValue, asyncState);
69-
multiplexer.CheckMessage(message);
68+
if (message == null) return CompletedTask<T>.FromDefault(defaultValue, ((RedisBase)this).AsyncState);
69+
Multiplexer.CheckMessage(message);
7070

7171
// prepare the inner command as a task
7272
Task<T> task;
@@ -76,7 +76,7 @@ internal override Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>?
7676
}
7777
else
7878
{
79-
var source = TaskResultBox<T>.Create(GetEffectiveCancellationToken(), out var tcs, asyncState);
79+
var source = TaskResultBox<T>.Create(GetEffectiveCancellationToken(), out var tcs, ((RedisBase)this).AsyncState);
8080
task = tcs.Task;
8181
message.SetSource(source, processor);
8282
}
@@ -88,8 +88,8 @@ internal override Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>?
8888

8989
internal override Task<T?> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null) where T : default
9090
{
91-
if (message == null) return CompletedTask<T>.Default(asyncState);
92-
multiplexer.CheckMessage(message);
91+
if (message == null) return CompletedTask<T>.Default(((RedisBase)this).AsyncState);
92+
Multiplexer.CheckMessage(message);
9393

9494
// prepare the inner command as a task
9595
Task<T?> task;
@@ -99,7 +99,7 @@ internal override Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>?
9999
}
100100
else
101101
{
102-
var source = TaskResultBox<T?>.Create(message.CancellationToken, out var tcs, asyncState);
102+
var source = TaskResultBox<T?>.Create(message.CancellationToken, out var tcs, ((RedisBase)this).AsyncState);
103103
task = tcs.Task;
104104
message.SetSource(source!, processor);
105105
}

src/StackExchange.Redis/RedisCancellationExtensions.cs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public static IDisposable WithCancellationAndTimeout(
112112
/// combining any ambient cancellation token and timeout.
113113
/// </summary>
114114
/// <returns>The effective cancellation token, or CancellationToken.None if no ambient context is set.</returns>
115-
public static CancellationToken GetEffectiveCancellationToken(this IConnectionMultiplexer redis)
115+
internal static CancellationToken GetEffectiveCancellationToken(this IConnectionMultiplexer redis, bool checkForCancellation = true)
116116
{
117117
var scope = _context.Value;
118118

@@ -126,7 +126,11 @@ public static CancellationToken GetEffectiveCancellationToken(this IConnectionMu
126126
if (fromScope is not null && fromScope.Equals(redis))
127127
{
128128
var token = scope.Token;
129-
token.ThrowIfCancellationRequested();
129+
if (checkForCancellation)
130+
{
131+
token.ThrowIfCancellationRequested();
132+
}
133+
130134
return token;
131135
}
132136
scope = scope.Previous;
@@ -137,10 +141,30 @@ public static CancellationToken GetEffectiveCancellationToken(this IConnectionMu
137141
}
138142

139143
/// <summary>
140-
/// Gets the current cancellation context for diagnostic purposes.
144+
/// Gets the current cancellation scope for diagnostic purposes.
141145
/// </summary>
142-
/// <returns>The current context, or null if no ambient context is set.</returns>
143-
internal static object? GetCurrentScope() => _context.Value;
146+
/// <returns>The current scope, or null if no ambient context is set.</returns>
147+
internal static object? GetCurrentScope(this IConnectionMultiplexer redis)
148+
{
149+
var scope = _context.Value;
150+
151+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract - trust no-one
152+
if (redis is not null)
153+
{
154+
// check for the top scope *that relates to this redis instance*
155+
while (scope is not null)
156+
{
157+
var scopeMuxer = scope.Target; // need to null-check because weak-ref / GC
158+
if (scopeMuxer is not null && scopeMuxer.Equals(redis))
159+
{
160+
return scope;
161+
}
162+
scope = scope.Previous;
163+
}
164+
}
165+
166+
return null;
167+
}
144168

145169
private static readonly AsyncLocal<CancellationScope?> _context = new();
146170

0 commit comments

Comments
 (0)