Skip to content

Commit f52b3a2

Browse files
authored
Merge pull request #82 from leancodepl/feature/improve-funnel-publishing-consumers-registration
Add more ways for funnel publishing consumers registration
2 parents becc746 + de64993 commit f52b3a2

File tree

5 files changed

+112
-19
lines changed

5 files changed

+112
-19
lines changed

publisher/src/Funnel/Instance/RegistrationConfiguratorExtensions.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public static class RegistrationConfiguratorExtensions
1616
/// </summary>
1717
/// <param name="configurator">MassTransit bus registration configurator.</param>
1818
/// <param name="configureHubLifetimeOptions">Configures hub lifetime.</param>
19-
public static void ConfigureLeanPipeFunnelConsumers(
19+
public static IRegistrationConfigurator ConfigureLeanPipeFunnelConsumers(
2020
this IRegistrationConfigurator configurator,
2121
Action<IHubLifetimeManagerOptions<LeanPipeSubscriber>>? configureHubLifetimeOptions = null
2222
)
@@ -41,6 +41,8 @@ public static void ConfigureLeanPipeFunnelConsumers(
4141
GroupConsumerDefinition<LeanPipeSubscriber>
4242
>();
4343
configurator.AddConsumer<GroupConsumer<LeanPipeSubscriber>>();
44+
45+
return configurator;
4446
}
4547

4648
private static MassTransitHubLifetimeManager<LeanPipeSubscriber> GetMassTransitHubLifetimeManager(

publisher/src/Funnel/Instance/ServiceCollectionExtensions.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ public static class ServiceCollectionExtensions
77
/// <summary>
88
/// Adds SignalR and other services required for the Funnel.
99
/// </summary>
10-
public static void AddLeanPipeFunnel(
10+
public static IServiceCollection AddLeanPipeFunnel(
1111
this IServiceCollection services,
1212
FunnelConfiguration? config = null
1313
)
@@ -22,5 +22,7 @@ public static void AddLeanPipeFunnel(
2222

2323
services.AddSingleton(config ?? FunnelConfiguration.Default);
2424
services.AddTransient<ISubscriptionExecutor, FunnelSubscriptionExecutor>();
25+
26+
return services;
2527
}
2628
}

publisher/src/Funnel/Publishing/RegistrationConfiguratorExtensions.cs

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Reflection;
2+
using LeanCode.Components;
23
using LeanCode.Contracts;
34
using MassTransit;
45
using MassTransit.Internals;
@@ -8,18 +9,42 @@ namespace LeanCode.Pipe.Funnel.Publishing;
89

910
public static class RegistrationConfiguratorExtensions
1011
{
11-
/// <summary>
12-
/// Configures a consumer for each topic in the provided assemblies that will handle subscriptions
13-
/// in the LeanPipe with Funnel model.
14-
/// </summary>
15-
/// <param name="configurator">MassTransit bus registration configurator.</param>
16-
/// <param name="serviceName">Funnelled service name.</param>
12+
/// <inheritdoc cref="AddFunnelledLeanPipeConsumers(IRegistrationConfigurator,string,Type[],Type?)"/>
13+
/// <param name="leanPipeBuilder">Preconfigured LeanPipe services builder containing topics exposed by the service.</param>
14+
/// <remarks>This overload guarantees configuration of all consumers for each topics in the builder.</remarks>
15+
public static IRegistrationConfigurator AddFunnelledLeanPipeConsumers(
16+
this IRegistrationConfigurator configurator,
17+
string serviceName,
18+
LeanPipeServicesBuilder leanPipeBuilder,
19+
Type? funnelledSubscriberDefinitionOverride = null
20+
)
21+
{
22+
return configurator.AddFunnelledLeanPipeConsumers(
23+
serviceName,
24+
leanPipeBuilder.Topics,
25+
funnelledSubscriberDefinitionOverride
26+
);
27+
}
28+
29+
/// <inheritdoc cref="AddFunnelledLeanPipeConsumers(IRegistrationConfigurator,string,Type[],Type?)"/>
30+
/// <param name="topics">Catalog of topics exposed by the service.</param>
31+
public static IRegistrationConfigurator AddFunnelledLeanPipeConsumers(
32+
this IRegistrationConfigurator configurator,
33+
string serviceName,
34+
TypesCatalog topics,
35+
Type? funnelledSubscriberDefinitionOverride = null
36+
)
37+
{
38+
return configurator.AddFunnelledLeanPipeConsumers(
39+
serviceName,
40+
topics.Assemblies,
41+
funnelledSubscriberDefinitionOverride
42+
);
43+
}
44+
45+
/// <inheritdoc cref="AddFunnelledLeanPipeConsumers(IRegistrationConfigurator,string,Type[],Type?)"/>
1746
/// <param name="assembliesWithTopics">Assemblies that contain all topics exposed by the service.</param>
18-
/// <param name="funnelledSubscriberDefinitionOverride">
19-
/// Optional definition override for subscriber consumers.
20-
/// Should inherit <see cref="FunnelledSubscriber{TTopic}"/> and be generic of the topics.
21-
/// </param>
22-
public static void AddFunnelledLeanPipeConsumers(
47+
public static IRegistrationConfigurator AddFunnelledLeanPipeConsumers(
2348
this IRegistrationConfigurator configurator,
2449
string serviceName,
2550
IEnumerable<Assembly> assembliesWithTopics,
@@ -35,14 +60,25 @@ public static void AddFunnelledLeanPipeConsumers(
3560
.FindTypes(TypeClassification.Closed | TypeClassification.Concrete)
3661
.ToArray();
3762

38-
configurator.AddFunnelledLeanPipeConsumers(
63+
return configurator.AddFunnelledLeanPipeConsumers(
3964
serviceName,
4065
types,
4166
funnelledSubscriberDefinitionOverride
4267
);
4368
}
4469

45-
public static void AddFunnelledLeanPipeConsumers(
70+
/// <summary>
71+
/// Configures a consumer for each topic in the provided assemblies that will handle subscriptions
72+
/// in the LeanPipe with Funnel model.
73+
/// </summary>
74+
/// <param name="configurator">MassTransit bus registration configurator.</param>
75+
/// <param name="serviceName">Funnelled service name.</param>
76+
/// <param name="topicTypes">All topics exposed by the service.</param>
77+
/// <param name="funnelledSubscriberDefinitionOverride">
78+
/// Optional definition override for subscriber consumers.
79+
/// Should inherit <see cref="FunnelledSubscriber{TTopic}"/> and be generic of the topics.
80+
/// </param>
81+
public static IRegistrationConfigurator AddFunnelledLeanPipeConsumers(
4682
this IRegistrationConfigurator configurator,
4783
string serviceName,
4884
Type[] topicTypes,
@@ -80,6 +116,8 @@ public static void AddFunnelledLeanPipeConsumers(
80116
var consumerDefinitionType = subscriberDefinition.MakeGenericType(topicType);
81117
configurator.AddConsumer(consumerType, consumerDefinitionType);
82118
}
119+
120+
return configurator;
83121
}
84122

85123
private static bool IsTopic(Type type)

publisher/src/LeanCode.Pipe/Extensions/LeanPipeServiceCollectionExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ TypesCatalog handlers
4242
public class LeanPipeServicesBuilder
4343
{
4444
public IServiceCollection Services { get; }
45+
public TypesCatalog Topics { get; private set; }
4546

4647
private JsonSerializerOptions? options;
47-
private TypesCatalog topics;
4848

4949
public LeanPipeServicesBuilder(IServiceCollection services, TypesCatalog topics)
5050
{
5151
Services = services;
52-
this.topics = topics;
52+
Topics = topics;
5353

5454
Services.AddSingleton<ITopicExtractor>(new DefaultTopicExtractor(topics, null));
5555
}
@@ -78,7 +78,7 @@ public LeanPipeServicesBuilder WithEnvelopeDeserializer(ITopicExtractor deserial
7878
/// </summary>
7979
public LeanPipeServicesBuilder AddTopics(TypesCatalog newTopics)
8080
{
81-
topics = topics.Merge(newTopics);
81+
Topics = Topics.Merge(newTopics);
8282
ReplaceDefaultEnvelopeDeserializer();
8383
return this;
8484
}
@@ -118,7 +118,7 @@ private void ReplaceDefaultEnvelopeDeserializer()
118118
)
119119
{
120120
Services.RemoveAt(i);
121-
Services.AddSingleton<ITopicExtractor>(new DefaultTopicExtractor(topics, options));
121+
Services.AddSingleton<ITopicExtractor>(new DefaultTopicExtractor(Topics, options));
122122
break;
123123
}
124124
}

publisher/test/Funnel.Tests/LeanCode.Pipe.Funnel.Tests/Publishing/RegistrationConfiguratorExtensionsTests.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,57 @@ public void Registers_consumers_with_default_definition_for_all_correct_specifie
4646
}
4747
}
4848

49+
[Fact]
50+
public void Registers_consumers_using_TypesCatalog_overload()
51+
{
52+
var collection = new ServiceCollection();
53+
var configurator = new ServiceCollectionBusConfigurator(collection);
54+
configurator.AddFunnelledLeanPipeConsumers(ServiceName, ThisCatalog);
55+
56+
var expectedTypesForConsumers = new[] { typeof(Topic1), typeof(Topic2) };
57+
58+
foreach (var type in expectedTypesForConsumers)
59+
{
60+
configurator
61+
.Should()
62+
.ContainSingle(d =>
63+
d.ServiceType == typeof(FunnelledSubscriber<>).MakeGenericType(type)
64+
)
65+
.And.ContainSingle(d =>
66+
d.ServiceType == typeof(FunnelledSubscriberDefinition<>).MakeGenericType(type)
67+
);
68+
}
69+
}
70+
71+
[Fact]
72+
public void Registers_consumers_using_LeanPipeServicesBuilder_overload()
73+
{
74+
var collection = new ServiceCollection();
75+
var leanPipeBuilder = new LeanPipeServicesBuilder(collection, ThisCatalog);
76+
leanPipeBuilder.AddTopics(ExternalCatalog);
77+
var configurator = new ServiceCollectionBusConfigurator(collection);
78+
configurator.AddFunnelledLeanPipeConsumers(ServiceName, leanPipeBuilder);
79+
80+
var expectedTypesForConsumers = new[]
81+
{
82+
typeof(Topic1),
83+
typeof(Topic2),
84+
typeof(ExternalTopic),
85+
};
86+
87+
foreach (var type in expectedTypesForConsumers)
88+
{
89+
configurator
90+
.Should()
91+
.ContainSingle(d =>
92+
d.ServiceType == typeof(FunnelledSubscriber<>).MakeGenericType(type)
93+
)
94+
.And.ContainSingle(d =>
95+
d.ServiceType == typeof(FunnelledSubscriberDefinition<>).MakeGenericType(type)
96+
);
97+
}
98+
}
99+
49100
[Fact]
50101
public void Skips_registering_consumers_with_default_definition_for_incorrect_specified_topics()
51102
{

0 commit comments

Comments
 (0)