From 2d1825610141af575c1ac383b12ddc21c787fd59 Mon Sep 17 00:00:00 2001
From: Marton Balassa <7115274+BalassaMarton@users.noreply.github.com>
Date: Mon, 19 Jun 2023 16:31:19 +0200
Subject: [PATCH] Added Messaging.Host packge with in-process connection
implementation
---
src/messaging/dotnet/Messaging.sln | 14 +
.../Client/Client/Abstractions/IConnection.cs | 2 +
...MessageRouterBuilderWebSocketExtensions.cs | 5 +-
.../MessageRouterBuilder.cs | 2 +-
.../Client/MessageRouterClientExtensions.cs | 1 +
.../Client/Internal/InProcessConnection.cs | 64 ++++
...MessageRouterBuilderInProcessExtensions.cs | 35 ++
...ganStanley.ComposeUI.Messaging.Host.csproj | 17 +
...ilder.cs => MessageRouterServerBuilder.cs} | 14 +-
...ollectionMessageRouterServerExtensions.cs} | 6 +-
.../src/Server/Server/MessageRouterServer.cs | 1 +
...RouterServerBuilderWebSocketExtensions.cs} | 11 +-
.../Internal/InProcessConnection.Tests.cs | 66 ++++
.../dotnet/test/Host.Tests/GlobalUsings.cs | 16 +
...nley.ComposeUI.Messaging.Host.Tests.csproj | 14 +
.../IntegrationTests/EndToEndTestsBase.cs | 335 ++++++++++++++++++
.../InProcessEndToEndTests.cs | 33 ++
...omposeUI.Messaging.IntegrationTests.csproj | 1 +
.../WebSocketEndToEndTests.cs | 304 +---------------
src/shell/dotnet/Shell.sln | 7 +
src/shell/dotnet/Shell/App.xaml.cs | 9 +-
src/shell/dotnet/Shell/Shell.csproj | 3 +-
22 files changed, 645 insertions(+), 315 deletions(-)
create mode 100644 src/messaging/dotnet/src/Host/Client/Internal/InProcessConnection.cs
create mode 100644 src/messaging/dotnet/src/Host/Client/Internal/MessageRouterBuilderInProcessExtensions.cs
create mode 100644 src/messaging/dotnet/src/Host/MorganStanley.ComposeUI.Messaging.Host.csproj
rename src/messaging/dotnet/src/Server/DependencyInjection/{MessageRouterBuilder.cs => MessageRouterServerBuilder.cs} (72%)
rename src/messaging/dotnet/src/Server/DependencyInjection/{ServiceCollectionMessageRouterExceptions.cs => ServiceCollectionMessageRouterServerExtensions.cs} (88%)
rename src/messaging/dotnet/src/Server/Server/WebSocket/{MessageRouterBuilderWebSocketExtensions.cs => MessageRouterServerBuilderWebSocketExtensions.cs} (80%)
create mode 100644 src/messaging/dotnet/test/Host.Tests/Client/Internal/InProcessConnection.Tests.cs
create mode 100644 src/messaging/dotnet/test/Host.Tests/GlobalUsings.cs
create mode 100644 src/messaging/dotnet/test/Host.Tests/MorganStanley.ComposeUI.Messaging.Host.Tests.csproj
create mode 100644 src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs
create mode 100644 src/messaging/dotnet/test/IntegrationTests/InProcessEndToEndTests.cs
diff --git a/src/messaging/dotnet/Messaging.sln b/src/messaging/dotnet/Messaging.sln
index 4ea5f1210..e56f5423f 100644
--- a/src/messaging/dotnet/Messaging.sln
+++ b/src/messaging/dotnet/Messaging.sln
@@ -11,6 +11,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Mes
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Server.Tests", "test\Server.Tests\MorganStanley.ComposeUI.Messaging.Server.Tests.csproj", "{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Host", "src\Host\MorganStanley.ComposeUI.Messaging.Host.csproj", "{C53E9D27-790A-450D-8E29-9B40E7D586CB}"
+EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.IntegrationTests", "test\IntegrationTests\MorganStanley.ComposeUI.Messaging.IntegrationTests.csproj", "{684367EE-406C-4727-993D-EF4B6FF5C37C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Core.Tests", "test\Core.Tests\MorganStanley.ComposeUI.Messaging.Core.Tests.csproj", "{CDC30566-1801-40CD-9EBD-2913418FC2F9}"
@@ -29,6 +31,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "lib", "lib", "{B7E63957-3C1
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Testing", "..\..\shared\dotnet\MorganStanley.ComposeUI.Testing\MorganStanley.ComposeUI.Testing.csproj", "{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MorganStanley.ComposeUI.Messaging.Host.Tests", "test\Host.Tests\MorganStanley.ComposeUI.Messaging.Host.Tests.csproj", "{CEF78D3F-C645-4471-BAD2-9C538A0CA763}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -51,6 +55,10 @@ Global
{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C53E9D27-790A-450D-8E29-9B40E7D586CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C53E9D27-790A-450D-8E29-9B40E7D586CB}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C53E9D27-790A-450D-8E29-9B40E7D586CB}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C53E9D27-790A-450D-8E29-9B40E7D586CB}.Release|Any CPU.Build.0 = Release|Any CPU
{684367EE-406C-4727-993D-EF4B6FF5C37C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{684367EE-406C-4727-993D-EF4B6FF5C37C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{684367EE-406C-4727-993D-EF4B6FF5C37C}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -71,6 +79,10 @@ Global
{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE}.Release|Any CPU.Build.0 = Release|Any CPU
+ {CEF78D3F-C645-4471-BAD2-9C538A0CA763}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {CEF78D3F-C645-4471-BAD2-9C538A0CA763}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {CEF78D3F-C645-4471-BAD2-9C538A0CA763}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {CEF78D3F-C645-4471-BAD2-9C538A0CA763}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -80,11 +92,13 @@ Global
{A3156691-0B46-4C3D-B042-E71D9BAF02EC} = {37DE16A7-F731-4206-B650-8A1D9D066B60}
{4AEFD2AA-CF42-401D-A058-E2B4C8E12283} = {37DE16A7-F731-4206-B650-8A1D9D066B60}
{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
+ {C53E9D27-790A-450D-8E29-9B40E7D586CB} = {37DE16A7-F731-4206-B650-8A1D9D066B60}
{684367EE-406C-4727-993D-EF4B6FF5C37C} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{CDC30566-1801-40CD-9EBD-2913418FC2F9} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{29122DF1-0AE3-4101-9D96-4711553C7696} = {B245E9BB-69CC-4457-9FDF-635E742993CB}
{AA63BC7C-9DB8-4062-A689-7FA9689EF6B1} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE} = {B7E63957-3C1B-4A16-B530-3EAD0D0C5F07}
+ {CEF78D3F-C645-4471-BAD2-9C538A0CA763} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {38CFF5DF-459A-49E1-8189-2DB429A99B3A}
diff --git a/src/messaging/dotnet/src/Client/Client/Abstractions/IConnection.cs b/src/messaging/dotnet/src/Client/Client/Abstractions/IConnection.cs
index b972fb9a3..7b238e010 100644
--- a/src/messaging/dotnet/src/Client/Client/Abstractions/IConnection.cs
+++ b/src/messaging/dotnet/src/Client/Client/Abstractions/IConnection.cs
@@ -14,6 +14,8 @@
namespace MorganStanley.ComposeUI.Messaging.Client.Abstractions;
+// TODO: Add IConnectionFactory abstraction to adhere to DI best practices
+
///
/// Represents a connection that can communicate with the server.
///
diff --git a/src/messaging/dotnet/src/Client/Client/WebSocket/MessageRouterBuilderWebSocketExtensions.cs b/src/messaging/dotnet/src/Client/Client/WebSocket/MessageRouterBuilderWebSocketExtensions.cs
index 56d769c9e..ecb29e435 100644
--- a/src/messaging/dotnet/src/Client/Client/WebSocket/MessageRouterBuilderWebSocketExtensions.cs
+++ b/src/messaging/dotnet/src/Client/Client/WebSocket/MessageRouterBuilderWebSocketExtensions.cs
@@ -10,11 +10,12 @@
// or implied. See the License for the specific language governing permissions
// and limitations under the License.
-using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
+using MorganStanley.ComposeUI.Messaging.Client.WebSocket;
-namespace MorganStanley.ComposeUI.Messaging.Client.WebSocket;
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection;
///
/// Static utilities for configuring WebSocket connections.
diff --git a/src/messaging/dotnet/src/Client/DependencyInjection/MessageRouterBuilder.cs b/src/messaging/dotnet/src/Client/DependencyInjection/MessageRouterBuilder.cs
index bffc4f46b..7aef0dcb6 100644
--- a/src/messaging/dotnet/src/Client/DependencyInjection/MessageRouterBuilder.cs
+++ b/src/messaging/dotnet/src/Client/DependencyInjection/MessageRouterBuilder.cs
@@ -27,7 +27,7 @@ internal MessageRouterBuilder(IServiceCollection serviceCollection)
ServiceCollection = serviceCollection;
}
- internal IServiceCollection ServiceCollection { get; }
+ public IServiceCollection ServiceCollection { get; }
internal string? AccessToken { get; set; }
}
\ No newline at end of file
diff --git a/src/messaging/dotnet/src/Client/MessageRouterClientExtensions.cs b/src/messaging/dotnet/src/Client/MessageRouterClientExtensions.cs
index 301db8867..a59fa70e5 100644
--- a/src/messaging/dotnet/src/Client/MessageRouterClientExtensions.cs
+++ b/src/messaging/dotnet/src/Client/MessageRouterClientExtensions.cs
@@ -19,6 +19,7 @@ namespace MorganStanley.ComposeUI.Messaging;
///
public static class MessageRouterClientExtensions
{
+ // TODO: Move this to IMessageRouter - this should be available even if only the Core package is referenced
public static IAsyncObservable Topic(this IMessageRouter messageRouter, string topic)
{
return messageRouter is MessageRouterClient messageRouterClient
diff --git a/src/messaging/dotnet/src/Host/Client/Internal/InProcessConnection.cs b/src/messaging/dotnet/src/Host/Client/Internal/InProcessConnection.cs
new file mode 100644
index 000000000..f65895e10
--- /dev/null
+++ b/src/messaging/dotnet/src/Host/Client/Internal/InProcessConnection.cs
@@ -0,0 +1,64 @@
+// Morgan Stanley makes this available to you under the Apache License,
+// Version 2.0 (the "License"). You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0.
+//
+// See the NOTICE file distributed with this work for additional information
+// regarding copyright ownership. Unless required by applicable law or agreed
+// to in writing, software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions
+// and limitations under the License.
+
+using System.Threading.Channels;
+using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
+using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
+using MorganStanley.ComposeUI.Messaging.Server;
+using MorganStanley.ComposeUI.Messaging.Server.Abstractions;
+
+namespace MorganStanley.ComposeUI.Messaging.Client.Internal;
+
+internal sealed class InProcessConnection : IConnection, IClientConnection
+{
+ public InProcessConnection(IMessageRouterServer server)
+ {
+ _server = server;
+ }
+
+ ValueTask IClientConnection.SendAsync(Message message, CancellationToken cancellationToken = default)
+ {
+ return _serverToClient.Writer.WriteAsync(message, cancellationToken);
+ }
+
+ ValueTask IClientConnection.ReceiveAsync(CancellationToken cancellationToken = default)
+ {
+ return _clientToServer.Reader.ReadAsync(cancellationToken);
+ }
+
+ public ValueTask DisposeAsync()
+ {
+ _clientToServer.Writer.TryComplete();
+ _serverToClient.Writer.TryComplete();
+
+ return ValueTask.CompletedTask;
+ }
+
+ ValueTask IConnection.ConnectAsync(CancellationToken cancellationToken = default)
+ {
+ return _server.ClientConnected(this);
+ }
+
+ ValueTask IConnection.SendAsync(Message message, CancellationToken cancellationToken = default)
+ {
+ return _clientToServer.Writer.WriteAsync(message, cancellationToken);
+ }
+
+ ValueTask IConnection.ReceiveAsync(CancellationToken cancellationToken = default)
+ {
+ return _serverToClient.Reader.ReadAsync(cancellationToken);
+ }
+
+ private readonly IMessageRouterServer _server;
+ private readonly Channel _clientToServer = Channel.CreateUnbounded();
+ private readonly Channel _serverToClient = Channel.CreateUnbounded();
+}
\ No newline at end of file
diff --git a/src/messaging/dotnet/src/Host/Client/Internal/MessageRouterBuilderInProcessExtensions.cs b/src/messaging/dotnet/src/Host/Client/Internal/MessageRouterBuilderInProcessExtensions.cs
new file mode 100644
index 000000000..0add6304d
--- /dev/null
+++ b/src/messaging/dotnet/src/Host/Client/Internal/MessageRouterBuilderInProcessExtensions.cs
@@ -0,0 +1,35 @@
+// Morgan Stanley makes this available to you under the Apache License,
+// Version 2.0 (the "License"). You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0.
+//
+// See the NOTICE file distributed with this work for additional information
+// regarding copyright ownership. Unless required by applicable law or agreed
+// to in writing, software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions
+// and limitations under the License.
+
+using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
+using MorganStanley.ComposeUI.Messaging.Client.Internal;
+
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection;
+
+///
+/// Adds extension methods for configuring an in-process Message Router client.
+///
+public static class MessageRouterBuilderInProcessExtensions
+{
+ ///
+ /// Configures the Message Router to connect to the in-process server.
+ /// The server must be set up using .
+ ///
+ ///
+ public static MessageRouterBuilder UseServer(this MessageRouterBuilder builder)
+ {
+ builder.ServiceCollection.AddTransient();
+
+ return builder;
+ }
+}
\ No newline at end of file
diff --git a/src/messaging/dotnet/src/Host/MorganStanley.ComposeUI.Messaging.Host.csproj b/src/messaging/dotnet/src/Host/MorganStanley.ComposeUI.Messaging.Host.csproj
new file mode 100644
index 000000000..bf9d04edb
--- /dev/null
+++ b/src/messaging/dotnet/src/Host/MorganStanley.ComposeUI.Messaging.Host.csproj
@@ -0,0 +1,17 @@
+
+
+
+ net6.0
+ enable
+ enable
+ True
+ MorganStanley.ComposeUI.Messaging
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/messaging/dotnet/src/Server/DependencyInjection/MessageRouterBuilder.cs b/src/messaging/dotnet/src/Server/DependencyInjection/MessageRouterServerBuilder.cs
similarity index 72%
rename from src/messaging/dotnet/src/Server/DependencyInjection/MessageRouterBuilder.cs
rename to src/messaging/dotnet/src/Server/DependencyInjection/MessageRouterServerBuilder.cs
index 7c8d4cda0..ca648f929 100644
--- a/src/messaging/dotnet/src/Server/DependencyInjection/MessageRouterBuilder.cs
+++ b/src/messaging/dotnet/src/Server/DependencyInjection/MessageRouterServerBuilder.cs
@@ -15,28 +15,28 @@
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection;
-public sealed class MessageRouterBuilder
+public sealed class MessageRouterServerBuilder
{
- public MessageRouterBuilder(IServiceCollection serviceCollection)
+ public MessageRouterServerBuilder(IServiceCollection serviceCollection)
{
ServiceCollection = serviceCollection;
}
- public MessageRouterBuilder UseAccessTokenValidator(IAccessTokenValidator validator)
+ public MessageRouterServerBuilder UseAccessTokenValidator(IAccessTokenValidator validator)
{
ServiceCollection.AddSingleton(validator);
return this;
}
- public MessageRouterBuilder UseAccessTokenValidator(Func factory)
+ public MessageRouterServerBuilder UseAccessTokenValidator(Func factory)
{
ServiceCollection.AddSingleton(factory);
return this;
}
- public MessageRouterBuilder UseAccessTokenValidator(AccessTokenValidatorCallback validatorCallback)
+ public MessageRouterServerBuilder UseAccessTokenValidator(AccessTokenValidatorCallback validatorCallback)
{
ServiceCollection.AddSingleton(
new AccessTokenValidator(
@@ -50,14 +50,14 @@ public MessageRouterBuilder UseAccessTokenValidator(AccessTokenValidatorCallback
return this;
}
- public MessageRouterBuilder UseAccessTokenValidator(AsyncAccessTokenValidatorCallback validatorCallback)
+ public MessageRouterServerBuilder UseAccessTokenValidator(AsyncAccessTokenValidatorCallback validatorCallback)
{
ServiceCollection.AddSingleton(new AccessTokenValidator(validatorCallback));
return this;
}
- public MessageRouterBuilder UseAccessTokenValidator(Func validatorCallback)
+ public MessageRouterServerBuilder UseAccessTokenValidator(Func validatorCallback)
{
ServiceCollection.AddSingleton(
new AccessTokenValidator(
diff --git a/src/messaging/dotnet/src/Server/DependencyInjection/ServiceCollectionMessageRouterExceptions.cs b/src/messaging/dotnet/src/Server/DependencyInjection/ServiceCollectionMessageRouterServerExtensions.cs
similarity index 88%
rename from src/messaging/dotnet/src/Server/DependencyInjection/ServiceCollectionMessageRouterExceptions.cs
rename to src/messaging/dotnet/src/Server/DependencyInjection/ServiceCollectionMessageRouterServerExtensions.cs
index 0010bf081..e1e54a083 100644
--- a/src/messaging/dotnet/src/Server/DependencyInjection/ServiceCollectionMessageRouterExceptions.cs
+++ b/src/messaging/dotnet/src/Server/DependencyInjection/ServiceCollectionMessageRouterServerExtensions.cs
@@ -18,7 +18,7 @@ namespace Microsoft.Extensions.DependencyInjection;
///
/// Contains extension methods for adding the Message Router server to a service collection.
///
-public static class ServiceCollectionMessageRouterExceptions
+public static class ServiceCollectionMessageRouterServerExtensions
{
///
/// Adds and related types to the service collection.
@@ -28,12 +28,12 @@ public static class ServiceCollectionMessageRouterExceptions
///
public static IServiceCollection AddMessageRouterServer(
this IServiceCollection serviceCollection,
- Action builderAction)
+ Action builderAction)
{
serviceCollection.AddSingleton();
serviceCollection.AddSingleton();
- var builder = new MessageRouterBuilder(serviceCollection);
+ var builder = new MessageRouterServerBuilder(serviceCollection);
builderAction(builder);
return serviceCollection;
diff --git a/src/messaging/dotnet/src/Server/Server/MessageRouterServer.cs b/src/messaging/dotnet/src/Server/Server/MessageRouterServer.cs
index fbf2df99f..5c973f4f9 100644
--- a/src/messaging/dotnet/src/Server/Server/MessageRouterServer.cs
+++ b/src/messaging/dotnet/src/Server/Server/MessageRouterServer.cs
@@ -36,6 +36,7 @@ public async ValueTask DisposeAsync()
{
_stopTokenSource.Cancel();
+ // TODO: Don't dispose objects that were created by someone else. Signal disconnection using a dedicated method.
await Task.WhenAll(_clients.Values.Select(client => client.Connection.DisposeAsync().AsTask()));
}
diff --git a/src/messaging/dotnet/src/Server/Server/WebSocket/MessageRouterBuilderWebSocketExtensions.cs b/src/messaging/dotnet/src/Server/Server/WebSocket/MessageRouterServerBuilderWebSocketExtensions.cs
similarity index 80%
rename from src/messaging/dotnet/src/Server/Server/WebSocket/MessageRouterBuilderWebSocketExtensions.cs
rename to src/messaging/dotnet/src/Server/Server/WebSocket/MessageRouterServerBuilderWebSocketExtensions.cs
index 45091f768..460d354f9 100644
--- a/src/messaging/dotnet/src/Server/Server/WebSocket/MessageRouterBuilderWebSocketExtensions.cs
+++ b/src/messaging/dotnet/src/Server/Server/WebSocket/MessageRouterServerBuilderWebSocketExtensions.cs
@@ -10,15 +10,16 @@
// or implied. See the License for the specific language governing permissions
// and limitations under the License.
-using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
+using MorganStanley.ComposeUI.Messaging.Server.WebSocket;
-namespace MorganStanley.ComposeUI.Messaging.Server.WebSocket;
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection;
-public static class MessageRouterBuilderWebSocketExtensions
+public static class MessageRouterServerBuilderWebSocketExtensions
{
- public static MessageRouterBuilder UseWebSockets(
- this MessageRouterBuilder builder,
+ public static MessageRouterServerBuilder UseWebSockets(
+ this MessageRouterServerBuilder builder,
Action? configureOptions = null)
{
if (configureOptions != null)
diff --git a/src/messaging/dotnet/test/Host.Tests/Client/Internal/InProcessConnection.Tests.cs b/src/messaging/dotnet/test/Host.Tests/Client/Internal/InProcessConnection.Tests.cs
new file mode 100644
index 000000000..72c97913a
--- /dev/null
+++ b/src/messaging/dotnet/test/Host.Tests/Client/Internal/InProcessConnection.Tests.cs
@@ -0,0 +1,66 @@
+// Morgan Stanley makes this available to you under the Apache License,
+// Version 2.0 (the "License"). You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0.
+//
+// See the NOTICE file distributed with this work for additional information
+// regarding copyright ownership. Unless required by applicable law or agreed
+// to in writing, software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions
+// and limitations under the License.
+
+using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
+using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
+using MorganStanley.ComposeUI.Messaging.Server;
+using MorganStanley.ComposeUI.Messaging.Server.Abstractions;
+
+namespace MorganStanley.ComposeUI.Messaging.Client.Internal;
+
+public class InProcessConnectionTests
+{
+ [Fact]
+ public async Task Connect_registers_itself_at_the_server()
+ {
+ var server = new Mock();
+ var connection = (IConnection)new InProcessConnection(server.Object);
+
+ await connection.ConnectAsync();
+
+ server.Verify(_ => _.ClientConnected(It.IsAny()),Times.Once());
+ }
+
+ [Fact]
+ public async Task The_client_can_send_messages_to_the_server()
+ {
+ var connection = new InProcessConnection(Mock.Of());
+ var clientSideConnection = (IConnection) connection;
+ var serverSideConnection = (IClientConnection) connection;
+
+ await clientSideConnection.ConnectAsync();
+
+ var message = new ConnectRequest();
+
+ await clientSideConnection.SendAsync(message);
+ var received = await serverSideConnection.ReceiveAsync();
+
+ received.Should().Be(message);
+ }
+
+ [Fact]
+ public async Task The_server_can_send_messages_to_the_client()
+ {
+ var connection = new InProcessConnection(Mock.Of());
+ var clientSideConnection = (IConnection) connection;
+ var serverSideConnection = (IClientConnection) connection;
+
+ await clientSideConnection.ConnectAsync();
+
+ var message = new Protocol.Messages.TopicMessage();
+
+ await serverSideConnection.SendAsync(message);
+ var received = await clientSideConnection.ReceiveAsync();
+
+ received.Should().Be(message);
+ }
+}
\ No newline at end of file
diff --git a/src/messaging/dotnet/test/Host.Tests/GlobalUsings.cs b/src/messaging/dotnet/test/Host.Tests/GlobalUsings.cs
new file mode 100644
index 000000000..e18ed8249
--- /dev/null
+++ b/src/messaging/dotnet/test/Host.Tests/GlobalUsings.cs
@@ -0,0 +1,16 @@
+// Morgan Stanley makes this available to you under the Apache License,
+// Version 2.0 (the "License"). You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0.
+//
+// See the NOTICE file distributed with this work for additional information
+// regarding copyright ownership. Unless required by applicable law or agreed
+// to in writing, software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions
+// and limitations under the License.
+
+global using FluentAssertions;
+global using Xunit;
+global using Moq;
+
diff --git a/src/messaging/dotnet/test/Host.Tests/MorganStanley.ComposeUI.Messaging.Host.Tests.csproj b/src/messaging/dotnet/test/Host.Tests/MorganStanley.ComposeUI.Messaging.Host.Tests.csproj
new file mode 100644
index 000000000..e68ad34d8
--- /dev/null
+++ b/src/messaging/dotnet/test/Host.Tests/MorganStanley.ComposeUI.Messaging.Host.Tests.csproj
@@ -0,0 +1,14 @@
+
+
+
+ net6.0
+ false
+ MorganStanley.ComposeUI.Messaging
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs b/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs
new file mode 100644
index 000000000..98ed46c7f
--- /dev/null
+++ b/src/messaging/dotnet/test/IntegrationTests/EndToEndTestsBase.cs
@@ -0,0 +1,335 @@
+// Morgan Stanley makes this available to you under the Apache License,
+// Version 2.0 (the "License"). You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0.
+//
+// See the NOTICE file distributed with this work for additional information
+// regarding copyright ownership. Unless required by applicable law or agreed
+// to in writing, software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions
+// and limitations under the License.
+
+using System.Reactive.Linq;
+using System.Text.Json;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Nito.AsyncEx;
+using TaskExtensions = MorganStanley.ComposeUI.Testing.TaskExtensions;
+
+namespace MorganStanley.ComposeUI.Messaging;
+
+public abstract class EndToEndTestsBase : IAsyncLifetime
+{
+ [Fact]
+ public async Task Client_can_connect()
+ {
+ await using var client = CreateClient();
+ await client.ConnectAsync();
+ }
+
+ [Fact]
+ public async Task Client_can_subscribe_and_receive_messages()
+ {
+ await using var publisher = CreateClient();
+ await using var subscriber = CreateClient();
+ var observerMock = new Mock>();
+ var receivedMessages = new List();
+ observerMock.Setup(x => x.OnNext(Capture.In(receivedMessages)));
+
+ await subscriber.SubscribeAsync(topic: "test-topic", observerMock.Object);
+ await TaskExtensions.WaitForBackgroundTasksAsync(DefaultTestTimeout);
+
+ var publishedPayload = new TestPayload
+ {
+ IntProperty = 0x10203040,
+ StringProperty = "Compose UI 🔥"
+ };
+
+ await publisher.PublishAsync(
+ topic: "test-topic",
+ MessageBuffer.Create(JsonSerializer.SerializeToUtf8Bytes(publishedPayload)));
+
+ await Task.Delay(
+ 10); // TODO: Investigate why WaitForBackgroundTasksAsync is unreliable in this particular scenario
+
+ var receivedPayload = JsonSerializer.Deserialize(receivedMessages.Single().Payload!.GetSpan());
+
+ receivedPayload.Should().BeEquivalentTo(publishedPayload);
+ }
+
+ [Fact]
+ public async Task Client_can_register_itself_as_a_service()
+ {
+ await using var client = CreateClient();
+ await client.RegisterServiceAsync(endpoint: "test-service", (name, payload, context) => default);
+ await client.UnregisterServiceAsync("test-service");
+ }
+
+ [Fact]
+ public async Task Client_can_invoke_a_registered_service()
+ {
+ await using var service = CreateClient();
+
+ var handlerMock = new Mock();
+
+ handlerMock
+ .Setup(_ => _.Invoke("test-service", It.IsAny(), It.IsAny()))
+ .Returns(new ValueTask(MessageBuffer.Create("test-response")));
+
+ await service.RegisterServiceAsync(endpoint: "test-service", handlerMock.Object);
+
+ await using var client = CreateClient();
+
+ var response = await client.InvokeAsync(endpoint: "test-service", payload: "test-request");
+
+ response.Should().BeEquivalentTo("test-response");
+
+ handlerMock.Verify(
+ _ => _.Invoke(
+ "test-service",
+ It.Is(buf => buf.GetString() == "test-request"),
+ It.IsAny()));
+
+ await service.UnregisterServiceAsync("test-service");
+ }
+
+ [Fact]
+ public async Task Client_can_invoke_another_client_by_id_as_long_as_it_is_registered()
+ {
+ await using var callee = CreateClient();
+ await using var caller = CreateClient();
+
+ var handlerMock = new Mock();
+
+ handlerMock.Setup(_ => _.Invoke(It.IsAny(), It.IsAny(), It.IsAny()))
+ .ReturnsAsync(
+ (string endpoint, MessageBuffer? payload, MessageContext context) =>
+ MessageBuffer.Create("test-response"));
+
+ await callee.RegisterEndpointAsync(endpoint: "test-endpoint", handlerMock.Object);
+
+ var response = await caller.InvokeAsync(
+ endpoint: "test-endpoint",
+ payload: "test-request",
+ new InvokeOptions
+ {
+ Scope = MessageScope.FromClientId(callee.ClientId!)
+ });
+
+ response.Should().BeEquivalentTo("test-response");
+
+ handlerMock.Verify(
+ _ => _.Invoke(
+ "test-endpoint",
+ It.Is(buf => buf.GetString() == "test-request"),
+ It.IsAny()));
+
+ await callee.UnregisterEndpointAsync("test-endpoint");
+
+ await Assert.ThrowsAsync(
+ async () => await caller.InvokeAsync(
+ endpoint: "test-endpoint",
+ payload: "test-request",
+ new InvokeOptions
+ {
+ Scope = MessageScope.FromClientId(callee.ClientId!)
+ }));
+ }
+
+ [Fact]
+ public async Task Subscriber_can_invoke_a_service_without_deadlock()
+ {
+ await using var subscriber = CreateClient();
+ await using var service = CreateClient();
+ await using var publisher = CreateClient();
+ await service.RegisterServiceAsync(endpoint: "test-service", new Mock().Object);
+ var tcs = new TaskCompletionSource();
+
+ await subscriber.SubscribeAsync(
+ topic: "test-topic",
+ AsyncObserver.Create(
+ new Func(
+ async msg =>
+ {
+ await subscriber.InvokeAsync("test-service");
+ tcs.SetResult();
+ })));
+
+ await publisher.PublishAsync("test-topic");
+ await tcs.Task;
+ }
+
+ [Fact]
+ public async Task Subscriber_is_called_sequentially_without_concurrency()
+ {
+ await using var subscriber = CreateClient();
+ await using var publisher = CreateClient();
+ var semaphore = new AsyncSemaphore(1);
+
+ var tcs = new TaskCompletionSource();
+
+ await subscriber.SubscribeAsync(
+ topic: "test-topic",
+ AsyncObserver.Create(
+ async msg =>
+ {
+ using (await semaphore.LockAsync(new CancellationTokenSource(TimeSpan.Zero).Token))
+ {
+ await TaskExtensions.WaitForBackgroundTasksAsync(DefaultTestTimeout);
+ }
+
+ if (msg.Payload?.GetString() == "done")
+ {
+ tcs.SetResult();
+ }
+ }));
+
+ for (var i = 0; i < 10; i++)
+ {
+ await publisher.PublishAsync("test-topic");
+ }
+
+ await publisher.PublishAsync(topic: "test-topic", payload: "done");
+ await tcs.Task;
+ }
+
+ [Fact]
+ public async Task Endpoint_handler_can_invoke_a_service_without_deadlock()
+ {
+ await using var listener = CreateClient();
+ await using var caller = CreateClient();
+ await using var service = CreateClient();
+ var tcs = new TaskCompletionSource();
+
+ await listener.RegisterEndpointAsync(
+ endpoint: "test-endpoint",
+ new MessageHandler(
+ async (endpoint, payload, context) =>
+ {
+ await listener.InvokeAsync("test-service");
+ tcs.SetResult();
+ return null;
+ }));
+
+ await service.RegisterServiceAsync(endpoint: "test-service", new Mock().Object);
+ await caller.InvokeAsync(
+ endpoint: "test-endpoint",
+ options: new InvokeOptions {Scope = MessageScope.FromClientId(listener.ClientId!)});
+ await tcs.Task;
+ }
+
+ [Fact]
+ public async Task Endpoint_handler_can_be_invoked_recursively_without_deadlock()
+ {
+ await using var serviceA = CreateClient();
+ await using var serviceB = CreateClient();
+ var tcs = new TaskCompletionSource();
+
+ await serviceA.RegisterServiceAsync(
+ endpoint: "test-service-a",
+ new MessageHandler(
+ async (endpoint, payload, context) =>
+ {
+ if (payload?.GetString() == "done")
+ {
+ tcs.SetResult();
+ }
+ else
+ {
+ await serviceA.InvokeAsync(endpoint: "test-service-b", payload: "hello");
+ }
+
+ return null;
+ }));
+
+ await serviceB.RegisterServiceAsync(
+ endpoint: "test-service-b",
+ new MessageHandler(
+ async (endpoint, payload, context) =>
+ {
+ await serviceB.InvokeAsync(endpoint: "test-service-a", payload: "done");
+
+ return null;
+ }));
+
+ await serviceB.InvokeAsync("test-service-a");
+
+ await tcs.Task;
+ }
+
+ public async Task InitializeAsync()
+ {
+ IHostBuilder builder = new HostBuilder();
+
+ builder.ConfigureServices(
+ services =>
+ {
+ services.AddMessageRouterServer(
+ server =>
+ {
+ server.UseAccessTokenValidator(
+ (clientId, token) =>
+ {
+ if (token != AccessToken)
+ throw new InvalidOperationException("Invalid access token");
+ });
+ ConfigureServer(server);
+ });
+
+ ConfigureServices(services);
+ });
+
+ _host = builder.Build();
+ await _host.StartAsync();
+ }
+
+ public async Task DisposeAsync()
+ {
+ foreach (var disposable in _cleanup)
+ {
+ if (disposable is IAsyncDisposable asyncDisposable)
+ {
+ await asyncDisposable.DisposeAsync();
+ }
+ else
+ {
+ disposable.Dispose();
+ }
+ }
+
+ await _host.StopAsync();
+ _host.Dispose();
+ }
+
+ public static readonly TimeSpan DefaultTestTimeout = TimeSpan.FromSeconds(1);
+
+ protected IHost Host => _host ?? throw new InvalidOperationException("Host is not initialized yet.");
+
+ protected void AddDisposable(IDisposable disposable)
+ {
+ _cleanup.Add(disposable);
+ }
+
+ protected const string AccessToken = "token";
+
+ protected virtual void ConfigureServices(IServiceCollection services)
+ {
+ // Add any additional service registrations.
+ // Don't call AddMessageRouterServer.
+ }
+
+ protected abstract void ConfigureServer(MessageRouterServerBuilder serverBuilder);
+
+ protected abstract IMessageRouter CreateClient();
+
+ private IHost _host = null!;
+
+ private readonly List _cleanup = new();
+
+ private class TestPayload
+ {
+ public int IntProperty { get; set; }
+ public string StringProperty { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/messaging/dotnet/test/IntegrationTests/InProcessEndToEndTests.cs b/src/messaging/dotnet/test/IntegrationTests/InProcessEndToEndTests.cs
new file mode 100644
index 000000000..bed1c7eb5
--- /dev/null
+++ b/src/messaging/dotnet/test/IntegrationTests/InProcessEndToEndTests.cs
@@ -0,0 +1,33 @@
+// Morgan Stanley makes this available to you under the Apache License,
+// Version 2.0 (the "License"). You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0.
+//
+// See the NOTICE file distributed with this work for additional information
+// regarding copyright ownership. Unless required by applicable law or agreed
+// to in writing, software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions
+// and limitations under the License.
+
+using Microsoft.Extensions.DependencyInjection;
+
+namespace MorganStanley.ComposeUI.Messaging;
+
+public class InProcessEndToEndTests : EndToEndTestsBase
+{
+ protected override IMessageRouter CreateClient()
+ {
+ return Host.Services.GetRequiredService();
+ }
+
+ protected override void ConfigureServer(MessageRouterServerBuilder serverBuilder)
+ {
+ }
+
+ protected override void ConfigureServices(IServiceCollection services)
+ {
+ base.ConfigureServices(services);
+ services.AddMessageRouter(mr => mr.UseServer().UseAccessToken(AccessToken));
+ }
+}
\ No newline at end of file
diff --git a/src/messaging/dotnet/test/IntegrationTests/MorganStanley.ComposeUI.Messaging.IntegrationTests.csproj b/src/messaging/dotnet/test/IntegrationTests/MorganStanley.ComposeUI.Messaging.IntegrationTests.csproj
index dbc145af5..633b7721a 100644
--- a/src/messaging/dotnet/test/IntegrationTests/MorganStanley.ComposeUI.Messaging.IntegrationTests.csproj
+++ b/src/messaging/dotnet/test/IntegrationTests/MorganStanley.ComposeUI.Messaging.IntegrationTests.csproj
@@ -14,6 +14,7 @@
+
diff --git a/src/messaging/dotnet/test/IntegrationTests/WebSocketEndToEndTests.cs b/src/messaging/dotnet/test/IntegrationTests/WebSocketEndToEndTests.cs
index a22c26098..15b7099a7 100644
--- a/src/messaging/dotnet/test/IntegrationTests/WebSocketEndToEndTests.cs
+++ b/src/messaging/dotnet/test/IntegrationTests/WebSocketEndToEndTests.cs
@@ -10,305 +10,24 @@
// or implied. See the License for the specific language governing permissions
// and limitations under the License.
-using System.Reactive;
-using System.Reactive.Linq;
-using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Hosting;
using MorganStanley.ComposeUI.Messaging.Client.WebSocket;
-using MorganStanley.ComposeUI.Messaging.Server.WebSocket;
-using Nito.AsyncEx;
-using TaskExtensions = MorganStanley.ComposeUI.Testing.TaskExtensions;
namespace MorganStanley.ComposeUI.Messaging;
-public class WebSocketEndToEndTests : IAsyncLifetime
+public class WebSocketEndToEndTests : EndToEndTestsBase
{
- [Fact]
- public async Task Client_can_connect()
+ protected override void ConfigureServer(MessageRouterServerBuilder serverBuilder)
{
- await using var client = CreateClient();
- await client.ConnectAsync();
- }
-
- [Fact]
- public async Task Client_can_subscribe_and_receive_messages()
- {
- await using var publisher = CreateClient();
- await using var subscriber = CreateClient();
- var observerMock = new Mock>();
- var receivedMessages = new List();
- observerMock.Setup(x => x.OnNext(Capture.In(receivedMessages)));
-
- await subscriber.SubscribeAsync("test-topic", observerMock.Object);
- await TaskExtensions.WaitForBackgroundTasksAsync(DefaultTestTimeout);
-
- var publishedPayload = new TestPayload
- {
- IntProperty = 0x10203040,
- StringProperty = "Compose UI 🔥"
- };
-
- await publisher.PublishAsync(
- "test-topic",
- MessageBuffer.Create(JsonSerializer.SerializeToUtf8Bytes(publishedPayload)));
-
- await Task.Delay(10); // TODO: Investigate why WaitForBackgroundTasksAsync is unreliable in this particular scenario
-
- var receivedPayload = JsonSerializer.Deserialize(receivedMessages.Single().Payload!.GetSpan());
-
- receivedPayload.Should().BeEquivalentTo(publishedPayload);
- }
-
- [Fact]
- public async Task Client_can_register_itself_as_a_service()
- {
- await using var client = CreateClient();
- await client.RegisterServiceAsync("test-service", (name, payload, context) => default);
- await client.UnregisterServiceAsync("test-service");
- }
-
- [Fact]
- public async Task Client_can_invoke_a_registered_service()
- {
- await using var service = CreateClient();
-
- var handlerMock = new Mock();
-
- handlerMock
- .Setup(_ => _.Invoke("test-service", It.IsAny(), It.IsAny()))
- .Returns(new ValueTask(MessageBuffer.Create("test-response")));
-
- await service.RegisterServiceAsync("test-service", handlerMock.Object);
-
- await using var client = CreateClient();
-
- var response = await client.InvokeAsync("test-service", "test-request");
-
- response.Should().BeEquivalentTo("test-response");
-
- handlerMock.Verify(
- _ => _.Invoke(
- "test-service",
- It.Is(buf => buf.GetString() == "test-request"),
- It.IsAny()));
-
- await service.UnregisterServiceAsync("test-service");
- }
-
- [Fact]
- public async Task Client_can_invoke_another_client_by_id_as_long_as_it_is_registered()
- {
- await using var callee = CreateClient();
- await using var caller = CreateClient();
-
- var handlerMock = new Mock();
-
- handlerMock.Setup(_ => _.Invoke(It.IsAny(), It.IsAny(), It.IsAny()))
- .ReturnsAsync(
- (string endpoint, MessageBuffer? payload, MessageContext context) =>
- MessageBuffer.Create("test-response"));
-
- await callee.RegisterEndpointAsync("test-endpoint", handlerMock.Object);
-
- var response = await caller.InvokeAsync(
- "test-endpoint",
- "test-request",
- new InvokeOptions
+ serverBuilder.UseWebSockets(
+ opt =>
{
- Scope = MessageScope.FromClientId(callee.ClientId!)
+ opt.RootPath = _webSocketUri.AbsolutePath;
+ opt.Port = _webSocketUri.Port;
});
-
- response.Should().BeEquivalentTo("test-response");
-
- handlerMock.Verify(
- _ => _.Invoke(
- "test-endpoint",
- It.Is(buf => buf.GetString() == "test-request"),
- It.IsAny()));
-
- await callee.UnregisterEndpointAsync("test-endpoint");
-
- await Assert.ThrowsAsync(
- async () => await caller.InvokeAsync(
- "test-endpoint",
- "test-request",
- new InvokeOptions
- {
- Scope = MessageScope.FromClientId(callee.ClientId!)
- }));
- }
-
- [Fact]
- public async Task Subscriber_can_invoke_a_service_without_deadlock()
- {
- await using var subscriber = CreateClient();
- await using var service = CreateClient();
- await using var publisher = CreateClient();
- await service.RegisterServiceAsync("test-service", new Mock().Object);
- var tcs = new TaskCompletionSource();
-
- await subscriber.SubscribeAsync(
- "test-topic",
- AsyncObserver.Create(
- new Func(
- async msg =>
- {
- await subscriber.InvokeAsync("test-service");
- tcs.SetResult();
- })));
-
- await publisher.PublishAsync("test-topic");
- await tcs.Task;
- }
-
- [Fact]
- public async Task Subscriber_is_called_sequentially_without_concurrency()
- {
- await using var subscriber = CreateClient();
- await using var publisher = CreateClient();
- var semaphore = new AsyncSemaphore(1);
-
- var tcs = new TaskCompletionSource();
-
- await subscriber.SubscribeAsync(
- "test-topic",
- AsyncObserver.Create(
- async msg =>
- {
- using (await semaphore.LockAsync(new CancellationTokenSource(TimeSpan.Zero).Token))
- {
- await TaskExtensions.WaitForBackgroundTasksAsync(DefaultTestTimeout);
- }
-
- if (msg.Payload?.GetString() == "done")
- {
- tcs.SetResult();
- }
- }));
-
- for (var i = 0; i < 10; i++)
- {
- await publisher.PublishAsync("test-topic");
- }
-
- await publisher.PublishAsync("test-topic", "done");
- await tcs.Task;
- }
-
- [Fact]
- public async Task Endpoint_handler_can_invoke_a_service_without_deadlock()
- {
- await using var listener = CreateClient();
- await using var caller = CreateClient();
- await using var service = CreateClient();
- var tcs = new TaskCompletionSource();
-
- await listener.RegisterEndpointAsync(
- "test-endpoint",
- new MessageHandler(
- async (endpoint, payload, context) =>
- {
- await listener.InvokeAsync("test-service");
- tcs.SetResult();
- return null;
- }));
-
- await service.RegisterServiceAsync("test-service", new Mock().Object);
- await caller.InvokeAsync(
- "test-endpoint",
- options: new InvokeOptions {Scope = MessageScope.FromClientId(listener.ClientId!)});
- await tcs.Task;
}
- [Fact]
- public async Task Endpoint_handler_can_be_invoked_recursively_without_deadlock()
- {
- await using var serviceA = CreateClient();
- await using var serviceB = CreateClient();
- var tcs = new TaskCompletionSource();
-
- await serviceA.RegisterServiceAsync(
- "test-service-a",
- new MessageHandler(
- async (endpoint, payload, context) =>
- {
- if (payload?.GetString() == "done")
- {
- tcs.SetResult();
- }
- else
- {
- await serviceA.InvokeAsync("test-service-b", "hello");
- }
-
- return null;
- }));
-
- await serviceB.RegisterServiceAsync(
- "test-service-b",
- new MessageHandler(
- async (endpoint, payload, context) =>
- {
- await serviceB.InvokeAsync("test-service-a", "done");
-
- return null;
- }));
-
- await serviceB.InvokeAsync("test-service-a");
-
- await tcs.Task;
- }
-
- public async Task InitializeAsync()
- {
- IHostBuilder builder = new HostBuilder();
-
- builder.ConfigureServices(
- services => services.AddMessageRouterServer(
- mr => mr.UseWebSockets(
- opt =>
- {
- opt.RootPath = _webSocketUri.AbsolutePath;
- opt.Port = _webSocketUri.Port;
- })
- .UseAccessTokenValidator(
- (clientId, token) =>
- {
- if (token != AccessToken)
- throw new InvalidOperationException("Invalid access token");
- })));
-
- _host = builder.Build();
- await _host.StartAsync();
- }
-
- public async Task DisposeAsync()
- {
- foreach (var disposable in _cleanup)
- {
- if (disposable is IAsyncDisposable asyncDisposable)
- {
- await asyncDisposable.DisposeAsync();
- }
- else
- {
- disposable.Dispose();
- }
- }
-
- await _host.StopAsync();
- _host.Dispose();
- }
-
- public static readonly TimeSpan DefaultTestTimeout = TimeSpan.FromSeconds(1);
-
- private IHost _host = null!;
- private readonly Uri _webSocketUri = new("ws://localhost:7098/ws");
- private const string AccessToken = "token";
- private readonly List _cleanup = new();
-
- private IMessageRouter CreateClient()
+ protected override IMessageRouter CreateClient()
{
var services = new ServiceCollection()
.AddMessageRouter(
@@ -321,14 +40,11 @@ private IMessageRouter CreateClient()
.UseAccessToken(AccessToken))
.BuildServiceProvider();
- _cleanup.Add(services);
+ AddDisposable(services);
return services.GetRequiredService();
}
- private class TestPayload
- {
- public int IntProperty { get; set; }
- public string StringProperty { get; set; }
- }
+
+ private readonly Uri _webSocketUri = new("ws://localhost:7098/ws");
}
\ No newline at end of file
diff --git a/src/shell/dotnet/Shell.sln b/src/shell/dotnet/Shell.sln
index a2c843ede..d21482d6c 100644
--- a/src/shell/dotnet/Shell.sln
+++ b/src/shell/dotnet/Shell.sln
@@ -15,6 +15,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{BEA71FE5
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shell.Tests", "tests\Shell.Tests\Shell.Tests.csproj", "{70491F36-A27B-4E8C-AFD5-B49480EC1343}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Host", "..\..\messaging\dotnet\src\Host\MorganStanley.ComposeUI.Messaging.Host.csproj", "{DD91C297-06D4-4578-9C75-1BA5D8595AA0}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -37,6 +39,10 @@ Global
{70491F36-A27B-4E8C-AFD5-B49480EC1343}.Debug|Any CPU.Build.0 = Debug|Any CPU
{70491F36-A27B-4E8C-AFD5-B49480EC1343}.Release|Any CPU.ActiveCfg = Release|Any CPU
{70491F36-A27B-4E8C-AFD5-B49480EC1343}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DD91C297-06D4-4578-9C75-1BA5D8595AA0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {DD91C297-06D4-4578-9C75-1BA5D8595AA0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DD91C297-06D4-4578-9C75-1BA5D8595AA0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {DD91C297-06D4-4578-9C75-1BA5D8595AA0}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -45,6 +51,7 @@ Global
{B2AA5676-2776-4BDA-95F8-BE09D7642393} = {E7A2C581-4BF4-47A5-8A11-59B2DEBADCA7}
{F0DFAB55-14F5-4FF6-A12F-37B1745D4551} = {E7A2C581-4BF4-47A5-8A11-59B2DEBADCA7}
{70491F36-A27B-4E8C-AFD5-B49480EC1343} = {BEA71FE5-7EC8-4BEA-819B-C0DB33125D91}
+ {DD91C297-06D4-4578-9C75-1BA5D8595AA0} = {E7A2C581-4BF4-47A5-8A11-59B2DEBADCA7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4C901E6C-4B9A-48C2-AB16-461040DC57B4}
diff --git a/src/shell/dotnet/Shell/App.xaml.cs b/src/shell/dotnet/Shell/App.xaml.cs
index eb4be75eb..77ee4e005 100644
--- a/src/shell/dotnet/Shell/App.xaml.cs
+++ b/src/shell/dotnet/Shell/App.xaml.cs
@@ -80,7 +80,7 @@ private async Task StartAsync(StartupEventArgs e)
config => config.AddJsonFile("appsettings.json", optional: true))
.ConfigureLogging(l => l.AddDebug().SetMinimumLevel(LogLevel.Debug))
.ConfigureServices(ConfigureServices)
- .Build();
+ .Build();
await host.StartAsync();
_host = host;
@@ -99,9 +99,16 @@ private void ConfigureServices(HostBuilderContext context, IServiceCollection se
.UseAccessTokenValidator(
(clientId, token) =>
{
+ // TODO: Assign a separate token for each client and only allow a single connection with each token
if (_messageRouterAccessToken != token)
throw new InvalidOperationException("The provided access token is invalid");
} ));
+
+ services.AddMessageRouter(
+ mr => mr
+ .UseServer()
+ .UseAccessToken(_messageRouterAccessToken));
+
services.Configure(context.Configuration.GetSection("Logging"));
}
diff --git a/src/shell/dotnet/Shell/Shell.csproj b/src/shell/dotnet/Shell/Shell.csproj
index 988dc322d..e56a28cfa 100644
--- a/src/shell/dotnet/Shell/Shell.csproj
+++ b/src/shell/dotnet/Shell/Shell.csproj
@@ -42,8 +42,7 @@
-
-
+
\ No newline at end of file