From 6411f22d400331a122e8b71bb85a8126bd4acc20 Mon Sep 17 00:00:00 2001 From: lilla28 Date: Fri, 23 Feb 2024 18:11:49 +0100 Subject: [PATCH] chore(messaging): Added Publish-, Subscribe- and UnsubscribeResponse types on both .NET and Typescript side, fixed DesktopAgent ModuleNotFound exception when removing module from the dictionary -running the chart and grid example. --- .../src/DesktopAgent/Fdc3DesktopAgent.cs | 13 +- .../dotnet/examples/TestServer/Program.cs | 1 - .../src/Client/Client/MessageRouterClient.cs | 26 ++-- .../src/Core/Protocol/Messages/Message.cs | 3 + .../src/Core/Protocol/Messages/MessageType.cs | 15 ++- .../Core/Protocol/Messages/PublishMessage.cs | 2 +- .../Core/Protocol/Messages/PublishResponse.cs | 18 +++ .../Protocol/Messages/SubscribeMessage.cs | 2 +- .../Protocol/Messages/SubscribeResponse.cs | 18 +++ .../Protocol/Messages/UnsubscribeMessage.cs | 2 +- .../Protocol/Messages/UnsubscribeResponse.cs | 18 +++ ...nStanley.ComposeUI.Messaging.Server.csproj | 3 +- .../src/Server/Server/MessageRouterServer.cs | 116 +++++++++++++++--- .../Client/MessageRouterClient.Tests.cs | 116 +++++++++++++++++- .../Json/JsonMessageSerializer.Tests.cs | 30 ++++- .../IntegrationTests/EndToEndTestsBase.cs | 1 - .../src/client/MessageRouterClient.spec.ts | 93 +++++++++++++- .../src/client/MessageRouterClient.ts | 51 +++++--- .../src/protocol/messages/AbstractResponse.ts | 5 +- .../src/protocol/messages/MessageType.ts | 3 + .../src/protocol/messages/PublishMessage.ts | 4 +- .../src/protocol/messages/PublishResponse.ts | 18 +++ .../src/protocol/messages/SubscribeMessage.ts | 4 +- .../protocol/messages/SubscribeResponse.ts | 18 +++ .../protocol/messages/UnsubscribeMessage.ts | 4 +- .../protocol/messages/UnsubscribeResponse.ts | 18 +++ .../src/protocol/messages/index.ts | 3 + 27 files changed, 537 insertions(+), 68 deletions(-) create mode 100644 src/messaging/dotnet/src/Core/Protocol/Messages/PublishResponse.cs create mode 100644 src/messaging/dotnet/src/Core/Protocol/Messages/SubscribeResponse.cs create mode 100644 src/messaging/dotnet/src/Core/Protocol/Messages/UnsubscribeResponse.cs create mode 100644 src/messaging/js/composeui-messaging-client/src/protocol/messages/PublishResponse.ts create mode 100644 src/messaging/js/composeui-messaging-client/src/protocol/messages/SubscribeResponse.ts create mode 100644 src/messaging/js/composeui-messaging-client/src/protocol/messages/UnsubscribeResponse.ts diff --git a/src/fdc3/dotnet/DesktopAgent/src/DesktopAgent/Fdc3DesktopAgent.cs b/src/fdc3/dotnet/DesktopAgent/src/DesktopAgent/Fdc3DesktopAgent.cs index 817adfd7b..a28cad97d 100644 --- a/src/fdc3/dotnet/DesktopAgent/src/DesktopAgent/Fdc3DesktopAgent.cs +++ b/src/fdc3/dotnet/DesktopAgent/src/DesktopAgent/Fdc3DesktopAgent.cs @@ -695,10 +695,17 @@ private Dictionary GetAppIntentsFromIntentMetadaCollection( private Task RemoveModuleAsync(IModuleInstance instance) { - var fdc3InstanceId = GetFdc3InstanceId(instance); - if (!_runningModules.TryRemove(new(fdc3InstanceId), out _)) + try + { + var fdc3InstanceId = GetFdc3InstanceId(instance); + if (!_runningModules.TryRemove(new(fdc3InstanceId), out _)) + { + _logger.LogError($"Could not remove the closed window with instanceId: {fdc3InstanceId}."); + } + } + catch (Fdc3DesktopAgentException exception) { - _logger.LogError($"Could not remove the closed window with instanceId: {fdc3InstanceId}."); + _logger.LogError(exception, $"Exception thrown while removing module: {instance.Manifest.Id}, {instance.Manifest.Name} from running instances in FDC3DesktopAgent."); } return Task.CompletedTask; diff --git a/src/messaging/dotnet/examples/TestServer/Program.cs b/src/messaging/dotnet/examples/TestServer/Program.cs index 48a16fcf9..802535a42 100644 --- a/src/messaging/dotnet/examples/TestServer/Program.cs +++ b/src/messaging/dotnet/examples/TestServer/Program.cs @@ -3,7 +3,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using MorganStanley.ComposeUI.Messaging.Server.WebSocket; namespace TestServer; diff --git a/src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs b/src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs index a98d61e8d..e53c60d9c 100644 --- a/src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs +++ b/src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs @@ -67,7 +67,7 @@ public ValueTask SubscribeAsync( return SubscribeAsyncCore(GetTopic(topic), subscriber, cancellationToken); } - public ValueTask PublishAsync( + public async ValueTask PublishAsync( string topic, MessageBuffer? payload = null, PublishOptions options = default, @@ -75,9 +75,10 @@ public ValueTask PublishAsync( { Protocol.Topic.Validate(topic); - return SendMessageAsync( + await SendRequestAsync( new PublishMessage { + RequestId = GenerateRequestId(), Topic = topic, Payload = payload, CorrelationId = options.CorrelationId @@ -616,9 +617,10 @@ private async ValueTask SubscribeAsyncCore( try { - await SendMessageAsync( + await SendRequestAsync( new SubscribeMessage { + RequestId = GenerateRequestId(), Topic = topic.Name }, cancellationToken); @@ -797,11 +799,21 @@ async Task CloseTopics() } } - private ValueTask TryUnsubscribe(Topic topic) + private async ValueTask TryUnsubscribe(Topic topic) { - return topic.CanUnsubscribe - ? SendMessageAsync(new UnsubscribeMessage {Topic = topic.Name}, CancellationToken.None) - : default; + var requestId = GenerateRequestId(); + + try + { + if (topic.CanUnsubscribe) + { + await SendRequestAsync(new UnsubscribeMessage { RequestId = requestId, Topic = topic.Name }, CancellationToken.None); + } + } + catch (MessageRouterException exception) + { + _logger.LogWarning(exception, $"Exception thrown while unsubscribing, topic: {topic.Name}, request id: {requestId}."); + } } private void OnConnectStart() diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/Message.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/Message.cs index 82313c6d5..b94d31f8f 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/Message.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/Message.cs @@ -28,8 +28,11 @@ public static Type ResolveMessageType(MessageType messageType) MessageType.Connect => typeof(ConnectRequest), MessageType.ConnectResponse => typeof(ConnectResponse), MessageType.Subscribe => typeof(SubscribeMessage), + MessageType.SubscribeResponse => typeof(SubscribeResponse), MessageType.Unsubscribe => typeof(UnsubscribeMessage), + MessageType.UnsubscribeResponse => typeof(UnsubscribeResponse), MessageType.Publish => typeof(PublishMessage), + MessageType.PublishResponse => typeof(PublishResponse), MessageType.Topic => typeof(TopicMessage), MessageType.Invoke => typeof(InvokeRequest), MessageType.RegisterService => typeof(RegisterServiceRequest), diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/MessageType.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/MessageType.cs index 6d24862aa..963fe0f0e 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/MessageType.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/MessageType.cs @@ -29,21 +29,30 @@ public enum MessageType : int /// Subscribe, - // TODO: SubscribeResponse + /// + /// Server confirms that the client subscribed. + /// + SubscribeResponse, /// /// Client unsubscribes from a topic. /// Unsubscribe, - // TODO: UnsubscribeResponse + /// + /// Server confirms that the client unsubscribed. + /// + UnsubscribeResponse, /// /// Client publishes a message to a topic. /// Publish, - // TODO: PublishResponse + /// + /// Server confirms that the message was published to a topic by the client. + /// + PublishResponse, /// /// Server notifies client of a message from a subscribed topic. diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/PublishMessage.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/PublishMessage.cs index 94553ed50..79ee6ff0d 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/PublishMessage.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/PublishMessage.cs @@ -12,7 +12,7 @@ namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; -public sealed class PublishMessage : Message +public sealed class PublishMessage : AbstractRequest { public override MessageType Type => MessageType.Publish; public string Topic { get; init; } = null!; diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/PublishResponse.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/PublishResponse.cs new file mode 100644 index 000000000..24badcf9f --- /dev/null +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/PublishResponse.cs @@ -0,0 +1,18 @@ +// Morgan Stanley makes this available to you under the Apache License, +// Version 2.0 (the "License"). You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0. +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; + +public class PublishResponse : AbstractResponse +{ + public override MessageType Type => MessageType.PublishResponse; +} \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/SubscribeMessage.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/SubscribeMessage.cs index a5eb4e7c5..d8951ac9a 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/SubscribeMessage.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/SubscribeMessage.cs @@ -12,7 +12,7 @@ namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; -public sealed class SubscribeMessage : Message +public sealed class SubscribeMessage : AbstractRequest { public override MessageType Type => MessageType.Subscribe; public string Topic { get; init; } = null!; diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/SubscribeResponse.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/SubscribeResponse.cs new file mode 100644 index 000000000..f6d0d4a06 --- /dev/null +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/SubscribeResponse.cs @@ -0,0 +1,18 @@ +// Morgan Stanley makes this available to you under the Apache License, +// Version 2.0 (the "License"). You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0. +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; + +public class SubscribeResponse : AbstractResponse +{ + public override MessageType Type => MessageType.SubscribeResponse; +} \ No newline at end of file diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/UnsubscribeMessage.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/UnsubscribeMessage.cs index 825738c3e..16daffe41 100644 --- a/src/messaging/dotnet/src/Core/Protocol/Messages/UnsubscribeMessage.cs +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/UnsubscribeMessage.cs @@ -12,7 +12,7 @@ namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; -public sealed class UnsubscribeMessage : Message +public sealed class UnsubscribeMessage : AbstractRequest { public override MessageType Type => MessageType.Unsubscribe; public string Topic { get; init; } = null!; diff --git a/src/messaging/dotnet/src/Core/Protocol/Messages/UnsubscribeResponse.cs b/src/messaging/dotnet/src/Core/Protocol/Messages/UnsubscribeResponse.cs new file mode 100644 index 000000000..9639f7f3d --- /dev/null +++ b/src/messaging/dotnet/src/Core/Protocol/Messages/UnsubscribeResponse.cs @@ -0,0 +1,18 @@ +// Morgan Stanley makes this available to you under the Apache License, +// Version 2.0 (the "License"). You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0. +// +// See the NOTICE file distributed with this work for additional information +// regarding copyright ownership. Unless required by applicable law or agreed +// to in writing, software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions +// and limitations under the License. + +namespace MorganStanley.ComposeUI.Messaging.Protocol.Messages; + +public class UnsubscribeResponse : AbstractResponse +{ + public override MessageType Type => MessageType.UnsubscribeResponse; +} \ No newline at end of file diff --git a/src/messaging/dotnet/src/Server/MorganStanley.ComposeUI.Messaging.Server.csproj b/src/messaging/dotnet/src/Server/MorganStanley.ComposeUI.Messaging.Server.csproj index 3ac7c0cbe..53ea4d881 100644 --- a/src/messaging/dotnet/src/Server/MorganStanley.ComposeUI.Messaging.Server.csproj +++ b/src/messaging/dotnet/src/Server/MorganStanley.ComposeUI.Messaging.Server.csproj @@ -1,4 +1,4 @@ - + net6.0 @@ -22,7 +22,6 @@ - \ No newline at end of file diff --git a/src/messaging/dotnet/src/Server/Server/MessageRouterServer.cs b/src/messaging/dotnet/src/Server/Server/MessageRouterServer.cs index 09b5642f0..83c741c65 100644 --- a/src/messaging/dotnet/src/Server/Server/MessageRouterServer.cs +++ b/src/messaging/dotnet/src/Server/Server/MessageRouterServer.cs @@ -135,7 +135,7 @@ private async Task HandleInvokeRequest( Client? serviceClient = null; if (!_serviceRegistrations.TryGetValue(message.Endpoint, out var serviceClientId) - || !_clients.TryGetValue(serviceClientId, out serviceClient)) + || !_clients.TryGetValue(serviceClientId, out serviceClient)) { throw ThrowHelper.UnknownEndpoint(message.Endpoint); } @@ -190,12 +190,26 @@ private async Task HandleInvokeResponse( CancellationToken cancellationToken) { if (!_serviceInvocations.TryRemove(message.RequestId, out var invocation)) - return; // TODO: Log warning + { + if (_logger.IsEnabled(LogLevel.Warning)) + { + _logger.LogWarning("ServiceInvocation could not be retrieved. RequestId: {0}", message.RequestId); + } + + return; + } try { if (!_clients.TryGetValue(invocation.CallerClientId, out var caller)) - return; // TODO: Log warning + { + if (_logger.IsEnabled(LogLevel.Warning)) + { + _logger.LogWarning("Client: {0}, could nt be retrieved when handling invoke response. Invocation: {1}, RequestId: {2}", invocation.CallerClientId, invocation, message.RequestId); + } + + return; + } var response = new InvokeResponse { @@ -245,12 +259,32 @@ await Task.WhenAll( await subscriber.Connection.SendAsync(outgoingMessage, cancellationToken); })); + await client.Connection.SendAsync( + new PublishResponse + { + RequestId = message.RequestId + }, + CancellationToken.None); + OnRequestStop(message); } - catch (Exception e) + catch (Exception exception) { - OnRequestStop(message, e); - + try + { + await client.Connection.SendAsync( + new PublishResponse + { + RequestId = message.RequestId, + Error = new Error(exception) + }, + CancellationToken.None); + } + finally + { + OnRequestStop(message, exception); + } + throw; } } @@ -297,7 +331,7 @@ await client.Connection.SendAsync( } } - private Task HandleSubscribeMessage( + private async Task HandleSubscribeMessage( Client client, SubscribeMessage message, CancellationToken cancellationToken) @@ -307,7 +341,7 @@ private Task HandleSubscribeMessage( try { if (!Protocol.Topic.IsValidTopicName(message.Topic)) - return Task.CompletedTask; + return; var topic = _topics.AddOrUpdate( message.Topic, @@ -322,11 +356,36 @@ private Task HandleSubscribeMessage( }, client); - return Task.CompletedTask; + await client.Connection.SendAsync( + new SubscribeResponse + { + RequestId = message.RequestId + }, + CancellationToken.None); + + OnRequestStop(message); } - finally + catch (Exception exception) { - OnRequestStop(message); + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"Exception thrown while handling subscription message: {0}.", exception); + } + + try + { + await client.Connection.SendAsync( + new SubscribeResponse + { + RequestId = message.RequestId, + Error = new Error(exception) + }, + CancellationToken.None); + } + finally + { + OnRequestStop(message); + } } } @@ -354,7 +413,7 @@ await client.Connection.SendAsync( } } - private Task HandleUnsubscribeMessage( + private async Task HandleUnsubscribeMessage( Client client, UnsubscribeMessage message, CancellationToken cancellationToken) @@ -364,7 +423,7 @@ private Task HandleUnsubscribeMessage( try { if (string.IsNullOrWhiteSpace(message.Topic)) - return Task.CompletedTask; + return; var topic = _topics.AddOrUpdate( message.Topic, @@ -378,12 +437,35 @@ private Task HandleUnsubscribeMessage( return topic; }, client); - - return Task.CompletedTask; + + await client.Connection.SendAsync( + new UnsubscribeResponse + { + RequestId = message.RequestId + }, + CancellationToken.None); + + OnRequestStop(message); } - finally + catch (Exception exception) { - OnRequestStop(message); + _logger.LogError( + exception, + "Exception thrown while handling unsubscribe message..."); + + try + { + await client.Connection.SendAsync( + new UnsubscribeResponse + { + RequestId= message.RequestId, + Error = new Error(exception) + }, CancellationToken.None); + } + finally + { + OnRequestStop(message); + } } } diff --git a/src/messaging/dotnet/test/Client.Tests/Client/MessageRouterClient.Tests.cs b/src/messaging/dotnet/test/Client.Tests/Client/MessageRouterClient.Tests.cs index 95e60705c..20aabe92d 100644 --- a/src/messaging/dotnet/test/Client.Tests/Client/MessageRouterClient.Tests.cs +++ b/src/messaging/dotnet/test/Client.Tests/Client/MessageRouterClient.Tests.cs @@ -10,7 +10,11 @@ // or implied. See the License for the specific language governing permissions // and limitations under the License. +using System.Diagnostics; using System.Linq.Expressions; +using Microsoft.CodeAnalysis.CSharp.Syntax; +using Microsoft.Extensions.Logging; +using Microsoft.VisualStudio.TestPlatform.ObjectModel.DataCollection; using MorganStanley.ComposeUI.Messaging.Client.Abstractions; using MorganStanley.ComposeUI.Messaging.Instrumentation; using MorganStanley.ComposeUI.Messaging.Protocol; @@ -63,6 +67,7 @@ public async Task DisposeAsync_disposes_the_connection_exactly_once() public async Task DisposeAsync_calls_OnError_on_active_subscribers() { await _messageRouter.ConnectAsync(); + _connectionMock.Handle(); var subscriber = new Mock>(); await _messageRouter.SubscribeAsync("test-topic", subscriber.Object); @@ -150,6 +155,7 @@ await Assert.ThrowsAsync( public async Task PublishAsync_sends_a_PublishMessage() { await _messageRouter.ConnectAsync(); + _connectionMock.Handle(); _diagnosticObserver.ExpectMessage(); await _messageRouter.PublishAsync( @@ -168,6 +174,32 @@ await _messageRouter.PublishAsync( && msg.CorrelationId == "test-correlation-id"); } + [Fact] + public async Task PublishAsync_throws_if_PublishResponse_contains_Error() + { + await _messageRouter.ConnectAsync(); + + async ValueTask SendPublishResponse(string requestId) + { + await _connectionMock.SendToClient(new PublishResponse() { RequestId = requestId, Error = new Error("testError-publish", null) }); + } + + _connectionMock.Handle( + async request => await SendPublishResponse(request.RequestId)); + + _diagnosticObserver.ExpectMessage(); + + var exception = + await Assert.ThrowsAsync( + async () => await _messageRouter.PublishAsync( + "test-topic", + "test-payload", + new PublishOptions + { CorrelationId = "test-correlation-id" })); + + exception.Name.Should().Be("testError-publish"); + } + [Fact] public async Task SubscribeAsync_throws_a_MessageRouterException_if_the_client_was_previously_closed() { @@ -186,10 +218,11 @@ await Assert.ThrowsAsync( public async Task SubscribeAsync_sends_a_Subscribe_message() { await _messageRouter.ConnectAsync(); + _connectionMock.Handle(); + _diagnosticObserver.ExpectMessage(); await _messageRouter.SubscribeAsync("test-topic", new Mock>().Object); - - _diagnosticObserver.ExpectMessage(); + await WaitForCompletionAsync(); _connectionMock.Expect(msg => msg.Topic == "test-topic"); @@ -199,6 +232,7 @@ public async Task SubscribeAsync_sends_a_Subscribe_message() public async Task SubscribeAsync_only_sends_a_Subscribe_message_on_the_first_subscription() { await _messageRouter.ConnectAsync(); + _connectionMock.Handle(); _diagnosticObserver.ExpectMessage(); await _messageRouter.SubscribeAsync("test-topic", new Mock>().Object); @@ -210,10 +244,33 @@ public async Task SubscribeAsync_only_sends_a_Subscribe_message_on_the_first_sub _connectionMock.Expect(msg => msg.Topic == "test-topic", Times.Once); } + [Fact] + public async Task SubscribeAsync_throws_if_SubscribeResponse_contains_Error() + { + await _messageRouter.ConnectAsync(); + + async ValueTask SendSubscribeResponse(string requestId) + { + await _connectionMock.SendToClient(new SubscribeResponse() { RequestId = requestId, Error = new Error("testError-subscribe", null) }); + } + + _connectionMock.Handle( + async request => await SendSubscribeResponse(request.RequestId)); + + _diagnosticObserver.ExpectMessage(); + + var exception = + await Assert.ThrowsAsync( + async () => await _messageRouter.SubscribeAsync("test-topic", new Mock>().Object)); + + exception.Name.Should().Be("testError-subscribe"); + } + [Fact] public async Task When_Topic_message_received_it_invokes_the_subscribers() { await _messageRouter.ConnectAsync(); + _connectionMock.Handle(); var sub1 = new Mock>(); var sub2 = new Mock>(); @@ -249,6 +306,7 @@ await _connectionMock.SendToClient( public async Task When_Topic_message_received_it_keeps_processing_messages_if_the_subscriber_calls_InvokeAsync() { await _messageRouter.ConnectAsync(); + _connectionMock.Handle(); // Register two subscribers, the first one will invoke a service that completes when // the second subscriber has been called twice. If the first subscriber could block @@ -307,6 +365,7 @@ await _connectionMock.SendToClient( [Fact] public async Task Topic_extension_sends_a_Subscribe_message_on_first_subscription() { + _connectionMock.Handle(); var topic = _messageRouter.Topic("test-topic"); _diagnosticObserver.ExpectMessage(); await using var sub1 = await topic.SubscribeAsync(_ => { }); @@ -323,8 +382,12 @@ public async Task Topic_extension_sends_a_Subscribe_message_on_first_subscriptio [Fact] public async Task Topic_extension_sends_an_Unsubscribe_message_after_the_last_subscription_is_disposed() { + _connectionMock.Handle(); + _connectionMock.Handle(); + var topic = _messageRouter.Topic("test-topic"); _diagnosticObserver.ExpectMessage(); + var sub1 = await topic.SubscribeAsync(_ => { }); var sub2 = await topic.SubscribeAsync(_ => { }); await WaitForCompletionAsync(); @@ -344,6 +407,7 @@ public async Task Topic_extension_sends_an_Unsubscribe_message_after_the_last_su public async Task When_the_last_subscription_is_disposed_it_sends_an_Unsubscribe_message() { await _messageRouter.ConnectAsync(); + _connectionMock.Handle(); var subscriber = new Mock>(); var sub1 = await _messageRouter.SubscribeAsync("test-topic", subscriber.Object); var sub2 = await _messageRouter.SubscribeAsync("test-topic", subscriber.Object); @@ -654,7 +718,7 @@ await _connectionMock.SendToClient( public async Task When_the_connection_closes_it_calls_OnErrorAsync_on_active_subscribers() { await _messageRouter.ConnectAsync(); - + _connectionMock.Handle(); var subscriberCalled = new AsyncManualResetEvent(); var subscriber = new Mock>(); subscriber.Setup(_ => _.OnErrorAsync(It.IsAny())).Callback(() => subscriberCalled.Set()); @@ -688,6 +752,7 @@ public async Task When_the_connection_closes_it_fails_pending_requests() public async Task When_a_subscription_is_disposed_there_will_be_no_further_calls_to_the_subscriber() { await _messageRouter.ConnectAsync(); + _connectionMock.Handle(); IAsyncDisposable subscription = null!; var subscriber = new Mock>(); @@ -716,13 +781,55 @@ await _connectionMock.SendToClient( subscriber.VerifyNoOtherCalls(); } + [Fact] + public async Task It_log_warning_when_a_subscription_is_disposed_and_the_UnsubscribeResponse_contains_Error() + { + await _messageRouter.ConnectAsync(); + + async ValueTask SendUnsubscribeResponse(string requestId) + { + await _connectionMock.SendToClient( + new UnsubscribeResponse { RequestId = requestId, Error = new Error("testError-unsubscribe", null) }); + } + + _connectionMock.Handle(); + _connectionMock.Handle( + request => SendUnsubscribeResponse(request.RequestId)); + + IAsyncDisposable subscription = null!; + var subscriber = new Mock>(); + + _diagnosticObserver.ExpectMessage(); + + subscription = await _messageRouter.SubscribeAsync("test-topic", subscriber.Object); + + await subscription.DisposeAsync(); + + await WaitForCompletionAsync(); + + Thread.Sleep(1); + + _loggerMock + .Verify( + _ => _.Log( + LogLevel.Warning, + It.IsAny(), + It.Is((message, _) => message.ToString()!.Contains("Exception thrown while unsubscribing, topic: test-topic")), + It.IsAny(), + It.IsAny>()), + Times.Once); + } + public MessageRouterClientTests() { _connectionMock = new MockConnection(); _connectionMock.AcceptConnections(); var connectionFactory = new Mock(); connectionFactory.Setup(_ => _.CreateConnection()).Returns(_connectionMock.Object); - _messageRouter = new MessageRouterClient(connectionFactory.Object, new MessageRouterOptions()); + _loggerMock = new Mock>(); + _loggerMock.Setup(_ => _.IsEnabled(It.IsAny())); + + _messageRouter = new MessageRouterClient(connectionFactory.Object, new MessageRouterOptions(), _loggerMock.Object); _diagnosticObserver = new MessageRouterDiagnosticObserver(_messageRouter); } @@ -741,6 +848,7 @@ public Task DisposeAsync() private readonly MockConnection _connectionMock; private readonly MessageRouterClient _messageRouter; private readonly MessageRouterDiagnosticObserver _diagnosticObserver; + private readonly Mock> _loggerMock; private TMessage RegisterRequest(TMessage message) where TMessage: Message { diff --git a/src/messaging/dotnet/test/Core.Tests/Protocol/Json/JsonMessageSerializer.Tests.cs b/src/messaging/dotnet/test/Core.Tests/Protocol/Json/JsonMessageSerializer.Tests.cs index bd9a3db3c..c4a07a543 100644 --- a/src/messaging/dotnet/test/Core.Tests/Protocol/Json/JsonMessageSerializer.Tests.cs +++ b/src/messaging/dotnet/test/Core.Tests/Protocol/Json/JsonMessageSerializer.Tests.cs @@ -36,11 +36,9 @@ public void Deserialize_works_when_the_type_property_is_not_the_first_element() var json = @"{ ""junk1"": ""junkText"", ""topic"": ""a/b/c"", ""type"": ""subscribe"" }"; var messageBytes = Encoding.UTF8.GetBytes(json); var buffer = new ReadOnlySequence(messageBytes); - - + var message = JsonMessageSerializer.DeserializeMessage(ref buffer); - - + message.Should().BeOfType(); ((SubscribeMessage)message).Topic.Should().Be("a/b/c"); } @@ -151,6 +149,30 @@ public SerializeDeserializeTheoryData() { Topic = "testTopic", }); + + Add(new SubscribeResponse()); + + Add( + new SubscribeResponse() + { + Error = new Error("testErrorName", "testErrorMessage") + }); + + Add(new PublishResponse()); + + Add( + new PublishResponse() + { + Error = new Error("testErrorName", "testErrorMessage") + }); + + Add(new UnsubscribeResponse()); + + Add( + new UnsubscribeResponse() + { + Error = new Error("testErrorName", "testErrorMessage") + }); } } } diff --git a/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs b/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs index 871277ce8..7e02d501d 100644 --- a/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs +++ b/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs @@ -15,7 +15,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Nito.AsyncEx; -using TaskExtensions = MorganStanley.ComposeUI.Testing.TaskExtensions; namespace MorganStanley.ComposeUI.Messaging; diff --git a/src/messaging/js/composeui-messaging-client/src/client/MessageRouterClient.spec.ts b/src/messaging/js/composeui-messaging-client/src/client/MessageRouterClient.spec.ts index e0e78e6d8..d515e8881 100644 --- a/src/messaging/js/composeui-messaging-client/src/client/MessageRouterClient.spec.ts +++ b/src/messaging/js/composeui-messaging-client/src/client/MessageRouterClient.spec.ts @@ -22,6 +22,7 @@ import { TopicSubscriber } from "../TopicSubscriber"; import { Connection, OnMessageCallback, OnErrorCallback, OnCloseCallback } from "./Connection"; import { ErrorNames } from "../ErrorNames"; import { MessageRouterError } from "../MessageRouterError"; +import { describe } from "node:test"; describe("MessageRouterClient", () => { @@ -155,6 +156,11 @@ describe("MessageRouterClient", () => { const subscriber: TopicSubscriber = { error: jest.fn() }; + + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); + await client.subscribe("test-topic", subscriber); await client.close(); @@ -184,6 +190,10 @@ describe("MessageRouterClient", () => { const client = new MessageRouterClient(connection, {}); + connection.handle( + "Publish", + msg => connection.sendToClient({ type: "PublishResponse", requestId: msg.requestId })); + await client.publish("test-topic", "test-payload", { correlationId: "test-correlation-id" }); expect(connection.mock.send).toHaveBeenCalledWith( @@ -207,6 +217,10 @@ describe("MessageRouterClient", () => { it("sends a Subscribe message", async () => { const client = new MessageRouterClient(connection, {}); + + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); await client.subscribe("test-topic", { next: () => { } }); @@ -229,6 +243,10 @@ describe("MessageRouterClient", () => { next: jest.fn() }; + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); + await client.subscribe("test-topic", observer); await connection.sendToClient({ @@ -257,6 +275,11 @@ describe("MessageRouterClient", () => { const client = new MessageRouterClient(connection, {}); const subscriber = jest.fn(); + + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); + await client.subscribe("test-topic", subscriber); await connection.sendToClient({ @@ -285,6 +308,11 @@ describe("MessageRouterClient", () => { const client = new MessageRouterClient(connection, {}); const subscriber = jest.fn((msg: TopicMessage) => client.invoke("test-service", msg.payload)); + + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); + await client.subscribe("test-topic", subscriber); await connection.sendToClient({ @@ -329,6 +357,10 @@ describe("MessageRouterClient", () => { next: jest.fn((msg: TopicMessage) => client.invoke("test-service", msg.payload)) }; + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); + await client.subscribe("test-topic", subscriber); await connection.sendToClient({ @@ -609,7 +641,13 @@ describe("MessageRouterClient", () => { const subscriber: TopicSubscriber = { error: jest.fn() }; + + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); + await client.subscribe("test-topic", subscriber); + await new Promise(process.nextTick); connection.raiseClose(); await new Promise(process.nextTick); @@ -638,6 +676,11 @@ describe("MessageRouterClient", () => { const subscriber: TopicSubscriber = { error: jest.fn() }; + + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); + await client.subscribe("test-topic", subscriber); const err = {}; @@ -661,7 +704,55 @@ describe("MessageRouterClient", () => { await expect(invokePromise).rejects.toThrow("Fail"); }); - }) + }); + + describe("when server raises error", () => { + + it("publish fails when PublishResponse contains error", async() => { + const client = new MessageRouterClient(connection, {}); + connection.handle( + "Publish", + msg => connection.sendToClient({ type: "PublishResponse", requestId: msg.requestId, error: new MessageRouterError("testError-publish") })); + + var publishPromise = client.publish("test-topic", "test-payload", { correlationId: "test-correlation-id" }); + + await expect(publishPromise).rejects.toThrowWithName(MessageRouterError, "testError-publish"); + }); + + it("subscribe fails when SubscribeResponse contains error", async() => { + const client = new MessageRouterClient(connection, {}); + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId, error: new MessageRouterError("testError-subscribe") })); + + var subscribePromise = client.subscribe("test-topic", { next: () => { } }); + + await expect(subscribePromise).rejects.toThrowWithName(MessageRouterError, "testError-subscribe"); + }); + + it("dispose logs warning when UnsubscribeResponse contains error", async() => { + const client = new MessageRouterClient(connection, {}); + const consoleWarnMock = jest.spyOn(console, 'warn').mockImplementation(); + connection.handle( + "Subscribe", + msg => connection.sendToClient({ type: "SubscribeResponse", requestId: msg.requestId })); + + connection.handle( + "Unsubscribe", + msg => connection.sendToClient({ type: "UnsubscribeResponse", requestId: msg.requestId, error: new MessageRouterError("testError-unsubscribe") })); + + var subscription = await client.subscribe("test-topic", { next: () => { } }); + await subscription.unsubscribe(); + + // Waiting for the background task to finish. + await new Promise(process.nextTick); + await new Promise(process.nextTick); + + expect(consoleWarnMock).toHaveBeenCalled(); + expect(consoleWarnMock).toHaveBeenCalledWith("Exception thrown while unsubscribing.", new MessageRouterError("testError-unsubscribe")); + consoleWarnMock.mockRestore(); + }); + }); }) type MockHandler = ((msg: TMessage) => Promise); diff --git a/src/messaging/js/composeui-messaging-client/src/client/MessageRouterClient.ts b/src/messaging/js/composeui-messaging-client/src/client/MessageRouterClient.ts index abf53c0d4..86117d024 100644 --- a/src/messaging/js/composeui-messaging-client/src/client/MessageRouterClient.ts +++ b/src/messaging/js/composeui-messaging-client/src/client/MessageRouterClient.ts @@ -83,11 +83,16 @@ export class MessageRouterClient implements MessageRouter { const subscription = topic.subscribe(subscriber); if (needsSubscription) { - await this.sendMessage( - { - type: "Subscribe", - topic: topicName - }); + try { + await this.sendRequest( + { + requestId: this.getRequestId(), + type: "Subscribe", + topic: topicName + }); + } catch (error) { + throw error; + } } return subscription; @@ -96,13 +101,18 @@ export class MessageRouterClient implements MessageRouter { async publish(topic: string, payload?: MessageBuffer, options?: PublishOptions): Promise { await this.checkState(); - return await this.sendMessage( - { - type: "Publish", - topic, - payload, - correlationId: options?.correlationId - }); + try { + await this.sendRequest( + { + type: "Publish", + requestId: this.getRequestId(), + topic, + payload, + correlationId: options?.correlationId + }); + } catch (error) { + throw error; + } } async invoke(endpoint: string, payload?: MessageBuffer, options?: InvokeOptions): Promise { @@ -410,12 +420,17 @@ export class MessageRouterClient implements MessageRouter { if (!topic) return; - await this.sendMessage( - { - type: "Unsubscribe", - topic: topicName - } - ); + try { + await this.sendRequest( + { + requestId: this.getRequestId(), + type: "Unsubscribe", + topic: topicName + } + ); + } catch (error) { + console.warn("Exception thrown while unsubscribing.", error); + } } private getRequestId(): string { diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/AbstractResponse.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/AbstractResponse.ts index 3e0b4d76e..36f74f60f 100644 --- a/src/messaging/js/composeui-messaging-client/src/protocol/messages/AbstractResponse.ts +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/AbstractResponse.ts @@ -22,5 +22,8 @@ export interface AbstractResponse extends Message { export function isResponse(message: Message): message is AbstractResponse { return (message.type === "InvokeResponse" || message.type === "RegisterServiceResponse" - || message.type === "UnregisterServiceResponse"); + || message.type === "UnregisterServiceResponse" + || message.type === "SubscribeResponse" + || message.type === "UnsubscribeResponse" + || message.type === "PublishResponse"); } diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/MessageType.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/MessageType.ts index ca7b6271d..6d9325609 100644 --- a/src/messaging/js/composeui-messaging-client/src/protocol/messages/MessageType.ts +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/MessageType.ts @@ -15,8 +15,11 @@ export type MessageType = ( "Connect" | "ConnectResponse" | "Subscribe" | + "SubscribeResponse" | "Unsubscribe" | + "UnsubscribeResponse" | "Publish" | + "PublishResponse" | "Topic" | "RegisterService" | "RegisterServiceResponse" | diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/PublishMessage.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/PublishMessage.ts index 3b8163aa4..754ba3668 100644 --- a/src/messaging/js/composeui-messaging-client/src/protocol/messages/PublishMessage.ts +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/PublishMessage.ts @@ -11,10 +11,12 @@ * */ +import { AbstractRequest } from "."; import { MessageBuffer } from "../../MessageBuffer"; import { Message } from "./Message"; +import { PublishResponse } from "./PublishResponse"; -export interface PublishMessage extends Message { +export interface PublishMessage extends AbstractRequest { type: "Publish"; topic: string; payload?: MessageBuffer; diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/PublishResponse.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/PublishResponse.ts new file mode 100644 index 000000000..87275d8d4 --- /dev/null +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/PublishResponse.ts @@ -0,0 +1,18 @@ +/* + * Morgan Stanley makes this available to you under the Apache License, + * Version 2.0 (the "License"). You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Unless required by applicable law or agreed + * to in writing, software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions + * and limitations under the License. + * + */ + +import { AbstractResponse } from "./AbstractResponse"; + +export interface PublishResponse extends AbstractResponse { + type: "PublishResponse"; +} \ No newline at end of file diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/SubscribeMessage.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/SubscribeMessage.ts index ba745c9a1..58344d826 100644 --- a/src/messaging/js/composeui-messaging-client/src/protocol/messages/SubscribeMessage.ts +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/SubscribeMessage.ts @@ -11,9 +11,11 @@ * */ +import { AbstractRequest } from "."; import { Message } from "./Message"; +import { SubscribeResponse } from "./SubscribeResponse"; -export interface SubscribeMessage extends Message { +export interface SubscribeMessage extends AbstractRequest { type: "Subscribe"; topic: string; } diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/SubscribeResponse.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/SubscribeResponse.ts new file mode 100644 index 000000000..82dcdb023 --- /dev/null +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/SubscribeResponse.ts @@ -0,0 +1,18 @@ +/* + * Morgan Stanley makes this available to you under the Apache License, + * Version 2.0 (the "License"). You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Unless required by applicable law or agreed + * to in writing, software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions + * and limitations under the License. + * + */ + +import { AbstractResponse } from "./AbstractResponse"; + +export interface SubscribeResponse extends AbstractResponse { + type: "SubscribeResponse"; +} \ No newline at end of file diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/UnsubscribeMessage.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/UnsubscribeMessage.ts index 68b375af0..771c0cae0 100644 --- a/src/messaging/js/composeui-messaging-client/src/protocol/messages/UnsubscribeMessage.ts +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/UnsubscribeMessage.ts @@ -11,9 +11,11 @@ * */ +import { AbstractRequest } from "."; import { Message } from "./Message"; +import { UnsubscribeResponse } from "./UnsubscribeResponse"; -export interface UnsubscribeMessage extends Message { +export interface UnsubscribeMessage extends AbstractRequest { type: "Unsubscribe"; topic: string; } diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/UnsubscribeResponse.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/UnsubscribeResponse.ts new file mode 100644 index 000000000..4f1d3d766 --- /dev/null +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/UnsubscribeResponse.ts @@ -0,0 +1,18 @@ +/* + * Morgan Stanley makes this available to you under the Apache License, + * Version 2.0 (the "License"). You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Unless required by applicable law or agreed + * to in writing, software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions + * and limitations under the License. + * + */ + +import { AbstractResponse } from "./AbstractResponse"; + +export interface UnsubscribeResponse extends AbstractResponse { + type: "UnsubscribeResponse"; +} \ No newline at end of file diff --git a/src/messaging/js/composeui-messaging-client/src/protocol/messages/index.ts b/src/messaging/js/composeui-messaging-client/src/protocol/messages/index.ts index f9d56b242..f9452ae26 100644 --- a/src/messaging/js/composeui-messaging-client/src/protocol/messages/index.ts +++ b/src/messaging/js/composeui-messaging-client/src/protocol/messages/index.ts @@ -20,10 +20,13 @@ export * from "./InvokeResponse"; export * from "./Message"; export * from "./MessageType"; export * from "./PublishMessage"; +export * from "./PublishResponse"; export * from "./RegisterServiceRequest"; export * from "./RegisterServiceResponse"; export * from "./SubscribeMessage"; +export * from "./SubscribeResponse"; export * from "./TopicMessage"; export * from "./UnregisterServiceRequest"; export * from "./UnregisterServiceResponse"; export * from "./UnsubscribeMessage"; +export * from "./UnsubscribeResponse";