From 2aa800f63705155db66772382d6e37675afae83a Mon Sep 17 00:00:00 2001 From: Marton Balassa <7115274+BalassaMarton@users.noreply.github.com> Date: Thu, 22 Jun 2023 12:21:20 +0200 Subject: [PATCH] Removed dependency on RecyclableMemoryStream in favor of ArrayPoolBufferWriter --- .../Client/WebSocket/WebSocketConnection.cs | 23 +++++++++++-------- .../src/Core/MessageBufferJsonExtensions.cs | 1 - ...ganStanley.ComposeUI.Messaging.Core.csproj | 1 - .../Server/WebSocket/WebSocketConnection.cs | 11 ++++----- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/messaging/dotnet/src/Client/Client/WebSocket/WebSocketConnection.cs b/src/messaging/dotnet/src/Client/Client/WebSocket/WebSocketConnection.cs index 09f6877b7..9f6706f68 100644 --- a/src/messaging/dotnet/src/Client/Client/WebSocket/WebSocketConnection.cs +++ b/src/messaging/dotnet/src/Client/Client/WebSocket/WebSocketConnection.cs @@ -15,10 +15,10 @@ using System.IO.Pipelines; using System.Net.WebSockets; using System.Threading.Channels; +using CommunityToolkit.HighPerformance.Buffers; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; -using Microsoft.IO; using MorganStanley.ComposeUI.Messaging.Client.Abstractions; using MorganStanley.ComposeUI.Messaging.Exceptions; using MorganStanley.ComposeUI.Messaging.Protocol.Json; @@ -69,12 +69,11 @@ public async ValueTask SendAsync(Message message, CancellationToken cancellation { try { - // TODO: Instead of a pooled buffer, we could have an IBufferWriter that writes directly to the websocket - await using var stream = new RecyclableMemoryStream(MemoryStreamManager); - JsonMessageSerializer.SerializeMessage(message, stream); + using var bufferWriter = new ArrayPoolBufferWriter(ArrayPool.Shared); + JsonMessageSerializer.SerializeMessage(message, bufferWriter); await _webSocket.SendAsync( - new ArraySegment(stream.GetBuffer(), 0, (int)stream.Length), + bufferWriter.WrittenMemory, WebSocketMessageType.Text, WebSocketMessageFlags.EndOfMessage, _stopTokenSource.Token); @@ -85,7 +84,10 @@ await _webSocket.SendAsync( } catch (Exception e) { - _logger.LogError(e, "Exception thrown while trying to send a message over the WebSocket: {ExceptionMessage}", e.Message); + _logger.LogError( + e, + "Exception thrown while trying to send a message over the WebSocket: {ExceptionMessage}", + e.Message); } } @@ -167,7 +169,10 @@ private async Task ReceiveMessages() } catch (Exception e) { - _logger.LogError(e, "Exception thrown while trying to read a message from the WebSocket: {ExceptionMessage}", e.Message); + _logger.LogError( + e, + "Exception thrown while trying to read a message from the WebSocket: {ExceptionMessage}", + e.Message); _receiveChannel.Writer.TryComplete(); } } @@ -190,6 +195,4 @@ private static bool TryReadMessage(ref ReadOnlySequence buffer, [NotNullWh return false; } } - - private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new(); -} +} \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs b/src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs index 7dcdeab49..491bffb99 100644 --- a/src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs +++ b/src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs @@ -11,7 +11,6 @@ // and limitations under the License. using System.Text.Json; -using Microsoft.IO; namespace MorganStanley.ComposeUI.Messaging; diff --git a/src/messaging/dotnet/src/Core/MorganStanley.ComposeUI.Messaging.Core.csproj b/src/messaging/dotnet/src/Core/MorganStanley.ComposeUI.Messaging.Core.csproj index 8b4f6946d..25bd31b33 100644 --- a/src/messaging/dotnet/src/Core/MorganStanley.ComposeUI.Messaging.Core.csproj +++ b/src/messaging/dotnet/src/Core/MorganStanley.ComposeUI.Messaging.Core.csproj @@ -15,7 +15,6 @@ - diff --git a/src/messaging/dotnet/src/Server/Server/WebSocket/WebSocketConnection.cs b/src/messaging/dotnet/src/Server/Server/WebSocket/WebSocketConnection.cs index adc9702bb..2c9d7f133 100644 --- a/src/messaging/dotnet/src/Server/Server/WebSocket/WebSocketConnection.cs +++ b/src/messaging/dotnet/src/Server/Server/WebSocket/WebSocketConnection.cs @@ -15,9 +15,9 @@ using System.IO.Pipelines; using System.Net.WebSockets; using System.Threading.Channels; +using CommunityToolkit.HighPerformance.Buffers; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using Microsoft.IO; using MorganStanley.ComposeUI.Messaging.Exceptions; using MorganStanley.ComposeUI.Messaging.Protocol.Json; using MorganStanley.ComposeUI.Messaging.Protocol.Messages; @@ -225,12 +225,11 @@ private async Task SendMessagesAsync( if (webSocket.State != WebSocketState.Open || cancellationToken.IsCancellationRequested) break; - // TODO: Instead of a pooled buffer, we could have an IBufferWriter that writes directly to the websocket - await using var stream = new RecyclableMemoryStream(MemoryStreamManager); - JsonMessageSerializer.SerializeMessage(message, stream); + using var bufferWriter = new ArrayPoolBufferWriter(ArrayPool.Shared); + JsonMessageSerializer.SerializeMessage(message, bufferWriter); await webSocket.SendAsync( - new ArraySegment(stream.GetBuffer(), 0, (int)stream.Length), + bufferWriter.WrittenMemory, WebSocketMessageType.Text, WebSocketMessageFlags.EndOfMessage, cancellationToken); @@ -264,6 +263,4 @@ private bool TryReadMessage(ref ReadOnlySequence buffer, [NotNullWhen(true return false; } } - - private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new(); }