Skip to content

Commit

Permalink
Merge pull request #238 from BalassaMarton/messagerouter-in-process
Browse files Browse the repository at this point in the history
In-process Message Router
  • Loading branch information
BalassaMarton authored Jun 20, 2023
2 parents d966c88 + 2d18256 commit 34ad3a0
Show file tree
Hide file tree
Showing 22 changed files with 645 additions and 315 deletions.
14 changes: 14 additions & 0 deletions src/messaging/dotnet/Messaging.sln
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Mes
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Server.Tests", "test\Server.Tests\MorganStanley.ComposeUI.Messaging.Server.Tests.csproj", "{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Host", "src\Host\MorganStanley.ComposeUI.Messaging.Host.csproj", "{C53E9D27-790A-450D-8E29-9B40E7D586CB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.IntegrationTests", "test\IntegrationTests\MorganStanley.ComposeUI.Messaging.IntegrationTests.csproj", "{684367EE-406C-4727-993D-EF4B6FF5C37C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Core.Tests", "test\Core.Tests\MorganStanley.ComposeUI.Messaging.Core.Tests.csproj", "{CDC30566-1801-40CD-9EBD-2913418FC2F9}"
Expand All @@ -29,6 +31,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "lib", "lib", "{B7E63957-3C1
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Testing", "..\..\shared\dotnet\MorganStanley.ComposeUI.Testing\MorganStanley.ComposeUI.Testing.csproj", "{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MorganStanley.ComposeUI.Messaging.Host.Tests", "test\Host.Tests\MorganStanley.ComposeUI.Messaging.Host.Tests.csproj", "{CEF78D3F-C645-4471-BAD2-9C538A0CA763}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -51,6 +55,10 @@ Global
{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3}.Release|Any CPU.Build.0 = Release|Any CPU
{C53E9D27-790A-450D-8E29-9B40E7D586CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C53E9D27-790A-450D-8E29-9B40E7D586CB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C53E9D27-790A-450D-8E29-9B40E7D586CB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C53E9D27-790A-450D-8E29-9B40E7D586CB}.Release|Any CPU.Build.0 = Release|Any CPU
{684367EE-406C-4727-993D-EF4B6FF5C37C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{684367EE-406C-4727-993D-EF4B6FF5C37C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{684367EE-406C-4727-993D-EF4B6FF5C37C}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand All @@ -71,6 +79,10 @@ Global
{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE}.Release|Any CPU.Build.0 = Release|Any CPU
{CEF78D3F-C645-4471-BAD2-9C538A0CA763}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CEF78D3F-C645-4471-BAD2-9C538A0CA763}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CEF78D3F-C645-4471-BAD2-9C538A0CA763}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CEF78D3F-C645-4471-BAD2-9C538A0CA763}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -80,11 +92,13 @@ Global
{A3156691-0B46-4C3D-B042-E71D9BAF02EC} = {37DE16A7-F731-4206-B650-8A1D9D066B60}
{4AEFD2AA-CF42-401D-A058-E2B4C8E12283} = {37DE16A7-F731-4206-B650-8A1D9D066B60}
{C95F8B4A-7E42-43A6-A57E-C3ACB6255EA3} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{C53E9D27-790A-450D-8E29-9B40E7D586CB} = {37DE16A7-F731-4206-B650-8A1D9D066B60}
{684367EE-406C-4727-993D-EF4B6FF5C37C} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{CDC30566-1801-40CD-9EBD-2913418FC2F9} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{29122DF1-0AE3-4101-9D96-4711553C7696} = {B245E9BB-69CC-4457-9FDF-635E742993CB}
{AA63BC7C-9DB8-4062-A689-7FA9689EF6B1} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{AE71CBC4-FD4E-4C66-B894-D7C31DE4D1BE} = {B7E63957-3C1B-4A16-B530-3EAD0D0C5F07}
{CEF78D3F-C645-4471-BAD2-9C538A0CA763} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {38CFF5DF-459A-49E1-8189-2DB429A99B3A}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

namespace MorganStanley.ComposeUI.Messaging.Client.Abstractions;

// TODO: Add IConnectionFactory abstraction to adhere to DI best practices

/// <summary>
/// Represents a connection that can communicate with the server.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Client.WebSocket;

namespace MorganStanley.ComposeUI.Messaging.Client.WebSocket;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection;

/// <summary>
/// Static utilities for configuring WebSocket connections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal MessageRouterBuilder(IServiceCollection serviceCollection)
ServiceCollection = serviceCollection;
}

internal IServiceCollection ServiceCollection { get; }
public IServiceCollection ServiceCollection { get; }

internal string? AccessToken { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace MorganStanley.ComposeUI.Messaging;
/// </summary>
public static class MessageRouterClientExtensions
{
// TODO: Move this to IMessageRouter - this should be available even if only the Core package is referenced
public static IAsyncObservable<TopicMessage> Topic(this IMessageRouter messageRouter, string topic)
{
return messageRouter is MessageRouterClient messageRouterClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using System.Threading.Channels;
using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
using MorganStanley.ComposeUI.Messaging.Server;
using MorganStanley.ComposeUI.Messaging.Server.Abstractions;

namespace MorganStanley.ComposeUI.Messaging.Client.Internal;

internal sealed class InProcessConnection : IConnection, IClientConnection
{
public InProcessConnection(IMessageRouterServer server)
{
_server = server;
}

ValueTask IClientConnection.SendAsync(Message message, CancellationToken cancellationToken = default)
{
return _serverToClient.Writer.WriteAsync(message, cancellationToken);
}

ValueTask<Message> IClientConnection.ReceiveAsync(CancellationToken cancellationToken = default)
{
return _clientToServer.Reader.ReadAsync(cancellationToken);
}

public ValueTask DisposeAsync()
{
_clientToServer.Writer.TryComplete();
_serverToClient.Writer.TryComplete();

return ValueTask.CompletedTask;
}

ValueTask IConnection.ConnectAsync(CancellationToken cancellationToken = default)
{
return _server.ClientConnected(this);
}

ValueTask IConnection.SendAsync(Message message, CancellationToken cancellationToken = default)
{
return _clientToServer.Writer.WriteAsync(message, cancellationToken);
}

ValueTask<Message> IConnection.ReceiveAsync(CancellationToken cancellationToken = default)
{
return _serverToClient.Reader.ReadAsync(cancellationToken);
}

private readonly IMessageRouterServer _server;
private readonly Channel<Message> _clientToServer = Channel.CreateUnbounded<Message>();
private readonly Channel<Message> _serverToClient = Channel.CreateUnbounded<Message>();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Client.Internal;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection;

/// <summary>
/// Adds extension methods for configuring an in-process Message Router client.
/// </summary>
public static class MessageRouterBuilderInProcessExtensions
{
/// <summary>
/// Configures the Message Router to connect to the in-process server.
/// The server must be set up using <see cref="ServiceCollectionMessageRouterServerExtensions.AddMessageRouterServer"/>.
/// </summary>
/// <returns></returns>
public static MessageRouterBuilder UseServer(this MessageRouterBuilder builder)
{
builder.ServiceCollection.AddTransient<IConnection, InProcessConnection>();

return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<RootNamespace>MorganStanley.ComposeUI.Messaging</RootNamespace>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="../Core/MorganStanley.ComposeUI.Messaging.Core.csproj" />
<ProjectReference Include="../Client/MorganStanley.ComposeUI.Messaging.Client.csproj" />
<ProjectReference Include="../Server/MorganStanley.ComposeUI.Messaging.Server.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection;

public sealed class MessageRouterBuilder
public sealed class MessageRouterServerBuilder
{
public MessageRouterBuilder(IServiceCollection serviceCollection)
public MessageRouterServerBuilder(IServiceCollection serviceCollection)
{
ServiceCollection = serviceCollection;
}

public MessageRouterBuilder UseAccessTokenValidator(IAccessTokenValidator validator)
public MessageRouterServerBuilder UseAccessTokenValidator(IAccessTokenValidator validator)
{
ServiceCollection.AddSingleton(validator);

return this;
}

public MessageRouterBuilder UseAccessTokenValidator(Func<IServiceProvider, IAccessTokenValidator> factory)
public MessageRouterServerBuilder UseAccessTokenValidator(Func<IServiceProvider, IAccessTokenValidator> factory)
{
ServiceCollection.AddSingleton(factory);

return this;
}

public MessageRouterBuilder UseAccessTokenValidator(AccessTokenValidatorCallback validatorCallback)
public MessageRouterServerBuilder UseAccessTokenValidator(AccessTokenValidatorCallback validatorCallback)
{
ServiceCollection.AddSingleton<IAccessTokenValidator>(
new AccessTokenValidator(
Expand All @@ -50,14 +50,14 @@ public MessageRouterBuilder UseAccessTokenValidator(AccessTokenValidatorCallback
return this;
}

public MessageRouterBuilder UseAccessTokenValidator(AsyncAccessTokenValidatorCallback validatorCallback)
public MessageRouterServerBuilder UseAccessTokenValidator(AsyncAccessTokenValidatorCallback validatorCallback)
{
ServiceCollection.AddSingleton<IAccessTokenValidator>(new AccessTokenValidator(validatorCallback));

return this;
}

public MessageRouterBuilder UseAccessTokenValidator(Func<string, string?, Task> validatorCallback)
public MessageRouterServerBuilder UseAccessTokenValidator(Func<string, string?, Task> validatorCallback)
{
ServiceCollection.AddSingleton<IAccessTokenValidator>(
new AccessTokenValidator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Microsoft.Extensions.DependencyInjection;
/// <summary>
/// Contains extension methods for adding the Message Router server to a service collection.
/// </summary>
public static class ServiceCollectionMessageRouterExceptions
public static class ServiceCollectionMessageRouterServerExtensions
{
/// <summary>
/// Adds <see cref="IMessageRouterServer"/> and related types to the service collection.
Expand All @@ -28,12 +28,12 @@ public static class ServiceCollectionMessageRouterExceptions
/// <returns></returns>
public static IServiceCollection AddMessageRouterServer(
this IServiceCollection serviceCollection,
Action<MessageRouterBuilder> builderAction)
Action<MessageRouterServerBuilder> builderAction)
{
serviceCollection.AddSingleton<IMessageRouterServer, MessageRouterServer>();
serviceCollection.AddSingleton<MessageRouterServerDependencies>();

var builder = new MessageRouterBuilder(serviceCollection);
var builder = new MessageRouterServerBuilder(serviceCollection);
builderAction(builder);

return serviceCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public async ValueTask DisposeAsync()
{
_stopTokenSource.Cancel();

// TODO: Don't dispose objects that were created by someone else. Signal disconnection using a dedicated method.
await Task.WhenAll(_clients.Values.Select(client => client.Connection.DisposeAsync().AsTask()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MorganStanley.ComposeUI.Messaging.Server.WebSocket;

namespace MorganStanley.ComposeUI.Messaging.Server.WebSocket;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection;

public static class MessageRouterBuilderWebSocketExtensions
public static class MessageRouterServerBuilderWebSocketExtensions
{
public static MessageRouterBuilder UseWebSockets(
this MessageRouterBuilder builder,
public static MessageRouterServerBuilder UseWebSockets(
this MessageRouterServerBuilder builder,
Action<MessageRouterWebSocketServerOptions>? configureOptions = null)
{
if (configureOptions != null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
using MorganStanley.ComposeUI.Messaging.Server;
using MorganStanley.ComposeUI.Messaging.Server.Abstractions;

namespace MorganStanley.ComposeUI.Messaging.Client.Internal;

public class InProcessConnectionTests
{
[Fact]
public async Task Connect_registers_itself_at_the_server()
{
var server = new Mock<IMessageRouterServer>();
var connection = (IConnection)new InProcessConnection(server.Object);

await connection.ConnectAsync();

server.Verify(_ => _.ClientConnected(It.IsAny<IClientConnection>()),Times.Once());
}

[Fact]
public async Task The_client_can_send_messages_to_the_server()
{
var connection = new InProcessConnection(Mock.Of<IMessageRouterServer>());
var clientSideConnection = (IConnection) connection;
var serverSideConnection = (IClientConnection) connection;

await clientSideConnection.ConnectAsync();

var message = new ConnectRequest();

await clientSideConnection.SendAsync(message);
var received = await serverSideConnection.ReceiveAsync();

received.Should().Be(message);
}

[Fact]
public async Task The_server_can_send_messages_to_the_client()
{
var connection = new InProcessConnection(Mock.Of<IMessageRouterServer>());
var clientSideConnection = (IConnection) connection;
var serverSideConnection = (IClientConnection) connection;

await clientSideConnection.ConnectAsync();

var message = new Protocol.Messages.TopicMessage();

await serverSideConnection.SendAsync(message);
var received = await clientSideConnection.ReceiveAsync();

received.Should().Be(message);
}
}
Loading

0 comments on commit 34ad3a0

Please sign in to comment.