Skip to content

Commit 3ed3c8d

Browse files
committed
feat(Pub/Sub): Support WaitForProcessing and NackInmediately shutdown options.
Previous shutdown mechanism is now obsolete.
1 parent 94a52d6 commit 3ed3c8d

File tree

11 files changed

+1029
-179
lines changed

11 files changed

+1029
-179
lines changed

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using System.Threading;
2424
using System.Threading.Tasks;
2525
using Xunit;
26+
using static Google.Cloud.PubSub.V1.SubscriberClient;
2627

2728
// Tests create quite a few tasks that don't need awaiting.
2829
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
@@ -159,7 +160,12 @@ private async Task RunBulkMessagingImpl(
159160
{
160161
// Test finished, so stop subscriber
161162
Console.WriteLine("All msgs received, stopping subscriber.");
162-
Task unused = subscriber.StopAsync(TimeSpan.FromSeconds(15));
163+
var shutdownOptions = new ShutdownOptions
164+
{
165+
Mode = ShutdownMode.NackImmediately,
166+
Timeout = TimeSpan.FromSeconds(15)
167+
};
168+
Task unused = subscriber.StopAsync(shutdownOptions);
163169
}
164170
}
165171
else
@@ -194,11 +200,15 @@ private async Task RunBulkMessagingImpl(
194200
{
195201
if (noProgressCount > 60)
196202
{
197-
// Deadlock, shutdown subscriber, and cancel
198-
Console.WriteLine("Deadlock detected. Cancelling test");
199-
subscriber.StopAsync(new CancellationToken(true));
200-
watchdogCts.Cancel();
201-
break;
203+
// Deadlock, shutdown subscriber, and cancel
204+
Console.WriteLine("Deadlock detected. Cancelling test");
205+
var shutdownOptions = new ShutdownOptions
206+
{
207+
Mode = ShutdownMode.NackImmediately
208+
};
209+
subscriber.StopAsync(shutdownOptions, cancellationToken: new CancellationToken(true));
210+
watchdogCts.Cancel();
211+
break;
202212
}
203213
noProgressCount += 1;
204214
}
@@ -431,7 +441,12 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre
431441
});
432442
await Task.Delay(subscriberLifetime);
433443
Console.WriteLine("Stopping subscriber");
434-
Task stopTask = subscriber.StopAsync(TimeSpan.FromSeconds(15));
444+
var shutdownOptions = new ShutdownOptions
445+
{
446+
Mode = ShutdownMode.NackImmediately,
447+
Timeout = TimeSpan.FromSeconds(15)
448+
};
449+
Task stopTask = subscriber.StopAsync(shutdownOptions);
435450
// If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail.
436451
await Task.WhenAll(subscribeTask, stopTask);
437452
int recvCount = recvedMsgs.Locked(() => recvedMsgs.Count);
@@ -538,8 +553,13 @@ await subscriberApi.IAMPolicyClient.SetIamPolicyAsync(new SetIamPolicyRequest
538553
{
539554
result.Add((msg.GetDeliveryAttempt(), true));
540555
// Received DLQ message, so stop test.
541-
sub.StopAsync(TimeSpan.FromSeconds(10));
542-
dlqSub.StopAsync(TimeSpan.FromSeconds(10));
556+
var shutdownOptions = new ShutdownOptions
557+
{
558+
Mode = ShutdownMode.NackImmediately,
559+
Timeout = TimeSpan.FromSeconds(10)
560+
};
561+
sub.StopAsync(shutdownOptions);
562+
dlqSub.StopAsync(shutdownOptions);
543563
return Task.FromResult(SubscriberClient.Reply.Ack);
544564
});
545565

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ Task Subscribe()
133133
if (recvCount == inputLines.Count)
134134
{
135135
Console.WriteLine("Received all messages, shutting down");
136-
var dummyTask = sub.StopAsync(CancellationToken.None);
136+
var dummyTask = sub.StopAsync(new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately});
137137
}
138138
}
139139
if (rnd.Next(3) == 0)

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,11 @@ await _subscriberClient.StartAsync((msg, token) =>
142142
return Task.FromResult(SubscriberClient.Reply.Ack);
143143
});
144144

145-
public override async Task StopAsync(CancellationToken stoppingToken) =>
146-
await _subscriberClient.StopAsync(stoppingToken);
145+
public override async Task StopAsync(CancellationToken stoppingToken)
146+
{
147+
var shutdownOptions = new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately};
148+
await _subscriberClient.StopAsync(shutdownOptions, cancellationToken: stoppingToken);
149+
}
147150
}
148151
// End sample
149152
}

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ await subscriber.StartAsync((msg, cancellationToken) =>
129129
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
130130
// Stop this subscriber after one message is received.
131131
// This is non-blocking, and the returned Task may be awaited.
132-
subscriber.StopAsync(TimeSpan.FromSeconds(15));
132+
subscriber.StopAsync(new SubscriberClient.ShutdownOptions
133+
{
134+
Mode = SubscriberClient.ShutdownMode.NackImmediately,
135+
Timeout =TimeSpan.FromSeconds(15)
136+
});
133137
// Return Reply.Ack to indicate this message has been handled.
134138
return Task.FromResult(SubscriberClient.Reply.Ack);
135139
});

0 commit comments

Comments
 (0)