Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageBuffer.Factory and JSON extensions #237

Merged
merged 2 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 57 additions & 10 deletions src/messaging/dotnet/src/Core/MessageBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using CommunityToolkit.HighPerformance.Buffers;

namespace MorganStanley.ComposeUI.Messaging;

Expand Down Expand Up @@ -53,16 +54,9 @@ public ReadOnlySpan<byte> GetSpan()
/// <returns>True, if the decoding was successful, False otherwise.</returns>
public bool TryGetBase64Bytes(IBufferWriter<byte> bufferWriter)
{
var utf8Span = new Span<byte>(_bytes, 0, _length);
var span = bufferWriter.GetSpan(Base64.GetMaxDecodedFromUtf8Length(utf8Span.Length));
var status = Base64.DecodeFromUtf8(utf8Span, span, out _, out var bytesWritten);

if (status != OperationStatus.Done)
return false;

bufferWriter.Advance(bytesWritten);
ThrowIfDisposed();

return true;
return TryGetBase64BytesCore(bufferWriter);
}

/// <summary>
Expand Down Expand Up @@ -123,6 +117,11 @@ public void Dispose()
GC.SuppressFinalize(this);
}

/// <summary>
/// Acts as a stub for extensions methods that create <see cref="MessageBuffer"/> instances with various formats.
/// </summary>
public static MessageBufferFactory Factory { get; } = new();

/// <summary>
/// Creates a new <see cref="MessageBuffer" /> from a string.
/// </summary>
Expand Down Expand Up @@ -161,6 +160,14 @@ public static MessageBuffer Create(ReadOnlySpan<byte> utf8Bytes)
return new MessageBuffer(buffer, utf8Bytes.Length);
}

/// <summary>
/// Creates a new <see cref="MessageBuffer" /> from a memory block containing the raw UTF8 bytes.
/// </summary>
/// <param name="utf8Bytes"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException">The content of the buffer is not a valid UTF8 byte sequence.</exception>
public static MessageBuffer Create(ReadOnlyMemory<byte> utf8Bytes) => Create(utf8Bytes.Span);

/// <summary>
/// Creates a new <see cref="MessageBuffer" /> from a sequence containing the raw UTF8 bytes.
/// </summary>
Expand Down Expand Up @@ -247,6 +254,25 @@ public static MessageBuffer CreateBase64(ReadOnlySequence<byte> bytes)
}
}

/// <summary>
/// Returns an <see cref="ArrayPoolBufferWriter{T}"/> that can be used to build byte arrays in a memory-efficient way.
/// </summary>
/// <returns></returns>
public static ArrayPoolBufferWriter<byte> GetBufferWriter()
{
return new ArrayPoolBufferWriter<byte>(Pool);
}

/// <summary>
/// Returns an <see cref="ArrayPoolBufferWriter{T}"/> that can be used to build byte arrays in a memory-efficient way.
/// </summary>
/// <param name="capacity">The initial capacity of the buffer</param>
/// <returns></returns>
public static ArrayPoolBufferWriter<byte> GetBufferWriter(int capacity)
{
return new ArrayPoolBufferWriter<byte>(Pool, capacity);
}

/// <summary>
/// Creates a new <see cref="MessageBuffer" /> using the provided buffer.
/// The buffer must have been allocated by calling <see cref="MessageBuffer.GetBuffer" />
Expand Down Expand Up @@ -304,16 +330,37 @@ private static void ValidateUtf8Bytes(ReadOnlySpan<byte> bytes)
}
}

private bool TryGetBase64BytesCore(IBufferWriter<byte> bufferWriter)
{
var utf8Span = new Span<byte>(_bytes, 0, _length);
var span = bufferWriter.GetSpan(Base64.GetMaxDecodedFromUtf8Length(utf8Span.Length));
var status = Base64.DecodeFromUtf8(utf8Span, span, out _, out var bytesWritten);

if (status != OperationStatus.Done)
return false;

bufferWriter.Advance(bytesWritten);

return true;
}

~MessageBuffer()
{
DisposeCore();
}

private static readonly UTF8Encoding Encoding = new(false, true);

/// <summary>
/// <seealso cref="MessageBuffer.Factory"/>
/// </summary>
public sealed class MessageBufferFactory
{
}

private static class ThrowHelper
{
public static InvalidOperationException InvalidBase64()
public static FormatException InvalidBase64()
{
return new("The current buffer is not Base64-encoded");
}
Expand Down
19 changes: 12 additions & 7 deletions src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@ public static class MessageBufferJsonExtensions
/// <summary>
/// Creates a <see cref="MessageBuffer"/> from the provided value serialized to JSON.
/// </summary>
/// <param name="factory"></param>
/// <param name="value"></param>
/// <param name="options"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static MessageBuffer CreateJson<T>(T value, JsonSerializerOptions? options = null)
public static MessageBuffer CreateJson<T>(
this MessageBuffer.MessageBufferFactory factory,
T value,
JsonSerializerOptions? options = null)
{
using var stream = new RecyclableMemoryStream(RecyclableMemoryStreamManager);
JsonSerializer.Serialize(stream, value, options);
return MessageBuffer.Create(stream.GetReadOnlySequence());
using var bufferWriter = MessageBuffer.GetBufferWriter();
using var jsonWriter = new Utf8JsonWriter(bufferWriter);
JsonSerializer.Serialize(jsonWriter, value, options);
jsonWriter.Flush();

return MessageBuffer.Create(bufferWriter.WrittenMemory);
}

private static readonly RecyclableMemoryStreamManager RecyclableMemoryStreamManager = new();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static ValueTask PublishJsonAsync<TPayload>(
{
return messageRouter.PublishAsync(
topic,
MessageBufferJsonExtensions.CreateJson(payload, jsonSerializerOptions),
MessageBuffer.Factory.CreateJson(payload, jsonSerializerOptions),
publishOptions,
cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="CommunityToolkit.HighPerformance" Version="8.0.0" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.2.1" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Reactive.Async" Version="6.0.0-alpha.3" />
Expand Down
71 changes: 71 additions & 0 deletions src/messaging/dotnet/test/Core.Tests/MessageBuffer.Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// and limitations under the License.

using System.Buffers;
using System.Buffers.Text;
using System.Text;
using MorganStanley.ComposeUI.Messaging.TestUtils;

Expand All @@ -29,6 +30,15 @@ public void GetString_returns_the_original_string(string input)
buffer.GetString().Should().Be(input);
}

[Fact]
public void GetString_throws_if_disposed()
{
var buffer = MessageBuffer.Create("x");
buffer.Dispose();

Assert.Throws<ObjectDisposedException>(() => buffer.GetString());
}

[Theory]
[InlineData("")]
[InlineData(StringConstants.LoremIpsum)]
Expand All @@ -41,6 +51,15 @@ public void GetSpan_returns_the_original_string_encoded_as_UTF8(string input)
span.ToArray().Should().Equal(Encoding.UTF8.GetBytes(input));
}

[Fact]
public void GetSpan_throws_if_disposed()
{
var buffer = MessageBuffer.Create("x");
buffer.Dispose();

Assert.Throws<ObjectDisposedException>(() => buffer.GetSpan());
}

[Theory]
[InlineData("")]
[InlineData(StringConstants.LoremIpsum)]
Expand All @@ -65,6 +84,58 @@ public void GetSpan_returns_the_original_UTF8_bytes(string input)
span.ToArray().Should().Equal(bytes);
}

[Fact]
public void TryGetBase64Bytes_gets_the_decoded_bytes_and_returns_true_when_the_data_is_valid_Base64()
{
var bytes = GetRandomBytes(100);
var base64String = Convert.ToBase64String(bytes);
var bufferWriter = new ArrayBufferWriter<byte>();
var buffer = MessageBuffer.Create(base64String);

buffer.TryGetBase64Bytes(out var decodedBytes).Should().BeTrue();
buffer.TryGetBase64Bytes(bufferWriter).Should().BeTrue();
decodedBytes.Should().Equal(bytes);
bufferWriter.WrittenMemory.ToArray().Should().Equal(bytes);
}

[Fact]
public void TryGetBase64Bytes_throws_if_disposed()
{
var buffer = MessageBuffer.Create("x");
buffer.Dispose();

Assert.Throws<ObjectDisposedException>(() => buffer.TryGetBase64Bytes(out _));
Assert.Throws<ObjectDisposedException>(() => buffer.TryGetBase64Bytes(new ArrayBufferWriter<byte>()));
}

[Fact]
public void GetBase64Bytes_throws_if_disposed()
{
var buffer = MessageBuffer.Create("x");
buffer.Dispose();

Assert.Throws<ObjectDisposedException>(() => buffer.GetBase64Bytes());
Assert.Throws<ObjectDisposedException>(() => buffer.GetBase64Bytes(new ArrayBufferWriter<byte>()));
}

[Fact]
public void TryGetBase64Bytes_returns_false_if_the_data_is_not_valid_Base64()
{
var buffer = MessageBuffer.Create("****");

buffer.TryGetBase64Bytes(out _).Should().BeFalse();
buffer.TryGetBase64Bytes(new ArrayBufferWriter<byte>()).Should().BeFalse();
}

[Fact]
public void GetBase64Bytes_throws_if_the_data_is_not_valid_Base64()
{
var buffer = MessageBuffer.Create("****");

Assert.Throws<FormatException>(() => buffer.GetBase64Bytes());
Assert.Throws<FormatException>(() => buffer.GetBase64Bytes(new ArrayBufferWriter<byte>()));
}

[Fact]
public void Create_throws_when_called_with_invalid_UTF8()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ public void ReadJson_respects_the_provided_JsonSerializerOptions()
});
}

[Fact]
public void CreateJson_creates_a_MessageBuffer_with_the_JSON_string()
{
var payload = new TestPayload
{
Name = "test-name",
Value = "test-value"
};

var buffer = MessageBuffer.Factory.CreateJson(payload);

JsonSerializer.Deserialize<TestPayload>(buffer.GetString()).Should().BeEquivalentTo(payload);
}

private class TestPayload
{
public string Name { get; set; }
Expand Down