Skip to content

Commit

Permalink
Merge pull request #237 from BalassaMarton/messagebuffer-factory
Browse files Browse the repository at this point in the history
MessageBuffer.Factory and JSON extensions
  • Loading branch information
BalassaMarton authored Jun 19, 2023
2 parents 7ec6dfd + e36257a commit d966c88
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 18 deletions.
67 changes: 57 additions & 10 deletions src/messaging/dotnet/src/Core/MessageBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using CommunityToolkit.HighPerformance.Buffers;

namespace MorganStanley.ComposeUI.Messaging;

Expand Down Expand Up @@ -53,16 +54,9 @@ public ReadOnlySpan<byte> GetSpan()
/// <returns>True, if the decoding was successful, False otherwise.</returns>
public bool TryGetBase64Bytes(IBufferWriter<byte> bufferWriter)
{
var utf8Span = new Span<byte>(_bytes, 0, _length);
var span = bufferWriter.GetSpan(Base64.GetMaxDecodedFromUtf8Length(utf8Span.Length));
var status = Base64.DecodeFromUtf8(utf8Span, span, out _, out var bytesWritten);

if (status != OperationStatus.Done)
return false;

bufferWriter.Advance(bytesWritten);
ThrowIfDisposed();

return true;
return TryGetBase64BytesCore(bufferWriter);
}

/// <summary>
Expand Down Expand Up @@ -123,6 +117,11 @@ public void Dispose()
GC.SuppressFinalize(this);
}

/// <summary>
/// Acts as a stub for extensions methods that create <see cref="MessageBuffer"/> instances with various formats.
/// </summary>
public static MessageBufferFactory Factory { get; } = new();

/// <summary>
/// Creates a new <see cref="MessageBuffer" /> from a string.
/// </summary>
Expand Down Expand Up @@ -161,6 +160,14 @@ public static MessageBuffer Create(ReadOnlySpan<byte> utf8Bytes)
return new MessageBuffer(buffer, utf8Bytes.Length);
}

/// <summary>
/// Creates a new <see cref="MessageBuffer" /> from a memory block containing the raw UTF8 bytes.
/// </summary>
/// <param name="utf8Bytes"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException">The content of the buffer is not a valid UTF8 byte sequence.</exception>
public static MessageBuffer Create(ReadOnlyMemory<byte> utf8Bytes) => Create(utf8Bytes.Span);

/// <summary>
/// Creates a new <see cref="MessageBuffer" /> from a sequence containing the raw UTF8 bytes.
/// </summary>
Expand Down Expand Up @@ -247,6 +254,25 @@ public static MessageBuffer CreateBase64(ReadOnlySequence<byte> bytes)
}
}

/// <summary>
/// Returns an <see cref="ArrayPoolBufferWriter{T}"/> that can be used to build byte arrays in a memory-efficient way.
/// </summary>
/// <returns></returns>
public static ArrayPoolBufferWriter<byte> GetBufferWriter()
{
return new ArrayPoolBufferWriter<byte>(Pool);
}

/// <summary>
/// Returns an <see cref="ArrayPoolBufferWriter{T}"/> that can be used to build byte arrays in a memory-efficient way.
/// </summary>
/// <param name="capacity">The initial capacity of the buffer</param>
/// <returns></returns>
public static ArrayPoolBufferWriter<byte> GetBufferWriter(int capacity)
{
return new ArrayPoolBufferWriter<byte>(Pool, capacity);
}

/// <summary>
/// Creates a new <see cref="MessageBuffer" /> using the provided buffer.
/// The buffer must have been allocated by calling <see cref="MessageBuffer.GetBuffer" />
Expand Down Expand Up @@ -304,16 +330,37 @@ private static void ValidateUtf8Bytes(ReadOnlySpan<byte> bytes)
}
}

private bool TryGetBase64BytesCore(IBufferWriter<byte> bufferWriter)
{
var utf8Span = new Span<byte>(_bytes, 0, _length);
var span = bufferWriter.GetSpan(Base64.GetMaxDecodedFromUtf8Length(utf8Span.Length));
var status = Base64.DecodeFromUtf8(utf8Span, span, out _, out var bytesWritten);

if (status != OperationStatus.Done)
return false;

bufferWriter.Advance(bytesWritten);

return true;
}

~MessageBuffer()
{
DisposeCore();
}

private static readonly UTF8Encoding Encoding = new(false, true);

/// <summary>
/// <seealso cref="MessageBuffer.Factory"/>
/// </summary>
public sealed class MessageBufferFactory
{
}

private static class ThrowHelper
{
public static InvalidOperationException InvalidBase64()
public static FormatException InvalidBase64()
{
return new("The current buffer is not Base64-encoded");
}
Expand Down
19 changes: 12 additions & 7 deletions src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@ public static class MessageBufferJsonExtensions
/// <summary>
/// Creates a <see cref="MessageBuffer"/> from the provided value serialized to JSON.
/// </summary>
/// <param name="factory"></param>
/// <param name="value"></param>
/// <param name="options"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static MessageBuffer CreateJson<T>(T value, JsonSerializerOptions? options = null)
public static MessageBuffer CreateJson<T>(
this MessageBuffer.MessageBufferFactory factory,
T value,
JsonSerializerOptions? options = null)
{
using var stream = new RecyclableMemoryStream(RecyclableMemoryStreamManager);
JsonSerializer.Serialize(stream, value, options);
return MessageBuffer.Create(stream.GetReadOnlySequence());
using var bufferWriter = MessageBuffer.GetBufferWriter();
using var jsonWriter = new Utf8JsonWriter(bufferWriter);
JsonSerializer.Serialize(jsonWriter, value, options);
jsonWriter.Flush();

return MessageBuffer.Create(bufferWriter.WrittenMemory);
}

private static readonly RecyclableMemoryStreamManager RecyclableMemoryStreamManager = new();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static ValueTask PublishJsonAsync<TPayload>(
{
return messageRouter.PublishAsync(
topic,
MessageBufferJsonExtensions.CreateJson(payload, jsonSerializerOptions),
MessageBuffer.Factory.CreateJson(payload, jsonSerializerOptions),
publishOptions,
cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="CommunityToolkit.HighPerformance" Version="8.0.0" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.2.1" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Reactive.Async" Version="6.0.0-alpha.3" />
Expand Down
71 changes: 71 additions & 0 deletions src/messaging/dotnet/test/Core.Tests/MessageBuffer.Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// and limitations under the License.

using System.Buffers;
using System.Buffers.Text;
using System.Text;
using MorganStanley.ComposeUI.Messaging.TestUtils;

Expand All @@ -29,6 +30,15 @@ public void GetString_returns_the_original_string(string input)
buffer.GetString().Should().Be(input);
}

[Fact]
public void GetString_throws_if_disposed()
{
var buffer = MessageBuffer.Create("x");
buffer.Dispose();

Assert.Throws<ObjectDisposedException>(() => buffer.GetString());
}

[Theory]
[InlineData("")]
[InlineData(StringConstants.LoremIpsum)]
Expand All @@ -41,6 +51,15 @@ public void GetSpan_returns_the_original_string_encoded_as_UTF8(string input)
span.ToArray().Should().Equal(Encoding.UTF8.GetBytes(input));
}

[Fact]
public void GetSpan_throws_if_disposed()
{
var buffer = MessageBuffer.Create("x");
buffer.Dispose();

Assert.Throws<ObjectDisposedException>(() => buffer.GetSpan());
}

[Theory]
[InlineData("")]
[InlineData(StringConstants.LoremIpsum)]
Expand All @@ -65,6 +84,58 @@ public void GetSpan_returns_the_original_UTF8_bytes(string input)
span.ToArray().Should().Equal(bytes);
}

[Fact]
public void TryGetBase64Bytes_gets_the_decoded_bytes_and_returns_true_when_the_data_is_valid_Base64()
{
var bytes = GetRandomBytes(100);
var base64String = Convert.ToBase64String(bytes);
var bufferWriter = new ArrayBufferWriter<byte>();
var buffer = MessageBuffer.Create(base64String);

buffer.TryGetBase64Bytes(out var decodedBytes).Should().BeTrue();
buffer.TryGetBase64Bytes(bufferWriter).Should().BeTrue();
decodedBytes.Should().Equal(bytes);
bufferWriter.WrittenMemory.ToArray().Should().Equal(bytes);
}

[Fact]
public void TryGetBase64Bytes_throws_if_disposed()
{
var buffer = MessageBuffer.Create("x");
buffer.Dispose();

Assert.Throws<ObjectDisposedException>(() => buffer.TryGetBase64Bytes(out _));
Assert.Throws<ObjectDisposedException>(() => buffer.TryGetBase64Bytes(new ArrayBufferWriter<byte>()));
}

[Fact]
public void GetBase64Bytes_throws_if_disposed()
{
var buffer = MessageBuffer.Create("x");
buffer.Dispose();

Assert.Throws<ObjectDisposedException>(() => buffer.GetBase64Bytes());
Assert.Throws<ObjectDisposedException>(() => buffer.GetBase64Bytes(new ArrayBufferWriter<byte>()));
}

[Fact]
public void TryGetBase64Bytes_returns_false_if_the_data_is_not_valid_Base64()
{
var buffer = MessageBuffer.Create("****");

buffer.TryGetBase64Bytes(out _).Should().BeFalse();
buffer.TryGetBase64Bytes(new ArrayBufferWriter<byte>()).Should().BeFalse();
}

[Fact]
public void GetBase64Bytes_throws_if_the_data_is_not_valid_Base64()
{
var buffer = MessageBuffer.Create("****");

Assert.Throws<FormatException>(() => buffer.GetBase64Bytes());
Assert.Throws<FormatException>(() => buffer.GetBase64Bytes(new ArrayBufferWriter<byte>()));
}

[Fact]
public void Create_throws_when_called_with_invalid_UTF8()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ public void ReadJson_respects_the_provided_JsonSerializerOptions()
});
}

[Fact]
public void CreateJson_creates_a_MessageBuffer_with_the_JSON_string()
{
var payload = new TestPayload
{
Name = "test-name",
Value = "test-value"
};

var buffer = MessageBuffer.Factory.CreateJson(payload);

JsonSerializer.Deserialize<TestPayload>(buffer.GetString()).Should().BeEquivalentTo(payload);
}

private class TestPayload
{
public string Name { get; set; }
Expand Down

0 comments on commit d966c88

Please sign in to comment.