Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messaging: Added IConnectionFactory for correctly implementing disposable pattern #239

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/messaging/dotnet/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<NoWarn>CS1066;CS1591</NoWarn>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

namespace MorganStanley.ComposeUI.Messaging.Client.Abstractions;

// TODO: Add IConnectionFactory abstraction to adhere to DI best practices

/// <summary>
/// Represents a connection that can communicate with the server.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

namespace MorganStanley.ComposeUI.Messaging.Client.Abstractions;

/// <summary>
/// Creates instances of <see cref="IConnection"/>
/// </summary>
public interface IConnectionFactory
{
/// <summary>
/// Creates a new <see cref="IConnection"/> instance.
/// </summary>
/// <returns></returns>
IConnection CreateConnection();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ namespace MorganStanley.ComposeUI.Messaging.Client;
internal sealed class MessageRouterClient : IMessageRouter
{
public MessageRouterClient(
IConnection connection,
IConnectionFactory connectionFactory,
MessageRouterOptions options,
ILogger<MessageRouterClient>? logger = null)
{
_connection = connection;
_connection = connectionFactory.CreateConnection();
_options = options;
_logger = logger ?? NullLogger<MessageRouterClient>.Instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static MessageRouterBuilder UseWebSocket(
MessageRouterWebSocketOptions options)
{
builder.ServiceCollection.AddSingleton<IOptions<MessageRouterWebSocketOptions>>(options);
builder.ServiceCollection.AddTransient<IConnection, WebSocketConnection>();
builder.ServiceCollection.AddSingleton<IConnectionFactory, WebSocketConnectionFactory>();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using Microsoft.Extensions.DependencyInjection;
using MorganStanley.ComposeUI.Messaging.Client.Abstractions;

namespace MorganStanley.ComposeUI.Messaging.Client.WebSocket;

internal class WebSocketConnectionFactory : IConnectionFactory
{
public WebSocketConnectionFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public IConnection CreateConnection()
{
return ActivatorUtilities.CreateInstance<WebSocketConnection>(_serviceProvider);
}

private readonly IServiceProvider _serviceProvider;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ internal class MessageBufferConverter : JsonConverter<MessageBuffer>
return null;

case JsonTokenType.String:
{
var length = reader.HasValueSequence ? checked((int)reader.ValueSequence.Length) : reader.ValueSpan.Length;
var buffer = MessageBuffer.GetBuffer(length);
length = reader.CopyString(buffer);
{
var length = reader.HasValueSequence
? checked((int) reader.ValueSequence.Length)
: reader.ValueSpan.Length;
var buffer = MessageBuffer.GetBuffer(length);
length = reader.CopyString(buffer);

return new MessageBuffer(buffer, length);
}
return new MessageBuffer(buffer, length);
}
}

throw new JsonException();
Expand All @@ -44,4 +46,4 @@ public override void Write(Utf8JsonWriter writer, MessageBuffer value, JsonSeria
{
writer.WriteStringValue(value.GetSpan());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ ValueTask<Message> IClientConnection.ReceiveAsync(CancellationToken cancellation
return _clientToServer.Reader.ReadAsync(cancellationToken);
}

public ValueTask DisposeAsync()
public ValueTask CloseAsync()
{
_clientToServer.Writer.TryComplete();
_serverToClient.Writer.TryComplete();

return ValueTask.CompletedTask;
}

public ValueTask DisposeAsync()
{
return CloseAsync();
}

ValueTask IConnection.ConnectAsync(CancellationToken cancellationToken = default)
{
return _server.ClientConnected(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Server;

namespace MorganStanley.ComposeUI.Messaging.Client.Internal;

internal sealed class InProcessConnectionFactory : IConnectionFactory
{
private readonly IMessageRouterServer _server;

public InProcessConnectionFactory(IMessageRouterServer server)
{
_server = server;
}

public IConnection CreateConnection()
{
return new InProcessConnection(_server);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static class MessageRouterBuilderInProcessExtensions
/// <returns></returns>
public static MessageRouterBuilder UseServer(this MessageRouterBuilder builder)
{
builder.ServiceCollection.AddTransient<IConnection, InProcessConnection>();
builder.ServiceCollection.AddSingleton<IConnectionFactory, InProcessConnectionFactory>();

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace MorganStanley.ComposeUI.Messaging.Server.Abstractions;
/// <summary>
/// Abstraction of a client connected to the Message Router server.
/// </summary>
public interface IClientConnection : IAsyncDisposable
public interface IClientConnection
{
/// <summary>
/// Sends a message to the client.
Expand All @@ -37,4 +37,10 @@ public interface IClientConnection : IAsyncDisposable
/// <exception cref="MessageRouterException">With name <see cref="MessageRouterErrors.ConnectionClosed"/> if the connection was closed by either party while sending the request</exception>
/// <exception cref="MessageRouterException">With name <see cref="MessageRouterErrors.ConnectionAborted"/> if the connection was closed due to an unexpected error</exception>
ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Invoked by the server to notify the object of a disconnection.
/// </summary>
/// <returns></returns>
ValueTask CloseAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public async ValueTask DisposeAsync()
{
_stopTokenSource.Cancel();

// TODO: Don't dispose objects that were created by someone else. Signal disconnection using a dedicated method.
await Task.WhenAll(_clients.Values.Select(client => client.Connection.DisposeAsync().AsTask()));
await Task.WhenAll(_clients.Values.Select(client => client.Connection.CloseAsync().AsTask()));
}

public ValueTask ClientConnected(IClientConnection connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,16 @@ public async ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken
}
}

public ValueTask DisposeAsync()
public ValueTask CloseAsync()
{
_stopTokenSource.Cancel();

return default;
return ValueTask.CompletedTask;
}

public ValueTask DisposeAsync()
{
return CloseAsync();
}

public async Task HandleWebSocketRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// and limitations under the License.

using System.Linq.Expressions;
using MorganStanley.ComposeUI.Messaging.Client.Abstractions;
using MorganStanley.ComposeUI.Messaging.Protocol;
using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
using MorganStanley.ComposeUI.Messaging.TestUtils;
Expand Down Expand Up @@ -682,6 +683,9 @@ public Task DisposeAsync()

private MessageRouterClient CreateMessageRouter(MessageRouterOptions? options = null)
{
return new MessageRouterClient(_connectionMock.Object, options ?? DefaultOptions);
var connectionFactory = new Mock<IConnectionFactory>();
connectionFactory.Setup(_ => _.CreateConnection()).Returns(_connectionMock.Object);

return new MessageRouterClient(connectionFactory.Object, options ?? DefaultOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.Linq.Expressions;
using MorganStanley.ComposeUI.Messaging.Protocol;
using MorganStanley.ComposeUI.Messaging.Protocol.Messages;
using MorganStanley.ComposeUI.Messaging.Server.Abstractions;
using MorganStanley.ComposeUI.Messaging.TestUtils;
using TaskExtensions = MorganStanley.ComposeUI.Testing.TaskExtensions;

Expand Down Expand Up @@ -347,14 +348,32 @@ await client.SendToServer(
Times.Once);
}

[Fact]
public async Task When_disposed_it_calls_CloseAsync_on_active_connections()
{
var server = CreateServer();
var connection = new Mock<IClientConnection>();

connection.SetupSequence(_ => _.ReceiveAsync(It.IsAny<CancellationToken>()))
.Returns(new ValueTask<Message>(
new ConnectRequest()))
.Returns(new ValueTask<Message>(
Task.Delay(1000).ContinueWith(_ => (Message)new PublishMessage {Topic = "dummy"})));

await server.ClientConnected(connection.Object);
await TaskExtensions.WaitForBackgroundTasksAsync();
await server.DisposeAsync();

connection.Verify(_ => _.CloseAsync(), Times.Once);
}

private MessageRouterServer CreateServer(IAccessTokenValidator? accessTokenValidator = null) =>
new MessageRouterServer(new MessageRouterServerDependencies(accessTokenValidator));

private MockClientConnection CreateClient() => new MockClientConnection();

private async Task<MockClientConnection> CreateAndConnectClient(MessageRouterServer server)
{
var connectResponseReceived = new TaskCompletionSource<ConnectResponse>();
var client = CreateClient();
await server.ClientConnected(client.Object);
await client.Connect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public MockClientConnection()
Setup(_ => _.ReceiveAsync(It.IsAny<CancellationToken>()))
.Returns((CancellationToken ct) => _sendChannel.Reader.ReadAsync(ct));

Setup(_ => _.DisposeAsync())
Setup(_ => _.CloseAsync())
.Callback(
() => { _sendChannel.Writer.TryComplete(); });
}
Expand Down