Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public static class RegistrationConfiguratorExtensions
/// </summary>
/// <param name="configurator">MassTransit bus registration configurator.</param>
/// <param name="configureHubLifetimeOptions">Configures hub lifetime.</param>
public static void ConfigureLeanPipeFunnelConsumers(
public static IRegistrationConfigurator ConfigureLeanPipeFunnelConsumers(
this IRegistrationConfigurator configurator,
Action<IHubLifetimeManagerOptions<LeanPipeSubscriber>>? configureHubLifetimeOptions = null
)
Expand All @@ -41,6 +41,8 @@ public static void ConfigureLeanPipeFunnelConsumers(
GroupConsumerDefinition<LeanPipeSubscriber>
>();
configurator.AddConsumer<GroupConsumer<LeanPipeSubscriber>>();

return configurator;
}

private static MassTransitHubLifetimeManager<LeanPipeSubscriber> GetMassTransitHubLifetimeManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public static class ServiceCollectionExtensions
/// <summary>
/// Adds SignalR and other services required for the Funnel.
/// </summary>
public static void AddLeanPipeFunnel(
public static IServiceCollection AddLeanPipeFunnel(
this IServiceCollection services,
FunnelConfiguration? config = null
)
Expand All @@ -22,5 +22,7 @@ public static void AddLeanPipeFunnel(

services.AddSingleton(config ?? FunnelConfiguration.Default);
services.AddTransient<ISubscriptionExecutor, FunnelSubscriptionExecutor>();

return services;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to explain in the docstrings here when one might want to use each of those AddFunnelledLeanPipeConsumers overloads

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note that hints what is the default overload

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Reflection;
using LeanCode.Components;
using LeanCode.Contracts;
using MassTransit;
using MassTransit.Internals;
Expand All @@ -8,18 +9,42 @@ namespace LeanCode.Pipe.Funnel.Publishing;

public static class RegistrationConfiguratorExtensions
{
/// <summary>
/// Configures a consumer for each topic in the provided assemblies that will handle subscriptions
/// in the LeanPipe with Funnel model.
/// </summary>
/// <param name="configurator">MassTransit bus registration configurator.</param>
/// <param name="serviceName">Funnelled service name.</param>
/// <inheritdoc cref="AddFunnelledLeanPipeConsumers(IRegistrationConfigurator,string,Type[],Type?)"/>
/// <param name="leanPipeBuilder">Preconfigured LeanPipe services builder containing topics exposed by the service.</param>
/// <remarks>This overload guarantees configuration of all consumers for each topics in the builder.</remarks>
public static IRegistrationConfigurator AddFunnelledLeanPipeConsumers(
this IRegistrationConfigurator configurator,
string serviceName,
LeanPipeServicesBuilder leanPipeBuilder,
Type? funnelledSubscriberDefinitionOverride = null
)
{
return configurator.AddFunnelledLeanPipeConsumers(
serviceName,
leanPipeBuilder.Topics,
funnelledSubscriberDefinitionOverride
);
}

/// <inheritdoc cref="AddFunnelledLeanPipeConsumers(IRegistrationConfigurator,string,Type[],Type?)"/>
/// <param name="topics">Catalog of topics exposed by the service.</param>
public static IRegistrationConfigurator AddFunnelledLeanPipeConsumers(
this IRegistrationConfigurator configurator,
string serviceName,
TypesCatalog topics,
Type? funnelledSubscriberDefinitionOverride = null
)
{
return configurator.AddFunnelledLeanPipeConsumers(
serviceName,
topics.Assemblies,
funnelledSubscriberDefinitionOverride
);
}

/// <inheritdoc cref="AddFunnelledLeanPipeConsumers(IRegistrationConfigurator,string,Type[],Type?)"/>
/// <param name="assembliesWithTopics">Assemblies that contain all topics exposed by the service.</param>
/// <param name="funnelledSubscriberDefinitionOverride">
/// Optional definition override for subscriber consumers.
/// Should inherit <see cref="FunnelledSubscriber{TTopic}"/> and be generic of the topics.
/// </param>
public static void AddFunnelledLeanPipeConsumers(
public static IRegistrationConfigurator AddFunnelledLeanPipeConsumers(
this IRegistrationConfigurator configurator,
string serviceName,
IEnumerable<Assembly> assembliesWithTopics,
Expand All @@ -35,14 +60,25 @@ public static void AddFunnelledLeanPipeConsumers(
.FindTypes(TypeClassification.Closed | TypeClassification.Concrete)
.ToArray();

configurator.AddFunnelledLeanPipeConsumers(
return configurator.AddFunnelledLeanPipeConsumers(
serviceName,
types,
funnelledSubscriberDefinitionOverride
);
}

public static void AddFunnelledLeanPipeConsumers(
/// <summary>
/// Configures a consumer for each topic in the provided assemblies that will handle subscriptions
/// in the LeanPipe with Funnel model.
/// </summary>
/// <param name="configurator">MassTransit bus registration configurator.</param>
/// <param name="serviceName">Funnelled service name.</param>
/// <param name="topicTypes">All topics exposed by the service.</param>
/// <param name="funnelledSubscriberDefinitionOverride">
/// Optional definition override for subscriber consumers.
/// Should inherit <see cref="FunnelledSubscriber{TTopic}"/> and be generic of the topics.
/// </param>
public static IRegistrationConfigurator AddFunnelledLeanPipeConsumers(
this IRegistrationConfigurator configurator,
string serviceName,
Type[] topicTypes,
Expand Down Expand Up @@ -80,6 +116,8 @@ public static void AddFunnelledLeanPipeConsumers(
var consumerDefinitionType = subscriberDefinition.MakeGenericType(topicType);
configurator.AddConsumer(consumerType, consumerDefinitionType);
}

return configurator;
}

private static bool IsTopic(Type type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ TypesCatalog handlers
public class LeanPipeServicesBuilder
{
public IServiceCollection Services { get; }
public TypesCatalog Topics { get; private set; }

private JsonSerializerOptions? options;
private TypesCatalog topics;

public LeanPipeServicesBuilder(IServiceCollection services, TypesCatalog topics)
{
Services = services;
this.topics = topics;
Topics = topics;

Services.AddSingleton<ITopicExtractor>(new DefaultTopicExtractor(topics, null));
}
Expand Down Expand Up @@ -78,7 +78,7 @@ public LeanPipeServicesBuilder WithEnvelopeDeserializer(ITopicExtractor deserial
/// </summary>
public LeanPipeServicesBuilder AddTopics(TypesCatalog newTopics)
{
topics = topics.Merge(newTopics);
Topics = Topics.Merge(newTopics);
ReplaceDefaultEnvelopeDeserializer();
return this;
}
Expand Down Expand Up @@ -118,7 +118,7 @@ private void ReplaceDefaultEnvelopeDeserializer()
)
{
Services.RemoveAt(i);
Services.AddSingleton<ITopicExtractor>(new DefaultTopicExtractor(topics, options));
Services.AddSingleton<ITopicExtractor>(new DefaultTopicExtractor(Topics, options));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,57 @@ public void Registers_consumers_with_default_definition_for_all_correct_specifie
}
}

[Fact]
public void Registers_consumers_using_TypesCatalog_overload()
{
var collection = new ServiceCollection();
var configurator = new ServiceCollectionBusConfigurator(collection);
configurator.AddFunnelledLeanPipeConsumers(ServiceName, ThisCatalog);

var expectedTypesForConsumers = new[] { typeof(Topic1), typeof(Topic2) };

foreach (var type in expectedTypesForConsumers)
{
configurator
.Should()
.ContainSingle(d =>
d.ServiceType == typeof(FunnelledSubscriber<>).MakeGenericType(type)
)
.And.ContainSingle(d =>
d.ServiceType == typeof(FunnelledSubscriberDefinition<>).MakeGenericType(type)
);
}
}

[Fact]
public void Registers_consumers_using_LeanPipeServicesBuilder_overload()
{
var collection = new ServiceCollection();
var leanPipeBuilder = new LeanPipeServicesBuilder(collection, ThisCatalog);
leanPipeBuilder.AddTopics(ExternalCatalog);
var configurator = new ServiceCollectionBusConfigurator(collection);
configurator.AddFunnelledLeanPipeConsumers(ServiceName, leanPipeBuilder);

var expectedTypesForConsumers = new[]
{
typeof(Topic1),
typeof(Topic2),
typeof(ExternalTopic),
};

foreach (var type in expectedTypesForConsumers)
{
configurator
.Should()
.ContainSingle(d =>
d.ServiceType == typeof(FunnelledSubscriber<>).MakeGenericType(type)
)
.And.ContainSingle(d =>
d.ServiceType == typeof(FunnelledSubscriberDefinition<>).MakeGenericType(type)
);
}
}

[Fact]
public void Skips_registering_consumers_with_default_definition_for_incorrect_specified_topics()
{
Expand Down