Skip to content

Commit

Permalink
Attribute request failure blame before determining a destination's he…
Browse files Browse the repository at this point in the history
…alth
  • Loading branch information
MihaZupan committed Apr 11, 2022
1 parent cf3b6e1 commit 9e3c252
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 2 deletions.
27 changes: 25 additions & 2 deletions src/ReverseProxy/Health/TransportFailureRateHealthPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ public TransportFailureRateHealthPolicy(

public void RequestProxied(HttpContext context, ClusterState cluster, DestinationState destination)
{
var error = context.Features.Get<IForwarderErrorFeature>();
var newHealth = EvaluateProxiedRequest(cluster, destination, error != null);
var newHealth = EvaluateProxiedRequest(cluster, destination, DetermineIfDestinationFailed(context));
var clusterReactivationPeriod = cluster.Model.Config.HealthCheck?.Passive?.ReactivationPeriod ?? _defaultReactivationPeriod;
// Avoid reactivating until the history has expired so that it does not affect future health assessments.
var reactivationPeriod = clusterReactivationPeriod >= _policyOptions.DetectionWindowSize ? clusterReactivationPeriod : _policyOptions.DetectionWindowSize;
Expand Down Expand Up @@ -75,6 +74,30 @@ private static bool TryParse(string stringValue, out double parsedValue)
return double.TryParse(stringValue, NumberStyles.Float, CultureInfo.InvariantCulture, out parsedValue);
}

private static bool DetermineIfDestinationFailed(HttpContext context)
{
var errorFeature = context.Features.Get<IForwarderErrorFeature>();
if (errorFeature is null)
{
return false;
}

if (context.RequestAborted.IsCancellationRequested)
{
// The client disconnected/canceled the request - the failure may not be the destination's fault
return false;
}

var error = errorFeature.Error;

return error == ForwarderError.Request
|| error == ForwarderError.RequestTimedOut
|| error == ForwarderError.RequestBodyDestination
|| error == ForwarderError.ResponseBodyDestination
|| error == ForwarderError.UpgradeRequestDestination
|| error == ForwarderError.UpgradeResponseDestination;
}

private class ProxiedRequestHistory
{
private const long RecordWindowSize = 1000;
Expand Down
172 changes: 172 additions & 0 deletions test/ReverseProxy.FunctionalTests/PassiveHealthCheckTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
using Yarp.ReverseProxy.Common;
using Yarp.ReverseProxy.Configuration;
using Yarp.ReverseProxy.Forwarder;

namespace Yarp.ReverseProxy;

public class PassiveHealthCheckTests
{
private sealed class MockHttpClientFactory : IForwarderHttpClientFactory
{
private readonly Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> _sendAsync;

public MockHttpClientFactory(Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> sendAsync)
{
_sendAsync = sendAsync;
}

public HttpMessageInvoker CreateClient(ForwarderHttpClientContext context)
{
return new HttpMessageInvoker(new MockHandler(_sendAsync));
}

private sealed class MockHandler : HttpMessageHandler
{
private readonly Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> _sendAsync;

public MockHandler(Func<HttpRequestMessage, CancellationToken, Task<HttpResponseMessage>> sendAsync)
{
_sendAsync = sendAsync;
}

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
return await _sendAsync(request, cancellationToken);
}
}
}

[Fact]
public async Task PassiveHealthChecksEnabled_MultipleDestinationFailures_ProxyReturnsServiceUnavailable()
{
var destinationReached = false;

var test = new TestEnvironment(
context =>
{
destinationReached = true;
throw new InvalidOperationException();
},
proxyBuilder => proxyBuilder.Services.AddSingleton<IForwarderHttpClientFactory>(new MockHttpClientFactory((_, _) => throw new IOException())),
proxyApp => { },
configTransformer: (c, r) =>
{
c = c with
{
HealthCheck = new HealthCheckConfig
{
Passive = new PassiveHealthCheckConfig
{
Enabled = true
}
}
};

return (c, r);
});

await test.Invoke(async uri =>
{
using var client = new HttpClient();
for (var i = 0; i < 42; i++)
{
using var response = await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, uri));

Assert.Equal(i < 10 ? HttpStatusCode.BadGateway : HttpStatusCode.ServiceUnavailable, response.StatusCode);
}
});

Assert.False(destinationReached);
}

[Fact]
public async Task PassiveHealthChecksEnabled_IncompleteClientRequests_ProxyHealthIsUnaffected()
{
var destinationReached = false;

var shouldThrow = true;
var requestStartedTcs = new TaskCompletionSource<byte>(TaskCreationOptions.RunContinuationsAsynchronously);

var proxySendAsync = async (HttpRequestMessage request, CancellationToken ct) =>
{
requestStartedTcs.SetResult(0);

if (shouldThrow)
{
await Task.Delay(-1, ct);

throw new OperationCanceledException(ct);
}
else
{
return new HttpResponseMessage((HttpStatusCode)418)
{
Content = new StringContent("Hello world")
};
}
};

var test = new TestEnvironment(
context =>
{
destinationReached = true;
throw new InvalidOperationException();
},
proxyBuilder => proxyBuilder.Services.AddSingleton<IForwarderHttpClientFactory>(new MockHttpClientFactory(proxySendAsync)),
proxyApp => { },
configTransformer: (c, r) =>
{
c = c with
{
HealthCheck = new HealthCheckConfig
{
Passive = new PassiveHealthCheckConfig
{
Enabled = true
}
}
};

return (c, r);
});

await test.Invoke(async uri =>
{
using var client = new HttpClient();
for (var i = 0; i < 42; i++)
{
using var cts = new CancellationTokenSource();
_ = requestStartedTcs.Task.ContinueWith(_ => cts.Cancel());

try
{
await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, uri), cts.Token);
Assert.True(false);
}
catch { }

requestStartedTcs = new TaskCompletionSource<byte>(TaskCreationOptions.RunContinuationsAsynchronously);
}

shouldThrow = false;

using var response = await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, uri));

Assert.Equal(418, (int)response.StatusCode);
Assert.Equal("Hello world", await response.Content.ReadAsStringAsync());
});

Assert.False(destinationReached);
}
}

0 comments on commit 9e3c252

Please sign in to comment.