diff --git a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.Cancellation.cs b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.Cancellation.cs index 4afc8a7d0dfaa..dea298f2ee90e 100644 --- a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.Cancellation.cs +++ b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.Cancellation.cs @@ -28,7 +28,6 @@ public HttpClientHandler_Cancellation_Test(ITestOutputHelper output) : base(outp [Theory] [InlineData(false, CancellationMode.Token)] [InlineData(true, CancellationMode.Token)] - [ActiveIssue("https://github.com/dotnet/runtime/issues/36634", TestPlatforms.Browser)] // out of memory public async Task PostAsync_CancelDuringRequestContentSend_TaskCanceledQuickly(bool chunkedTransfer, CancellationMode mode) { if (LoopbackServerFactory.Version >= HttpVersion20.Value && chunkedTransfer) @@ -42,6 +41,12 @@ public async Task PostAsync_CancelDuringRequestContentSend_TaskCanceledQuickly(b return; } + if (PlatformDetection.IsBrowser && LoopbackServerFactory.Version < HttpVersion20.Value) + { + // Browser request streaming is only supported on HTTP/2 or higher + return; + } + var serverRelease = new TaskCompletionSource(); await LoopbackServerFactory.CreateClientAndServerAsync(async uri => { @@ -58,6 +63,13 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri => req.Content = new ByteAtATimeContent(int.MaxValue, waitToSend.Task, contentSending, millisecondDelayBetweenBytes: 1); req.Headers.TransferEncodingChunked = chunkedTransfer; + if (PlatformDetection.IsBrowser) + { +#if !NETFRAMEWORK + req.Options.Set(new HttpRequestOptionsKey("WebAssemblyEnableStreamingRequest"), true); +#endif + } + Task resp = client.SendAsync(TestAsync, req, HttpCompletionOption.ResponseHeadersRead, cts.Token); waitToSend.SetResult(true); await Task.WhenAny(contentSending.Task, resp); diff --git a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs index fe4c61a9577b6..d7acae1b1c940 100644 --- a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs +++ b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs @@ -1886,9 +1886,11 @@ await connection.ReadRequestHeaderAndSendCustomResponseAsync( } [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure) + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure, bool enableWasmStreaming) { if (UseVersion == HttpVersion30) { @@ -1896,6 +1898,18 @@ public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure) return; } + if (enableWasmStreaming && !PlatformDetection.IsBrowser) + { + // enableWasmStreaming makes only sense on Browser platform + return; + } + + if (enableWasmStreaming && PlatformDetection.IsBrowser && UseVersion < HttpVersion20.Value) + { + // Browser request streaming is only supported on HTTP/2 or higher + return; + } + await LoopbackServer.CreateServerAsync(async (server, uri) => { Task responseTask = server.AcceptConnectionAsync(async connection => @@ -1914,8 +1928,20 @@ await LoopbackServer.CreateServerAsync(async (server, uri) => canReadFunc: () => true, readFunc: (buffer, offset, count) => throw error, readAsyncFunc: (buffer, offset, count, cancellationToken) => syncFailure ? throw error : Task.Delay(1).ContinueWith(_ => throw error))); + var request = new HttpRequestMessage(HttpMethod.Post, uri); + request.Content = content; + + if (PlatformDetection.IsBrowser) + { + if (enableWasmStreaming) + { +#if !NETFRAMEWORK + request.Options.Set(new HttpRequestOptionsKey("WebAssemblyEnableStreamingRequest"), true); +#endif + } + } - Assert.Same(error, await Assert.ThrowsAsync(() => client.PostAsync(uri, content))); + Assert.Same(error, await Assert.ThrowsAsync(() => client.SendAsync(request))); } }); } diff --git a/src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs b/src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs index 992851b166bef..c63e4de335a9d 100644 --- a/src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs +++ b/src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs @@ -229,9 +229,147 @@ await client.GetAsync(remoteServer.EchoUri, HttpCompletionOption.ResponseHeaders } #if NETCOREAPP - [OuterLoop] + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] public async Task BrowserHttpHandler_Streaming() + { + var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey("WebAssemblyEnableStreamingRequest"); + var WebAssemblyEnableStreamingResponseKey = new HttpRequestOptionsKey("WebAssemblyEnableStreamingResponse"); + + var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.RemoteHttp2Server.BaseUri + "echobody.ashx"); + + req.Options.Set(WebAssemblyEnableStreamingRequestKey, true); + req.Options.Set(WebAssemblyEnableStreamingResponseKey, true); + + byte[] body = new byte[1024 * 1024]; + Random.Shared.NextBytes(body); + + int readOffset = 0; + req.Content = new StreamContent(new DelegateStream( + readAsyncFunc: async (buffer, offset, count, cancellationToken) => + { + await Task.Delay(1); + if (readOffset < body.Length) + { + int send = Math.Min(body.Length - readOffset, count); + body.AsSpan(readOffset, send).CopyTo(buffer.AsSpan(offset, send)); + readOffset += send; + return send; + } + return 0; + })); + + using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server)) + // we need to switch off Response buffering of default ResponseContentRead option + using (HttpResponseMessage response = await client.SendAsync(req, HttpCompletionOption.ResponseHeadersRead)) + { + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + // Streaming requests can't set Content-Length + Assert.False(response.Headers.Contains("X-HttpRequest-Headers-ContentLength")); + // Streaming response uses StreamContent + Assert.Equal(typeof(StreamContent), response.Content.GetType()); + + var stream = await response.Content.ReadAsStreamAsync(); + Assert.Equal("ReadOnlyStream", stream.GetType().Name); + var buffer = new byte[1024 * 1024]; + int totalCount = 0; + int fetchedCount = 0; + do + { + fetchedCount = await stream.ReadAsync(buffer, 0, buffer.Length); + Assert.True(body.AsSpan(totalCount, fetchedCount).SequenceEqual(buffer.AsSpan(0, fetchedCount))); + totalCount += fetchedCount; + } while (fetchedCount != 0); + Assert.Equal(body.Length, totalCount); + } + } + + [OuterLoop] + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] + [InlineData(true)] + [InlineData(false)] + public async Task BrowserHttpHandler_StreamingRequest(bool useStringContent) + { + var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey("WebAssemblyEnableStreamingRequest"); + + var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.Http2RemoteVerifyUploadServer); + + req.Options.Set(WebAssemblyEnableStreamingRequestKey, true); + + int size; + if (useStringContent) + { + string bodyContent = "Hello World"; + size = bodyContent.Length; + req.Content = new StringContent(bodyContent); + } + else + { + size = 1500 * 1024 * 1024; + int remaining = size; + req.Content = new StreamContent(new DelegateStream( + readAsyncFunc: (buffer, offset, count, cancellationToken) => + { + if (remaining > 0) + { + int send = Math.Min(remaining, count); + buffer.AsSpan(offset, send).Fill(65); + remaining -= send; + return Task.FromResult(send); + } + return Task.FromResult(0); + })); + } + + req.Content.Headers.Add("Content-MD5-Skip", "browser"); + + using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server)) + using (HttpResponseMessage response = await client.SendAsync(req)) + { + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + Assert.Equal(size.ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Body-Length"))); + // Streaming requests can't set Content-Length + Assert.Equal(useStringContent, response.Headers.Contains("X-HttpRequest-Headers-ContentLength")); + if (useStringContent) + { + Assert.Equal(size.ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Headers-ContentLength"))); + } + } + } + + // Duplicate of PostAsync_ThrowFromContentCopy_RequestFails using remote server + [OuterLoop] + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] + [InlineData(false)] + [InlineData(true)] + public async Task BrowserHttpHandler_StreamingRequest_ThrowFromContentCopy_RequestFails(bool syncFailure) + { + var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey("WebAssemblyEnableStreamingRequest"); + + var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.Http2RemoteEchoServer); + + req.Options.Set(WebAssemblyEnableStreamingRequestKey, true); + + Exception error = new FormatException(); + var content = new StreamContent(new DelegateStream( + canSeekFunc: () => true, + lengthFunc: () => 12345678, + positionGetFunc: () => 0, + canReadFunc: () => true, + readFunc: (buffer, offset, count) => throw error, + readAsyncFunc: (buffer, offset, count, cancellationToken) => syncFailure ? throw error : Task.Delay(1).ContinueWith(_ => throw error))); + + req.Content = content; + + using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server)) + { + Assert.Same(error, await Assert.ThrowsAsync(() => client.SendAsync(req))); + } + } + + [OuterLoop] + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))] + public async Task BrowserHttpHandler_StreamingResponse() { var WebAssemblyEnableStreamingResponseKey = new HttpRequestOptionsKey("WebAssemblyEnableStreamingResponse"); @@ -244,6 +382,7 @@ public async Task BrowserHttpHandler_Streaming() // we need to switch off Response buffering of default ResponseContentRead option using (HttpResponseMessage response = await client.SendAsync(req, HttpCompletionOption.ResponseHeadersRead)) { + // Streaming response uses StreamContent Assert.Equal(typeof(StreamContent), response.Content.GetType()); Assert.Equal("application/octet-stream", response.Content.Headers.ContentType.MediaType); diff --git a/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/GenericHandler.cs b/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/GenericHandler.cs index 846a30fd9951e..31fa23f16de68 100644 --- a/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/GenericHandler.cs +++ b/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/GenericHandler.cs @@ -88,6 +88,11 @@ public async Task Invoke(HttpContext context) await LargeResponseHandler.InvokeAsync(context); return; } + if (path.Equals(new PathString("/echobody.ashx"))) + { + await EchoBodyHandler.InvokeAsync(context); + return; + } // Default handling. await EchoHandler.InvokeAsync(context); diff --git a/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/Handlers/EchoBodyHandler.cs b/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/Handlers/EchoBodyHandler.cs new file mode 100644 index 0000000000000..3ddf4f62535e6 --- /dev/null +++ b/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/Handlers/EchoBodyHandler.cs @@ -0,0 +1,38 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Features; + +namespace NetCoreServer +{ + public class EchoBodyHandler + { + public static async Task InvokeAsync(HttpContext context) + { + context.Features.Get().MaxRequestBodySize = null; + + // Report back original request method verb. + context.Response.Headers["X-HttpRequest-Method"] = context.Request.Method; + + // Report back original entity-body related request headers. + string contentLength = context.Request.Headers["Content-Length"]; + if (!string.IsNullOrEmpty(contentLength)) + { + context.Response.Headers["X-HttpRequest-Headers-ContentLength"] = contentLength; + } + + string transferEncoding = context.Request.Headers["Transfer-Encoding"]; + if (!string.IsNullOrEmpty(transferEncoding)) + { + context.Response.Headers["X-HttpRequest-Headers-TransferEncoding"] = transferEncoding; + } + + context.Response.StatusCode = 200; + context.Response.ContentType = context.Request.ContentType; + await context.Request.Body.CopyToAsync(context.Response.Body); + } + } +} diff --git a/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/Handlers/VerifyUploadHandler.cs b/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/Handlers/VerifyUploadHandler.cs index e4c6cb63c5560..5876d6303caff 100644 --- a/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/Handlers/VerifyUploadHandler.cs +++ b/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/Handlers/VerifyUploadHandler.cs @@ -6,6 +6,7 @@ using System.Security.Cryptography; using System.Threading.Tasks; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Features; namespace NetCoreServer { @@ -13,6 +14,8 @@ public class VerifyUploadHandler { public static async Task InvokeAsync(HttpContext context) { + context.Features.Get().MaxRequestBodySize = null; + // Report back original request method verb. context.Response.Headers["X-HttpRequest-Method"] = context.Request.Method; @@ -29,12 +32,15 @@ public static async Task InvokeAsync(HttpContext context) context.Response.Headers["X-HttpRequest-Headers-TransferEncoding"] = transferEncoding; } - // Get request body. - byte[] requestBodyBytes = await ReadAllRequestBytesAsync(context); + // Compute MD5 hash of received request body. + (byte[] md5Bytes, int bodyLength) = await ComputeMD5HashRequestBodyAsync(context); + + // Report back the actual body length. + context.Response.Headers["X-HttpRequest-Body-Length"] = bodyLength.ToString(); - // Skip MD5 checksum for empty request body + // Skip MD5 checksum for empty request body // or for requests which opt to skip it due to [ActiveIssue("https://github.com/dotnet/runtime/issues/37669", TestPlatforms.Browser)] - if (requestBodyBytes.Length == 0 || !string.IsNullOrEmpty(context.Request.Headers["Content-MD5-Skip"])) + if (bodyLength == 0 || !string.IsNullOrEmpty(context.Request.Headers["Content-MD5-Skip"])) { context.Response.StatusCode = 200; return; @@ -49,13 +55,7 @@ public static async Task InvokeAsync(HttpContext context) return; } - // Compute MD5 hash of received request body. - string actualHash; - using (MD5 md5 = MD5.Create()) - { - byte[] hash = md5.ComputeHash(requestBodyBytes); - actualHash = Convert.ToBase64String(hash); - } + string actualHash = Convert.ToBase64String(md5Bytes); if (expectedHash == actualHash) { @@ -66,21 +66,22 @@ public static async Task InvokeAsync(HttpContext context) context.Response.StatusCode = 400; context.Response.SetStatusDescription("Received request body fails MD5 checksum"); } - } - private static async Task ReadAllRequestBytesAsync(HttpContext context) + private static async Task<(byte[] MD5Hash, int BodyLength)> ComputeMD5HashRequestBodyAsync(HttpContext context) { Stream requestStream = context.Request.Body; byte[] buffer = new byte[16 * 1024]; - using (MemoryStream ms = new MemoryStream()) + using (MD5 md5 = MD5.Create()) { - int read; + int read, size = 0; while ((read = await requestStream.ReadAsync(buffer, 0, buffer.Length)) > 0) { - ms.Write(buffer, 0, read); + size += read; + md5.TransformBlock(buffer, 0, read, buffer, 0); } - return ms.ToArray(); + md5.TransformFinalBlock(buffer, 0, read); + return (md5.Hash, size); } } } diff --git a/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/NetCoreServer.csproj b/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/NetCoreServer.csproj index a654a21e298c0..a458eb3eeec5e 100644 --- a/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/NetCoreServer.csproj +++ b/src/libraries/Common/tests/System/Net/Prerequisites/NetCoreServer/NetCoreServer.csproj @@ -26,6 +26,7 @@ + diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs index 2cf419e72a5f7..57eb7e52658cf 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs @@ -15,6 +15,7 @@ namespace System.Net.Http // the JavaScript objects have thread affinity, it is necessary that the continuations run the same thread as the start of the async method. internal sealed class BrowserHttpHandler : HttpMessageHandler { + private static readonly HttpRequestOptionsKey EnableStreamingRequest = new HttpRequestOptionsKey("WebAssemblyEnableStreamingRequest"); private static readonly HttpRequestOptionsKey EnableStreamingResponse = new HttpRequestOptionsKey("WebAssemblyEnableStreamingResponse"); private static readonly HttpRequestOptionsKey> FetchOptions = new HttpRequestOptionsKey>("WebAssemblyFetchOptions"); private bool _allowAutoRedirect = HttpHandlerDefaults.DefaultAutomaticRedirection; @@ -220,10 +221,28 @@ private static async Task CallFetch(HttpRequestMessage reques } else { - byte[] buffer = await request.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(true); - cancellationToken.ThrowIfCancellationRequested(); + bool streamingEnabled = false; + if (BrowserHttpInterop.SupportsStreamingRequest()) + { + request.Options.TryGetValue(EnableStreamingRequest, out streamingEnabled); + } + + if (streamingEnabled) + { + Stream stream = await request.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(true); + cancellationToken.ThrowIfCancellationRequested(); + + ReadableStreamPullState pullState = new ReadableStreamPullState(stream, cancellationToken); + + promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, ReadableStreamPull, pullState); + } + else + { + byte[] buffer = await request.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(true); + cancellationToken.ThrowIfCancellationRequested(); - promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer); + promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer); + } } } else @@ -248,6 +267,14 @@ private static async Task CallFetch(HttpRequestMessage reques } } + private static void ReadableStreamPull(object state) + { + ReadableStreamPullState pullState = (ReadableStreamPullState)state; +#pragma warning disable CS4014 // intentionally not awaited + pullState.PullAsync(); +#pragma warning restore CS4014 + } + private static HttpResponseMessage ConvertResponse(HttpRequestMessage request, WasmFetchResponse fetchResponse) { #if FEATURE_WASM_THREADS @@ -312,6 +339,43 @@ static async Task Impl(HttpRequestMessage request, Cancella } } + internal sealed class ReadableStreamPullState + { + private readonly Stream _stream; + private readonly CancellationToken _cancellationToken; + private readonly byte[] _buffer; + + public ReadableStreamPullState(Stream stream, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(stream); + + _stream = stream; + _cancellationToken = cancellationToken; + _buffer = new byte[65536]; + } + + public async Task PullAsync() + { + try + { + int length = await _stream.ReadAsync(_buffer, _cancellationToken).ConfigureAwait(true); + ReadableStreamControllerEnqueueUnsafe(this, _buffer, length); + } + catch (Exception ex) + { + BrowserHttpInterop.ReadableStreamControllerError(this, ex); + } + } + + private static unsafe void ReadableStreamControllerEnqueueUnsafe(object pullState, byte[] buffer, int length) + { + fixed (byte* ptr = buffer) + { + BrowserHttpInterop.ReadableStreamControllerEnqueue(pullState, (nint)ptr, length); + } + } + } + internal sealed class WasmFetchResponse : IDisposable { #if FEATURE_WASM_THREADS diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpInterop.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpInterop.cs index 07bbf0f7145a8..89de8b54187e9 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpInterop.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpInterop.cs @@ -11,6 +11,9 @@ namespace System.Net.Http { internal static partial class BrowserHttpInterop { + [JSImport("INTERNAL.http_wasm_supports_streaming_request")] + public static partial bool SupportsStreamingRequest(); + [JSImport("INTERNAL.http_wasm_supports_streaming_response")] public static partial bool SupportsStreamingResponse(); @@ -25,6 +28,17 @@ public static partial void AbortRequest( public static partial void AbortResponse( JSObject fetchResponse); + [JSImport("INTERNAL.http_wasm_readable_stream_controller_enqueue")] + public static partial void ReadableStreamControllerEnqueue( + [JSMarshalAs] object pullState, + IntPtr bufferPtr, + int bufferLength); + + [JSImport("INTERNAL.http_wasm_readable_stream_controller_error")] + public static partial void ReadableStreamControllerError( + [JSMarshalAs] object pullState, + Exception error); + [JSImport("INTERNAL.http_wasm_get_response_header_names")] private static partial string[] _GetResponseHeaderNames( JSObject fetchResponse); @@ -58,6 +72,17 @@ public static partial Task Fetch( JSObject abortControler, string? body = null); + [JSImport("INTERNAL.http_wasm_fetch_stream")] + public static partial Task Fetch( + string uri, + string[] headerNames, + string[] headerValues, + string[] optionNames, + [JSMarshalAs>] object?[] optionValues, + JSObject abortControler, + [JSMarshalAs>] Action pull, + [JSMarshalAs] object pullState); + [JSImport("INTERNAL.http_wasm_fetch_bytes")] private static partial Task FetchBytes( string uri, @@ -67,8 +92,7 @@ private static partial Task FetchBytes( [JSMarshalAs>] object?[] optionValues, JSObject abortControler, IntPtr bodyPtr, - int bodyLength - ); + int bodyLength); public static unsafe Task Fetch(string uri, string[] headerNames, string[] headerValues, string[] optionNames, object?[] optionValues, JSObject abortControler, byte[] body) { diff --git a/src/mono/wasm/runtime/exports-internal.ts b/src/mono/wasm/runtime/exports-internal.ts index 826180282e3bb..43011bfaa4127 100644 --- a/src/mono/wasm/runtime/exports-internal.ts +++ b/src/mono/wasm/runtime/exports-internal.ts @@ -4,7 +4,7 @@ import { mono_wasm_cancel_promise } from "./cancelable-promise"; import cwraps, { profiler_c_functions } from "./cwraps"; import { mono_wasm_send_dbg_command_with_parms, mono_wasm_send_dbg_command, mono_wasm_get_dbg_command_info, mono_wasm_get_details, mono_wasm_release_object, mono_wasm_call_function_on, mono_wasm_debugger_resume, mono_wasm_detach_debugger, mono_wasm_raise_debug_event, mono_wasm_change_debugger_log_level, mono_wasm_debugger_attached } from "./debug"; -import { http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_fetch, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http"; +import { http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_readable_stream_controller_enqueue, http_wasm_readable_stream_controller_error, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http"; import { exportedRuntimeAPI, Module, runtimeHelpers } from "./globals"; import { get_property, set_property, has_property, get_typeof_property, get_global_this, dynamic_import } from "./invoke-js"; import { mono_wasm_stringify_as_error_with_stack } from "./logging"; @@ -64,11 +64,15 @@ export function export_internal(): any { ws_wasm_abort, // BrowserHttpHandler + http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, + http_wasm_readable_stream_controller_enqueue, + http_wasm_readable_stream_controller_error, http_wasm_fetch, + http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, diff --git a/src/mono/wasm/runtime/http.ts b/src/mono/wasm/runtime/http.ts index 1ad6a4fc457bb..36b34ebfcad27 100644 --- a/src/mono/wasm/runtime/http.ts +++ b/src/mono/wasm/runtime/http.ts @@ -2,9 +2,10 @@ // The .NET Foundation licenses this file to you under the MIT license. import { wrap_as_cancelable_promise } from "./cancelable-promise"; -import { ENVIRONMENT_IS_NODE, Module, loaderHelpers, mono_assert } from "./globals"; -import { MemoryViewType, Span } from "./marshal"; +import { ENVIRONMENT_IS_NODE, Module, createPromiseController, loaderHelpers, mono_assert } from "./globals"; +import { ManagedObject, MemoryViewType, Span } from "./marshal"; import type { VoidPtr } from "./types/emscripten"; +import { ControllablePromise, PromiseController } from "./types/internal"; function verifyEnvironment() { @@ -16,6 +17,29 @@ function verifyEnvironment() { } } +export function http_wasm_supports_streaming_request(): boolean { + // Detecting streaming request support works like this: + // If the browser doesn't support a particular body type, it calls toString() on the object and uses the result as the body. + // So, if the browser doesn't support request streams, the request body becomes the string "[object ReadableStream]". + // When a string is used as a body, it conveniently sets the Content-Type header to text/plain;charset=UTF-8. + // So, if that header is set, then we know the browser doesn't support streams in request objects, and we can exit early. + // Safari does support streams in request objects, but doesn't allow them to be used with fetch, so the duplex option is tested, which Safari doesn't currently support. + // See https://developer.chrome.com/articles/fetch-streaming-requests/ + if (typeof Request !== "undefined" && "body" in Request.prototype && typeof ReadableStream === "function") { + let duplexAccessed = false; + const hasContentType = new Request("", { + body: new ReadableStream(), + method: "POST", + get duplex() { + duplexAccessed = true; + return "half"; + }, + } as RequestInit /* https://github.com/microsoft/TypeScript-DOM-lib-generator/issues/1483 */).headers.has("Content-Type"); + return duplexAccessed && !hasContentType; + } + return false; +} + export function http_wasm_supports_streaming_response(): boolean { return typeof Response !== "undefined" && "body" in Response.prototype && typeof ReadableStream === "function"; } @@ -41,14 +65,90 @@ export function http_wasm_abort_response(res: ResponseExtension): void { } } +export function http_wasm_readable_stream_controller_enqueue(pull_state: PullStateExtension, bufferPtr: VoidPtr, bufferLength: number): void { + const controller = pull_state.__controller; + const pull_promise_control = pull_state.__pull_promise_control; + mono_assert(controller, "expected controller"); + mono_assert(pull_promise_control, "expected pull_promise_control"); + try { + if (bufferLength === 0) { + controller.close(); + pull_state.dispose(); + pull_state.__pull_promise_control = null; + pull_state.__controller = null; + } else { + // the bufferPtr is pinned by the caller + const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte); + // because https://github.com/WebAssembly/design/issues/1162 we need to copy the buffer + // also it doesn't make much sense to use byob + const copy = view.slice() as Uint8Array; + controller.enqueue(copy); + } + pull_promise_control.resolve(); + } + catch (err) { + pull_state.dispose(); + pull_promise_control.reject(err); + } + finally { + pull_state.__pull_promise_control = null; + } +} + +export function http_wasm_readable_stream_controller_error(pull_state: PullStateExtension, error: Error): void { + const controller = pull_state.__controller; + mono_assert(controller, "expected controller"); + pull_state.__pull_promise_control?.reject(error); + pull_state.__fetch_promise_control?.reject(error); + pull_state.dispose(); + controller.error(error); + pull_state.__pull_promise_control = null; +} + +export function http_wasm_fetch_stream(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, + pull_delegate: (pull_state: PullStateExtension) => void, + pull_state: PullStateExtension): Promise { + function pull(controller: ReadableByteStreamController): Promise { + const { promise, promise_control } = createPromiseController(); + try { + mono_assert(!pull_state.__pull_promise_control, "expected pull_promise_control to be null"); + pull_state.__controller = controller; + pull_state.__pull_promise_control = promise_control; + pull_delegate(pull_state); + return promise; + } + catch (error) { + pull_state.dispose(); + pull_state.__controller = null; + pull_state.__pull_promise_control = null; + pull_state.__fetch_promise_control?.reject(error); + return Promise.reject(error); + } + } + + function cancel(error: any) { + pull_state.__fetch_promise_control?.reject(error); + } + + const body = new ReadableStream({ + type: "bytes", + pull, + cancel + }); + + const cancelable_promise = http_wasm_fetch(url, header_names, header_values, option_names, option_values, abort_controller, body); + pull_state.__fetch_promise_control = loaderHelpers.getPromiseController(cancelable_promise); + return cancelable_promise; +} + export function http_wasm_fetch_bytes(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, bodyPtr: VoidPtr, bodyLength: number): Promise { - // the bufferPtr is pinned by the caller + // the bodyPtr is pinned by the caller const view = new Span(bodyPtr, bodyLength, MemoryViewType.Byte); const copy = view.slice() as Uint8Array; return http_wasm_fetch(url, header_names, header_values, option_names, option_values, abort_controller, copy); } -export function http_wasm_fetch(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, body: string | Uint8Array | null): Promise { +export function http_wasm_fetch(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, body: string | Uint8Array | ReadableStream | null): ControllablePromise { verifyEnvironment(); mono_assert(url && typeof url === "string", "expected url string"); mono_assert(header_names && header_values && Array.isArray(header_names) && Array.isArray(header_values) && header_names.length === header_values.length, "expected headerNames and headerValues arrays"); @@ -62,6 +162,9 @@ export function http_wasm_fetch(url: string, header_names: string[], header_valu headers, signal: abort_controller.signal }; + if (typeof ReadableStream !== "undefined" && body instanceof ReadableStream) { + options.duplex = "half"; + } for (let i = 0; i < option_names.length; i++) { options[option_names[i]] = option_values[i]; } @@ -149,6 +252,12 @@ export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bu }); } +interface PullStateExtension extends ManagedObject { + __pull_promise_control: PromiseController | null + __fetch_promise_control: PromiseController | null + __controller: ReadableByteStreamController | null +} + interface ResponseExtension extends Response { __buffer?: ArrayBuffer __reader?: ReadableStreamDefaultReader