Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets over HTTP/2 #1978

Merged
merged 20 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<PackageIcon></PackageIcon>
<PackageIconFullPath></PackageIconFullPath>
<LangVersion>10.0</LangVersion>
<LangVersion>11.0</LangVersion>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<StrongNameKeyId>Microsoft</StrongNameKeyId>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
88 changes: 0 additions & 88 deletions src/ReverseProxy/Forwarder/AutoFlushingStream.cs

This file was deleted.

349 changes: 272 additions & 77 deletions src/ReverseProxy/Forwarder/HttpForwarder.cs

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions src/ReverseProxy/Forwarder/ProtocolHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.Net;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using Microsoft.AspNetCore.Http;
using Microsoft.Net.Http.Headers;

Expand Down Expand Up @@ -37,4 +40,39 @@ contentType is not null
&& contentType.StartsWith(GrpcContentType, StringComparison.OrdinalIgnoreCase)
&& MediaTypeHeaderValue.TryParse(contentType, out var mediaType)
&& mediaType.MatchesMediaType(GrpcContentType);

/// <summary>
/// Creates a security key for sending in the Sec-WebSocket-Key header.
/// </summary>
internal static string CreateSecWebSocketKey()
{
Span<byte> bytes = stackalloc byte[16];
// Base64-encode a new Guid's bytes to get the security key
var success = Guid.NewGuid().TryWriteBytes(bytes);
Debug.Assert(success);
var secKey = Convert.ToBase64String(bytes);
return secKey;
}

/// <summary>
/// Creates the Accept response to a given security key for sending in or verifying the Sec-WebSocket-Accept header value.
/// </summary>
internal static string CreateSecWebSocketAccept(string? key)
Tratcher marked this conversation as resolved.
Show resolved Hide resolved
{
// GUID appended by the server as part of the security key response. Defined in the RFC.
var wsServerGuidBytes = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"u8;
Span<byte> bytes = stackalloc byte[24 /* Base64 guid length */ + wsServerGuidBytes.Length];

// Get the corresponding ASCII bytes for seckey+wsServerGuidBytes
var encodedSecKeyLength = Encoding.ASCII.GetBytes(key, bytes);
wsServerGuidBytes.CopyTo(bytes.Slice(encodedSecKeyLength));

// Hash the seckey+wsServerGuidBytes bytes
SHA1.TryHashData(bytes, bytes, out var bytesWritten);
Debug.Assert(bytesWritten == 20 /* SHA1 hash length */);
var accept = Convert.ToBase64String(bytes[..bytesWritten]);

// Return the security key + accept value
return accept;
}
}
13 changes: 11 additions & 2 deletions src/ReverseProxy/Forwarder/StreamCopier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ internal static class StreamCopier
public const long UnknownLength = -1;

public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, IClock clock, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
=> CopyAsync(isRequest, input, output, promisedContentLength, clock, activityToken, autoFlush: false, cancellation);

public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, IClock clock, ActivityCancellationTokenSource activityToken, bool autoFlush, CancellationToken cancellation)
{
Debug.Assert(input is not null);
Debug.Assert(output is not null);
Expand All @@ -33,10 +36,10 @@ internal static class StreamCopier
? new StreamCopierTelemetry(isRequest, clock)
: null;

return CopyAsync(input, output, promisedContentLength, telemetry, activityToken, cancellation);
return CopyAsync(input, output, promisedContentLength, telemetry, activityToken, autoFlush, cancellation);
}

private static async ValueTask<(StreamCopyResult, Exception?)> CopyAsync(Stream input, Stream output, long promisedContentLength, StreamCopierTelemetry? telemetry, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
private static async ValueTask<(StreamCopyResult, Exception?)> CopyAsync(Stream input, Stream output, long promisedContentLength, StreamCopierTelemetry? telemetry, ActivityCancellationTokenSource activityToken, bool autoFlush, CancellationToken cancellation)
{
var buffer = ArrayPool<byte>.Shared.Rent(DefaultBufferSize);
var read = 0;
Expand Down Expand Up @@ -97,6 +100,12 @@ internal static class StreamCopier
}

await output.WriteAsync(buffer.AsMemory(0, read), cancellation);
if (autoFlush)
{
// HttpClient doesn't always flush outgoing data unless the buffer is full or the caller asks.
// This is a problem for streaming protocols like WebSockets and gRPC.
await output.FlushAsync(cancellation);
}

telemetry?.AfterWrite();

Expand Down
14 changes: 3 additions & 11 deletions src/ReverseProxy/Forwarder/StreamCopyHttpContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ namespace Yarp.ReverseProxy.Forwarder;
internal sealed class StreamCopyHttpContent : HttpContent
{
private readonly HttpRequest _request;
// HttpClient's machinery keeps an internal buffer that doesn't get flushed to the socket on every write.
// Some protocols (e.g. gRPC) may rely on specific bytes being sent, and HttpClient's buffering would prevent it.
private readonly bool _autoFlushHttpClientOutgoingStream;
private readonly IClock _clock;
private readonly ActivityCancellationTokenSource _activityToken;
Expand Down Expand Up @@ -144,16 +146,6 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon

try
{
if (_autoFlushHttpClientOutgoingStream)
{
// HttpClient's machinery keeps an internal buffer that doesn't get flushed to the socket on every write.
// Some protocols (e.g. gRPC) may rely on specific bytes being sent, and HttpClient's buffering would prevent it.
// AutoFlushingStream delegates to the provided stream, adding calls to FlushAsync on every WriteAsync.
// Note that HttpClient does NOT call Flush on the underlying socket, so the perf impact of this is expected to be small.
// This statement is based on current knowledge as of .NET Core 3.1.201.
stream = new AutoFlushingStream(stream);
}

// Immediately flush request stream to send headers
// https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081
try
Expand All @@ -172,7 +164,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
}

// Check that the content-length matches the request body size. This can be removed in .NET 7 now that SocketsHttpHandler enforces this: https://github.com/dotnet/runtime/issues/62258.
var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _request.Body, stream, Headers.ContentLength ?? StreamCopier.UnknownLength, _clock, _activityToken, cancellationToken);
var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _request.Body, stream, Headers.ContentLength ?? StreamCopier.UnknownLength, _clock, _activityToken, _autoFlushHttpClientOutgoingStream, cancellationToken);
_tcs.TrySetResult((result, error));

// Check for errors that weren't the result of the destination failing.
Expand Down
2 changes: 2 additions & 0 deletions src/ReverseProxy/Utilities/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ internal static class EventIds
public static readonly EventId Http10RequestVersionDetected = new EventId(58, "Http10RequestVersionDetected");
public static readonly EventId NotForwarding = new EventId(59, "NotForwarding");
public static readonly EventId MaxRequestBodySizeSet = new EventId(60, "MaxRequestBodySizeSet");
public static readonly EventId RetryingWebSocketDowngradeNoConnect = new EventId(61, "RetryingWebSocketDowngradeNoConnect");
public static readonly EventId RetryingWebSocketDowngradeNoHttp2 = new EventId(62, "RetryingWebSocketDowngradeNoHttp2");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#if NET7_0_OR_GREATER

using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Yarp.ReverseProxy.Utilities;

namespace Yarp.ReverseProxy.WebSocketsTelemetry;

internal sealed class HttpConnectFeatureWrapper : IHttpExtendedConnectFeature
{
private readonly IClock _clock;

public HttpContext HttpContext { get; private set; }

public IHttpExtendedConnectFeature InnerConnectFeature { get; private set; }

public WebSocketsTelemetryStream? TelemetryStream { get; private set; }

public bool IsExtendedConnect => InnerConnectFeature.IsExtendedConnect;

public string? Protocol => InnerConnectFeature.Protocol;

public HttpConnectFeatureWrapper(IClock clock, HttpContext httpContext, IHttpExtendedConnectFeature connectFeature)
{
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
HttpContext = httpContext ?? throw new ArgumentNullException(nameof(httpContext));
InnerConnectFeature = connectFeature ?? throw new ArgumentNullException(nameof(connectFeature));
}

public async ValueTask<Stream> AcceptAsync()
{
Debug.Assert(TelemetryStream is null);
var opaqueTransport = await InnerConnectFeature.AcceptAsync();
TelemetryStream = new WebSocketsTelemetryStream(_clock, opaqueTransport);
return TelemetryStream;
}
}

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Net.Http.Headers;
using Yarp.ReverseProxy.Utilities;

namespace Yarp.ReverseProxy.WebSocketsTelemetry;

internal sealed class HttpUpgradeFeatureWrapper : IHttpUpgradeFeature
{
private readonly IClock _clock;

public HttpContext HttpContext { get; private set; }

public IHttpUpgradeFeature InnerUpgradeFeature { get; private set; }

public WebSocketsTelemetryStream? TelemetryStream { get; private set; }

public bool IsUpgradableRequest => InnerUpgradeFeature.IsUpgradableRequest;

public HttpUpgradeFeatureWrapper(IClock clock, HttpContext httpContext, IHttpUpgradeFeature upgradeFeature)
{
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
HttpContext = httpContext ?? throw new ArgumentNullException(nameof(httpContext));
InnerUpgradeFeature = upgradeFeature ?? throw new ArgumentNullException(nameof(upgradeFeature));
}

public async Task<Stream> UpgradeAsync()
{
Debug.Assert(TelemetryStream is null);
var opaqueTransport = await InnerUpgradeFeature.UpgradeAsync();

if (HttpContext.Response.Headers.TryGetValue(HeaderNames.Upgrade, out var upgradeValues) &&
upgradeValues.Count == 1 &&
string.Equals("WebSocket", upgradeValues.ToString(), StringComparison.OrdinalIgnoreCase))
{
TelemetryStream = new WebSocketsTelemetryStream(_clock, opaqueTransport);
}

return TelemetryStream ?? opaqueTransport;
}
}
Loading