A lightweight mediator + messaging library for .NET 10. Like MediatR and MassTransit, but without the licensing drama.
| Package | Description |
|---|---|
Conduit |
In-process mediator — request/response, notifications, streaming, pipeline behaviors, causality tracking |
Conduit.Messaging |
Cross-process messaging abstractions — publisher/consumer interfaces, serialization, pipeline context bridge, in-memory transport |
Conduit.Messaging.RabbitMq |
RabbitMQ transport provider for Conduit.Messaging |
# Mediator only
dotnet add package Conduit
# Messaging (includes Conduit core)
dotnet add package Conduit.Messaging
# RabbitMQ transport
dotnet add package Conduit.Messaging.RabbitMqpublic record Ping(string Message) : IRequest<Pong>;
public record Pong(string Reply);
public class PingHandler : IRequestHandler<Ping, Pong>
{
public Task<Pong> Handle(Ping request, CancellationToken cancellationToken)
{
return Task.FromResult(new Pong($"Pong: {request.Message}"));
}
}services.AddMediator(cfg => cfg.RegisterServicesFromAssemblyContaining<Program>());var dispatcher = serviceProvider.GetRequiredService<IDispatcher>();
var response = await dispatcher.Send(new Ping("Hello"));
// response.Reply == "Pong: Hello"public record GetUser(int Id) : IRequest<User>;
public class GetUserHandler : IRequestHandler<GetUser, User>
{
public async Task<User> Handle(GetUser request, CancellationToken ct)
{
// fetch user...
}
}public record DeleteUser(int Id) : IRequest;
public class DeleteUserHandler : IRequestHandler<DeleteUser>
{
public async Task<Unit> Handle(DeleteUser request, CancellationToken ct)
{
// delete user...
return Unit.Value;
}
}public record UserCreated(int UserId) : INotification;
public class SendWelcomeEmail : INotificationHandler<UserCreated>
{
public async Task Handle(UserCreated notification, CancellationToken ct)
{
// send email...
}
}
// Publish to all handlers
await dispatcher.Publish(new UserCreated(42));public record GetItems(string Query) : IStreamRequest<Item>;
public class GetItemsHandler : IStreamRequestHandler<GetItems, Item>
{
public async IAsyncEnumerable<Item> Handle(
GetItems request,
[EnumeratorCancellation] CancellationToken ct)
{
await foreach (var item in FetchItemsAsync(request.Query, ct))
{
yield return item;
}
}
}
// Consume the stream
await foreach (var item in dispatcher.CreateStream(new GetItems("search")))
{
// process item...
}Wrap request handling with cross-cutting concerns:
public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
Console.WriteLine($"Handling {typeof(TRequest).Name}");
var response = await next();
Console.WriteLine($"Handled {typeof(TRequest).Name}");
return response;
}
}
// Register
services.AddMediator(cfg =>
{
cfg.RegisterServicesFromAssemblyContaining<Program>();
cfg.AddBehavior(typeof(LoggingBehavior<,>));
});public class ValidationPreProcessor<TRequest> : IRequestPreProcessor<TRequest>
where TRequest : notnull
{
public Task Process(TRequest request, CancellationToken ct)
{
// validate request...
}
}
public class AuditPostProcessor<TRequest, TResponse> : IRequestPostProcessor<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
public Task Process(TRequest request, TResponse response, CancellationToken ct)
{
// audit logging...
}
}public class GlobalExceptionHandler<TRequest, TResponse> : IRequestExceptionHandler<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
public Task Handle(
TRequest request,
Exception exception,
RequestExceptionHandlerState<TResponse> state,
CancellationToken ct)
{
// log, set state.Response to provide fallback, or let it bubble up
}
}The IPipelineContext is a scoped, thread-safe context that flows through your entire pipeline - across behaviors, pre/post processors, handlers, and even nested requests within the same DI scope. This is the killer feature for cross-cutting concerns that need to share state.
services.AddMediator(cfg =>
{
cfg.RegisterServicesFromAssemblyContaining<Program>();
cfg.EnablePipelineContext = true;
});Inject IPipelineContext anywhere in your pipeline:
public class MyBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{
private readonly IPipelineContext _context;
public MyBehavior(IPipelineContext context) => _context = context;
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
// Store arbitrary data - accessible from ANY pipeline component
_context.Items["UserId"] = GetCurrentUserId();
_context.Items["CorrelationId"] = Guid.NewGuid().ToString();
var response = await next();
return response;
}
}Measure execution time across pipeline stages:
public class TimingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{
private readonly IPipelineContext _context;
public async Task<TResponse> Handle(...)
{
using var timer = _context.StartTimer($"Handler:{typeof(TRequest).Name}");
var response = await next();
// timer automatically records elapsed time on dispose
return response;
}
}
// Later, in a post-processor or logging middleware:
var timings = _context.GetTimings();
foreach (var t in timings)
{
Console.WriteLine($"{t.Name}: {t.Elapsed.TotalMilliseconds}ms");
}Counters and aggregates that accumulate across the pipeline:
// Increment counters
_context.Increment("db.queries");
_context.Increment("cache.hits", 5);
// Record values (tracks count, total, min, max)
_context.Record("response.size", responseBytes);
// Read metrics
var metrics = _context.GetMetrics();
var dbQueries = metrics["db.queries"]; // Count, Total, Min, MaxKey-value pairs that propagate through all requests in the scope:
// Set in an early behavior
_context.SetBaggage("tenant-id", "acme-corp");
_context.SetBaggage("feature-flags", "new-ui,beta");
// Read anywhere in the pipeline, including nested Send() calls
var tenantId = _context.GetBaggage("tenant-id");
var allBaggage = _context.GetAllBaggage();The context is scoped to the DI scope (typically an HTTP request), so when a handler dispatches additional requests, they all share the same context:
public class CreateOrderHandler : IRequestHandler<CreateOrder, Order>
{
private readonly IDispatcher _dispatcher;
private readonly IPipelineContext _context;
public async Task<Order> Handle(CreateOrder request, CancellationToken ct)
{
// This nested request shares the same IPipelineContext
var inventory = await _dispatcher.Send(new CheckInventory(request.ProductId), ct);
// Metrics from CheckInventory's pipeline are already in _context
_context.Increment("orders.created");
return new Order(...);
}
}PipelineContext has an AsyncLocal static accessor, so you can access it anywhere in the async flow — not just where DI is available:
// Set ambient context (returns IDisposable that restores previous)
using var scope = PipelineContext.SetCurrent(myContext);
// Access anywhere
var ctx = PipelineContext.Current; // null if not set
ctx?.SetBaggage("tenant-id", tenantId);
var tenant = ctx?.GetBaggage("tenant-id");This is especially useful for:
- Background services where DI scopes don't exist
- Static helpers that need context without constructor injection
- Cross-process consumers where context is hydrated from message headers
The ambient context nests correctly — SetCurrent returns an IDisposable that restores the previous context on dispose, making it safe for concurrent async flows.
When enabled, Conduit automatically tracks parent-child relationships between requests, giving you a full call graph:
services.AddMediator(cfg =>
{
cfg.RegisterServicesFromAssemblyContaining<Program>();
cfg.EnablePipelineContext = true;
cfg.EnableCausalityTracking = true;
});
// In any pipeline component:
var currentId = _context.GetCurrentRequestId();
var parentId = _context.GetParentRequestId();
var fullChain = _context.GetCausalityChain();
foreach (var entry in fullChain)
{
Console.WriteLine($"{entry.RequestId} <- {entry.ParentId}: {entry.RequestType} @ {entry.Timestamp}");
}This is invaluable for debugging complex flows, distributed tracing integration, and understanding "who called whom" in your request pipeline.
Catch missing handlers early:
var app = builder.Build();
app.Services.ValidateConduitRegistrations(typeof(Program).Assembly);services.AddMediator(cfg =>
{
// Assembly scanning
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly);
cfg.RegisterServicesFromAssemblyContaining<Program>();
// Service lifetime (default: Transient)
cfg.Lifetime = ServiceLifetime.Scoped;
// Pipeline features
cfg.EnablePipelineContext = true;
cfg.EnableCausalityTracking = true;
// Custom notification publishing strategy
cfg.NotificationPublisherType = typeof(TaskWhenAllPublisher); // parallel
// cfg.NotificationPublisherType = typeof(ForeachAwaitPublisher); // sequential (default)
// Register pipeline components
cfg.AddBehavior(typeof(LoggingBehavior<,>));
cfg.AddPreProcessor(typeof(ValidationPreProcessor<>));
cfg.AddPostProcessor(typeof(AuditPostProcessor<,>));
cfg.AddExceptionHandler(typeof(GlobalExceptionHandler<,>));
cfg.AddStreamBehavior(typeof(StreamLoggingBehavior<,>));
});Cross-process messaging with pluggable transport providers. Define message contracts once, swap transports without changing application code.
public record OrderPlaced : EventMessage
{
public required string OrderId { get; init; }
public required decimal Total { get; init; }
}Three base types are provided:
EventMessage— something happened (pub/sub, fan-out)CommandMessage— do something (point-to-point)QueryMessage— request data (point-to-point)
All include MessageId, CreatedAt, CorrelationId, TenantId, SessionId, UserId, and Metadata.
public class OrderPlacedConsumer : IMessageConsumer<OrderPlaced>
{
public async Task ConsumeAsync(
OrderPlaced message,
MessageContext context,
CancellationToken cancellationToken = default)
{
// Handle the message
// context.MessageId, context.CorrelationId, context.Headers available
}
}services.AddConduitMessaging(cfg =>
{
cfg.ServiceName = "service-orders";
cfg.UseRabbitMq(settings); // or cfg.UseInMemory() for tests
cfg.PropagateContextHeaders = true; // auto-propagate PipelineContext baggage
cfg.AddConsumer<OrderPlacedConsumer>();
cfg.AddConsumersFromAssembly(typeof(Program).Assembly);
});public class PlaceOrderHandler(IMessagePublisher publisher)
{
public async Task Handle(PlaceOrder request, CancellationToken ct)
{
// Process order...
await publisher.PublishAsync(new OrderPlaced
{
OrderId = order.Id,
Total = order.Total,
TenantId = request.TenantId,
SessionId = request.SessionId,
UserId = request.UserId
}, ct);
}
}// Pub/sub — all subscribers receive the message
await publisher.PublishAsync(message, ct);
// Topic-based routing
await publisher.PublishAsync(message, "orders.eu", ct);
// Point-to-point — exactly one consumer receives it
await publisher.SendAsync(message, "process-payment-queue", ct);All methods accept optional IReadOnlyDictionary<string, string>? contextHeaders for explicit cross-process context propagation:
var headers = PipelineContextBridge.ExtractHeaders(pipelineContext);
await publisher.PublishAsync(message, headers, ct);services.AddConduitMessaging(cfg =>
{
cfg.ServiceName = "service-audit";
cfg.UseRabbitMq(new RabbitMqSettings
{
Host = "rabbitmq",
Port = 5671,
UseSsl = true,
VirtualHost = "myapp",
Username = "myapp",
Password = "secret",
PrefetchCount = 10,
RetryCount = 3
});
cfg.AddConsumersFromAssembly(typeof(Program).Assembly);
});Features:
- Auto-topology: exchanges and queues declared on first use
- Dead-letter queues: failed messages routed to
*.dlqafter retry limit - Retry with requeue: configurable retry count before dead-lettering
- Persistent delivery: messages survive broker restart
- SSL/TLS: TLS 1.2/1.3 support
- Auto-recovery: reconnects on connection loss
Exchange strategy:
PublishAsync→ fanout exchange namedNamespace:TypeNamePublishAsyncwith topic → topic exchange with routing keySendAsync→ direct to queue via default exchange
services.AddConduitMessaging(cfg =>
{
cfg.UseInMemory();
cfg.AddConsumer<OrderPlacedConsumer>();
});The in-memory transport dispatches synchronously to consumers and records all messages for assertions:
var bus = sp.GetRequiredService<InMemoryMessageBus>();
// Query recorded messages
var published = bus.GetPublished<OrderPlaced>().ToList();
var consumed = bus.GetConsumed<OrderPlaced>().ToList();
// Wait for async consumer completion
var result = await bus.WaitForConsume<OrderPlaced>(
m => m.OrderId == "123",
timeout: TimeSpan.FromSeconds(5));
// Reset state between tests
bus.Clear();Implement a transport by providing an extension method on MessagingConfiguration:
public static class MyTransportExtensions
{
public static void UseMyTransport(this MessagingConfiguration config, MySettings settings)
{
config.TransportRegistrar = (services, cfg) =>
{
foreach (var reg in cfg.ConsumerRegistrations)
services.AddScoped(reg.ConsumerType);
services.AddSingleton<IMessageBus>(sp =>
new MyMessageBus(settings, cfg.ConsumerRegistrations, sp));
services.AddSingleton<IMessagePublisher>(sp =>
sp.GetRequiredService<IMessageBus>().Publisher);
};
}
}The MessageBusHostedService is registered by the core and will call StartAsync/StopAsync on your IMessageBus automatically.
When IPipelineContext is enabled in the mediator, the bridge propagates context (baggage, causality chain, correlation ID) across process boundaries via message headers.
Enable PropagateContextHeaders to automatically extract PipelineContext.Current baggage into message headers on every publish/send — no manual header extraction needed:
services.AddConduitMessaging(cfg =>
{
cfg.ServiceName = "my-service";
cfg.UseRabbitMq(settings);
cfg.PropagateContextHeaders = true; // auto-propagate context
});
// Now every publish automatically includes baggage, causality, correlation headers
await publisher.PublishAsync(new OrderPlaced { ... }, ct);This wraps IMessagePublisher with a ContextPropagatingPublisher decorator that reads from PipelineContext.Current and merges headers via PipelineContextBridge.ExtractHeaders. The decorator uses lazy publisher resolution, so it works even when resolved before the bus hosted service has started.
If you prefer explicit control, extract headers manually:
var headers = PipelineContextBridge.ExtractHeaders(pipelineContext);
await publisher.PublishAsync(message, headers, ct);Serialized headers:
conduit.baggage.*— arbitrary key-value pairsconduit.correlation-id— correlation ID for tracingconduit.origin-request-id— publishing process request IDconduit.causality-chain— pipe-delimited causality entries
Both RabbitMQ and in-memory transports automatically hydrate IPipelineContext in the consumer's DI scope before invoking the consumer. No manual code needed — baggage, correlation, and causality are restored transparently.
Messages are wrapped in a JSON envelope for transport:
{
"messageType": "MyApp.Orders.OrderPlaced",
"payload": { "orderId": "123", "total": 99.99 },
"headers": { "conduit.baggage.tenant-id": "acme" },
"timestamp": "2026-03-16T10:30:00Z"
}Exchange names: Namespace:TypeName. Queue names: serviceName:ConsumerTypeName.
MIT