From 72ed8cd7e6d584fe95924d959482ae9daf3f0f14 Mon Sep 17 00:00:00 2001 From: Ferenc Hubicsak <4752853+fhubi@users.noreply.github.com> Date: Mon, 24 Jun 2024 16:59:11 +0200 Subject: [PATCH] Separated messaging layer from UserChannel (#689) Separated messaging layer from UserChannel. --- src/fdc3/dotnet/DesktopAgent/DesktopAgent.sln | 7 ++ .../ServiceCollectionExtensions.cs | 3 +- .../Fdc3DesktopAgentMessageRouterService.cs | 6 +- .../UserChannel.cs | 43 +++++---- ...3DesktopAgentMessageRouterService.Tests.cs | 3 +- .../UserChannelErrorsAndDiagnosticsTests.cs | 3 +- .../UserChannelTests.cs | 3 +- src/messaging/dotnet/Messaging.sln | 7 ++ .../dotnet/src/Abstractions/IMessageBuffer.cs | 29 ++++++ .../src/Abstractions/IMessagingService.cs | 93 +++++++++++++++++++ .../{Core => Abstractions}/InvokeOptions.cs | 2 +- .../{Core => Abstractions}/MessageContext.cs | 2 +- ...ey.ComposeUI.Messaging.Abstractions.csproj | 9 ++ .../{Core => Abstractions}/PublishOptions.cs | 2 +- .../src/Client/Client/MessageRouterClient.cs | 75 ++++++++++++++- .../ServiceCollectionExtensions.cs | 2 + .../Client/MessageRouterClientExtensions.cs | 10 +- .../dotnet/src/Core/Exceptions/ThrowHelper.cs | 6 ++ .../dotnet/src/Core/IMessageRouter.cs | 81 ++++------------ .../dotnet/src/Core/MessageBuffer.cs | 19 ++-- .../src/Core/MessageBufferJsonExtensions.cs | 5 +- .../dotnet/src/Core/MessageHandler.cs | 4 +- .../src/Core/MessageRouterExtensions.cs | 90 ++++++------------ ...sions.cs => MessageRouterJsonExtension.cs} | 1 + ...ganStanley.ComposeUI.Messaging.Core.csproj | 4 + .../Protocol/Json/MessageBufferConverter.cs | 27 +++--- .../Core/Protocol/Messages/InvokeRequest.cs | 4 +- .../Core/Protocol/Messages/InvokeResponse.cs | 4 +- .../Core/Protocol/Messages/PublishMessage.cs | 4 +- .../Core/Protocol/Messages/TopicMessage.cs | 4 +- src/messaging/dotnet/src/Core/TopicMessage.cs | 6 +- .../Client/MessageRouterClient.Tests.cs | 4 +- .../MessageRouterJsonExtensions.Tests.cs | 1 + .../IntegrationTests/EndToEndTestsBase.cs | 3 +- src/shell/dotnet/Shell/App.xaml.cs | 4 +- 35 files changed, 369 insertions(+), 201 deletions(-) create mode 100644 src/messaging/dotnet/src/Abstractions/IMessageBuffer.cs create mode 100644 src/messaging/dotnet/src/Abstractions/IMessagingService.cs rename src/messaging/dotnet/src/{Core => Abstractions}/InvokeOptions.cs (92%) rename src/messaging/dotnet/src/{Core => Abstractions}/MessageContext.cs (94%) create mode 100644 src/messaging/dotnet/src/Abstractions/MorganStanley.ComposeUI.Messaging.Abstractions.csproj rename src/messaging/dotnet/src/{Core => Abstractions}/PublishOptions.cs (92%) rename src/messaging/dotnet/src/Core/{MessageRouterJsonExtensions.cs => MessageRouterJsonExtension.cs} (97%) diff --git a/src/fdc3/dotnet/DesktopAgent/DesktopAgent.sln b/src/fdc3/dotnet/DesktopAgent/DesktopAgent.sln index 024a2f3a5..511b79401 100644 --- a/src/fdc3/dotnet/DesktopAgent/DesktopAgent.sln +++ b/src/fdc3/dotnet/DesktopAgent/DesktopAgent.sln @@ -29,6 +29,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.App EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.ModuleLoader", "..\..\..\module-loader\dotnet\src\MorganStanley.ComposeUI.ModuleLoader\MorganStanley.ComposeUI.ModuleLoader.csproj", "{4207C810-55CC-433E-BD5B-B9B4D639C7A2}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Abstractions", "..\..\..\messaging\dotnet\src\Abstractions\MorganStanley.ComposeUI.Messaging.Abstractions.csproj", "{4FC1B3E0-6DAC-4868-AD68-08093B2555DF}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -75,6 +77,10 @@ Global {4207C810-55CC-433E-BD5B-B9B4D639C7A2}.Debug|Any CPU.Build.0 = Debug|Any CPU {4207C810-55CC-433E-BD5B-B9B4D639C7A2}.Release|Any CPU.ActiveCfg = Release|Any CPU {4207C810-55CC-433E-BD5B-B9B4D639C7A2}.Release|Any CPU.Build.0 = Release|Any CPU + {4FC1B3E0-6DAC-4868-AD68-08093B2555DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4FC1B3E0-6DAC-4868-AD68-08093B2555DF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4FC1B3E0-6DAC-4868-AD68-08093B2555DF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4FC1B3E0-6DAC-4868-AD68-08093B2555DF}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -90,6 +96,7 @@ Global {2C25BA40-0103-4EB5-B181-98362F563B0F} = {2CE54671-6C98-455E-B2A0-52F8564DDA5D} {50BD1318-FC44-4582-B333-3033076164E7} = {2CE54671-6C98-455E-B2A0-52F8564DDA5D} {4207C810-55CC-433E-BD5B-B9B4D639C7A2} = {2CE54671-6C98-455E-B2A0-52F8564DDA5D} + {4FC1B3E0-6DAC-4868-AD68-08093B2555DF} = {2CE54671-6C98-455E-B2A0-52F8564DDA5D} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {70FE103D-26D8-42D8-88BA-1BECB1F73A18} diff --git a/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/DependencyInjection/ServiceCollectionExtensions.cs b/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/DependencyInjection/ServiceCollectionExtensions.cs index d253a7dd8..643b02277 100644 --- a/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/DependencyInjection/ServiceCollectionExtensions.cs @@ -12,10 +12,11 @@ * and limitations under the License. */ -using Microsoft.Extensions.Hosting; using MorganStanley.ComposeUI.Fdc3.DesktopAgent; using MorganStanley.ComposeUI.Fdc3.DesktopAgent.DependencyInjection; using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Infrastructure.Internal; +using MorganStanley.ComposeUI.Messaging; +using MorganStanley.ComposeUI.Messaging.Abstractions; using MorganStanley.ComposeUI.ModuleLoader; using MorganStanley.ComposeUI.Shell.Fdc3; diff --git a/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/Infrastructure/Internal/Fdc3DesktopAgentMessageRouterService.cs b/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/Infrastructure/Internal/Fdc3DesktopAgentMessageRouterService.cs index fdba69c2f..c9b1d82a5 100644 --- a/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/Infrastructure/Internal/Fdc3DesktopAgentMessageRouterService.cs +++ b/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/Infrastructure/Internal/Fdc3DesktopAgentMessageRouterService.cs @@ -20,15 +20,15 @@ using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Contracts; using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Converters; using MorganStanley.ComposeUI.Fdc3.DesktopAgent.DependencyInjection; -using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Exceptions; using MorganStanley.ComposeUI.Messaging; using Finos.Fdc3; +using MorganStanley.ComposeUI.Messaging.Abstractions; namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent.Infrastructure.Internal; internal class Fdc3DesktopAgentMessageRouterService : IHostedService { - private readonly IMessageRouter _messageRouter; + private readonly IMessagingService _messageRouter; private readonly IFdc3DesktopAgentBridge _desktopAgent; private readonly Fdc3DesktopAgentOptions _options; private readonly ILoggerFactory _loggerFactory; @@ -51,7 +51,7 @@ internal class Fdc3DesktopAgentMessageRouterService : IHostedService public JsonSerializerOptions JsonMessageSerializerOptions => new(_jsonSerializerOptions); public Fdc3DesktopAgentMessageRouterService( - IMessageRouter messageRouter, + IMessagingService messageRouter, IFdc3DesktopAgentBridge desktopAgent, IOptions options, ILoggerFactory? loggerFactory = null) diff --git a/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/UserChannel.cs b/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/UserChannel.cs index 1f886fcf8..433c03e84 100644 --- a/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/UserChannel.cs +++ b/src/fdc3/dotnet/DesktopAgent/src/MorganStanley.ComposeUI.DesktopAgent/UserChannel.cs @@ -13,33 +13,33 @@ */ using System.Collections.Concurrent; -using System.Reactive.Linq; using System.Text.Json; using System.Text.Json.Nodes; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Contracts; using MorganStanley.ComposeUI.Messaging; +using MorganStanley.ComposeUI.Messaging.Abstractions; namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent; internal class UserChannel : IAsyncDisposable { public string Id { get; } - private readonly ConcurrentDictionary _contexts = new ConcurrentDictionary(); - private MessageBuffer? _lastContext = null; + private readonly ConcurrentDictionary _contexts = new(); + private IMessageBuffer? _lastContext = null; private readonly UserChannelTopics _topics; - private readonly IMessageRouter _messageRouter; + private readonly IMessagingService _messagingService; private IAsyncDisposable? _broadcastSubscription; private bool _disposed = false; private readonly ILogger _logger; - public UserChannel(string id, IMessageRouter messageRouter, ILogger? logger) + public UserChannel(string id, IMessagingService messageRouter, ILogger? logger) { Id = id; _topics = Fdc3Topic.UserChannel(id); - _messageRouter = messageRouter; + _messagingService = messageRouter; _logger = (ILogger?) logger ?? NullLogger.Instance; } @@ -50,19 +50,18 @@ public async ValueTask Connect() throw new ObjectDisposedException(nameof(UserChannel)); } - await _messageRouter.ConnectAsync(); + await _messagingService.ConnectAsync(); - var broadcastObserver = AsyncObserver.Create(x => HandleBroadcast(x.Payload)); + var broadcastHandler = new Func(HandleBroadcast); + var broadcastSubscription = _messagingService.SubscribeAsync(_topics.Broadcast, broadcastHandler); - var broadcastSubscribing = _messageRouter.SubscribeAsync(_topics.Broadcast, broadcastObserver); - - await _messageRouter.RegisterServiceAsync(_topics.GetCurrentContext, GetCurrentContext); - _broadcastSubscription = await broadcastSubscribing; + await _messagingService.RegisterServiceAsync(_topics.GetCurrentContext, GetCurrentContext); + _broadcastSubscription = await broadcastSubscription; LogConnected(); } - internal ValueTask HandleBroadcast(MessageBuffer? payloadBuffer) + internal ValueTask HandleBroadcast(IMessageBuffer? payloadBuffer) { if (payloadBuffer == null) { @@ -101,24 +100,24 @@ internal ValueTask HandleBroadcast(MessageBuffer? payloadBuffer) return ValueTask.CompletedTask; } - internal ValueTask GetCurrentContext(string endpoint, MessageBuffer? payloadBuffer, MessageContext? context) + internal ValueTask GetCurrentContext(string endpoint, IMessageBuffer? payloadBuffer, MessageContext? context) { if (payloadBuffer == null) { - return ValueTask.FromResult(_lastContext); + return ValueTask.FromResult(_lastContext); } var payload = payloadBuffer.ReadJson(); if (payload?.ContextType == null) { - return ValueTask.FromResult(_lastContext); + return ValueTask.FromResult(_lastContext); } - if (_contexts.TryGetValue(payload.ContextType, out MessageBuffer? messageBuffer)) + if (_contexts.TryGetValue(payload.ContextType, out IMessageBuffer? messageBuffer)) { - return ValueTask.FromResult(messageBuffer); + return ValueTask.FromResult(messageBuffer); } - return ValueTask.FromResult(null); + return ValueTask.FromResult(null); } public async ValueTask DisposeAsync() @@ -130,7 +129,7 @@ public async ValueTask DisposeAsync() _broadcastSubscription = null; - await _messageRouter.UnregisterServiceAsync(_topics.GetCurrentContext); + await _messagingService.UnregisterServiceAsync(_topics.GetCurrentContext); _disposed = true; } @@ -139,7 +138,7 @@ private void LogConnected() { if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug($"UserChannel {Id} connected to messagerouter with client id {_messageRouter.ClientId}"); + _logger.LogDebug($"UserChannel {Id} connected to messagerouter with client id {_messagingService.ClientId}"); } } @@ -159,7 +158,7 @@ private void LogInvalidPayloadJson() } } - private void LogPayload(MessageBuffer payload) + private void LogPayload(IMessageBuffer payload) { if (_logger.IsEnabled(LogLevel.Debug)) { diff --git a/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/Infrastructure/Internal/Fdc3DesktopAgentMessageRouterService.Tests.cs b/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/Infrastructure/Internal/Fdc3DesktopAgentMessageRouterService.Tests.cs index 671dbf085..c0e7e2159 100644 --- a/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/Infrastructure/Internal/Fdc3DesktopAgentMessageRouterService.Tests.cs +++ b/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/Infrastructure/Internal/Fdc3DesktopAgentMessageRouterService.Tests.cs @@ -23,6 +23,7 @@ using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Infrastructure.Internal; using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests.Helpers; using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests.TestUtils; +using MorganStanley.ComposeUI.Messaging.Abstractions; using MorganStanley.ComposeUI.ModuleLoader; using AppIdentifier = MorganStanley.ComposeUI.Fdc3.DesktopAgent.Protocol.AppIdentifier; using AppIntent = MorganStanley.ComposeUI.Fdc3.DesktopAgent.Protocol.AppIntent; @@ -829,7 +830,7 @@ public async Task AddIntentListener_subscribes() Intent = "intentMetadataCustom", Selected = false, Context = new Context("contextCustom"), - TargetAppIdentifier = new AppIdentifier {AppId = "appId4", InstanceId = targetFdc3InstanceId} + TargetAppIdentifier = new AppIdentifier { AppId = "appId4", InstanceId = targetFdc3InstanceId } }; var raiseIntentResult = await _fdc3.HandleRaiseIntent(raiseIntentRequest, new MessageContext()); diff --git a/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/UserChannelErrorsAndDiagnosticsTests.cs b/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/UserChannelErrorsAndDiagnosticsTests.cs index 35ade79a8..cd72bca7e 100644 --- a/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/UserChannelErrorsAndDiagnosticsTests.cs +++ b/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/UserChannelErrorsAndDiagnosticsTests.cs @@ -13,6 +13,7 @@ */ using Microsoft.Extensions.Logging; +using MorganStanley.ComposeUI.Messaging.Abstractions; namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests; @@ -51,7 +52,7 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except public UserChannelErrorsAndDiagnosticsTests() { _logger = new TestLogger(); - _channel = new UserChannel(TestChannel, new Mock().Object, _logger); + _channel = new UserChannel(TestChannel, new Mock().Object, _logger); } [Fact] diff --git a/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/UserChannelTests.cs b/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/UserChannelTests.cs index a0323ee94..923e99366 100644 --- a/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/UserChannelTests.cs +++ b/src/fdc3/dotnet/DesktopAgent/test/MorganStanley.ComposeUI.DesktopAgent.Tests/UserChannelTests.cs @@ -13,6 +13,7 @@ */ using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Contracts; +using MorganStanley.ComposeUI.Messaging.Abstractions; using Finos.Fdc3.Context; namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests; @@ -20,7 +21,7 @@ namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests; public class UserChannelTests { private const string TestChannel = "testChannel"; - UserChannel _channel = new UserChannel(TestChannel, new Mock().Object, null); + UserChannel _channel = new UserChannel(TestChannel, new Mock().Object, null); UserChannelTopics _topics = new UserChannelTopics(TestChannel); [Theory] diff --git a/src/messaging/dotnet/Messaging.sln b/src/messaging/dotnet/Messaging.sln index f6fa697b4..245eb15cd 100644 --- a/src/messaging/dotnet/Messaging.sln +++ b/src/messaging/dotnet/Messaging.sln @@ -3,6 +3,8 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.0.31903.59 MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MorganStanley.ComposeUI.Messaging.Abstractions", "src\Abstractions\MorganStanley.ComposeUI.Messaging.Abstractions.csproj", "{2F0D6AA5-E922-4EAE-B627-4449A92CCD43}" +EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Core", "src\Core\MorganStanley.ComposeUI.Messaging.Core.csproj", "{FC647551-B014-49A8-817E-F5FE6FEBF452}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Client", "src\Client\MorganStanley.ComposeUI.Messaging.Client.csproj", "{A3156691-0B46-4C3D-B042-E71D9BAF02EC}" @@ -83,6 +85,10 @@ Global {F1B0C089-450E-4B02-881F-F41A8B7F5867}.Debug|Any CPU.Build.0 = Debug|Any CPU {F1B0C089-450E-4B02-881F-F41A8B7F5867}.Release|Any CPU.ActiveCfg = Release|Any CPU {F1B0C089-450E-4B02-881F-F41A8B7F5867}.Release|Any CPU.Build.0 = Release|Any CPU + {2F0D6AA5-E922-4EAE-B627-4449A92CCD43}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2F0D6AA5-E922-4EAE-B627-4449A92CCD43}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2F0D6AA5-E922-4EAE-B627-4449A92CCD43}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2F0D6AA5-E922-4EAE-B627-4449A92CCD43}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -99,6 +105,7 @@ Global {AA63BC7C-9DB8-4062-A689-7FA9689EF6B1} = {78E94277-B0B4-425C-9C47-81E09933FE5B} {CEF78D3F-C645-4471-BAD2-9C538A0CA763} = {78E94277-B0B4-425C-9C47-81E09933FE5B} {F1B0C089-450E-4B02-881F-F41A8B7F5867} = {B7E63957-3C1B-4A16-B530-3EAD0D0C5F07} + {2F0D6AA5-E922-4EAE-B627-4449A92CCD43} = {37DE16A7-F731-4206-B650-8A1D9D066B60} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {38CFF5DF-459A-49E1-8189-2DB429A99B3A} diff --git a/src/messaging/dotnet/src/Abstractions/IMessageBuffer.cs b/src/messaging/dotnet/src/Abstractions/IMessageBuffer.cs new file mode 100644 index 000000000..669357e30 --- /dev/null +++ b/src/messaging/dotnet/src/Abstractions/IMessageBuffer.cs @@ -0,0 +1,29 @@ +// Morgan Stanley makes this available to you under the Apache License, +// Version 2.0 (the "License"). You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0. +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +namespace MorganStanley.ComposeUI.Messaging.Abstractions +{ + public interface IMessageBuffer + { + /// + /// Gets the bytes of the underlying buffer as a + /// + /// + ReadOnlySpan GetSpan(); + + /// + /// Gets the string value of the buffer. + /// + /// + string GetString(); + } +} diff --git a/src/messaging/dotnet/src/Abstractions/IMessagingService.cs b/src/messaging/dotnet/src/Abstractions/IMessagingService.cs new file mode 100644 index 000000000..8d358ee8a --- /dev/null +++ b/src/messaging/dotnet/src/Abstractions/IMessagingService.cs @@ -0,0 +1,93 @@ +// Morgan Stanley makes this available to you under the Apache License, +// Version 2.0 (the "License"). You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0. +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +namespace MorganStanley.ComposeUI.Messaging.Abstractions +{ + public interface IMessagingService : IAsyncDisposable + { + /// + /// Gets the client ID of the current connection. + /// + /// + /// The returned value will be null if the client is not connected. + /// + string? ClientId { get; } + + /// + /// Asynchronously connects to the Message Router server endpoint. + /// + /// + /// + /// + /// Clients don't need to call this method before calling other methods on this type. + /// The client should automatically establish a connection when needed. + /// + ValueTask ConnectAsync(CancellationToken cancellationToken = default); + + /// + /// Gets an observable that represents a topic. + /// + /// + /// + /// + /// + ValueTask SubscribeAsync(string topic, + Func subscriber, + CancellationToken cancellationToken = default); + + /// + /// Publishes a message to a topic. + /// + /// + /// + /// + /// + /// + ValueTask PublishAsync(string topic, + IMessageBuffer? message = null, + PublishOptions optinos = default, + CancellationToken cancellationToken = default); + + /// + /// Registers a service by providing a name and handler. + /// + /// + /// + /// + /// + ValueTask RegisterServiceAsync(string endpoint, + Func> subscriber, + CancellationToken cancellationToken = default); + + /// + /// Removes a service registration. + /// + /// + /// + /// + ValueTask UnregisterServiceAsync(string endpoint, CancellationToken cancellationToken = default); + + /// + /// Invokes a named service. + /// + /// + /// + /// + /// + /// + ValueTask InvokeAsync( + string endpoint, + IMessageBuffer? payload = null, + InvokeOptions options = default, + CancellationToken cancellationToken = default); + } +} diff --git a/src/messaging/dotnet/src/Core/InvokeOptions.cs b/src/messaging/dotnet/src/Abstractions/InvokeOptions.cs similarity index 92% rename from src/messaging/dotnet/src/Core/InvokeOptions.cs rename to src/messaging/dotnet/src/Abstractions/InvokeOptions.cs index a0502a806..40fcc1b6c 100644 --- a/src/messaging/dotnet/src/Core/InvokeOptions.cs +++ b/src/messaging/dotnet/src/Abstractions/InvokeOptions.cs @@ -10,7 +10,7 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. -namespace MorganStanley.ComposeUI.Messaging; +namespace MorganStanley.ComposeUI.Messaging.Abstractions; public readonly record struct InvokeOptions { diff --git a/src/messaging/dotnet/src/Core/MessageContext.cs b/src/messaging/dotnet/src/Abstractions/MessageContext.cs similarity index 94% rename from src/messaging/dotnet/src/Core/MessageContext.cs rename to src/messaging/dotnet/src/Abstractions/MessageContext.cs index f9a5b2f4f..9d39edee1 100644 --- a/src/messaging/dotnet/src/Core/MessageContext.cs +++ b/src/messaging/dotnet/src/Abstractions/MessageContext.cs @@ -10,7 +10,7 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. -namespace MorganStanley.ComposeUI.Messaging; +namespace MorganStanley.ComposeUI.Messaging.Abstractions; /// /// Provides contextual information for a message received from the Message Router. diff --git a/src/messaging/dotnet/src/Abstractions/MorganStanley.ComposeUI.Messaging.Abstractions.csproj b/src/messaging/dotnet/src/Abstractions/MorganStanley.ComposeUI.Messaging.Abstractions.csproj new file mode 100644 index 000000000..132c02c59 --- /dev/null +++ b/src/messaging/dotnet/src/Abstractions/MorganStanley.ComposeUI.Messaging.Abstractions.csproj @@ -0,0 +1,9 @@ + + + + net6.0 + enable + enable + + + diff --git a/src/messaging/dotnet/src/Core/PublishOptions.cs b/src/messaging/dotnet/src/Abstractions/PublishOptions.cs similarity index 92% rename from src/messaging/dotnet/src/Core/PublishOptions.cs rename to src/messaging/dotnet/src/Abstractions/PublishOptions.cs index e8066f57b..50c208f19 100644 --- a/src/messaging/dotnet/src/Core/PublishOptions.cs +++ b/src/messaging/dotnet/src/Abstractions/PublishOptions.cs @@ -10,7 +10,7 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. -namespace MorganStanley.ComposeUI.Messaging; +namespace MorganStanley.ComposeUI.Messaging.Abstractions; public readonly record struct PublishOptions { diff --git a/src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs b/src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs index 5bc35d727..f873294d7 100644 --- a/src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs +++ b/src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs @@ -10,11 +10,15 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using System; using System.Collections.Concurrent; using System.Diagnostics; +using System.Reactive; +using System.Reactive.Linq; using System.Threading.Channels; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using MorganStanley.ComposeUI.Messaging.Abstractions; using MorganStanley.ComposeUI.Messaging.Client.Abstractions; using MorganStanley.ComposeUI.Messaging.Exceptions; using MorganStanley.ComposeUI.Messaging.Instrumentation; @@ -58,7 +62,7 @@ public ValueTask ConnectAsync(CancellationToken cancellationToken = default) public ValueTask SubscribeAsync( string topic, - IAsyncObserver subscriber, + Func subscriber, CancellationToken cancellationToken = default) { Protocol.Topic.Validate(topic); @@ -67,9 +71,20 @@ public ValueTask SubscribeAsync( return SubscribeAsyncCore(GetTopic(topic), subscriber, cancellationToken); } + public ValueTask SubscribeAsync( + string topic, + IAsyncObserver subscriber, + CancellationToken cancellationToken = default) + { + Protocol.Topic.Validate(topic); + CheckState(); + + return SubscribeAsyncCore(GetTopic(topic), subscriber, cancellationToken); + } + public async ValueTask PublishAsync( string topic, - MessageBuffer? payload = null, + IMessageBuffer? payload = null, PublishOptions options = default, CancellationToken cancellationToken = default) { @@ -86,9 +101,9 @@ await SendRequestAsync( cancellationToken); } - public async ValueTask InvokeAsync( + public async ValueTask InvokeAsync( string endpoint, - MessageBuffer? payload = null, + IMessageBuffer? payload = null, InvokeOptions options = default, CancellationToken cancellationToken = default) { @@ -108,6 +123,14 @@ await SendRequestAsync( return response.Payload; } + /// + /// Registers a service by providing a name and handler. + /// + /// + /// + /// + /// + /// public ValueTask RegisterServiceAsync( string endpoint, MessageHandler handler, @@ -132,6 +155,19 @@ public ValueTask RegisterServiceAsync( } } + public ValueTask RegisterServiceAsync( + string endpoint, + Func> subscriber, + CancellationToken cancellationToken = default) + { + MessageHandler handler = async (endpoint, payload, context) => + { + return await subscriber(endpoint, payload, context); + }; + + return RegisterServiceAsync(endpoint, handler, null, cancellationToken); + } + public ValueTask UnregisterServiceAsync(string endpoint, CancellationToken cancellationToken = default) { CheckState(); @@ -606,6 +642,37 @@ private async ValueTask RegisterServiceCore( await SendRequestAsync(request, cancellationToken); } + private async ValueTask SubscribeAsyncCore( + Topic topic, + Func handler, + CancellationToken cancellationToken) + { + var subscriber = AsyncObserver.Create( + async message => + { + if (message?.Payload != null) + { + await handler(message.Payload); + } + else + { + ThrowHelper.MessageOrPayloadNull(); + } + }, + async ex => + { + ThrowHelper.ErrorInObserver(ex); + await Task.CompletedTask; + }, + + async () => + { + await Task.CompletedTask; + }); + + return await SubscribeAsyncCore(topic, subscriber, cancellationToken); + } + private async ValueTask SubscribeAsyncCore( Topic topic, IAsyncObserver subscriber, diff --git a/src/messaging/dotnet/src/Client/DependencyInjection/ServiceCollectionExtensions.cs b/src/messaging/dotnet/src/Client/DependencyInjection/ServiceCollectionExtensions.cs index a2dc38e27..d3302bf3b 100644 --- a/src/messaging/dotnet/src/Client/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/messaging/dotnet/src/Client/DependencyInjection/ServiceCollectionExtensions.cs @@ -11,6 +11,7 @@ // and limitations under the License. using MorganStanley.ComposeUI.Messaging; +using MorganStanley.ComposeUI.Messaging.Abstractions; using MorganStanley.ComposeUI.Messaging.Client; // ReSharper disable once CheckNamespace @@ -35,6 +36,7 @@ public static IServiceCollection AddMessageRouter( { var builder = new MessageRouterBuilder(serviceCollection); builderAction(builder); + serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton( diff --git a/src/messaging/dotnet/src/Client/MessageRouterClientExtensions.cs b/src/messaging/dotnet/src/Client/MessageRouterClientExtensions.cs index a59fa70e5..cc4f0d79d 100644 --- a/src/messaging/dotnet/src/Client/MessageRouterClientExtensions.cs +++ b/src/messaging/dotnet/src/Client/MessageRouterClientExtensions.cs @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using MorganStanley.ComposeUI.Messaging.Abstractions; using MorganStanley.ComposeUI.Messaging.Client; namespace MorganStanley.ComposeUI.Messaging; @@ -37,7 +38,14 @@ public TopicObservable(IMessageRouter messageRouter, string topic) public ValueTask SubscribeAsync(IAsyncObserver observer) { - return _messageRouter.SubscribeAsync(_topic, observer, CancellationToken.None); + Func handler = async messageBuffer => + { + var context = new MessageContext(); + var topicMessage = new TopicMessage(_topic, messageBuffer, context); + await observer.OnNextAsync(topicMessage); + }; + + return _messageRouter.SubscribeAsync(_topic, handler, CancellationToken.None); } private readonly IMessageRouter _messageRouter; diff --git a/src/messaging/dotnet/src/Core/Exceptions/ThrowHelper.cs b/src/messaging/dotnet/src/Core/Exceptions/ThrowHelper.cs index 10be83b57..0c32ed855 100644 --- a/src/messaging/dotnet/src/Core/Exceptions/ThrowHelper.cs +++ b/src/messaging/dotnet/src/Core/Exceptions/ThrowHelper.cs @@ -48,4 +48,10 @@ public static MessageRouterException ConnectionAborted() => public static MessageRouterException ConnectionAborted(Exception innerException) => new(MessageRouterErrors.ConnectionAborted, $"The connection dropped unexpectedly.\n\r{innerException.Message}", innerException); + + public static MessageRouterException MessageOrPayloadNull() => + new(MessageRouterErrors.ConnectionAborted, "The TopicMessage or its payload is null."); + + public static MessageRouterException ErrorInObserver(Exception innerException) => + new(MessageRouterErrors.ConnectionAborted, $"Error in Observer.\n\r{innerException.Message}", innerException); } diff --git a/src/messaging/dotnet/src/Core/IMessageRouter.cs b/src/messaging/dotnet/src/Core/IMessageRouter.cs index 23019c6dd..12a6089c4 100644 --- a/src/messaging/dotnet/src/Core/IMessageRouter.cs +++ b/src/messaging/dotnet/src/Core/IMessageRouter.cs @@ -10,68 +10,15 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using MorganStanley.ComposeUI.Messaging.Abstractions; + namespace MorganStanley.ComposeUI.Messaging; /// /// Message Router client interface. /// -public interface IMessageRouter : IAsyncDisposable -{ - /// - /// Gets the client ID of the current connection. - /// - /// - /// The returned value will be null if the client is not connected. - /// - string? ClientId { get; } - - /// - /// Asynchronously connects to the Message Router server endpoint. - /// - /// - /// - /// - /// Clients don't need to call this method before calling other methods on this type. - /// The client should automatically establish a connection when needed. - /// - ValueTask ConnectAsync(CancellationToken cancellationToken = default); - - /// - /// Gets an observable that represents a topic. - /// - /// - /// - /// - /// - ValueTask SubscribeAsync( - string topic, - IAsyncObserver subscriber, - CancellationToken cancellationToken = default); - - /// - /// Publishes a message to a topic. - /// - /// - /// - /// - /// - /// - ValueTask PublishAsync(string topic, MessageBuffer? payload = null, PublishOptions options = default, CancellationToken cancellationToken = default); - - /// - /// Invokes a named service. - /// - /// - /// - /// - /// - /// - ValueTask InvokeAsync( - string endpoint, - MessageBuffer? payload = null, - InvokeOptions options = default, - CancellationToken cancellationToken = default); - +public interface IMessageRouter : IMessagingService, IAsyncDisposable +{ /// /// Registers a service by providing a name and handler. /// @@ -86,14 +33,6 @@ ValueTask RegisterServiceAsync( EndpointDescriptor? descriptor = null, CancellationToken cancellationToken = default); - /// - /// Removes a service registration. - /// - /// - /// - /// - ValueTask UnregisterServiceAsync(string endpoint, CancellationToken cancellationToken = default); - /// /// Registers an endpoint by providing a name, handler and optional descriptor. /// @@ -108,6 +47,18 @@ ValueTask RegisterEndpointAsync( EndpointDescriptor? descriptor = null, CancellationToken cancellationToken = default); + /// + /// Gets an observable that represents a topic. + /// + /// + /// + /// + /// + ValueTask SubscribeAsync( + string topic, + IAsyncObserver subscriber, + CancellationToken cancellationToken = default); + /// /// Removes an endpoint registration. /// diff --git a/src/messaging/dotnet/src/Core/MessageBuffer.cs b/src/messaging/dotnet/src/Core/MessageBuffer.cs index 4ee512529..96b83d60d 100644 --- a/src/messaging/dotnet/src/Core/MessageBuffer.cs +++ b/src/messaging/dotnet/src/Core/MessageBuffer.cs @@ -15,7 +15,9 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Text; +using System.Text.Json; using CommunityToolkit.HighPerformance.Buffers; +using MorganStanley.ComposeUI.Messaging.Abstractions; namespace MorganStanley.ComposeUI.Messaging; @@ -23,12 +25,8 @@ namespace MorganStanley.ComposeUI.Messaging; /// Represents an UTF8-encoded string buffer that uses pooled memory. /// Instances of this type typically represent message payloads. /// -public sealed class MessageBuffer : IDisposable +public sealed class MessageBuffer : IMessageBuffer, IDisposable { - /// - /// Gets the string value of the buffer. - /// - /// public string GetString() { ThrowIfDisposed(); @@ -36,10 +34,6 @@ public string GetString() return Encoding.GetString(_bytes, 0, _length); } - /// - /// Gets the bytes of the underlying buffer as a - /// - /// public ReadOnlySpan GetSpan() { ThrowIfDisposed(); @@ -344,6 +338,13 @@ private bool TryGetBase64BytesCore(IBufferWriter bufferWriter) return true; } + public T? ReadJson(JsonSerializerOptions? options = null) + { + var reader = new Utf8JsonReader(GetSpan()); + + return JsonSerializer.Deserialize(ref reader, options); + } + ~MessageBuffer() { DisposeCore(); diff --git a/src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs b/src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs index 491bffb99..35b58c771 100644 --- a/src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs +++ b/src/messaging/dotnet/src/Core/MessageBufferJsonExtensions.cs @@ -11,6 +11,7 @@ // and limitations under the License. using System.Text.Json; +using MorganStanley.ComposeUI.Messaging.Abstractions; namespace MorganStanley.ComposeUI.Messaging; @@ -26,7 +27,7 @@ public static class MessageBufferJsonExtensions /// /// /// - public static T? ReadJson(this MessageBuffer buffer, JsonSerializerOptions? options = null) + public static T? ReadJson(this IMessageBuffer buffer, JsonSerializerOptions? options = null) { var reader = new Utf8JsonReader(buffer.GetSpan()); @@ -50,7 +51,7 @@ public static MessageBuffer CreateJson( using var jsonWriter = new Utf8JsonWriter(bufferWriter); JsonSerializer.Serialize(jsonWriter, value, options); jsonWriter.Flush(); - + return MessageBuffer.Create(bufferWriter.WrittenMemory); } } \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/MessageHandler.cs b/src/messaging/dotnet/src/Core/MessageHandler.cs index f7120fb5c..2e8241b13 100644 --- a/src/messaging/dotnet/src/Core/MessageHandler.cs +++ b/src/messaging/dotnet/src/Core/MessageHandler.cs @@ -10,12 +10,14 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using MorganStanley.ComposeUI.Messaging.Abstractions; + namespace MorganStanley.ComposeUI.Messaging; /// /// The delegate type that gets called when an endpoint is invoked. /// -public delegate ValueTask MessageHandler(string endpoint, MessageBuffer? payload, MessageContext context); +public delegate ValueTask MessageHandler(string endpoint, IMessageBuffer? payload, MessageContext context); /// /// The delegate type that gets called when an endpoint is invoked (plain text version). diff --git a/src/messaging/dotnet/src/Core/MessageRouterExtensions.cs b/src/messaging/dotnet/src/Core/MessageRouterExtensions.cs index 6083fc26e..ca4496493 100644 --- a/src/messaging/dotnet/src/Core/MessageRouterExtensions.cs +++ b/src/messaging/dotnet/src/Core/MessageRouterExtensions.cs @@ -10,10 +10,9 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. -using System.Reactive; -using System.Reactive.Linq; using System.Runtime.CompilerServices; using System.Threading.Channels; +using MorganStanley.ComposeUI.Messaging.Abstractions; using MorganStanley.ComposeUI.Messaging.Internal; namespace MorganStanley.ComposeUI.Messaging; @@ -90,26 +89,14 @@ public static ValueTask SubscribeAsync( IObserver observer, CancellationToken cancellationToken = default) { - var innerSubscriber = AsyncObserver.Create( - message => - { - observer.OnNext(message); - - return default; - - }, - exception => - { - observer.OnError(exception); - - return default; - }, - () => - { - observer.OnCompleted(); + Func innerSubscriber = async messageBuffer => + { + MessageContext context = new MessageContext(); + var topicMessage = new TopicMessage(topic, messageBuffer, context); - return default; - }); + observer.OnNext(topicMessage); + await ValueTask.CompletedTask; + }; return Disposable.FromAsyncDisposable( messageRouter.SubscribeAsync(topic, innerSubscriber, cancellationToken)); @@ -130,26 +117,12 @@ public static ValueTask SubscribeAsync( IObserver observer, CancellationToken cancellationToken = default) { - var innerSubscriber = AsyncObserver.Create( - message => - { - observer.OnNext(message.Payload?.GetString()); - - return default; - - }, - exception => - { - observer.OnError(exception); - - return default; - }, - () => - { - observer.OnCompleted(); - - return default; - }); + Func innerSubscriber = async messageBuffer => + { + string? message = messageBuffer?.GetString(); + observer.OnNext(message); + await ValueTask.CompletedTask; + }; return Disposable.FromAsyncDisposable( messageRouter.SubscribeAsync(topic, innerSubscriber, cancellationToken)); @@ -170,10 +143,12 @@ public static ValueTask SubscribeAsync( IAsyncObserver subscriber, CancellationToken cancellationToken = default) { - var innerSubscriber = AsyncObserver.Create( - message => subscriber.OnNextAsync(message.Payload?.GetString()), - subscriber.OnErrorAsync, - subscriber.OnCompletedAsync); + + Func innerSubscriber = async messageBuffer => + { + string? message = messageBuffer?.GetString(); + await subscriber.OnNextAsync(message); + }; return messageRouter.SubscribeAsync(topic, innerSubscriber, cancellationToken); } @@ -192,23 +167,14 @@ public static async IAsyncEnumerable SubscribeAsync( { var channel = Channel.CreateUnbounded(); - await using var subscription = await messageRouter.SubscribeAsync( - topic, - AsyncObserver.Create( - onNextAsync: message => channel.Writer.WriteAsync(message, cancellationToken), - onErrorAsync: exception => - { - channel.Writer.TryComplete(exception); - - return default; - }, - onCompletedAsync: () => - { - channel.Writer.TryComplete(); - - return default; - }), - cancellationToken); + Func handler = async messageBuffer => + { + var context = new MessageContext(); + var topicMessage = new TopicMessage(topic, messageBuffer, context); + await channel.Writer.WriteAsync(topicMessage, cancellationToken); + }; + + await using var subscription = await messageRouter.SubscribeAsync(topic, handler, cancellationToken); await foreach (var message in channel.Reader.ReadAllAsync(cancellationToken).WithCancellation(cancellationToken)) { diff --git a/src/messaging/dotnet/src/Core/MessageRouterJsonExtensions.cs b/src/messaging/dotnet/src/Core/MessageRouterJsonExtension.cs similarity index 97% rename from src/messaging/dotnet/src/Core/MessageRouterJsonExtensions.cs rename to src/messaging/dotnet/src/Core/MessageRouterJsonExtension.cs index b29bf22b5..fee258c45 100644 --- a/src/messaging/dotnet/src/Core/MessageRouterJsonExtensions.cs +++ b/src/messaging/dotnet/src/Core/MessageRouterJsonExtension.cs @@ -11,6 +11,7 @@ // and limitations under the License. using System.Text.Json; +using MorganStanley.ComposeUI.Messaging.Abstractions; namespace MorganStanley.ComposeUI.Messaging; diff --git a/src/messaging/dotnet/src/Core/MorganStanley.ComposeUI.Messaging.Core.csproj b/src/messaging/dotnet/src/Core/MorganStanley.ComposeUI.Messaging.Core.csproj index 7db87fb24..3545d62b1 100644 --- a/src/messaging/dotnet/src/Core/MorganStanley.ComposeUI.Messaging.Core.csproj +++ b/src/messaging/dotnet/src/Core/MorganStanley.ComposeUI.Messaging.Core.csproj @@ -29,4 +29,8 @@ + + + + \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/Protocol/Json/MessageBufferConverter.cs b/src/messaging/dotnet/src/Core/Protocol/Json/MessageBufferConverter.cs index 8b2805848..c929bcf18 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Json/MessageBufferConverter.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Json/MessageBufferConverter.cs @@ -12,37 +12,38 @@ using System.Text.Json; using System.Text.Json.Serialization; +using MorganStanley.ComposeUI.Messaging.Abstractions; namespace MorganStanley.ComposeUI.Messaging.Protocol.Json; /// -/// A JSON converter that reads and writes objects. +/// A JSON converter that reads and writes objects. /// -internal class MessageBufferConverter : JsonConverter +internal class MessageBufferConverter : JsonConverter { - public override MessageBuffer? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + public override IMessageBuffer? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { switch (reader.TokenType) { case JsonTokenType.Null: - return null; + return default; case JsonTokenType.String: - { - var length = reader.HasValueSequence - ? checked((int) reader.ValueSequence.Length) - : reader.ValueSpan.Length; - var buffer = MessageBuffer.GetBuffer(length); - length = reader.CopyString(buffer); + { + var length = reader.HasValueSequence + ? checked((int) reader.ValueSequence.Length) + : reader.ValueSpan.Length; + var buffer = MessageBuffer.GetBuffer(length); + length = reader.CopyString(buffer); - return new MessageBuffer(buffer, length); - } + return new MessageBuffer(buffer, length); + } } throw new JsonException(); } - public override void Write(Utf8JsonWriter writer, MessageBuffer value, JsonSerializerOptions options) + public override void Write(Utf8JsonWriter writer, IMessageBuffer value, JsonSerializerOptions options) { writer.WriteStringValue(value.GetSpan()); } diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/InvokeRequest.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/InvokeRequest.cs index bfed896cd..999e436cd 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/InvokeRequest.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/InvokeRequest.cs @@ -10,13 +10,15 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using MorganStanley.ComposeUI.Messaging.Abstractions; + namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; public sealed class InvokeRequest : AbstractRequest { public override MessageType Type => MessageType.Invoke; public string Endpoint { get; init; } = null!; - public MessageBuffer? Payload { get; init; } + public IMessageBuffer? Payload { get; init; } public string? SourceId { get; init; } public string? CorrelationId { get; init; } } \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/InvokeResponse.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/InvokeResponse.cs index 75688880b..744e438ab 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/InvokeResponse.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/InvokeResponse.cs @@ -10,10 +10,12 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using MorganStanley.ComposeUI.Messaging.Abstractions; + namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; public sealed class InvokeResponse : AbstractResponse { public override MessageType Type => MessageType.InvokeResponse; - public MessageBuffer? Payload { get; init; } + public IMessageBuffer? Payload { get; init; } } \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/PublishMessage.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/PublishMessage.cs index 79ee6ff0d..a9a989d8e 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/PublishMessage.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/PublishMessage.cs @@ -10,12 +10,14 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using MorganStanley.ComposeUI.Messaging.Abstractions; + namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; public sealed class PublishMessage : AbstractRequest { public override MessageType Type => MessageType.Publish; public string Topic { get; init; } = null!; - public MessageBuffer? Payload { get; init; } + public IMessageBuffer? Payload { get; init; } public string? CorrelationId { get; init; } } \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/TopicMessage.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/TopicMessage.cs index 2dd8914bd..ec270bfb1 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/TopicMessage.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/TopicMessage.cs @@ -10,13 +10,15 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using MorganStanley.ComposeUI.Messaging.Abstractions; + namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; public sealed class TopicMessage : Message { public override MessageType Type => MessageType.Topic; public string Topic { get; init; } = null!; - public MessageBuffer? Payload { get; init; } + public IMessageBuffer? Payload { get; init; } public string SourceId { get; init; } = null!; public string? CorrelationId { get; init; } } \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/TopicMessage.cs b/src/messaging/dotnet/src/Core/TopicMessage.cs index 8b7592c0f..c6b394c29 100644 --- a/src/messaging/dotnet/src/Core/TopicMessage.cs +++ b/src/messaging/dotnet/src/Core/TopicMessage.cs @@ -10,6 +10,8 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using MorganStanley.ComposeUI.Messaging.Abstractions; + namespace MorganStanley.ComposeUI.Messaging; /// @@ -24,7 +26,7 @@ public sealed class TopicMessage /// /// /// - public TopicMessage(string topic, MessageBuffer? payload, MessageContext context) + public TopicMessage(string topic, IMessageBuffer? payload, MessageContext context) { Topic = topic; Payload = payload; @@ -40,7 +42,7 @@ public TopicMessage(string topic, MessageBuffer? payload, MessageContext context /// The payload of the message. The format of the message is arbitrary and should /// be defined and documented with the message definition. /// - public MessageBuffer? Payload { get; } + public IMessageBuffer? Payload { get; } /// /// Gets contextual information about the message. diff --git a/src/messaging/dotnet/test/Client.Tests/Client/MessageRouterClient.Tests.cs b/src/messaging/dotnet/test/Client.Tests/Client/MessageRouterClient.Tests.cs index ab99873b7..cee1aa3cf 100644 --- a/src/messaging/dotnet/test/Client.Tests/Client/MessageRouterClient.Tests.cs +++ b/src/messaging/dotnet/test/Client.Tests/Client/MessageRouterClient.Tests.cs @@ -10,11 +10,9 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. -using System.Diagnostics; using System.Linq.Expressions; -using Microsoft.CodeAnalysis.CSharp.Syntax; using Microsoft.Extensions.Logging; -using Microsoft.VisualStudio.TestPlatform.ObjectModel.DataCollection; +using MorganStanley.ComposeUI.Messaging.Abstractions; using MorganStanley.ComposeUI.Messaging.Client.Abstractions; using MorganStanley.ComposeUI.Messaging.Instrumentation; using MorganStanley.ComposeUI.Messaging.Protocol; diff --git a/src/messaging/dotnet/test/Core.Tests/MessageRouterJsonExtensions.Tests.cs b/src/messaging/dotnet/test/Core.Tests/MessageRouterJsonExtensions.Tests.cs index 58bb0448b..d0139e27e 100644 --- a/src/messaging/dotnet/test/Core.Tests/MessageRouterJsonExtensions.Tests.cs +++ b/src/messaging/dotnet/test/Core.Tests/MessageRouterJsonExtensions.Tests.cs @@ -13,6 +13,7 @@ using System.Text.Json; using System.Text.Json.Serialization; using FluentAssertions.Json; +using MorganStanley.ComposeUI.Messaging.Abstractions; using Newtonsoft.Json.Linq; namespace MorganStanley.ComposeUI.Messaging; diff --git a/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs b/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs index 7e02d501d..fa86ccb8b 100644 --- a/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs +++ b/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs @@ -14,6 +14,7 @@ using System.Text.Json; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using MorganStanley.ComposeUI.Messaging.Abstractions; using Nito.AsyncEx; namespace MorganStanley.ComposeUI.Messaging; @@ -73,7 +74,7 @@ public async Task Client_can_invoke_a_registered_service() handlerMock .Setup(_ => _.Invoke("test-service", It.IsAny(), It.IsAny())) - .Returns(new ValueTask(MessageBuffer.Create("test-response"))); + .Returns(new ValueTask(MessageBuffer.Create("test-response"))); await service.RegisterServiceAsync(endpoint: "test-service", handlerMock.Object); diff --git a/src/shell/dotnet/Shell/App.xaml.cs b/src/shell/dotnet/Shell/App.xaml.cs index 4451fccf2..156c20f35 100644 --- a/src/shell/dotnet/Shell/App.xaml.cs +++ b/src/shell/dotnet/Shell/App.xaml.cs @@ -133,8 +133,8 @@ private async Task StartAsync(StartupEventArgs e) ShellVersion = Assembly.GetExecutingAssembly().FullName }; - await _host.Services.GetRequiredService().RegisterServiceAsync("Diagnostics", (e, m, t) => - ValueTask.FromResult(MessageBuffer.Factory.CreateJson(diagnostics))!); + await _host.Services.GetRequiredService().RegisterServiceAsync("Diagnostics", (e, m, t) => + ValueTask.FromResult(MessageBuffer.Factory.CreateJson(diagnostics).GetString())!); await OnHostInitializedAsync();