Skip to content

Commit

Permalink
Fix using disposed token in connect and resulting status (#1873)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Sep 1, 2022
1 parent 0964116 commit a57f1e5
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/Grpc.Net.Client/Balancer/Internal/ErrorPicker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#if SUPPORT_LOAD_BALANCING
using System;
using System.Diagnostics;
using Grpc.Core;

namespace Grpc.Net.Client.Balancer.Internal
Expand All @@ -28,6 +29,7 @@ internal class ErrorPicker : SubchannelPicker

public ErrorPicker(Status status)
{
Debug.Assert(status.StatusCode != StatusCode.OK, "Error status code must not be OK.");
_status = status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,22 @@ internal interface ISubchannelTransport : IDisposable
internal sealed class ConnectContext
{
private readonly CancellationTokenSource _cts;
private readonly CancellationToken _token;
private bool _disposed;

// This flag allows the transport to determine why the cancellation token was canceled.
// - Explicit cancellation, e.g. the channel was disposed.
// - Connection timeout, e.g. SocketsHttpHandler.ConnectTimeout was exceeded.
public bool IsConnectCanceled { get; private set; }

public CancellationToken CancellationToken => _cts.Token;
public CancellationToken CancellationToken => _token;

public ConnectContext(TimeSpan connectTimeout)
{
_cts = new CancellationTokenSource(connectTimeout);

// Take a copy of the token to avoid ObjectDisposedException when accessing _cts.Token after CTS is disposed.
_token = _cts.Token;
}

public void CancelConnect()
Expand Down
2 changes: 1 addition & 1 deletion src/Grpc.Net.Client/Balancer/Subchannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private async Task ConnectTransportAsync()
{
SubchannelLog.ConnectError(_logger, Id, ex);

UpdateConnectivityState(ConnectivityState.TransientFailure, "Error connecting to subchannel.");
UpdateConnectivityState(ConnectivityState.TransientFailure, new Status(StatusCode.Unavailable, "Error connecting to subchannel.", ex));
}
finally
{
Expand Down
37 changes: 37 additions & 0 deletions test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,43 @@ public async Task PickAsync_WaitForReadyWithDrop_ThrowsError()
Assert.AreEqual(StatusCode.DataLoss, ex.StatusCode);
}

[Test]
public async Task PickAsync_ErrorConnectingToSubchannel_ThrowsError()
{
// Arrange
var services = new ServiceCollection();
services.AddNUnitLogger();
var serviceProvider = services.BuildServiceProvider();
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();

var resolver = new TestResolver(loggerFactory);
resolver.UpdateAddresses(new List<BalancerAddress>
{
new BalancerAddress("localhost", 80)
});

var transportFactory = new TestSubchannelTransportFactory((s, c) =>
{
return Task.FromException<ConnectivityState>(new Exception("Test error!"));
});
var clientChannel = CreateConnectionManager(loggerFactory, resolver, transportFactory);
clientChannel.ConfigureBalancer(c => new PickFirstBalancer(c, loggerFactory));

// Act
_ = clientChannel.ConnectAsync(waitForReady: false, CancellationToken.None).ConfigureAwait(false);

var pickTask = clientChannel.PickAsync(
new PickContext { Request = new HttpRequestMessage() },
waitForReady: false,
CancellationToken.None).AsTask();

// Assert
var ex = await ExceptionAssert.ThrowsAsync<RpcException>(() => pickTask).DefaultTimeout();
Assert.AreEqual(StatusCode.Unavailable, ex.StatusCode);
Assert.AreEqual("Error connecting to subchannel.", ex.Status.Detail);
Assert.AreEqual("Test error!", ex.Status.DebugException?.Message);
}

[Test]
public async Task PickAsync_RetryWithDrop_ThrowsError()
{
Expand Down
1 change: 1 addition & 0 deletions test/Grpc.Net.Client.Tests/GrpcChannelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ public async Task ConnectAsync_ShiftThroughStates_CompleteOnReady()
var currentConnectivityState = ConnectivityState.TransientFailure;

var services = new ServiceCollection();
services.AddNUnitLogger();
services.AddSingleton<ResolverFactory, ChannelTestResolverFactory>();
services.AddSingleton<ISubchannelTransportFactory>(new TestSubchannelTransportFactory(async (s, c) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public async
var newState = await (_onTryConnect?.Invoke(context.CancellationToken) ?? Task.FromResult(ConnectivityState.Ready));

CurrentAddress = Subchannel._addresses[0];
Subchannel.UpdateConnectivityState(newState, Status.DefaultSuccess);
var newStatus = newState == ConnectivityState.TransientFailure ? new Status(StatusCode.Internal, "") : Status.DefaultSuccess;
Subchannel.UpdateConnectivityState(newState, newStatus);

_connectTcs.TrySetResult(null);

Expand Down

0 comments on commit a57f1e5

Please sign in to comment.