Skip to content

Commit

Permalink
Forwarder blame improvements (#2167)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tratcher authored Jun 26, 2023
1 parent 07955da commit 01715d1
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 132 deletions.
124 changes: 40 additions & 84 deletions src/ReverseProxy/Forwarder/HttpForwarder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public async ValueTask<ForwarderError> SendAsync(

// :: Step 2: Setup copy of request body (background) Client --► Proxy --► Destination
// Note that we must do this before step (3) because step (3) may also add headers to the HttpContent that we set up here.
var requestContent = SetupRequestBodyCopy(context.Request, isStreamingRequest, activityToken);
var requestContent = SetupRequestBodyCopy(context, isStreamingRequest, activityToken);
destinationRequest.Content = requestContent;

// :: Step 3: Copy request headers Client --► Proxy --► Destination
Expand Down Expand Up @@ -496,12 +496,13 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage
// else not an upgrade, or H2->H2, no changes needed
}

private StreamCopyHttpContent? SetupRequestBodyCopy(HttpRequest request, bool isStreamingRequest, ActivityCancellationTokenSource activityToken)
private StreamCopyHttpContent? SetupRequestBodyCopy(HttpContext context, bool isStreamingRequest, ActivityCancellationTokenSource activityToken)
{
// If we generate an HttpContent without a Content-Length then for HTTP/1.1 HttpClient will add a Transfer-Encoding: chunked header
// even if it's a GET request. Some servers reject requests containing a Transfer-Encoding header if they're not expecting a body.
// Try to be as specific as possible about the client's intent to send a body. The one thing we don't want to do is to start
// reading the body early because that has side-effects like 100-continue.
var request = context.Request;
var hasBody = true;
var contentLength = request.Headers.ContentLength;
var method = request.Method;
Expand All @@ -512,8 +513,9 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage
// 5.0 servers provide a definitive answer for us.
hasBody = canHaveBodyFeature.CanHaveBody;

// TODO: Kestrel bug, this shouldn't be true for ExtendedConnect.
#if NET7_0_OR_GREATER
#if NET7_0
// TODO: Kestrel 7.0 bug only, hasBody shouldn't be true for ExtendedConnect.
// https://github.com/dotnet/aspnetcore/issues/46002 Fixed in 8.0
var connectFeature = request.HttpContext.Features.Get<IHttpExtendedConnectFeature>();
if (connectFeature?.IsExtendedConnect == true)
{
Expand Down Expand Up @@ -560,31 +562,13 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage

if (hasBody)
{
if (isStreamingRequest)
{
DisableMinRequestBodyDataRateAndMaxRequestBodySize(request.HttpContext);
}

// Note on `autoFlushHttpClientOutgoingStream: isStreamingRequest`:
// The.NET Core HttpClient stack keeps its own buffers on top of the underlying outgoing connection socket.
// We flush those buffers down to the socket on every write when this is set,
// but it does NOT result in calls to flush on the underlying socket.
// This is necessary because we proxy http2 transparently,
// and we are deliberately unaware of packet structure used e.g. in gRPC duplex channels.
// Because the sockets aren't flushed, the perf impact of this choice is expected to be small.
// Future: It may be wise to set this to true for *all* http2 incoming requests,
// but for now, out of an abundance of caution, we only do it for requests that look like gRPC.
return new StreamCopyHttpContent(
request: request,
autoFlushHttpClientOutgoingStream: isStreamingRequest,
timeProvider: _timeProvider,
activityToken);
return new StreamCopyHttpContent(context, isStreamingRequest, _timeProvider, _logger, activityToken);
}

return null;
}

private ForwarderError HandleRequestBodyFailure(HttpContext context, StreamCopyResult requestBodyCopyResult, Exception requestBodyException, Exception additionalException)
private ForwarderError HandleRequestBodyFailure(HttpContext context, StreamCopyResult requestBodyCopyResult, Exception requestBodyException, Exception additionalException, bool timedOut)
{
ForwarderError requestBodyError;
int statusCode;
Expand All @@ -593,19 +577,12 @@ private ForwarderError HandleRequestBodyFailure(HttpContext context, StreamCopyR
// Failed while trying to copy the request body from the client. It's ambiguous if the request or response failed first.
case StreamCopyResult.InputError:
requestBodyError = ForwarderError.RequestBodyClient;
statusCode = StatusCodes.Status400BadRequest;
statusCode = timedOut ? StatusCodes.Status408RequestTimeout : StatusCodes.Status400BadRequest;
break;
// Failed while trying to copy the request body to the destination. It's ambiguous if the request or response failed first.
case StreamCopyResult.OutputError:
requestBodyError = ForwarderError.RequestBodyDestination;
statusCode = StatusCodes.Status502BadGateway;
break;
// Canceled while trying to copy the request body, either due to a client disconnect or a timeout. This probably caused the response to fail as a secondary error.
case StreamCopyResult.Canceled:
requestBodyError = ForwarderError.RequestBodyCanceled;
// Timeouts (504s) are handled at the SendAsync call site.
// The request body should only be canceled by the RequestAborted token.
statusCode = StatusCodes.Status502BadGateway;
statusCode = timedOut ? StatusCodes.Status504GatewayTimeout : StatusCodes.Status502BadGateway;
break;
default:
throw new NotImplementedException(requestBodyCopyResult.ToString());
Expand All @@ -630,33 +607,46 @@ private ForwarderError HandleRequestBodyFailure(HttpContext context, StreamCopyR
private async ValueTask<ForwarderError> HandleRequestFailureAsync(HttpContext context, StreamCopyHttpContent? requestContent, Exception requestException,
HttpTransformer transformer, ActivityCancellationTokenSource requestCancellationSource, bool failedDuringRequestCreation)
{
if (requestException is OperationCanceledException)
var triedRequestBody = requestContent?.ConsumptionTask.IsCompleted == true;

if (requestCancellationSource.CancelledByLinkedToken)
{
if (requestCancellationSource.CancelledByLinkedToken)
var requestBodyCanceled = false;
if (triedRequestBody)
{
// Either the client went away (HttpContext.RequestAborted) or the CancellationToken provided to SendAsync was signaled.
return await ReportErrorAsync(ForwarderError.RequestCanceled, StatusCodes.Status502BadGateway);
}
else
{
Debug.Assert(requestCancellationSource.IsCancellationRequested || requestException.ToString().Contains("ConnectTimeout"), requestException.ToString());
return await ReportErrorAsync(ForwarderError.RequestTimedOut, StatusCodes.Status504GatewayTimeout);
var (requestBodyCopyResult, requestBodyException) = requestContent!.ConsumptionTask.Result;
requestBodyCanceled = requestBodyCopyResult == StreamCopyResult.Canceled;
if (requestBodyCanceled)
{
requestException = new AggregateException(requestException, requestBodyException!);
}
}
// Either the client went away (HttpContext.RequestAborted) or the CancellationToken provided to SendAsync was signaled.
return await ReportErrorAsync(requestBodyCanceled ? ForwarderError.RequestBodyCanceled : ForwarderError.RequestCanceled,
context.RequestAborted.IsCancellationRequested ? StatusCodes.Status400BadRequest : StatusCodes.Status502BadGateway);
}

// Check for request body errors, these may have triggered the response error.
if (requestContent?.ConsumptionTask.IsCompleted == true)
if (triedRequestBody)
{
var (requestBodyCopyResult, requestBodyException) = requestContent.ConsumptionTask.Result;
var (requestBodyCopyResult, requestBodyException) = requestContent!.ConsumptionTask.Result;

if (requestBodyCopyResult != StreamCopyResult.Success)
if (requestBodyCopyResult is StreamCopyResult.InputError or StreamCopyResult.OutputError)
{
var error = HandleRequestBodyFailure(context, requestBodyCopyResult, requestBodyException!, requestException);
var error = HandleRequestBodyFailure(context, requestBodyCopyResult, requestBodyException!, requestException,
timedOut: requestCancellationSource.IsCancellationRequested);
await transformer.TransformResponseAsync(context, proxyResponse: null, requestCancellationSource.Token);
return error;
}
}

if (requestException is OperationCanceledException)
{
Debug.Assert(requestCancellationSource.IsCancellationRequested || requestException.ToString().Contains("ConnectTimeout"), requestException.ToString());

return await ReportErrorAsync(ForwarderError.RequestTimedOut, StatusCodes.Status504GatewayTimeout);
}

// We couldn't communicate with the destination.
return await ReportErrorAsync(failedDuringRequestCreation ? ForwarderError.RequestCreation : ForwarderError.Request, StatusCodes.Status502BadGateway);

Expand Down Expand Up @@ -870,7 +860,7 @@ private ForwarderError FixupUpgradeResponseHeaders(HttpContext context, HttpResp
return (StreamCopyResult.Success, null);
}

private async ValueTask<ForwarderError> HandleResponseBodyErrorAsync(HttpContext context, StreamCopyHttpContent? requestContent, StreamCopyResult responseBodyCopyResult, Exception responseBodyException, CancellationTokenSource requestCancellationSource)
private async ValueTask<ForwarderError> HandleResponseBodyErrorAsync(HttpContext context, StreamCopyHttpContent? requestContent, StreamCopyResult responseBodyCopyResult, Exception responseBodyException, ActivityCancellationTokenSource requestCancellationSource)
{
if (requestContent is not null && requestContent.Started)
{
Expand All @@ -884,9 +874,10 @@ private async ValueTask<ForwarderError> HandleResponseBodyErrorAsync(HttpContext
var (requestBodyCopyResult, requestBodyError) = await requestContent.ConsumptionTask;

// Check for request body errors, these may have triggered the response error.
if (alreadyFinished && requestBodyCopyResult != StreamCopyResult.Success)
if (alreadyFinished && requestBodyCopyResult is StreamCopyResult.InputError or StreamCopyResult.OutputError)
{
return HandleRequestBodyFailure(context, requestBodyCopyResult, requestBodyError!, responseBodyException);
return HandleRequestBodyFailure(context, requestBodyCopyResult, requestBodyError!, responseBodyException,
timedOut: requestCancellationSource.IsCancellationRequested && !requestCancellationSource.CancelledByLinkedToken);
}
}

Expand Down Expand Up @@ -920,41 +911,6 @@ private static ValueTask CopyResponseTrailingHeadersAsync(HttpResponseMessage so
return transformer.TransformResponseTrailersAsync(context, source, cancellationToken);
}


/// <summary>
/// Disable some ASP .NET Core server limits so that we can handle long-running gRPC requests unconstrained.
/// Note that the gRPC server implementation on ASP .NET Core does the same for client-streaming and duplex methods.
/// Since in Gateway we have no way to determine if the current request requires client-streaming or duplex comm,
/// we do this for *all* incoming requests that look like they might be gRPC.
/// </summary>
/// <remarks>
/// Inspired on
/// <see href="https://github.com/grpc/grpc-dotnet/blob/3ce9b104524a4929f5014c13cd99ba9a1c2431d4/src/Grpc.AspNetCore.Server/Internal/CallHandlers/ServerCallHandlerBase.cs#L127"/>.
/// </remarks>
private void DisableMinRequestBodyDataRateAndMaxRequestBodySize(HttpContext httpContext)
{
var minRequestBodyDataRateFeature = httpContext.Features.Get<IHttpMinRequestBodyDataRateFeature>();
if (minRequestBodyDataRateFeature is not null)
{
minRequestBodyDataRateFeature.MinDataRate = null;
}

var maxRequestBodySizeFeature = httpContext.Features.Get<IHttpMaxRequestBodySizeFeature>();
if (maxRequestBodySizeFeature is not null)
{
if (!maxRequestBodySizeFeature.IsReadOnly)
{
maxRequestBodySizeFeature.MaxRequestBodySize = null;
}
else
{
// IsReadOnly could be true if middleware has already started reading the request body
// In that case we can't disable the max request body size for the request stream
_logger.LogWarning("Unable to disable max request body size.");
}
}
}

private void ReportProxyError(HttpContext context, ForwarderError error, Exception ex)
{
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(error, ex));
Expand Down
8 changes: 6 additions & 2 deletions src/ReverseProxy/Forwarder/StreamCopier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,13 @@ internal static class StreamCopier
telemetry?.AfterWrite();
}

var result = ex is OperationCanceledException ? StreamCopyResult.Canceled :
(read == 0 ? StreamCopyResult.InputError : StreamCopyResult.OutputError);
if (activityToken.CancelledByLinkedToken)
{
return (StreamCopyResult.Canceled, ex);
}

// If the activity timeout triggered while reading or writing, blame the sender or receiver.
var result = read == 0 ? StreamCopyResult.InputError : StreamCopyResult.OutputError;
return (result, ex);
}
finally
Expand Down
Loading

0 comments on commit 01715d1

Please sign in to comment.