Library for messaging between services.
Travis | |
---|---|
Infrastructure.Messaging |
Packages | Travis |
---|---|
Infrastructure.Messaging | |
Infrastructure.Messaging.DependencyInjection | |
Infrastructure.Messaging.RabbitMQ | |
Infrastructure.Messaging.InMemory |
- .NETStandard 2.1
- Microsoft.Extensions.DependencyInjection (>= 3.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 3.0.0)
- System.Text.Json (>= 4.6.0)
Register Messaging via .AddMessaging()
extension method provided by Infrastructure.Messaging.Extensions.DependencyInjection
package.
var rabbitMQUri = configuration.GetSection("RabbitMq")["uri"];
var serviceProvider =
new ServiceCollection()
.AddLogging(lb => lb.AddConsole())
.AddMessaging(mc =>
{
mc.UseRabbitMQ(cf => cf.Uri = new Uri(rabbitMQUri));
mc.UseJsonPacker(jso => jso.PropertyNamingPolicy = JsonNamingPolicy.CamelCase);
},
sc => sc
.AddSingleton<IMessageHandler<TestMessage>, TestMessageHandler>()
.AddSingleton<IMessageHandler<TestMessageWithEventId>, TestMessageHandler>()
)
.BuildServiceProvider();
Implement handlers.
public sealed class TestMessageHandler :
IMessageHandler<TestMessage>,
IMessageHandler<TestMessageWithEventId>
{
private readonly ILogger<TestMessageHandler> _logger;
public TestMessageHandler(ILogger<TestMessageHandler> logger)
{
_logger = logger;
}
public async Task Handle(TestMessage message, CancellationToken cancellationToken)
{
await Task.Delay(100);
_logger.LogInformation("Test message was dispached {message}", message.Ping);
throw new Exception("AASDSD");
}
public Task Handle(TestMessageWithEventId message, CancellationToken cancellationToken)
{
_logger.LogInformation("TestMessageWithEventId was dispatched. {eventId}, {text}", message.EventId, message.Text);
return Task.CompletedTask;
}
}
Use IMessageRouter.Route()
to start the routing process and IMessagePublisher.Publish(IMessage message)
to publish a message.
public async Task Main()
{
CancellationTokenSource = new CancellationTokenSource();
//..
MessageRouter = serviceProvider.GetRequiredService<IMessageRouter>();
Publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
//..
await Publisher.Publish(new TestMessage { Ping = "A" }, cancellationToken: CancellationTokenSource.Token);
// ..
await MessageRouter.Route(cancellationToken: CancellationTokenSource.Token);
}