Skip to content

Commit

Permalink
Upgrade to Rabbit.NET v7 RC12
Browse files Browse the repository at this point in the history
  • Loading branch information
Hawxy authored and jeremydmiller committed Oct 9, 2024
1 parent bc67040 commit bb6697f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ public async Task ConnectAsync()
{
_connection = await _transport.CreateConnectionAsync();

_connection.ConnectionShutdown += connectionOnConnectionShutdown;
_connection.ConnectionUnblocked += connectionOnConnectionUnblocked;
_connection.ConnectionBlocked += connectionOnConnectionBlocked;
_connection.CallbackException += connectionOnCallbackException;
_connection.ConnectionShutdownAsync += connectionOnConnectionShutdownAsync;
_connection.ConnectionUnblockedAsync += connectionOnConnectionUnblockedAsync;
_connection.ConnectionBlockedAsync += connectionOnConnectionBlockedAsync;
_connection.CallbackExceptionAsync += connectionOnCallbackExceptionAsync;
}

public Task<IChannel> CreateChannelAsync()
Expand Down Expand Up @@ -73,32 +73,37 @@ public void Track(RabbitMqChannelAgent agent)
_agents.Add(agent);
}

private void connectionOnCallbackException(object? sender, CallbackExceptionEventArgs e)
private Task connectionOnCallbackExceptionAsync(object? sender, CallbackExceptionEventArgs e)
{
if (e.Exception != null)
{
_logger.LogError(e.Exception, "Rabbit MQ connection error on callback");
}

return Task.CompletedTask;
}

private void connectionOnConnectionBlocked(object? sender, ConnectionBlockedEventArgs e)
private Task connectionOnConnectionBlockedAsync(object? sender, ConnectionBlockedEventArgs e)
{
_logger.LogInformation("Rabbit MQ connection is blocked because of {Reason}", e.Reason);
return Task.CompletedTask;
}

private void connectionOnConnectionUnblocked(object? sender, EventArgs e)
private Task connectionOnConnectionUnblockedAsync(object? sender, AsyncEventArgs e)
{
_logger.LogInformation("Rabbit MQ connection unblocked");
return Task.CompletedTask;
}

private void connectionOnConnectionShutdown(object? sender, ShutdownEventArgs e)
private Task connectionOnConnectionShutdownAsync(object? sender, ShutdownEventArgs e)
{
if (e.Initiator == ShutdownInitiator.Application) return;
if (e.Initiator == ShutdownInitiator.Application) return Task.CompletedTask;

if (e.Exception != null)
{
_logger.LogError(e.Exception, "Unexpected Rabbit MQ connection shutdown");
}
return Task.CompletedTask;
}

public void Remove(RabbitMqChannelAgent agent)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Wolverine.RabbitMQ.Internal;

Expand Down Expand Up @@ -64,19 +65,20 @@ protected async Task startNewChannel()
{
Channel = await _monitor.CreateChannelAsync();

Channel.CallbackException += (sender, args) =>
Channel.CallbackExceptionAsync += (sender, args) =>
{
Logger.LogError(args.Exception, "Callback error in Rabbit Mq agent");
return Task.CompletedTask;
};

Channel.ChannelShutdown += ChannelOnModelShutdown;
Channel.ChannelShutdownAsync += ChannelOnModelShutdown;

Logger.LogInformation("Opened a new channel for Wolverine endpoint {Endpoint}", this);
}

private void ChannelOnModelShutdown(object? sender, ShutdownEventArgs e)
private Task ChannelOnModelShutdown(object? sender, ShutdownEventArgs e)
{
if (e.Initiator == ShutdownInitiator.Application) return;
if (e.Initiator == ShutdownInitiator.Application) return Task.CompletedTask;

if (e.Exception != null)
{
Expand All @@ -85,13 +87,15 @@ private void ChannelOnModelShutdown(object? sender, ShutdownEventArgs e)
}

_ = EnsureConnected();

return Task.CompletedTask;
}

protected async Task teardownChannel()
{
if (Channel != null)
{
Channel.ChannelShutdown -= ChannelOnModelShutdown;
Channel.ChannelShutdownAsync -= ChannelOnModelShutdown;
await Channel.CloseAsync();
await Channel.AbortAsync();
Channel.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,10 @@ public void Dispose()
_latched = true;
}

public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange,
string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
return HandleBasicDeliverImpl(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
}

public async Task HandleBasicDeliverImpl(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
//TODO do something with the token passed in here
public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange,
string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = new())
{
if (_latched || _cancellation.IsCancellationRequested)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<ProjectReference Include="..\..\..\Wolverine\Wolverine.csproj"/>
</ItemGroup>
<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.11" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.12" />
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="8.0.0" />
</ItemGroup>

Expand Down

0 comments on commit bb6697f

Please sign in to comment.