Skip to content

Commit

Permalink
Removed dependency on RecyclableMemoryStream in favor of ArrayPoolBuf…
Browse files Browse the repository at this point in the history
…ferWriter
  • Loading branch information
BalassaMarton committed Jun 22, 2023
1 parent 833c7c4 commit 2aa800f
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
using System.IO.Pipelines;
using System.Net.WebSockets;
using System.Threading.Channels;
using CommunityToolkit.HighPerformance.Buffers;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.IO;
using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Exceptions;
using MorganStanley.ComposeUI.Messaging.Protocol.Json;
Expand Down Expand Up @@ -69,12 +69,11 @@ public async ValueTask SendAsync(Message message, CancellationToken cancellation
{
try
{
// TODO: Instead of a pooled buffer, we could have an IBufferWriter that writes directly to the websocket
await using var stream = new RecyclableMemoryStream(MemoryStreamManager);
JsonMessageSerializer.SerializeMessage(message, stream);
using var bufferWriter = new ArrayPoolBufferWriter<byte>(ArrayPool<byte>.Shared);
JsonMessageSerializer.SerializeMessage(message, bufferWriter);

await _webSocket.SendAsync(
new ArraySegment<byte>(stream.GetBuffer(), 0, (int)stream.Length),
bufferWriter.WrittenMemory,
WebSocketMessageType.Text,
WebSocketMessageFlags.EndOfMessage,
_stopTokenSource.Token);
Expand All @@ -85,7 +84,10 @@ await _webSocket.SendAsync(
}
catch (Exception e)
{
_logger.LogError(e, "Exception thrown while trying to send a message over the WebSocket: {ExceptionMessage}", e.Message);
_logger.LogError(
e,
"Exception thrown while trying to send a message over the WebSocket: {ExceptionMessage}",
e.Message);
}
}

Expand Down Expand Up @@ -167,7 +169,10 @@ private async Task ReceiveMessages()
}
catch (Exception e)
{
_logger.LogError(e, "Exception thrown while trying to read a message from the WebSocket: {ExceptionMessage}", e.Message);
_logger.LogError(
e,
"Exception thrown while trying to read a message from the WebSocket: {ExceptionMessage}",
e.Message);
_receiveChannel.Writer.TryComplete();
}
}
Expand All @@ -190,6 +195,4 @@ private static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, [NotNullWh
return false;
}
}

private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// and limitations under the License.

using System.Text.Json;
using Microsoft.IO;

namespace MorganStanley.ComposeUI.Messaging;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

<ItemGroup>
<PackageReference Include="CommunityToolkit.HighPerformance" Version="8.0.0" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.2.1" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Reactive.Async" Version="6.0.0-alpha.3" />
<PackageReference Include="System.Text.Json" Version="6.0.7" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
using System.IO.Pipelines;
using System.Net.WebSockets;
using System.Threading.Channels;
using CommunityToolkit.HighPerformance.Buffers;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.IO;
using MorganStanley.ComposeUI.Messaging.Exceptions;
using MorganStanley.ComposeUI.Messaging.Protocol.Json;
using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
Expand Down Expand Up @@ -225,12 +225,11 @@ private async Task SendMessagesAsync(
if (webSocket.State != WebSocketState.Open || cancellationToken.IsCancellationRequested)
break;

// TODO: Instead of a pooled buffer, we could have an IBufferWriter that writes directly to the websocket
await using var stream = new RecyclableMemoryStream(MemoryStreamManager);
JsonMessageSerializer.SerializeMessage(message, stream);
using var bufferWriter = new ArrayPoolBufferWriter<byte>(ArrayPool<byte>.Shared);
JsonMessageSerializer.SerializeMessage(message, bufferWriter);

await webSocket.SendAsync(
new ArraySegment<byte>(stream.GetBuffer(), 0, (int)stream.Length),
bufferWriter.WrittenMemory,
WebSocketMessageType.Text,
WebSocketMessageFlags.EndOfMessage,
cancellationToken);
Expand Down Expand Up @@ -264,6 +263,4 @@ private bool TryReadMessage(ref ReadOnlySequence<byte> buffer, [NotNullWhen(true
return false;
}
}

private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new();
}

0 comments on commit 2aa800f

Please sign in to comment.