Skip to content

Commit

Permalink
Merged PR 31986: [2.0] Forwarder blame improvements
Browse files Browse the repository at this point in the history
Passive health checks rely on the HttpForwarder to categorize failures so the check can understand if it was caused by the client, destination, or some other reason. Incorrect attribution can allow the client to induce an error and cause the destination to be marked unhealthy.

Scenarios:
1. Blame the request body (client/destination) for timeouts. Today if the ActivityTimeout triggers while uploading the request body that will be reported as RequestTimeout and blamed on the destination. However, if the proxy was waiting for the client to send more data, that should be blamed on the client instead (RequestBodyClient).
2. A malicious client may pretend to be a gRPC request by setting the expected Content-Type, but then getting proxied to a HTTP/1.1 destination. This causes the proxy request data rate and size limit to be disabled, but then you can induce an error against the destination's size limit that would be blamed on the destination. Fix: Delay disabling the limits until we're sure we were assigned to an HTTP/2 connection.
3. I've included #2119. This works around a Kestrel bug which caused a body length exception that we blamed on the destination. This fix is already in main.
  • Loading branch information
Tratcher committed Jun 21, 2023
1 parent a4febbb commit 9bf4d83
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 129 deletions.
126 changes: 41 additions & 85 deletions src/ReverseProxy/Forwarder/HttpForwarder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public async ValueTask<ForwarderError> SendAsync(

// :: Step 2: Setup copy of request body (background) Client --► Proxy --► Destination
// Note that we must do this before step (3) because step (3) may also add headers to the HttpContent that we set up here.
var requestContent = SetupRequestBodyCopy(context.Request, isStreamingRequest, activityToken);
var requestContent = SetupRequestBodyCopy(context, isStreamingRequest, activityToken);
destinationRequest.Content = requestContent;

// :: Step 3: Copy request headers Client --► Proxy --► Destination
Expand Down Expand Up @@ -496,12 +496,13 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage
// else not an upgrade, or H2->H2, no changes needed
}

private StreamCopyHttpContent? SetupRequestBodyCopy(HttpRequest request, bool isStreamingRequest, ActivityCancellationTokenSource activityToken)
private StreamCopyHttpContent? SetupRequestBodyCopy(HttpContext context, bool isStreamingRequest, ActivityCancellationTokenSource activityToken)
{
// If we generate an HttpContent without a Content-Length then for HTTP/1.1 HttpClient will add a Transfer-Encoding: chunked header
// even if it's a GET request. Some servers reject requests containing a Transfer-Encoding header if they're not expecting a body.
// Try to be as specific as possible about the client's intent to send a body. The one thing we don't want to do is to start
// reading the body early because that has side-effects like 100-continue.
var request = context.Request;
var hasBody = true;
var contentLength = request.Headers.ContentLength;
var method = request.Method;
Expand All @@ -512,10 +513,11 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage
// 5.0 servers provide a definitive answer for us.
hasBody = canHaveBodyFeature.CanHaveBody;

// TODO: Kestrel bug, this shouldn't be true for ExtendedConnect.
#if NET7_0_OR_GREATER
#if NET7_0
// TODO: Kestrel 7.0 bug only, hasBody shouldn't be true for ExtendedConnect.
// https://github.com/dotnet/aspnetcore/issues/46002 Fixed in 8.0
var connectFeature = request.HttpContext.Features.Get<IHttpExtendedConnectFeature>();
if (connectFeature?.Protocol != null)
if (connectFeature?.IsExtendedConnect == true)
{
hasBody = false;
}
Expand Down Expand Up @@ -560,31 +562,13 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage

if (hasBody)
{
if (isStreamingRequest)
{
DisableMinRequestBodyDataRateAndMaxRequestBodySize(request.HttpContext);
}

// Note on `autoFlushHttpClientOutgoingStream: isStreamingRequest`:
// The.NET Core HttpClient stack keeps its own buffers on top of the underlying outgoing connection socket.
// We flush those buffers down to the socket on every write when this is set,
// but it does NOT result in calls to flush on the underlying socket.
// This is necessary because we proxy http2 transparently,
// and we are deliberately unaware of packet structure used e.g. in gRPC duplex channels.
// Because the sockets aren't flushed, the perf impact of this choice is expected to be small.
// Future: It may be wise to set this to true for *all* http2 incoming requests,
// but for now, out of an abundance of caution, we only do it for requests that look like gRPC.
return new StreamCopyHttpContent(
request: request,
autoFlushHttpClientOutgoingStream: isStreamingRequest,
clock: _clock,
activityToken);
return new StreamCopyHttpContent(context, isStreamingRequest, _clock, _logger, activityToken);
}

return null;
}

private ForwarderError HandleRequestBodyFailure(HttpContext context, StreamCopyResult requestBodyCopyResult, Exception requestBodyException, Exception additionalException)
private ForwarderError HandleRequestBodyFailure(HttpContext context, StreamCopyResult requestBodyCopyResult, Exception requestBodyException, Exception additionalException, bool timedOut)
{
ForwarderError requestBodyError;
int statusCode;
Expand All @@ -593,19 +577,12 @@ private ForwarderError HandleRequestBodyFailure(HttpContext context, StreamCopyR
// Failed while trying to copy the request body from the client. It's ambiguous if the request or response failed first.
case StreamCopyResult.InputError:
requestBodyError = ForwarderError.RequestBodyClient;
statusCode = StatusCodes.Status400BadRequest;
statusCode = timedOut ? StatusCodes.Status408RequestTimeout : StatusCodes.Status400BadRequest;
break;
// Failed while trying to copy the request body to the destination. It's ambiguous if the request or response failed first.
case StreamCopyResult.OutputError:
requestBodyError = ForwarderError.RequestBodyDestination;
statusCode = StatusCodes.Status502BadGateway;
break;
// Canceled while trying to copy the request body, either due to a client disconnect or a timeout. This probably caused the response to fail as a secondary error.
case StreamCopyResult.Canceled:
requestBodyError = ForwarderError.RequestBodyCanceled;
// Timeouts (504s) are handled at the SendAsync call site.
// The request body should only be canceled by the RequestAborted token.
statusCode = StatusCodes.Status502BadGateway;
statusCode = timedOut ? StatusCodes.Status504GatewayTimeout : StatusCodes.Status502BadGateway;
break;
default:
throw new NotImplementedException(requestBodyCopyResult.ToString());
Expand All @@ -630,33 +607,46 @@ private ForwarderError HandleRequestBodyFailure(HttpContext context, StreamCopyR
private async ValueTask<ForwarderError> HandleRequestFailureAsync(HttpContext context, StreamCopyHttpContent? requestContent, Exception requestException,
HttpTransformer transformer, ActivityCancellationTokenSource requestCancellationSource, bool failedDuringRequestCreation)
{
if (requestException is OperationCanceledException)
var triedRequestBody = requestContent?.ConsumptionTask.IsCompleted == true;

if (requestCancellationSource.CancelledByLinkedToken)
{
if (requestCancellationSource.CancelledByLinkedToken)
var requestBodyCanceled = false;
if (triedRequestBody)
{
// Either the client went away (HttpContext.RequestAborted) or the CancellationToken provided to SendAsync was signaled.
return await ReportErrorAsync(ForwarderError.RequestCanceled, StatusCodes.Status502BadGateway);
}
else
{
Debug.Assert(requestCancellationSource.IsCancellationRequested || requestException.ToString().Contains("ConnectTimeout"), requestException.ToString());
return await ReportErrorAsync(ForwarderError.RequestTimedOut, StatusCodes.Status504GatewayTimeout);
var (requestBodyCopyResult, requestBodyException) = requestContent!.ConsumptionTask.Result;
requestBodyCanceled = requestBodyCopyResult == StreamCopyResult.Canceled;
if (requestBodyCanceled)
{
requestException = new AggregateException(requestException, requestBodyException!);
}
}
// Either the client went away (HttpContext.RequestAborted) or the CancellationToken provided to SendAsync was signaled.
return await ReportErrorAsync(requestBodyCanceled ? ForwarderError.RequestBodyCanceled : ForwarderError.RequestCanceled,
context.RequestAborted.IsCancellationRequested ? StatusCodes.Status400BadRequest : StatusCodes.Status502BadGateway);
}

// Check for request body errors, these may have triggered the response error.
if (requestContent?.ConsumptionTask.IsCompleted == true)
if (triedRequestBody)
{
var (requestBodyCopyResult, requestBodyException) = requestContent.ConsumptionTask.Result;
var (requestBodyCopyResult, requestBodyException) = requestContent!.ConsumptionTask.Result;

if (requestBodyCopyResult != StreamCopyResult.Success)
if (requestBodyCopyResult is StreamCopyResult.InputError or StreamCopyResult.OutputError)
{
var error = HandleRequestBodyFailure(context, requestBodyCopyResult, requestBodyException!, requestException);
var error = HandleRequestBodyFailure(context, requestBodyCopyResult, requestBodyException!, requestException,
timedOut: requestCancellationSource.IsCancellationRequested);
await transformer.TransformResponseAsync(context, proxyResponse: null, requestCancellationSource.Token);
return error;
}
}

if (requestException is OperationCanceledException)
{
Debug.Assert(requestCancellationSource.IsCancellationRequested || requestException.ToString().Contains("ConnectTimeout"), requestException.ToString());

return await ReportErrorAsync(ForwarderError.RequestTimedOut, StatusCodes.Status504GatewayTimeout);
}

// We couldn't communicate with the destination.
return await ReportErrorAsync(failedDuringRequestCreation ? ForwarderError.RequestCreation : ForwarderError.Request, StatusCodes.Status502BadGateway);

Expand Down Expand Up @@ -870,7 +860,7 @@ private ForwarderError FixupUpgradeResponseHeaders(HttpContext context, HttpResp
return (StreamCopyResult.Success, null);
}

private async ValueTask<ForwarderError> HandleResponseBodyErrorAsync(HttpContext context, StreamCopyHttpContent? requestContent, StreamCopyResult responseBodyCopyResult, Exception responseBodyException, CancellationTokenSource requestCancellationSource)
private async ValueTask<ForwarderError> HandleResponseBodyErrorAsync(HttpContext context, StreamCopyHttpContent? requestContent, StreamCopyResult responseBodyCopyResult, Exception responseBodyException, ActivityCancellationTokenSource requestCancellationSource)
{
if (requestContent is not null && requestContent.Started)
{
Expand All @@ -884,9 +874,10 @@ private async ValueTask<ForwarderError> HandleResponseBodyErrorAsync(HttpContext
var (requestBodyCopyResult, requestBodyError) = await requestContent.ConsumptionTask;

// Check for request body errors, these may have triggered the response error.
if (alreadyFinished && requestBodyCopyResult != StreamCopyResult.Success)
if (alreadyFinished && requestBodyCopyResult is StreamCopyResult.InputError or StreamCopyResult.OutputError)
{
return HandleRequestBodyFailure(context, requestBodyCopyResult, requestBodyError!, responseBodyException);
return HandleRequestBodyFailure(context, requestBodyCopyResult, requestBodyError!, responseBodyException,
timedOut: requestCancellationSource.IsCancellationRequested && !requestCancellationSource.CancelledByLinkedToken);
}
}

Expand Down Expand Up @@ -920,41 +911,6 @@ private static ValueTask CopyResponseTrailingHeadersAsync(HttpResponseMessage so
return transformer.TransformResponseTrailersAsync(context, source, cancellationToken);
}


/// <summary>
/// Disable some ASP .NET Core server limits so that we can handle long-running gRPC requests unconstrained.
/// Note that the gRPC server implementation on ASP .NET Core does the same for client-streaming and duplex methods.
/// Since in Gateway we have no way to determine if the current request requires client-streaming or duplex comm,
/// we do this for *all* incoming requests that look like they might be gRPC.
/// </summary>
/// <remarks>
/// Inspired on
/// <see href="https://github.com/grpc/grpc-dotnet/blob/3ce9b104524a4929f5014c13cd99ba9a1c2431d4/src/Grpc.AspNetCore.Server/Internal/CallHandlers/ServerCallHandlerBase.cs#L127"/>.
/// </remarks>
private void DisableMinRequestBodyDataRateAndMaxRequestBodySize(HttpContext httpContext)
{
var minRequestBodyDataRateFeature = httpContext.Features.Get<IHttpMinRequestBodyDataRateFeature>();
if (minRequestBodyDataRateFeature is not null)
{
minRequestBodyDataRateFeature.MinDataRate = null;
}

var maxRequestBodySizeFeature = httpContext.Features.Get<IHttpMaxRequestBodySizeFeature>();
if (maxRequestBodySizeFeature is not null)
{
if (!maxRequestBodySizeFeature.IsReadOnly)
{
maxRequestBodySizeFeature.MaxRequestBodySize = null;
}
else
{
// IsReadOnly could be true if middleware has already started reading the request body
// In that case we can't disable the max request body size for the request stream
_logger.LogWarning("Unable to disable max request body size.");
}
}
}

private void ReportProxyError(HttpContext context, ForwarderError error, Exception ex)
{
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(error, ex));
Expand Down
8 changes: 6 additions & 2 deletions src/ReverseProxy/Forwarder/StreamCopier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,13 @@ internal static class StreamCopier
telemetry?.AfterWrite();
}

var result = ex is OperationCanceledException ? StreamCopyResult.Canceled :
(read == 0 ? StreamCopyResult.InputError : StreamCopyResult.OutputError);
if (activityToken.CancelledByLinkedToken)
{
return (StreamCopyResult.Canceled, ex);
}

// If the activity timeout triggered while reading or writing, blame the sender or receiver.
var result = read == 0 ? StreamCopyResult.InputError : StreamCopyResult.OutputError;
return (result, ex);
}
finally
Expand Down
Loading

0 comments on commit 9bf4d83

Please sign in to comment.