Skip to content

Commit

Permalink
swap delegate allocation with state allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
campersau committed Aug 30, 2023
1 parent e2193e6 commit 92ae132
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,48 +232,9 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques
Stream stream = await request.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();

byte[]? buffer = null;

var pull = async void (JSObject controller, int desiredSize) =>
{
Memory<byte> view;
if (desiredSize > 0)
{
if (buffer is null || buffer.Length < desiredSize)
{
view = buffer = new byte[desiredSize];
}
else
{
view = buffer.AsMemory(0, desiredSize);
}
}
else
{
view = buffer ??= new byte[65536];
}

using (controller)
{
try
{
int length = await stream.ReadAsync(view, cancellationToken).ConfigureAwait(true);
using (Buffers.MemoryHandle handle = view.Pin())
{
ReadableStreamControllerEnqueueUnsafe(controller, handle, length);
}
}
catch (Exception ex)
{
BrowserHttpInterop.ReadableStreamControllerError(controller, ex);
}
}
};

unsafe static void ReadableStreamControllerEnqueueUnsafe(JSObject controller, Buffers.MemoryHandle handle, int length) =>
BrowserHttpInterop.ReadableStreamControllerEnqueue(controller, (IntPtr)handle.Pointer, length);

promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, pull);
ReadableStreamPullState pullState = new ReadableStreamPullState(stream, cancellationToken);

promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, ReadableStreamPull, pullState);
}
else
{
Expand Down Expand Up @@ -306,6 +267,49 @@ unsafe static void ReadableStreamControllerEnqueueUnsafe(JSObject controller, Bu
}
}

private static async void ReadableStreamPull(JSObject controller, int desiredSize, object state)
{
using (controller)
{
try
{
ReadableStreamPullState pullState = (ReadableStreamPullState)state;

byte[]? buffer = pullState.Buffer;
Memory<byte> view;
if (desiredSize > 0)
{
if (buffer is null || buffer.Length < desiredSize)
{
view = buffer = new byte[desiredSize];
}
else
{
view = buffer.AsMemory(0, desiredSize);
}
}
else
{
view = buffer ??= new byte[65536];
}
pullState.Buffer = buffer;

int length = await pullState.Stream.ReadAsync(view, pullState.CancellationToken).ConfigureAwait(true);
using (Buffers.MemoryHandle handle = view.Pin())
{
ReadableStreamControllerEnqueueUnsafe(controller, handle, length);
}
}
catch (Exception ex)
{
BrowserHttpInterop.ReadableStreamControllerError(controller, ex);
}
}

unsafe static void ReadableStreamControllerEnqueueUnsafe(JSObject controller, Buffers.MemoryHandle handle, int length) =>
BrowserHttpInterop.ReadableStreamControllerEnqueue(controller, (IntPtr)handle.Pointer, length);
}

private static HttpResponseMessage ConvertResponse(HttpRequestMessage request, WasmFetchResponse fetchResponse)
{
#if FEATURE_WASM_THREADS
Expand Down Expand Up @@ -370,6 +374,21 @@ static async Task<HttpResponseMessage> Impl(HttpRequestMessage request, Cancella
}
}

internal sealed class ReadableStreamPullState
{
public ReadableStreamPullState(Stream stream, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(stream);

Stream = stream;
CancellationToken = cancellationToken;
}

public Stream Stream { get; }
public CancellationToken CancellationToken { get; }
public byte[]? Buffer { get; set; }
}

internal sealed class WasmFetchResponse : IDisposable
{
#if FEATURE_WASM_THREADS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public static partial Task<JSObject> Fetch(
string[] optionNames,
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler,
[JSMarshalAs<JSType.Function<JSType.Object, JSType.Number>>] Action<JSObject, int> pull);
[JSMarshalAs<JSType.Function<JSType.Object, JSType.Number, JSType.Any>>] Action<JSObject, int, object> pull,
[JSMarshalAs<JSType.Any>] object pullState);

[JSImport("INTERNAL.http_wasm_fetch_bytes")]
private static partial Task<JSObject> FetchBytes(
Expand Down
4 changes: 2 additions & 2 deletions src/mono/wasm/runtime/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export function http_wasm_readable_stream_controller_error(controller: ReadableB
controller.__resolve_pull();
}

export function http_wasm_fetch_stream(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, pull: (controller: ReadableByteStreamControllerExtension, desired_size: number) => void): Promise<ResponseExtension> {
export function http_wasm_fetch_stream(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, pull: (controller: ReadableByteStreamControllerExtension, desired_size: number, pull_state: any) => void, pull_state: any): Promise<ResponseExtension> {
return new Promise((resolve_fetch, reject_fetch) => {
const body = new ReadableStream({
type: "bytes",
Expand All @@ -95,7 +95,7 @@ export function http_wasm_fetch_stream(url: string, header_names: string[], head
}
c.__resolve_pull = pull_promise_control.resolve;
c.__reject_fetch = reject_fetch;
pull(c, desired_size || 0);
pull(c, desired_size || 0, pull_state);
return pull_promise;
},
cancel(reason) {
Expand Down

0 comments on commit 92ae132

Please sign in to comment.