From 3e7411188c4a27a588d9c9a342a5557dfd31a667 Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Thu, 14 Nov 2024 17:14:56 +0000 Subject: [PATCH 01/11] Add SseFormatter. --- .../ref/System.Net.ServerSentEvents.cs | 8 +- .../ref/System.Net.ServerSentEvents.csproj | 4 + .../src/Resources/Strings.resx | 3 + .../src/System.Net.ServerSentEvents.csproj | 7 + .../System/Net/ServerSentEvents/Helpers.cs | 76 ++++++++ .../PooledByteBufferWriter.cs | 150 ++++++++++++++++ .../Net/ServerSentEvents/SseFormatter.cs | 152 ++++++++++++++++ .../System/Net/ServerSentEvents/SseItem.cs | 18 +- .../Net/ServerSentEvents/SseParser_1.cs | 17 +- .../tests/SseFormatterTests.cs | 162 ++++++++++++++++++ .../tests/SseItemTests.cs | 15 +- .../tests/SseParserTests.cs | 32 ++-- .../System.Net.ServerSentEvents.Tests.csproj | 5 + 13 files changed, 621 insertions(+), 28 deletions(-) create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs create mode 100644 src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs diff --git a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs index 10b97e15d7cc0..ec1bc2abc05e3 100644 --- a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs +++ b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs @@ -6,14 +6,20 @@ namespace System.Net.ServerSentEvents { + public static partial class SseFormatter + { + public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable> source, System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable> source, System.IO.Stream destination, System.Action, System.Net.ServerSentEvents.SseItem> itemFormatter, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } public delegate T SseItemParser(string eventType, System.ReadOnlySpan data); public readonly partial struct SseItem { private readonly T _Data_k__BackingField; private readonly object _dummy; private readonly int _dummyPrimitive; - public SseItem(T data, string? eventType) { throw null; } + public SseItem(T data, string? eventType = null) { throw null; } public T Data { get { throw null; } } + public string? EventId { get { throw null; } init { } } public string EventType { get { throw null; } } } public static partial class SseParser diff --git a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj index 50bc340a7191a..bb3a652626696 100644 --- a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj +++ b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj @@ -8,6 +8,10 @@ + + + + diff --git a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx index a0ea42d131d14..cf01ed771b737 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx +++ b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx @@ -120,4 +120,7 @@ The enumerable may be enumerated only once. + + Parameter should not contain any line breaks. + diff --git a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj index 65c1959305695..98d6577797157 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj +++ b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj @@ -11,12 +11,19 @@ System.Net.ServerSentEvents.SseParser + + + + + + + diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs new file mode 100644 index 0000000000000..8baf1d6db6979 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs @@ -0,0 +1,76 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.IO; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.ServerSentEvents +{ + internal static class Helpers + { + public static unsafe void WriteAsUtf8String(this IBufferWriter bufferWriter, ReadOnlySpan value) + { + if (value.IsEmpty) + { + return; + } + + int maxByteCount = Encoding.UTF8.GetMaxByteCount(value.Length); + Span buffer = bufferWriter.GetSpan(maxByteCount); + int bytesWritten; +#if NET + bytesWritten = Encoding.UTF8.GetBytes(value, buffer); +#else + fixed (char* chars = value) + fixed (byte* bytes = buffer) + { + bytesWritten = Encoding.UTF8.GetBytes(chars, value.Length, bytes, maxByteCount); + } +#endif + bufferWriter.Advance(bytesWritten); + } + + public static void ValidateParameterDoesNotContainLineBreaks(string? input, string paramName) + { + if (input?.Contains('\n') is true) + { + Throw(paramName); + static void Throw(string parameterName) => throw new ArgumentException(SR.ArgumentException_MustNotContainLineBreaks, parameterName); + } + } + +#if !NET + public static bool Contains(this string text, char character) => text.IndexOf(character) >= 0; + + public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + if (MemoryMarshal.TryGetArray(buffer, out ArraySegment segment)) + { + return new ValueTask(stream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken)); + } + else + { + return WriteAsyncUsingPooledBuffer(stream, buffer, cancellationToken); + + static async ValueTask WriteAsyncUsingPooledBuffer(Stream stream, ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + byte[] sharedBuffer = ArrayPool.Shared.Rent(buffer.Length); + buffer.Span.CopyTo(sharedBuffer); + try + { + await stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); + } + finally + { + ArrayPool.Shared.Return(sharedBuffer); + } + } + } + } +#endif + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs new file mode 100644 index 0000000000000..f4e0ba33c16ab --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs @@ -0,0 +1,150 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Diagnostics; + +namespace System.Net.ServerSentEvents +{ + internal sealed class PooledByteBufferWriter : IBufferWriter, IDisposable + { + private byte[] _rentedBuffer; + private int _index; + private const int MinimumBufferSize = 256; + + // Value copied from Array.MaxLength in System.Private.CoreLib/src/libraries/System.Private.CoreLib/src/System/Array.cs. + public const int MaximumBufferSize = 0X7FFFFFC7; + + public PooledByteBufferWriter(int initialCapacity = MinimumBufferSize) + { + Debug.Assert(initialCapacity > 0); + + _rentedBuffer = ArrayPool.Shared.Rent(initialCapacity); + _index = 0; + } + + public ReadOnlyMemory WrittenMemory + { + get + { + Debug.Assert(_rentedBuffer != null); + Debug.Assert(_index <= _rentedBuffer.Length); + return _rentedBuffer.AsMemory(0, _index); + } + } + + public int WrittenCount + { + get + { + Debug.Assert(_rentedBuffer != null); + return _index; + } + } + + public int Capacity + { + get + { + Debug.Assert(_rentedBuffer != null); + return _rentedBuffer.Length; + } + } + + public void Reset() + { + Debug.Assert(_rentedBuffer != null); + Debug.Assert(_index <= _rentedBuffer.Length); + + _rentedBuffer.AsSpan(0, _index).Clear(); + _index = 0; + } + + // Returns the rented buffer back to the pool + public void Dispose() + { + if (_rentedBuffer == null) + { + return; + } + + Reset(); + byte[] toReturn = _rentedBuffer; + _rentedBuffer = null!; + ArrayPool.Shared.Return(toReturn); + } + + public void Advance(int count) + { + Debug.Assert(_rentedBuffer != null); + Debug.Assert(count >= 0); + Debug.Assert(_index <= _rentedBuffer.Length - count); + _index += count; + } + + public Memory GetMemory(int sizeHint = 0) + { + CheckAndResizeBuffer(sizeHint); + return _rentedBuffer.AsMemory(_index); + } + + public Span GetSpan(int sizeHint = 0) + { + CheckAndResizeBuffer(sizeHint); + return _rentedBuffer.AsSpan(_index); + } + + private void CheckAndResizeBuffer(int sizeHint) + { + Debug.Assert(_rentedBuffer != null); + Debug.Assert(sizeHint >= 0); + + int currentLength = _rentedBuffer.Length; + int availableSpace = currentLength - _index; + + if (sizeHint == 0) + { + sizeHint = MinimumBufferSize; + } + + // If we've reached ~1GB written, grow to the maximum buffer + // length to avoid incessant minimal growths causing perf issues. + if (_index >= MaximumBufferSize / 2) + { + sizeHint = Math.Max(sizeHint, MaximumBufferSize - currentLength); + } + + if (sizeHint > availableSpace) + { + int growBy = Math.Max(sizeHint, currentLength); + + int newSize = currentLength + growBy; + + if ((uint)newSize > MaximumBufferSize) + { + newSize = currentLength + sizeHint; + if ((uint)newSize > MaximumBufferSize) + { + Throw(); + static void Throw() => throw new OutOfMemoryException(); + } + } + + byte[] oldBuffer = _rentedBuffer; + + _rentedBuffer = ArrayPool.Shared.Rent(newSize); + + Debug.Assert(oldBuffer.Length >= _index); + Debug.Assert(_rentedBuffer.Length >= _index); + + Span oldBufferAsSpan = oldBuffer.AsSpan(0, _index); + oldBufferAsSpan.CopyTo(_rentedBuffer); + oldBufferAsSpan.Clear(); + ArrayPool.Shared.Return(oldBuffer); + } + + Debug.Assert(_rentedBuffer.Length - _index > 0); + Debug.Assert(_rentedBuffer.Length - _index >= sizeHint); + } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs new file mode 100644 index 0000000000000..e860c0b72c531 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs @@ -0,0 +1,152 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.ServerSentEvents +{ + /// + /// Provides methods for formatting server-sent events. + /// + public static class SseFormatter + { + private static readonly byte[] s_newLine = "\n"u8.ToArray(); + + /// + /// Writes the of server-sent events to the stream. + /// + /// The events to write to the stream. + /// The destination stream to write the events. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + public static Task WriteAsync(IAsyncEnumerable> source, Stream destination, CancellationToken cancellationToken = default) + { + if (source is null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (destination is null) + { + throw new ArgumentNullException(nameof(destination)); + } + + return WriteAsyncCore(source, destination, static (writer, item) => writer.WriteAsUtf8String(item.Data), cancellationToken); + } + + /// + /// Writes the of server-sent events to the stream. + /// + /// The data type of the event. + /// The events to write to the stream. + /// The destination stream to write the events. + /// The formatter for the data field of given event. + /// The that can be used to cancel the write operation. + /// A task that represents the asynchronous write operation. + public static Task WriteAsync(IAsyncEnumerable> source, Stream destination, Action, SseItem> itemFormatter, CancellationToken cancellationToken = default) + { + if (source is null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (destination is null) + { + throw new ArgumentNullException(nameof(destination)); + } + + if (itemFormatter is null) + { + throw new ArgumentNullException(nameof(itemFormatter)); + } + + return WriteAsyncCore(source, destination, itemFormatter, cancellationToken); + } + + private static async Task WriteAsyncCore(IAsyncEnumerable> source, Stream destination, Action, SseItem> itemFormatter, CancellationToken cancellationToken) + { + using PooledByteBufferWriter bufferWriter = new(); + using PooledByteBufferWriter userDataBufferWriter = new(); + + await foreach (SseItem item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) + { + FormatSseEvent(bufferWriter, userDataBufferWriter, itemFormatter, item); + + await destination.WriteAsync(bufferWriter.WrittenMemory, cancellationToken).ConfigureAwait(false); + + userDataBufferWriter.Reset(); + bufferWriter.Reset(); + } + } + + private static void FormatSseEvent( + PooledByteBufferWriter bufferWriter, + PooledByteBufferWriter userDataBufferWriter, + Action, SseItem> itemFormatter, + SseItem sseItem) + { + Debug.Assert(bufferWriter.WrittenCount is 0, "Must not contain any data"); + Debug.Assert(userDataBufferWriter.WrittenCount is 0, "Must not contain any data"); + + if (sseItem._eventType is { } eventType) + { + Debug.Assert(!eventType.Contains('\n'), "Event type must not contain line breaks"); + + bufferWriter.Write("event: "u8); + bufferWriter.WriteAsUtf8String(eventType); + bufferWriter.Write(s_newLine); + } + + itemFormatter(userDataBufferWriter, sseItem); + WriteDataWithLineBreakHandling(bufferWriter, userDataBufferWriter.WrittenMemory.Span); + + if (sseItem.EventId is { } eventId) + { + Debug.Assert(!eventId.Contains('\n'), "Event id must not contain line breaks"); + + bufferWriter.Write("id: "u8); + bufferWriter.WriteAsUtf8String(eventId); + bufferWriter.Write(s_newLine); + } + + bufferWriter.Write(s_newLine); + } + + private static void WriteDataWithLineBreakHandling(PooledByteBufferWriter bufferWriter, ReadOnlySpan data) + { + // The data field can contain multiple lines, each line must be prefixed with "data: " and suffixed with a line break. + + ReadOnlySpan nextLine; + int lineBreak; + + do + { + lineBreak = data.IndexOf((byte)'\n'); + if (lineBreak < 0) + { + nextLine = data; + data = default; + } + else + { + int lineLength = lineBreak > 0 && data[lineBreak - 1] == '\r' + ? lineBreak - 1 + : lineBreak; + + nextLine = data.Slice(0, lineLength); + data = data.Slice(lineBreak + 1); + } + + bufferWriter.Write("data: "u8); + bufferWriter.Write(nextLine); + bufferWriter.Write(s_newLine); + + } while (lineBreak >= 0); + } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs index c4f966d62b779..c973527d9f371 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs @@ -9,12 +9,16 @@ public readonly struct SseItem { /// The event's type. internal readonly string? _eventType; + /// The event's id. + private readonly string? _eventId; /// Initializes the server-sent event. /// The event's payload. /// The event's type. - public SseItem(T data, string? eventType) + public SseItem(T data, string? eventType = null) { + Helpers.ValidateParameterDoesNotContainLineBreaks(eventType, nameof(eventType)); + Data = data; _eventType = eventType; } @@ -24,5 +28,17 @@ public SseItem(T data, string? eventType) /// Gets the event's type. public string EventType => _eventType ?? SseParser.EventTypeDefault; + + /// Gets the event's id. + public string? EventId + { + get => _eventId; + init + { + Helpers.ValidateParameterDoesNotContainLineBreaks(value, nameof(EventId)); + + _eventId = value; + } + } } } diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs index 98beedb7048fa..41dddf3233797 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs @@ -71,7 +71,10 @@ public sealed class SseParser private bool _dataAppended; /// The event type for the next event. - private string _eventType = SseParser.EventTypeDefault; + private string? _eventType; + + /// The event id for the next event. + private string? _eventId; /// Initialize the enumerable. /// The stream to parse. @@ -314,8 +317,9 @@ private bool ProcessLine(out SseItem sseItem, out int advance) if (_dataAppended) { - sseItem = new SseItem(_itemParser(_eventType, _dataBuffer.AsSpan(0, _dataLength)), _eventType); - _eventType = SseParser.EventTypeDefault; + sseItem = new SseItem(_itemParser(_eventType ?? SseParser.EventTypeDefault, _dataBuffer.AsSpan(0, _dataLength)), _eventType) { EventId = _eventId }; + _eventType = null; + _eventId = null; _dataLength = 0; _dataAppended = false; return true; @@ -365,8 +369,9 @@ private bool ProcessLine(out SseItem sseItem, out int advance) (remainder[0] is LF || (remainder[0] is CR && remainder.Length > 1))) { advance = line.Length + newlineLength + (remainder.StartsWith(CRLF) ? 2 : 1); - sseItem = new SseItem(_itemParser(_eventType, fieldValue), _eventType); - _eventType = SseParser.EventTypeDefault; + sseItem = new SseItem(_itemParser(_eventType ?? SseParser.EventTypeDefault, fieldValue), _eventType) { EventId = _eventId }; + _eventType = null; + _eventId = null; return true; } } @@ -398,7 +403,7 @@ private bool ProcessLine(out SseItem sseItem, out int advance) if (fieldValue.IndexOf((byte)'\0') < 0) { // Note that fieldValue might be empty, in which case LastEventId will naturally be reset to the empty string. This is per spec. - LastEventId = SseParser.Utf8GetString(fieldValue); + LastEventId = _eventId = SseParser.Utf8GetString(fieldValue); } } else if (fieldName.SequenceEqual("retry"u8)) diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs new file mode 100644 index 0000000000000..d869e13518051 --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs @@ -0,0 +1,162 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Runtime.CompilerServices; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Net.ServerSentEvents.Tests +{ + public static partial class SseFormatterTests + { + [Fact] + public static void WriteAsync_InvalidArguments_Throws() + { + AssertExtensions.Throws("source", () => SseFormatter.WriteAsync(source: null, new MemoryStream())); + AssertExtensions.Throws("source", () => SseFormatter.WriteAsync(source: null, new MemoryStream(), (_,_) => { })); + + AssertExtensions.Throws("destination", () => SseFormatter.WriteAsync(GetItemsAsync(), destination: null)); + AssertExtensions.Throws("destination", () => SseFormatter.WriteAsync(GetItemsAsync(), destination: null, (_,_) => { })); + + AssertExtensions.Throws("itemFormatter", () => SseFormatter.WriteAsync(GetItemsAsync(), new MemoryStream(), itemFormatter: null)); + } + + [Fact] + public static async Task WriteAsync_HasExpectedFormat() + { + // Arrange + string expectedFormat = + "event: eventType1\ndata: data1\n\n" + + "event: eventType2\ndata: data2\n\n" + + "data: data3\n\n" + + "data: \n\n" + + "event: eventType4\ndata: data4\nid: id4\n\n" + + "event: eventType4\ndata: data with\ndata: multiple \rline\ndata: breaks\n\n" + + "data: line break at end\ndata: \n\n"; + + using MemoryStream stream = new(); + + // Act + await SseFormatter.WriteAsync(GetItemsAsync(), stream); + + // Assert + string actualFormat = Encoding.UTF8.GetString(stream.ToArray()); + Assert.Equal(expectedFormat, actualFormat); + } + + [Fact] + public static async Task WriteAsync_ItemFormatter_HasExpectedFormat() + { + // Arrange + string expectedFormat = + "event: eventType1\ndata: data1_suffix\n\n" + + "event: eventType2\ndata: data2_suffix\n\n" + + "data: data3_suffix\n\n" + + "data: _suffix\n\n" + + "event: eventType4\ndata: data4_suffix\nid: id4\n\n" + + "event: eventType4\ndata: data with\ndata: multiple \rline\ndata: breaks_suffix\n\n" + + "data: line break at end\ndata: _suffix\n\n"; + + using MemoryStream stream = new(); + + // Act + await SseFormatter.WriteAsync(GetItemsAsync(), stream, (writer, item) => writer.Write(Encoding.UTF8.GetBytes(item.Data + "_suffix"))); + + // Assert + string actualFormat = Encoding.UTF8.GetString(stream.ToArray()); + Assert.Equal(expectedFormat, actualFormat); + } + + private static async IAsyncEnumerable> GetItemsAsync() + { + yield return new SseItem("data1", "eventType1"); + yield return new SseItem("data2", "eventType2"); + await Task.Yield(); + yield return new SseItem("data3", null); + yield return new SseItem(data: null!, null); + yield return new SseItem("data4", "eventType4") { EventId = "id4" }; + await Task.Yield(); + yield return new SseItem("data with\nmultiple \rline\r\nbreaks", "eventType4"); + yield return new SseItem("line break at end\n", null); + } + + [Fact] + public static async Task WriteAsync_HonorsCancellationToken() + { + CancellationToken token = new(canceled: true); + + await Assert.ThrowsAsync(() => SseFormatter.WriteAsync(GetItemsAsync(), new MemoryStream(), token)); + await Assert.ThrowsAsync(() => + SseFormatter.WriteAsync( + GetItemsAsync(), + new MemoryStream(), + (writer, item) => writer.Write(Encoding.UTF8.GetBytes(item.Data)), + token)); + + async IAsyncEnumerable> GetItemsAsync([EnumeratorCancellation] CancellationToken token = default) + { + yield return new SseItem("data"); + await Task.Delay(20); + token.ThrowIfCancellationRequested(); + } + } + + [Fact] + public static async Task WriteAsync_ParserCanRoundtripJsonEvents() + { + MemoryStream stream = new(); + await SseFormatter.WriteAsync(GetItemsAsync(), stream, FormatJson); + + stream.Position = 0; + SseParser parser = SseParser.Create(stream, ParseJson); + await ValidateParseResults(parser.EnumerateAsync()); + + async IAsyncEnumerable> GetItemsAsync() + { + for (int i = 0; i < 50; i++) + { + string? eventType = i % 2 == 0 ? null : "eventType"; + string? eventId = i % 3 == 2 ? i.ToString() : null; + yield return new SseItem(new MyPoco(i), eventType) { EventId = eventId }; + await Task.Yield(); + } + } + + async Task ValidateParseResults(IAsyncEnumerable> results) + { + int i = 0; + await foreach (SseItem item in results) + { + Assert.Equal(i % 2 == 0 ? "message" : "eventType", item.EventType); + Assert.Equal(i % 3 == 2 ? i.ToString() : null, item.EventId); + Assert.Equal(i, item.Data.Value); + i++; + } + } + + static void FormatJson(IBufferWriter writer, SseItem item) + { + using Utf8JsonWriter jsonWriter = new Utf8JsonWriter(writer); + JsonSerializer.Serialize(jsonWriter, item.Data, JsonContext.Default.MyPoco); + } + + static MyPoco ParseJson(string eventType, ReadOnlySpan data) + { + return JsonSerializer.Deserialize(data, JsonContext.Default.MyPoco); + } + } + + public record MyPoco(int Value); + + [JsonSourceGenerationOptions(WriteIndented = true)] + [JsonSerializable(typeof(MyPoco))] + partial class JsonContext : JsonSerializerContext; + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs index cf5a0d06382b6..4ec24bdd90bd1 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs @@ -15,14 +15,27 @@ public void SseItem_Roundtrips() item = default; Assert.Null(item.Data); Assert.Equal(SseParser.EventTypeDefault, item.EventType); + Assert.Null(item.EventId); item = new SseItem("some data", null); Assert.Equal("some data", item.Data); Assert.Equal(SseParser.EventTypeDefault, item.EventType); + Assert.Null(item.EventId); - item = new SseItem("some data", "eventType"); + item = new SseItem("some data", "eventType") { EventId = "eventId" }; Assert.Equal("some data", item.Data); Assert.Equal("eventType", item.EventType); + Assert.Equal("eventId", item.EventId); + } + + [Theory] + [InlineData("\n")] + [InlineData("Hello, World!\n")] + [InlineData("Hello, \r\nWorld!")] + public void SseItem_MetadataWithLineBreak_ThrowsArgumentException(string metadataWithLineBreak) + { + Assert.Throws(() => new SseItem("data", eventType: metadataWithLineBreak)); + Assert.Throws(() => new SseItem("data", "eventType") { EventId = metadataWithLineBreak }); } } } diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs index d0004ec47d476..86005176d67c4 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs @@ -121,9 +121,9 @@ await ReadAllEventsAsync(stream) : Assert.Equal(stream.Length, stream.Position); Assert.Equal(3, items.Count); - AssertSseItemEqual(new SseItem("1", "A"), items[0]); - AssertSseItemEqual(new SseItem("4", "B"), items[1]); - AssertSseItemEqual(new SseItem("7", "C"), items[2]); + AssertSseItemEqual(new SseItem("1", "A") { EventId = "2" }, items[0]); + AssertSseItemEqual(new SseItem("4", "B") { EventId = "5" }, items[1]); + AssertSseItemEqual(new SseItem("7", "C") { EventId = "8" }, items[2]); } [Theory] @@ -217,11 +217,11 @@ public async Task Parse_HtmlSpec_Example4(string newline, bool trickle, bool use using IEnumerator> e = parser.Enumerate().GetEnumerator(); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + AssertSseItemEqual(new SseItem("first event", "message") { EventId = "1" }, e.Current); Assert.Equal("1", parser.LastEventId); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + AssertSseItemEqual(new SseItem("second event", "message") { EventId = "" }, e.Current); Assert.Equal(string.Empty, parser.LastEventId); Assert.True(e.MoveNext()); @@ -235,11 +235,11 @@ public async Task Parse_HtmlSpec_Example4(string newline, bool trickle, bool use await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + AssertSseItemEqual(new SseItem("first event", "message") { EventId = "1" }, e.Current); Assert.Equal("1", parser.LastEventId); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem("second event", "message"), e.Current); + AssertSseItemEqual(new SseItem("second event", "message") { EventId = "" }, e.Current); Assert.Equal(string.Empty, parser.LastEventId); Assert.True(await e.MoveNextAsync()); @@ -273,7 +273,7 @@ public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool tric using IEnumerator> e = parser.Enumerate().GetEnumerator(); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + AssertSseItemEqual(new SseItem("first event", "message") { EventId = "1" }, e.Current); Assert.Equal("1", parser.LastEventId); Assert.True(e.MoveNext()); @@ -281,7 +281,7 @@ public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool tric Assert.Equal("1", parser.LastEventId); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + AssertSseItemEqual(new SseItem(" third event", "message") { EventId = "42" }, e.Current); Assert.Equal("42", parser.LastEventId); } else @@ -291,7 +291,7 @@ public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool tric await using IAsyncEnumerator> e = parser.EnumerateAsync().GetAsyncEnumerator(); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem("first event", "message"), e.Current); + AssertSseItemEqual(new SseItem("first event", "message") { EventId = "1" }, e.Current); Assert.Equal("1", parser.LastEventId); Assert.True(await e.MoveNextAsync()); @@ -299,7 +299,7 @@ public async Task Parse_HtmlSpec_Example4_InheritedIDs(string newline, bool tric Assert.Equal("1", parser.LastEventId); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); + AssertSseItemEqual(new SseItem(" third event", "message") { EventId = "42" }, e.Current); Assert.Equal("42", parser.LastEventId); } } @@ -865,14 +865,8 @@ public async Task ArrayPoolRental_Closure(string newline, bool trickle, bool use private static void AssertSseItemEqual(SseItem left, SseItem right) { Assert.Equal(left.EventType, right.EventType); - if (left.Data is string leftData && right.Data is string rightData) - { - Assert.Equal($"{leftData.Length} {leftData}", $"{rightData.Length} {rightData}"); - } - else - { - Assert.Equal(left.Data, right.Data); - } + Assert.Equal(left.EventId, right.EventId); + Assert.Equal(left.Data, right.Data); } public static IEnumerable NewlineTrickleAsyncData() => diff --git a/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj b/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj index d0f19860f75dc..56cf948fb7259 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj +++ b/src/libraries/System.Net.ServerSentEvents/tests/System.Net.ServerSentEvents.Tests.csproj @@ -5,10 +5,15 @@ + + + + + From 5664a32687813391c42443beffd19ff474f6cd53 Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Thu, 14 Nov 2024 18:56:12 +0000 Subject: [PATCH 02/11] Update src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx Co-authored-by: Stephen Toub --- .../System.Net.ServerSentEvents/src/Resources/Strings.resx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx index cf01ed771b737..833ae31c9871c 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx +++ b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx @@ -121,6 +121,6 @@ The enumerable may be enumerated only once. - Parameter should not contain any line breaks. + Argument must not contain line breaks. From 4d8d36336c06003bd270cef66823613305327abb Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Thu, 14 Nov 2024 19:02:10 +0000 Subject: [PATCH 03/11] Document SseItem exceptions. --- .../src/System/Net/ServerSentEvents/SseItem.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs index c973527d9f371..39ecd9023fe7a 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs @@ -15,6 +15,7 @@ public readonly struct SseItem /// Initializes the server-sent event. /// The event's payload. /// The event's type. + /// Thrown when contains a line break. public SseItem(T data, string? eventType = null) { Helpers.ValidateParameterDoesNotContainLineBreaks(eventType, nameof(eventType)); @@ -30,6 +31,7 @@ public SseItem(T data, string? eventType = null) public string EventType => _eventType ?? SseParser.EventTypeDefault; /// Gets the event's id. + /// Thrown when the value contains a line break. public string? EventId { get => _eventId; From 7a476b154ebd1f7f400e89d109e8901860588eca Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Fri, 15 Nov 2024 10:54:37 +0000 Subject: [PATCH 04/11] Misc improvements and fixes. --- .../System/Net/ServerSentEvents/Helpers.cs | 21 ++++- .../Net/ServerSentEvents/SseFormatter.cs | 86 +++++++++---------- .../tests/SseFormatterTests.cs | 4 +- 3 files changed, 62 insertions(+), 49 deletions(-) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs index 8baf1d6db6979..9cd6f3e4a65fc 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Buffers; +using System.Diagnostics; using System.IO; using System.Runtime.InteropServices; using System.Text; @@ -12,7 +13,20 @@ namespace System.Net.ServerSentEvents { internal static class Helpers { - public static unsafe void WriteAsUtf8String(this IBufferWriter bufferWriter, ReadOnlySpan value) + public static void WriteUtf8String(this IBufferWriter writer, ReadOnlySpan value) + { + if (value.IsEmpty) + { + return; + } + + Span buffer = writer.GetSpan(value.Length); + Debug.Assert(value.Length <= buffer.Length); + value.CopyTo(buffer); + writer.Advance(value.Length); + } + + public static unsafe void WriteUtf8String(this IBufferWriter writer, ReadOnlySpan value) { if (value.IsEmpty) { @@ -20,7 +34,8 @@ public static unsafe void WriteAsUtf8String(this IBufferWriter bufferWrite } int maxByteCount = Encoding.UTF8.GetMaxByteCount(value.Length); - Span buffer = bufferWriter.GetSpan(maxByteCount); + Span buffer = writer.GetSpan(maxByteCount); + Debug.Assert(maxByteCount <= buffer.Length); int bytesWritten; #if NET bytesWritten = Encoding.UTF8.GetBytes(value, buffer); @@ -31,7 +46,7 @@ public static unsafe void WriteAsUtf8String(this IBufferWriter bufferWrite bytesWritten = Encoding.UTF8.GetBytes(chars, value.Length, bytes, maxByteCount); } #endif - bufferWriter.Advance(bytesWritten); + writer.Advance(bytesWritten); } public static void ValidateParameterDoesNotContainLineBreaks(string? input, string paramName) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs index e860c0b72c531..db4ff5c3dff14 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs @@ -36,7 +36,7 @@ public static Task WriteAsync(IAsyncEnumerable> source, Stream d throw new ArgumentNullException(nameof(destination)); } - return WriteAsyncCore(source, destination, static (writer, item) => writer.WriteAsUtf8String(item.Data), cancellationToken); + return WriteAsyncCore(source, destination, static (writer, item) => writer.WriteUtf8String(item.Data), cancellationToken); } /// @@ -75,7 +75,13 @@ private static async Task WriteAsyncCore(IAsyncEnumerable> source, await foreach (SseItem item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { - FormatSseEvent(bufferWriter, userDataBufferWriter, itemFormatter, item); + itemFormatter(userDataBufferWriter, item); + + FormatSseEvent( + bufferWriter, + eventType: item._eventType, // Do not use the public property since it normalizes to "message" if null + data: userDataBufferWriter.WrittenMemory.Span, + eventId: item.EventId); await destination.WriteAsync(bufferWriter.WrittenMemory, cancellationToken).ConfigureAwait(false); @@ -84,69 +90,61 @@ private static async Task WriteAsyncCore(IAsyncEnumerable> source, } } - private static void FormatSseEvent( + private static void FormatSseEvent( PooledByteBufferWriter bufferWriter, - PooledByteBufferWriter userDataBufferWriter, - Action, SseItem> itemFormatter, - SseItem sseItem) + string? eventType, + ReadOnlySpan data, + string? eventId) { - Debug.Assert(bufferWriter.WrittenCount is 0, "Must not contain any data"); - Debug.Assert(userDataBufferWriter.WrittenCount is 0, "Must not contain any data"); + Debug.Assert(bufferWriter.WrittenCount is 0); - if (sseItem._eventType is { } eventType) + if (eventType is not null) { - Debug.Assert(!eventType.Contains('\n'), "Event type must not contain line breaks"); + Debug.Assert(!eventType.Contains('\n')); - bufferWriter.Write("event: "u8); - bufferWriter.WriteAsUtf8String(eventType); - bufferWriter.Write(s_newLine); + bufferWriter.WriteUtf8String("event: "u8); + bufferWriter.WriteUtf8String(eventType); + bufferWriter.WriteUtf8String(s_newLine); } - itemFormatter(userDataBufferWriter, sseItem); - WriteDataWithLineBreakHandling(bufferWriter, userDataBufferWriter.WrittenMemory.Span); + WriteLinesWithPrefix(bufferWriter, prefix: "data: "u8, data); + bufferWriter.Write(s_newLine); - if (sseItem.EventId is { } eventId) + if (eventId is not null) { - Debug.Assert(!eventId.Contains('\n'), "Event id must not contain line breaks"); + Debug.Assert(!eventId.Contains('\n')); - bufferWriter.Write("id: "u8); - bufferWriter.WriteAsUtf8String(eventId); - bufferWriter.Write(s_newLine); + bufferWriter.WriteUtf8String("id: "u8); + bufferWriter.WriteUtf8String(eventId); + bufferWriter.WriteUtf8String(s_newLine); } - bufferWriter.Write(s_newLine); + bufferWriter.WriteUtf8String(s_newLine); } - private static void WriteDataWithLineBreakHandling(PooledByteBufferWriter bufferWriter, ReadOnlySpan data) + private static void WriteLinesWithPrefix(PooledByteBufferWriter writer, ReadOnlySpan prefix, ReadOnlySpan data) { - // The data field can contain multiple lines, each line must be prefixed with "data: " and suffixed with a line break. + // Writes a potentially multi-line string, prefixing each line with the given prefix. + // Both \n and \r\n sequences are normalized to \n. - ReadOnlySpan nextLine; - int lineBreak; - - do + while (true) { - lineBreak = data.IndexOf((byte)'\n'); - if (lineBreak < 0) - { - nextLine = data; - data = default; - } - else - { - int lineLength = lineBreak > 0 && data[lineBreak - 1] == '\r' - ? lineBreak - 1 - : lineBreak; + writer.WriteUtf8String(prefix); - nextLine = data.Slice(0, lineLength); - data = data.Slice(lineBreak + 1); + int i = data.IndexOf((byte)'\n'); + if (i is -1) + { + writer.WriteUtf8String(data); + return; } - bufferWriter.Write("data: "u8); - bufferWriter.Write(nextLine); - bufferWriter.Write(s_newLine); + int lineLength = i > 0 && data[i - 1] == '\r' ? i - 1 : i; + ReadOnlySpan nextLine = data.Slice(0, lineLength); + data = data.Slice(i + 1); - } while (lineBreak >= 0); + writer.WriteUtf8String(nextLine); + writer.WriteUtf8String(s_newLine); + } } } } diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs index d869e13518051..ddee7cfc4a89b 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs @@ -143,7 +143,8 @@ async Task ValidateParseResults(IAsyncEnumerable> results) static void FormatJson(IBufferWriter writer, SseItem item) { - using Utf8JsonWriter jsonWriter = new Utf8JsonWriter(writer); + JsonWriterOptions writerOptions = new() { Indented = true }; + using Utf8JsonWriter jsonWriter = new(writer, writerOptions); JsonSerializer.Serialize(jsonWriter, item.Data, JsonContext.Default.MyPoco); } @@ -155,7 +156,6 @@ static MyPoco ParseJson(string eventType, ReadOnlySpan data) public record MyPoco(int Value); - [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(MyPoco))] partial class JsonContext : JsonSerializerContext; } From 1b7a3e534a9d8a2f15ac155c2352c37740da3346 Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Fri, 15 Nov 2024 10:58:24 +0000 Subject: [PATCH 05/11] Reinstate ordering of parameters in serialization callback. --- .../ref/System.Net.ServerSentEvents.cs | 2 +- .../src/System/Net/ServerSentEvents/SseFormatter.cs | 8 ++++---- .../tests/SseFormatterTests.cs | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs index ec1bc2abc05e3..63ea89eebdff0 100644 --- a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs +++ b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs @@ -9,7 +9,7 @@ namespace System.Net.ServerSentEvents public static partial class SseFormatter { public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable> source, System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable> source, System.IO.Stream destination, System.Action, System.Net.ServerSentEvents.SseItem> itemFormatter, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable> source, System.IO.Stream destination, System.Action, System.Buffers.IBufferWriter> itemFormatter, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } } public delegate T SseItemParser(string eventType, System.ReadOnlySpan data); public readonly partial struct SseItem diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs index db4ff5c3dff14..d87e8499aa17f 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs @@ -36,7 +36,7 @@ public static Task WriteAsync(IAsyncEnumerable> source, Stream d throw new ArgumentNullException(nameof(destination)); } - return WriteAsyncCore(source, destination, static (writer, item) => writer.WriteUtf8String(item.Data), cancellationToken); + return WriteAsyncCore(source, destination, static (item, writer) => writer.WriteUtf8String(item.Data), cancellationToken); } /// @@ -48,7 +48,7 @@ public static Task WriteAsync(IAsyncEnumerable> source, Stream d /// The formatter for the data field of given event. /// The that can be used to cancel the write operation. /// A task that represents the asynchronous write operation. - public static Task WriteAsync(IAsyncEnumerable> source, Stream destination, Action, SseItem> itemFormatter, CancellationToken cancellationToken = default) + public static Task WriteAsync(IAsyncEnumerable> source, Stream destination, Action, IBufferWriter> itemFormatter, CancellationToken cancellationToken = default) { if (source is null) { @@ -68,14 +68,14 @@ public static Task WriteAsync(IAsyncEnumerable> source, Stream des return WriteAsyncCore(source, destination, itemFormatter, cancellationToken); } - private static async Task WriteAsyncCore(IAsyncEnumerable> source, Stream destination, Action, SseItem> itemFormatter, CancellationToken cancellationToken) + private static async Task WriteAsyncCore(IAsyncEnumerable> source, Stream destination, Action, IBufferWriter> itemFormatter, CancellationToken cancellationToken) { using PooledByteBufferWriter bufferWriter = new(); using PooledByteBufferWriter userDataBufferWriter = new(); await foreach (SseItem item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { - itemFormatter(userDataBufferWriter, item); + itemFormatter(item, userDataBufferWriter); FormatSseEvent( bufferWriter, diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs index ddee7cfc4a89b..dc4df1877d69a 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs @@ -67,7 +67,7 @@ public static async Task WriteAsync_ItemFormatter_HasExpectedFormat() using MemoryStream stream = new(); // Act - await SseFormatter.WriteAsync(GetItemsAsync(), stream, (writer, item) => writer.Write(Encoding.UTF8.GetBytes(item.Data + "_suffix"))); + await SseFormatter.WriteAsync(GetItemsAsync(), stream, (item, writer) => writer.Write(Encoding.UTF8.GetBytes(item.Data + "_suffix"))); // Assert string actualFormat = Encoding.UTF8.GetString(stream.ToArray()); @@ -97,7 +97,7 @@ await Assert.ThrowsAsync(() => SseFormatter.WriteAsync( GetItemsAsync(), new MemoryStream(), - (writer, item) => writer.Write(Encoding.UTF8.GetBytes(item.Data)), + (item, writer) => writer.Write(Encoding.UTF8.GetBytes(item.Data)), token)); async IAsyncEnumerable> GetItemsAsync([EnumeratorCancellation] CancellationToken token = default) @@ -141,7 +141,7 @@ async Task ValidateParseResults(IAsyncEnumerable> results) } } - static void FormatJson(IBufferWriter writer, SseItem item) + static void FormatJson(SseItem item, IBufferWriter writer) { JsonWriterOptions writerOptions = new() { Indented = true }; using Utf8JsonWriter jsonWriter = new(writer, writerOptions); From 1a12c51ab65811d328388ca8c5b40cce19153ac0 Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Mon, 18 Nov 2024 16:46:43 +0000 Subject: [PATCH 06/11] Add SseItem.ReconnectionInterval. --- .../ref/System.Net.ServerSentEvents.cs | 1 + .../src/Resources/Strings.resx | 7 +++- .../src/System.Net.ServerSentEvents.csproj | 3 +- .../System/Net/ServerSentEvents/Helpers.cs | 9 ----- .../Net/ServerSentEvents/SseFormatter.cs | 37 +++++++++++++++---- .../System/Net/ServerSentEvents/SseItem.cs | 30 ++++++++++++++- .../System/Net/ServerSentEvents/SseParser.cs | 18 +++++++-- .../Net/ServerSentEvents/SseParser_1.cs | 13 +++++-- .../Net/ServerSentEvents/ThrowHelper.cs | 26 +++++++++++++ .../tests/SseFormatterTests.cs | 10 +++-- .../tests/SseItemTests.cs | 15 ++++++-- .../tests/SseParserTests.cs | 35 ++++++++++-------- 12 files changed, 154 insertions(+), 50 deletions(-) create mode 100644 src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/ThrowHelper.cs diff --git a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs index 63ea89eebdff0..1fb04dfab99c2 100644 --- a/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs +++ b/src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs @@ -21,6 +21,7 @@ public readonly partial struct SseItem public T Data { get { throw null; } } public string? EventId { get { throw null; } init { } } public string EventType { get { throw null; } } + public System.TimeSpan? ReconnectionInterval { get { throw null; } init { } } } public static partial class SseParser { diff --git a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx index 833ae31c9871c..e414cea32c071 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx +++ b/src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx @@ -120,7 +120,10 @@ The enumerable may be enumerated only once. - - Argument must not contain line breaks. + + The argument cannot contain line breaks. + + + The argument cannot be a negative value. diff --git a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj index 98d6577797157..65d80337f74c2 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj +++ b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj @@ -1,4 +1,4 @@ - + $(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) @@ -18,6 +18,7 @@ System.Net.ServerSentEvents.SseParser + diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs index 9cd6f3e4a65fc..4136709f2eea9 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs @@ -49,15 +49,6 @@ public static unsafe void WriteUtf8String(this IBufferWriter writer, ReadO writer.Advance(bytesWritten); } - public static void ValidateParameterDoesNotContainLineBreaks(string? input, string paramName) - { - if (input?.Contains('\n') is true) - { - Throw(paramName); - static void Throw(string parameterName) => throw new ArgumentException(SR.ArgumentException_MustNotContainLineBreaks, parameterName); - } - } - #if !NET public static bool Contains(this string text, char character) => text.IndexOf(character) >= 0; diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs index d87e8499aa17f..71a0de610aedd 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs @@ -4,6 +4,7 @@ using System.Buffers; using System.Collections.Generic; using System.Diagnostics; +using System.Globalization; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -28,12 +29,12 @@ public static Task WriteAsync(IAsyncEnumerable> source, Stream d { if (source is null) { - throw new ArgumentNullException(nameof(source)); + ThrowHelper.ThrowArgumentNullException(nameof(source)); } if (destination is null) { - throw new ArgumentNullException(nameof(destination)); + ThrowHelper.ThrowArgumentNullException(nameof(destination)); } return WriteAsyncCore(source, destination, static (item, writer) => writer.WriteUtf8String(item.Data), cancellationToken); @@ -52,17 +53,17 @@ public static Task WriteAsync(IAsyncEnumerable> source, Stream des { if (source is null) { - throw new ArgumentNullException(nameof(source)); + ThrowHelper.ThrowArgumentNullException(nameof(source)); } if (destination is null) { - throw new ArgumentNullException(nameof(destination)); + ThrowHelper.ThrowArgumentNullException(nameof(destination)); } if (itemFormatter is null) { - throw new ArgumentNullException(nameof(itemFormatter)); + ThrowHelper.ThrowArgumentNullException(nameof(itemFormatter)); } return WriteAsyncCore(source, destination, itemFormatter, cancellationToken); @@ -81,7 +82,8 @@ private static async Task WriteAsyncCore(IAsyncEnumerable> source, bufferWriter, eventType: item._eventType, // Do not use the public property since it normalizes to "message" if null data: userDataBufferWriter.WrittenMemory.Span, - eventId: item.EventId); + eventId: item.EventId, + reconnectionInterval: item.ReconnectionInterval); await destination.WriteAsync(bufferWriter.WrittenMemory, cancellationToken).ConfigureAwait(false); @@ -94,7 +96,8 @@ private static void FormatSseEvent( PooledByteBufferWriter bufferWriter, string? eventType, ReadOnlySpan data, - string? eventId) + string? eventId, + TimeSpan? reconnectionInterval) { Debug.Assert(bufferWriter.WrittenCount is 0); @@ -119,6 +122,26 @@ private static void FormatSseEvent( bufferWriter.WriteUtf8String(s_newLine); } + if (reconnectionInterval is { } retry) + { + Debug.Assert(retry >= TimeSpan.Zero); + + long retryMs = (long)retry.TotalMilliseconds; + + bufferWriter.WriteUtf8String("retry: "u8); +#if NET + const int MaxDigits = 20; + Span buffer = stackalloc byte[MaxDigits]; + bool success = retryMs.TryFormat(buffer, out int bytesWritten, provider: CultureInfo.InvariantCulture); + Debug.Assert(success); + + bufferWriter.Write(buffer.Slice(0, bytesWritten)); +#else + bufferWriter.WriteUtf8String(retryMs.ToString(CultureInfo.InvariantCulture)); +#endif + bufferWriter.WriteUtf8String(s_newLine); + } + bufferWriter.WriteUtf8String(s_newLine); } diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs index 39ecd9023fe7a..d0f7c9cd9808c 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs @@ -11,6 +11,8 @@ public readonly struct SseItem internal readonly string? _eventType; /// The event's id. private readonly string? _eventId; + /// The event's reconnection interval. + private readonly TimeSpan? _reconnectionInterval; /// Initializes the server-sent event. /// The event's payload. @@ -18,7 +20,10 @@ public readonly struct SseItem /// Thrown when contains a line break. public SseItem(T data, string? eventType = null) { - Helpers.ValidateParameterDoesNotContainLineBreaks(eventType, nameof(eventType)); + if (eventType?.Contains('\n') is true) + { + ThrowHelper.ThrowArgumentException_CannotContainLineBreaks(nameof(eventType)); + } Data = data; _eventType = eventType; @@ -37,10 +42,31 @@ public string? EventId get => _eventId; init { - Helpers.ValidateParameterDoesNotContainLineBreaks(value, nameof(EventId)); + if (value?.Contains('\n') is true) + { + ThrowHelper.ThrowArgumentException_CannotContainLineBreaks(nameof(EventId)); + } _eventId = value; } } + + /// Gets the event's retry interval. + /// + /// When specified on an event, instructs the client to update its reconnection time to the specified value. + /// + public TimeSpan? ReconnectionInterval + { + get => _reconnectionInterval; + init + { + if (value < TimeSpan.Zero) + { + ThrowHelper.ThrowArgumentException_CannotBeNegative(nameof(ReconnectionInterval)); + } + + _reconnectionInterval = value; + } + } } } diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs index d25a10a4ec54f..733bfcc3978f9 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser.cs @@ -34,10 +34,20 @@ public static SseParser Create(Stream sseStream) => /// The parser to use to transform each payload of bytes into a data element. /// The enumerable, which can be enumerated synchronously or asynchronously. /// or is null. - public static SseParser Create(Stream sseStream, SseItemParser itemParser) => - new SseParser( - sseStream ?? throw new ArgumentNullException(nameof(sseStream)), - itemParser ?? throw new ArgumentNullException(nameof(itemParser))); + public static SseParser Create(Stream sseStream, SseItemParser itemParser) + { + if (sseStream is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(sseStream)); + } + + if (itemParser is null) + { + ThrowHelper.ThrowArgumentNullException(nameof(itemParser)); + } + + return new SseParser(sseStream, itemParser); + } /// Encoding.UTF8.GetString(bytes) internal static unsafe string Utf8GetString(ReadOnlySpan bytes) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs index 41dddf3233797..25e0076b3492b 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs @@ -76,6 +76,9 @@ public sealed class SseParser /// The event id for the next event. private string? _eventId; + /// The reconnection interval for the next event. + private TimeSpan? _nextReconnectionInterval; + /// Initialize the enumerable. /// The stream to parse. /// The function to use to parse payload bytes into a . @@ -317,9 +320,11 @@ private bool ProcessLine(out SseItem sseItem, out int advance) if (_dataAppended) { - sseItem = new SseItem(_itemParser(_eventType ?? SseParser.EventTypeDefault, _dataBuffer.AsSpan(0, _dataLength)), _eventType) { EventId = _eventId }; + T data = _itemParser(_eventType ?? SseParser.EventTypeDefault, _dataBuffer.AsSpan(0, _dataLength)); + sseItem = new SseItem(data, _eventType) { EventId = _eventId, ReconnectionInterval = _nextReconnectionInterval }; _eventType = null; _eventId = null; + _nextReconnectionInterval = null; _dataLength = 0; _dataAppended = false; return true; @@ -369,9 +374,11 @@ private bool ProcessLine(out SseItem sseItem, out int advance) (remainder[0] is LF || (remainder[0] is CR && remainder.Length > 1))) { advance = line.Length + newlineLength + (remainder.StartsWith(CRLF) ? 2 : 1); - sseItem = new SseItem(_itemParser(_eventType ?? SseParser.EventTypeDefault, fieldValue), _eventType) { EventId = _eventId }; + T data = _itemParser(_eventType ?? SseParser.EventTypeDefault, fieldValue); + sseItem = new SseItem(data, _eventType) { EventId = _eventId, ReconnectionInterval = _nextReconnectionInterval }; _eventType = null; _eventId = null; + _nextReconnectionInterval = null; return true; } } @@ -418,7 +425,7 @@ private bool ProcessLine(out SseItem sseItem, out int advance) #endif NumberStyles.None, CultureInfo.InvariantCulture, out long milliseconds)) { - ReconnectionInterval = TimeSpan.FromMilliseconds(milliseconds); + _nextReconnectionInterval = ReconnectionInterval = TimeSpan.FromMilliseconds(milliseconds); } } else diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/ThrowHelper.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/ThrowHelper.cs new file mode 100644 index 0000000000000..cb6a4c58132bc --- /dev/null +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/ThrowHelper.cs @@ -0,0 +1,26 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; + +namespace System.Net.ServerSentEvents +{ + internal static class ThrowHelper + { + [DoesNotReturn] + public static void ThrowArgumentNullException(string parameterName) + { + throw new ArgumentNullException(parameterName); + } + + public static void ThrowArgumentException_CannotContainLineBreaks(string parameterName) + { + throw new ArgumentException(SR.ArgumentException_CannotContainLineBreaks, parameterName); + } + + public static void ThrowArgumentException_CannotBeNegative(string parameterName) + { + throw new ArgumentException(SR.ArgumentException_CannotBeNegative, parameterName); + } + } +} diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs index dc4df1877d69a..522a092f2c111 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs @@ -34,7 +34,7 @@ public static async Task WriteAsync_HasExpectedFormat() // Arrange string expectedFormat = "event: eventType1\ndata: data1\n\n" + - "event: eventType2\ndata: data2\n\n" + + "event: eventType2\ndata: data2\nretry: 300\n\n" + "data: data3\n\n" + "data: \n\n" + "event: eventType4\ndata: data4\nid: id4\n\n" + @@ -57,7 +57,7 @@ public static async Task WriteAsync_ItemFormatter_HasExpectedFormat() // Arrange string expectedFormat = "event: eventType1\ndata: data1_suffix\n\n" + - "event: eventType2\ndata: data2_suffix\n\n" + + "event: eventType2\ndata: data2_suffix\nretry: 300\n\n" + "data: data3_suffix\n\n" + "data: _suffix\n\n" + "event: eventType4\ndata: data4_suffix\nid: id4\n\n" + @@ -77,7 +77,7 @@ public static async Task WriteAsync_ItemFormatter_HasExpectedFormat() private static async IAsyncEnumerable> GetItemsAsync() { yield return new SseItem("data1", "eventType1"); - yield return new SseItem("data2", "eventType2"); + yield return new SseItem("data2", "eventType2") { ReconnectionInterval = TimeSpan.FromMilliseconds(300) }; await Task.Yield(); yield return new SseItem("data3", null); yield return new SseItem(data: null!, null); @@ -124,7 +124,8 @@ async IAsyncEnumerable> GetItemsAsync() { string? eventType = i % 2 == 0 ? null : "eventType"; string? eventId = i % 3 == 2 ? i.ToString() : null; - yield return new SseItem(new MyPoco(i), eventType) { EventId = eventId }; + TimeSpan? reconnectionInterval = i % 5 == 4 ? TimeSpan.FromSeconds(i) : null; + yield return new SseItem(new MyPoco(i), eventType) { EventId = eventId, ReconnectionInterval = reconnectionInterval }; await Task.Yield(); } } @@ -136,6 +137,7 @@ async Task ValidateParseResults(IAsyncEnumerable> results) { Assert.Equal(i % 2 == 0 ? "message" : "eventType", item.EventType); Assert.Equal(i % 3 == 2 ? i.ToString() : null, item.EventId); + Assert.Equal(i % 5 == 4 ? TimeSpan.FromSeconds(i) : null, item.ReconnectionInterval); Assert.Equal(i, item.Data.Value); i++; } diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs index 4ec24bdd90bd1..54609d068faa7 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs @@ -16,16 +16,19 @@ public void SseItem_Roundtrips() Assert.Null(item.Data); Assert.Equal(SseParser.EventTypeDefault, item.EventType); Assert.Null(item.EventId); + Assert.Null(item.ReconnectionInterval); item = new SseItem("some data", null); Assert.Equal("some data", item.Data); Assert.Equal(SseParser.EventTypeDefault, item.EventType); Assert.Null(item.EventId); + Assert.Null(item.ReconnectionInterval); - item = new SseItem("some data", "eventType") { EventId = "eventId" }; + item = new SseItem("some data", "eventType") { EventId = "eventId", ReconnectionInterval = TimeSpan.FromSeconds(3) }; Assert.Equal("some data", item.Data); Assert.Equal("eventType", item.EventType); Assert.Equal("eventId", item.EventId); + Assert.Equal(TimeSpan.FromSeconds(3), item.ReconnectionInterval); } [Theory] @@ -34,8 +37,14 @@ public void SseItem_Roundtrips() [InlineData("Hello, \r\nWorld!")] public void SseItem_MetadataWithLineBreak_ThrowsArgumentException(string metadataWithLineBreak) { - Assert.Throws(() => new SseItem("data", eventType: metadataWithLineBreak)); - Assert.Throws(() => new SseItem("data", "eventType") { EventId = metadataWithLineBreak }); + Assert.Throws("eventType", () => new SseItem("data", eventType: metadataWithLineBreak)); + Assert.Throws("EventId", () => new SseItem("data", "eventType") { EventId = metadataWithLineBreak }); + } + + [Fact] + public void SseItem_ReconnectionInterval_NegativeTimeSpan_ThrowsArgumentException() + { + Assert.Throws("ReconnectionInterval", () => new SseItem("data") { ReconnectionInterval = TimeSpan.FromSeconds(-1) }); } } } diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs index 86005176d67c4..c6cad40f34cab 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs @@ -121,9 +121,9 @@ await ReadAllEventsAsync(stream) : Assert.Equal(stream.Length, stream.Position); Assert.Equal(3, items.Count); - AssertSseItemEqual(new SseItem("1", "A") { EventId = "2" }, items[0]); - AssertSseItemEqual(new SseItem("4", "B") { EventId = "5" }, items[1]); - AssertSseItemEqual(new SseItem("7", "C") { EventId = "8" }, items[2]); + AssertSseItemEqual(new SseItem("1", "A") { EventId = "2", ReconnectionInterval = TimeSpan.FromMilliseconds(300) }, items[0]); + AssertSseItemEqual(new SseItem("4", "B") { EventId = "5", ReconnectionInterval = TimeSpan.FromMilliseconds(600) }, items[1]); + AssertSseItemEqual(new SseItem("7", "C") { EventId = "8", ReconnectionInterval = TimeSpan.FromMilliseconds(900) }, items[2]); } [Theory] @@ -430,20 +430,22 @@ public async Task Retry_SetsReconnectionInterval(string newline, bool trickle, b Assert.Equal(Timeout.InfiniteTimeSpan, parser.ReconnectionInterval); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem("second event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(42), parser.ReconnectionInterval); + TimeSpan firstRetry = TimeSpan.FromMilliseconds(42); + AssertSseItemEqual(new SseItem("second event", "message") { ReconnectionInterval = firstRetry}, e.Current); + Assert.Equal(firstRetry, parser.ReconnectionInterval); Assert.True(e.MoveNext()); - AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + TimeSpan secondRetry = TimeSpan.FromMilliseconds(12345678910); + AssertSseItemEqual(new SseItem(" third event", "message") { ReconnectionInterval = secondRetry }, e.Current); + Assert.Equal(secondRetry, parser.ReconnectionInterval); Assert.True(e.MoveNext()); AssertSseItemEqual(new SseItem("fourth event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + Assert.Equal(secondRetry, parser.ReconnectionInterval); Assert.True(e.MoveNext()); AssertSseItemEqual(new SseItem("fifth event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + Assert.Equal(secondRetry, parser.ReconnectionInterval); } else { @@ -456,20 +458,22 @@ public async Task Retry_SetsReconnectionInterval(string newline, bool trickle, b AssertSseItemEqual(new SseItem("first event", "message"), e.Current); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem("second event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(42), parser.ReconnectionInterval); + TimeSpan firstRetry = TimeSpan.FromMilliseconds(42); + AssertSseItemEqual(new SseItem("second event", "message") { ReconnectionInterval = firstRetry }, e.Current); + Assert.Equal(firstRetry, parser.ReconnectionInterval); Assert.True(await e.MoveNextAsync()); - AssertSseItemEqual(new SseItem(" third event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + TimeSpan secondRetry = TimeSpan.FromMilliseconds(12345678910); + AssertSseItemEqual(new SseItem(" third event", "message") { ReconnectionInterval = secondRetry }, e.Current); + Assert.Equal(secondRetry, parser.ReconnectionInterval); Assert.True(await e.MoveNextAsync()); AssertSseItemEqual(new SseItem("fourth event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + Assert.Equal(secondRetry, parser.ReconnectionInterval); Assert.True(await e.MoveNextAsync()); AssertSseItemEqual(new SseItem("fifth event", "message"), e.Current); - Assert.Equal(TimeSpan.FromMilliseconds(12345678910), parser.ReconnectionInterval); + Assert.Equal(secondRetry, parser.ReconnectionInterval); } } @@ -866,6 +870,7 @@ private static void AssertSseItemEqual(SseItem left, SseItem right) { Assert.Equal(left.EventType, right.EventType); Assert.Equal(left.EventId, right.EventId); + Assert.Equal(left.ReconnectionInterval, right.ReconnectionInterval); Assert.Equal(left.Data, right.Data); } From 2bae6f9a163aeff5a1c56a268715061be6786494 Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Mon, 18 Nov 2024 17:12:42 +0000 Subject: [PATCH 07/11] Address feedback. --- .../src/System/Net/ServerSentEvents/Helpers.cs | 16 ++++++++++++++++ .../System/Net/ServerSentEvents/SseFormatter.cs | 14 +------------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs index 4136709f2eea9..d2e5d3fef83b0 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs @@ -3,6 +3,7 @@ using System.Buffers; using System.Diagnostics; +using System.Globalization; using System.IO; using System.Runtime.InteropServices; using System.Text; @@ -13,6 +14,21 @@ namespace System.Net.ServerSentEvents { internal static class Helpers { + public static void WriteUtf8Number(this IBufferWriter writer, long value) + { +#if NET + const int MaxDecimalDigits = 20; + Span buffer = writer.GetSpan(MaxDecimalDigits); + Debug.Assert(MaxDecimalDigits <= buffer.Length); + + bool success = value.TryFormat(buffer, out int bytesWritten, provider: CultureInfo.InvariantCulture); + Debug.Assert(success); + writer.Advance(bytesWritten); +#else + writer.WriteUtf8String(value.ToString(CultureInfo.InvariantCulture)); +#endif + } + public static void WriteUtf8String(this IBufferWriter writer, ReadOnlySpan value) { if (value.IsEmpty) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs index 71a0de610aedd..78c24504f3a4b 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs @@ -4,7 +4,6 @@ using System.Buffers; using System.Collections.Generic; using System.Diagnostics; -using System.Globalization; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -126,19 +125,8 @@ private static void FormatSseEvent( { Debug.Assert(retry >= TimeSpan.Zero); - long retryMs = (long)retry.TotalMilliseconds; - bufferWriter.WriteUtf8String("retry: "u8); -#if NET - const int MaxDigits = 20; - Span buffer = stackalloc byte[MaxDigits]; - bool success = retryMs.TryFormat(buffer, out int bytesWritten, provider: CultureInfo.InvariantCulture); - Debug.Assert(success); - - bufferWriter.Write(buffer.Slice(0, bytesWritten)); -#else - bufferWriter.WriteUtf8String(retryMs.ToString(CultureInfo.InvariantCulture)); -#endif + bufferWriter.WriteUtf8Number((long)retry.TotalMilliseconds); bufferWriter.WriteUtf8String(s_newLine); } From 06951dcbdf8d4bee142fe2d68428195c34efcacf Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Mon, 18 Nov 2024 17:56:50 +0000 Subject: [PATCH 08/11] Add parser validation for too small or too large retry intervals. --- .../Net/ServerSentEvents/SseParser_1.cs | 10 ++++-- .../tests/SseParserTests.cs | 34 +++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs index 25e0076b3492b..bff2149a6b9f2 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseParser_1.cs @@ -28,6 +28,9 @@ public sealed class SseParser /// Carriage Return Line Feed. private static ReadOnlySpan CRLF => "\r\n"u8; + /// The maximum number of milliseconds representible by . + private readonly long TimeSpan_MaxValueMilliseconds = (long)TimeSpan.MaxValue.TotalMilliseconds; + /// The default size of an ArrayPool buffer to rent. /// Larger size used by default to minimize number of reads. Smaller size used in debug to stress growth/shifting logic. private const int DefaultArrayPoolRentSize = @@ -423,9 +426,12 @@ private bool ProcessLine(out SseItem sseItem, out int advance) #else SseParser.Utf8GetString(fieldValue), #endif - NumberStyles.None, CultureInfo.InvariantCulture, out long milliseconds)) + NumberStyles.None, CultureInfo.InvariantCulture, out long milliseconds) && + 0 <= milliseconds && milliseconds <= TimeSpan_MaxValueMilliseconds) { - _nextReconnectionInterval = ReconnectionInterval = TimeSpan.FromMilliseconds(milliseconds); + // Workaround for TimeSpan.FromMilliseconds not being able to roundtrip TimeSpan.MaxValue + TimeSpan timeSpan = milliseconds == TimeSpan_MaxValueMilliseconds ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(milliseconds); + _nextReconnectionInterval = ReconnectionInterval = timeSpan; } } else diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs index c6cad40f34cab..13b2747af7905 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseParserTests.cs @@ -477,6 +477,40 @@ public async Task Retry_SetsReconnectionInterval(string newline, bool trickle, b } } + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(922337203685477)] // TimeSpan.MaxValue.TotalMilliseconds + [InlineData(922337203685476)] + public async Task Retry_ValidRetryField_IsReturned(long retryValue) + { + // Workaround for TimeSpan.FromMillisecond not being able to roundtrip TimeSpan.MaxValue + TimeSpan expectedInterval = retryValue == TimeSpan.MaxValue.TotalMilliseconds ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(retryValue); + using Stream stream = GetStream($"data: test\nretry: {retryValue}\n\n", trickle: false); + + List> items = await ReadAllEventsAsync(stream); + + Assert.Equal(1, items.Count); + Assert.Equal(expectedInterval, items[0].ReconnectionInterval); + } + + [Theory] + [InlineData("")] + [InlineData("-1")] + [InlineData("-922337203685477")] // TimeSpan.MinValue.TotalMilliseconds + [InlineData("922337203685478")] // TimeSpan.MaxValue.TotalMilliseconds + 1 + [InlineData("9223372036854775807")] // long.MaxValue + [InlineData("invalidmilliseconds")] + public async Task Retry_InvalidRetryField_IsIgnored(string retryValue) + { + using Stream stream = GetStream($"data: test\nretry: {retryValue}\n\n", trickle: false); + + List> items = await ReadAllEventsAsync(stream); + + Assert.Equal(1, items.Count); + Assert.Null(items[0].ReconnectionInterval); + } + [Theory] [MemberData(nameof(NewlineTrickleAsyncData))] public async Task JsonContent_DelegateInvoked(string newline, bool trickle, bool useAsync) From da3ef4d3dc9e224bb81f1609ad996c89c1ceaeb1 Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Tue, 19 Nov 2024 19:37:40 +0000 Subject: [PATCH 09/11] Update src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs Co-authored-by: Stephen Toub --- .../src/System/Net/ServerSentEvents/SseFormatter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs index 78c24504f3a4b..0e096df7f2b10 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs @@ -143,7 +143,7 @@ private static void WriteLinesWithPrefix(PooledByteBufferWriter writer, ReadOnly writer.WriteUtf8String(prefix); int i = data.IndexOf((byte)'\n'); - if (i is -1) + if (i < 0) { writer.WriteUtf8String(data); return; From aa27ac480f9f821b50d36f5f59222389d7ce6360 Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Tue, 19 Nov 2024 20:12:01 +0000 Subject: [PATCH 10/11] Handle CR line breaks. --- .../System/Net/ServerSentEvents/Helpers.cs | 4 +++- .../PooledByteBufferWriter.cs | 3 +-- .../Net/ServerSentEvents/SseFormatter.cs | 15 ++++++++----- .../System/Net/ServerSentEvents/SseItem.cs | 4 ++-- .../tests/SseFormatterTests.cs | 21 +++++++++++++------ .../tests/SseItemTests.cs | 1 + 6 files changed, 32 insertions(+), 16 deletions(-) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs index d2e5d3fef83b0..4639c84cd3ded 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/Helpers.cs @@ -65,8 +65,10 @@ public static unsafe void WriteUtf8String(this IBufferWriter writer, ReadO writer.Advance(bytesWritten); } + public static bool ContainsLineBreaks(this ReadOnlySpan text) => + text.IndexOfAny('\r', '\n') >= 0; + #if !NET - public static bool Contains(this string text, char character) => text.IndexOf(character) >= 0; public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs index f4e0ba33c16ab..f4af617cfced3 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs @@ -125,8 +125,7 @@ private void CheckAndResizeBuffer(int sizeHint) newSize = currentLength + sizeHint; if ((uint)newSize > MaximumBufferSize) { - Throw(); - static void Throw() => throw new OutOfMemoryException(); + throw new OutOfMemoryException(); } } diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs index 0e096df7f2b10..3b9c950f4594e 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs @@ -102,7 +102,7 @@ private static void FormatSseEvent( if (eventType is not null) { - Debug.Assert(!eventType.Contains('\n')); + Debug.Assert(!eventType.ContainsLineBreaks()); bufferWriter.WriteUtf8String("event: "u8); bufferWriter.WriteUtf8String(eventType); @@ -114,7 +114,7 @@ private static void FormatSseEvent( if (eventId is not null) { - Debug.Assert(!eventId.Contains('\n')); + Debug.Assert(!eventId.ContainsLineBreaks()); bufferWriter.WriteUtf8String("id: "u8); bufferWriter.WriteUtf8String(eventId); @@ -142,16 +142,21 @@ private static void WriteLinesWithPrefix(PooledByteBufferWriter writer, ReadOnly { writer.WriteUtf8String(prefix); - int i = data.IndexOf((byte)'\n'); + int i = data.IndexOfAny((byte)'\r', (byte)'\n'); if (i < 0) { writer.WriteUtf8String(data); return; } - int lineLength = i > 0 && data[i - 1] == '\r' ? i - 1 : i; + int lineLength = i; + if (data[i++] == '\r' && i < data.Length && data[i] == '\n') + { + i++; + } + ReadOnlySpan nextLine = data.Slice(0, lineLength); - data = data.Slice(i + 1); + data = data.Slice(i); writer.WriteUtf8String(nextLine); writer.WriteUtf8String(s_newLine); diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs index d0f7c9cd9808c..b73e4aef46e5f 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseItem.cs @@ -20,7 +20,7 @@ public readonly struct SseItem /// Thrown when contains a line break. public SseItem(T data, string? eventType = null) { - if (eventType?.Contains('\n') is true) + if (eventType?.ContainsLineBreaks() is true) { ThrowHelper.ThrowArgumentException_CannotContainLineBreaks(nameof(eventType)); } @@ -42,7 +42,7 @@ public string? EventId get => _eventId; init { - if (value?.Contains('\n') is true) + if (value?.ContainsLineBreaks() is true) { ThrowHelper.ThrowArgumentException_CannotContainLineBreaks(nameof(EventId)); } diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs index 522a092f2c111..a21f74f8cacb2 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseFormatterTests.cs @@ -38,8 +38,11 @@ public static async Task WriteAsync_HasExpectedFormat() "data: data3\n\n" + "data: \n\n" + "event: eventType4\ndata: data4\nid: id4\n\n" + - "event: eventType4\ndata: data with\ndata: multiple \rline\ndata: breaks\n\n" + - "data: line break at end\ndata: \n\n"; + "event: eventType4\ndata: data\ndata: \ndata: with\ndata: multiple \ndata: line\ndata: breaks\n\n" + + "data: LF at end\ndata: \n\n" + + "data: CR at end\ndata: \n\n" + + "data: CRLF at end\ndata: \n\n" + + "data: LFCR at end\ndata: \ndata: \n\n"; using MemoryStream stream = new(); @@ -61,8 +64,11 @@ public static async Task WriteAsync_ItemFormatter_HasExpectedFormat() "data: data3_suffix\n\n" + "data: _suffix\n\n" + "event: eventType4\ndata: data4_suffix\nid: id4\n\n" + - "event: eventType4\ndata: data with\ndata: multiple \rline\ndata: breaks_suffix\n\n" + - "data: line break at end\ndata: _suffix\n\n"; + "event: eventType4\ndata: data\ndata: \ndata: with\ndata: multiple \ndata: line\ndata: breaks_suffix\n\n" + + "data: LF at end\ndata: _suffix\n\n" + + "data: CR at end\ndata: _suffix\n\n" + + "data: CRLF at end\ndata: _suffix\n\n" + + "data: LFCR at end\ndata: \ndata: _suffix\n\n"; using MemoryStream stream = new(); @@ -83,8 +89,11 @@ private static async IAsyncEnumerable> GetItemsAsync() yield return new SseItem(data: null!, null); yield return new SseItem("data4", "eventType4") { EventId = "id4" }; await Task.Yield(); - yield return new SseItem("data with\nmultiple \rline\r\nbreaks", "eventType4"); - yield return new SseItem("line break at end\n", null); + yield return new SseItem("data\n\r with\nmultiple \rline\r\nbreaks", "eventType4"); + yield return new SseItem("LF at end\n", null); + yield return new SseItem("CR at end\r", null); + yield return new SseItem("CRLF at end\r\n", null); + yield return new SseItem("LFCR at end\n\r", null); } [Fact] diff --git a/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs b/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs index 54609d068faa7..75b5bf1b8cec7 100644 --- a/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs +++ b/src/libraries/System.Net.ServerSentEvents/tests/SseItemTests.cs @@ -35,6 +35,7 @@ public void SseItem_Roundtrips() [InlineData("\n")] [InlineData("Hello, World!\n")] [InlineData("Hello, \r\nWorld!")] + [InlineData("Hello, \rWorld!")] public void SseItem_MetadataWithLineBreak_ThrowsArgumentException(string metadataWithLineBreak) { Assert.Throws("eventType", () => new SseItem("data", eventType: metadataWithLineBreak)); From b35c650395bf400250c9b94b3ad78af52a6df18a Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Tue, 19 Nov 2024 20:15:46 +0000 Subject: [PATCH 11/11] Simplify PooledByteBufferWriter. --- .../src/System.Net.ServerSentEvents.csproj | 1 + .../PooledByteBufferWriter.cs | 138 ++---------------- 2 files changed, 12 insertions(+), 127 deletions(-) diff --git a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj index 65d80337f74c2..7ca797752fa43 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj +++ b/src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj @@ -11,6 +11,7 @@ System.Net.ServerSentEvents.SseParser + diff --git a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs index f4af617cfced3..81e5070b765d0 100644 --- a/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs +++ b/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs @@ -8,142 +8,26 @@ namespace System.Net.ServerSentEvents { internal sealed class PooledByteBufferWriter : IBufferWriter, IDisposable { - private byte[] _rentedBuffer; - private int _index; - private const int MinimumBufferSize = 256; + private ArrayBuffer _buffer = new(initialSize: 256, usePool: true); - // Value copied from Array.MaxLength in System.Private.CoreLib/src/libraries/System.Private.CoreLib/src/System/Array.cs. - public const int MaximumBufferSize = 0X7FFFFFC7; - - public PooledByteBufferWriter(int initialCapacity = MinimumBufferSize) - { - Debug.Assert(initialCapacity > 0); - - _rentedBuffer = ArrayPool.Shared.Rent(initialCapacity); - _index = 0; - } - - public ReadOnlyMemory WrittenMemory - { - get - { - Debug.Assert(_rentedBuffer != null); - Debug.Assert(_index <= _rentedBuffer.Length); - return _rentedBuffer.AsMemory(0, _index); - } - } - - public int WrittenCount - { - get - { - Debug.Assert(_rentedBuffer != null); - return _index; - } - } - - public int Capacity - { - get - { - Debug.Assert(_rentedBuffer != null); - return _rentedBuffer.Length; - } - } - - public void Reset() - { - Debug.Assert(_rentedBuffer != null); - Debug.Assert(_index <= _rentedBuffer.Length); - - _rentedBuffer.AsSpan(0, _index).Clear(); - _index = 0; - } - - // Returns the rented buffer back to the pool - public void Dispose() - { - if (_rentedBuffer == null) - { - return; - } - - Reset(); - byte[] toReturn = _rentedBuffer; - _rentedBuffer = null!; - ArrayPool.Shared.Return(toReturn); - } - - public void Advance(int count) - { - Debug.Assert(_rentedBuffer != null); - Debug.Assert(count >= 0); - Debug.Assert(_index <= _rentedBuffer.Length - count); - _index += count; - } + public void Advance(int count) => _buffer.Commit(count); public Memory GetMemory(int sizeHint = 0) { - CheckAndResizeBuffer(sizeHint); - return _rentedBuffer.AsMemory(_index); + _buffer.EnsureAvailableSpace(sizeHint); + return _buffer.AvailableMemory; } public Span GetSpan(int sizeHint = 0) { - CheckAndResizeBuffer(sizeHint); - return _rentedBuffer.AsSpan(_index); + _buffer.EnsureAvailableSpace(sizeHint); + return _buffer.AvailableSpan; } - private void CheckAndResizeBuffer(int sizeHint) - { - Debug.Assert(_rentedBuffer != null); - Debug.Assert(sizeHint >= 0); - - int currentLength = _rentedBuffer.Length; - int availableSpace = currentLength - _index; - - if (sizeHint == 0) - { - sizeHint = MinimumBufferSize; - } - - // If we've reached ~1GB written, grow to the maximum buffer - // length to avoid incessant minimal growths causing perf issues. - if (_index >= MaximumBufferSize / 2) - { - sizeHint = Math.Max(sizeHint, MaximumBufferSize - currentLength); - } - - if (sizeHint > availableSpace) - { - int growBy = Math.Max(sizeHint, currentLength); - - int newSize = currentLength + growBy; - - if ((uint)newSize > MaximumBufferSize) - { - newSize = currentLength + sizeHint; - if ((uint)newSize > MaximumBufferSize) - { - throw new OutOfMemoryException(); - } - } - - byte[] oldBuffer = _rentedBuffer; - - _rentedBuffer = ArrayPool.Shared.Rent(newSize); - - Debug.Assert(oldBuffer.Length >= _index); - Debug.Assert(_rentedBuffer.Length >= _index); - - Span oldBufferAsSpan = oldBuffer.AsSpan(0, _index); - oldBufferAsSpan.CopyTo(_rentedBuffer); - oldBufferAsSpan.Clear(); - ArrayPool.Shared.Return(oldBuffer); - } - - Debug.Assert(_rentedBuffer.Length - _index > 0); - Debug.Assert(_rentedBuffer.Length - _index >= sizeHint); - } + public ReadOnlyMemory WrittenMemory => _buffer.ActiveMemory; + public int Capacity => _buffer.Capacity; + public int WrittenCount => _buffer.ActiveLength; + public void Reset() => _buffer.Discard(_buffer.ActiveLength); + public void Dispose() => _buffer.Dispose(); } }