Skip to content

Commit

Permalink
Merge pull request #239 from BalassaMarton/messagerouter-connectionfa…
Browse files Browse the repository at this point in the history
…ctory

Messaging: Added IConnectionFactory for correctly implementing disposable pattern
  • Loading branch information
BalassaMarton authored Jun 20, 2023
2 parents 34ad3a0 + fd222c3 commit e0cd1df
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/messaging/dotnet/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<NoWarn>CS1066;CS1591</NoWarn>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

namespace MorganStanley.ComposeUI.Messaging.Client.Abstractions;

// TODO: Add IConnectionFactory abstraction to adhere to DI best practices

/// <summary>
/// Represents a connection that can communicate with the server.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

namespace MorganStanley.ComposeUI.Messaging.Client.Abstractions;

/// <summary>
/// Creates instances of <see cref="IConnection"/>
/// </summary>
public interface IConnectionFactory
{
/// <summary>
/// Creates a new <see cref="IConnection"/> instance.
/// </summary>
/// <returns></returns>
IConnection CreateConnection();
}
4 changes: 2 additions & 2 deletions src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ namespace MorganStanley.ComposeUI.Messaging.Client;
internal sealed class MessageRouterClient : IMessageRouter
{
public MessageRouterClient(
IConnection connection,
IConnectionFactory connectionFactory,
MessageRouterOptions options,
ILogger<MessageRouterClient>? logger = null)
{
_connection = connection;
_connection = connectionFactory.CreateConnection();
_options = options;
_logger = logger ?? NullLogger<MessageRouterClient>.Instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static MessageRouterBuilder UseWebSocket(
MessageRouterWebSocketOptions options)
{
builder.ServiceCollection.AddSingleton<IOptions<MessageRouterWebSocketOptions>>(options);
builder.ServiceCollection.AddTransient<IConnection, WebSocketConnection>();
builder.ServiceCollection.AddSingleton<IConnectionFactory, WebSocketConnectionFactory>();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using Microsoft.Extensions.DependencyInjection;
using MorganStanley.ComposeUI.Messaging.Client.Abstractions;

namespace MorganStanley.ComposeUI.Messaging.Client.WebSocket;

internal class WebSocketConnectionFactory : IConnectionFactory
{
public WebSocketConnectionFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public IConnection CreateConnection()
{
return ActivatorUtilities.CreateInstance<WebSocketConnection>(_serviceProvider);
}

private readonly IServiceProvider _serviceProvider;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ internal class MessageBufferConverter : JsonConverter<MessageBuffer>
return null;

case JsonTokenType.String:
{
var length = reader.HasValueSequence ? checked((int)reader.ValueSequence.Length) : reader.ValueSpan.Length;
var buffer = MessageBuffer.GetBuffer(length);
length = reader.CopyString(buffer);
{
var length = reader.HasValueSequence
? checked((int) reader.ValueSequence.Length)
: reader.ValueSpan.Length;
var buffer = MessageBuffer.GetBuffer(length);
length = reader.CopyString(buffer);

return new MessageBuffer(buffer, length);
}
return new MessageBuffer(buffer, length);
}
}

throw new JsonException();
Expand All @@ -44,4 +46,4 @@ public override void Write(Utf8JsonWriter writer, MessageBuffer value, JsonSeria
{
writer.WriteStringValue(value.GetSpan());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ ValueTask<Message> IClientConnection.ReceiveAsync(CancellationToken cancellation
return _clientToServer.Reader.ReadAsync(cancellationToken);
}

public ValueTask DisposeAsync()
public ValueTask CloseAsync()
{
_clientToServer.Writer.TryComplete();
_serverToClient.Writer.TryComplete();

return ValueTask.CompletedTask;
}

public ValueTask DisposeAsync()
{
return CloseAsync();
}

ValueTask IConnection.ConnectAsync(CancellationToken cancellationToken = default)
{
return _server.ClientConnected(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Server;

namespace MorganStanley.ComposeUI.Messaging.Client.Internal;

internal sealed class InProcessConnectionFactory : IConnectionFactory
{
private readonly IMessageRouterServer _server;

public InProcessConnectionFactory(IMessageRouterServer server)
{
_server = server;
}

public IConnection CreateConnection()
{
return new InProcessConnection(_server);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static class MessageRouterBuilderInProcessExtensions
/// <returns></returns>
public static MessageRouterBuilder UseServer(this MessageRouterBuilder builder)
{
builder.ServiceCollection.AddTransient<IConnection, InProcessConnection>();
builder.ServiceCollection.AddSingleton<IConnectionFactory, InProcessConnectionFactory>();

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace MorganStanley.ComposeUI.Messaging.Server.Abstractions;
/// <summary>
/// Abstraction of a client connected to the Message Router server.
/// </summary>
public interface IClientConnection : IAsyncDisposable
public interface IClientConnection
{
/// <summary>
/// Sends a message to the client.
Expand All @@ -37,4 +37,10 @@ public interface IClientConnection : IAsyncDisposable
/// <exception cref="MessageRouterException">With name <see cref="MessageRouterErrors.ConnectionClosed"/> if the connection was closed by either party while sending the request</exception>
/// <exception cref="MessageRouterException">With name <see cref="MessageRouterErrors.ConnectionAborted"/> if the connection was closed due to an unexpected error</exception>
ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Invoked by the server to notify the object of a disconnection.
/// </summary>
/// <returns></returns>
ValueTask CloseAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public async ValueTask DisposeAsync()
{
_stopTokenSource.Cancel();

// TODO: Don't dispose objects that were created by someone else. Signal disconnection using a dedicated method.
await Task.WhenAll(_clients.Values.Select(client => client.Connection.DisposeAsync().AsTask()));
await Task.WhenAll(_clients.Values.Select(client => client.Connection.CloseAsync().AsTask()));
}

public ValueTask ClientConnected(IClientConnection connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,16 @@ public async ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken
}
}

public ValueTask DisposeAsync()
public ValueTask CloseAsync()
{
_stopTokenSource.Cancel();

return default;
return ValueTask.CompletedTask;
}

public ValueTask DisposeAsync()
{
return CloseAsync();
}

public async Task HandleWebSocketRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// and limitations under the License.

using System.Linq.Expressions;
using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Protocol;
using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
using MorganStanley.ComposeUI.Messaging.TestUtils;
Expand Down Expand Up @@ -682,6 +683,9 @@ public Task DisposeAsync()

private MessageRouterClient CreateMessageRouter(MessageRouterOptions? options = null)
{
return new MessageRouterClient(_connectionMock.Object, options ?? DefaultOptions);
var connectionFactory = new Mock<IConnectionFactory>();
connectionFactory.Setup(_ => _.CreateConnection()).Returns(_connectionMock.Object);

return new MessageRouterClient(connectionFactory.Object, options ?? DefaultOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.Linq.Expressions;
using MorganStanley.ComposeUI.Messaging.Protocol;
using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
using MorganStanley.ComposeUI.Messaging.Server.Abstractions;
using MorganStanley.ComposeUI.Messaging.TestUtils;
using TaskExtensions = MorganStanley.ComposeUI.Testing.TaskExtensions;

Expand Down Expand Up @@ -347,14 +348,32 @@ await client.SendToServer(
Times.Once);
}

[Fact]
public async Task When_disposed_it_calls_CloseAsync_on_active_connections()
{
var server = CreateServer();
var connection = new Mock<IClientConnection>();

connection.SetupSequence(_ => _.ReceiveAsync(It.IsAny<CancellationToken>()))
.Returns(new ValueTask<Message>(
new ConnectRequest()))
.Returns(new ValueTask<Message>(
Task.Delay(1000).ContinueWith(_ => (Message)new PublishMessage {Topic = "dummy"})));

await server.ClientConnected(connection.Object);
await TaskExtensions.WaitForBackgroundTasksAsync();
await server.DisposeAsync();

connection.Verify(_ => _.CloseAsync(), Times.Once);
}

private MessageRouterServer CreateServer(IAccessTokenValidator? accessTokenValidator = null) =>
new MessageRouterServer(new MessageRouterServerDependencies(accessTokenValidator));

private MockClientConnection CreateClient() => new MockClientConnection();

private async Task<MockClientConnection> CreateAndConnectClient(MessageRouterServer server)
{
var connectResponseReceived = new TaskCompletionSource<ConnectResponse>();
var client = CreateClient();
await server.ClientConnected(client.Object);
await client.Connect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public MockClientConnection()
Setup(_ => _.ReceiveAsync(It.IsAny<CancellationToken>()))
.Returns((CancellationToken ct) => _sendChannel.Reader.ReadAsync(ct));

Setup(_ => _.DisposeAsync())
Setup(_ => _.CloseAsync())
.Callback(
() => { _sendChannel.Writer.TryComplete(); });
}
Expand Down

0 comments on commit e0cd1df

Please sign in to comment.