Skip to content

Commit

Permalink
Separated messaging layer from UserChannel (#689)
Browse files Browse the repository at this point in the history
Separated messaging layer from UserChannel.
  • Loading branch information
fhubi authored Jun 24, 2024
1 parent 0445f8b commit 72ed8cd
Show file tree
Hide file tree
Showing 35 changed files with 369 additions and 201 deletions.
7 changes: 7 additions & 0 deletions src/fdc3/dotnet/DesktopAgent/DesktopAgent.sln
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.App
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.ModuleLoader", "..\..\..\module-loader\dotnet\src\MorganStanley.ComposeUI.ModuleLoader\MorganStanley.ComposeUI.ModuleLoader.csproj", "{4207C810-55CC-433E-BD5B-B9B4D639C7A2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Abstractions", "..\..\..\messaging\dotnet\src\Abstractions\MorganStanley.ComposeUI.Messaging.Abstractions.csproj", "{4FC1B3E0-6DAC-4868-AD68-08093B2555DF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -75,6 +77,10 @@ Global
{4207C810-55CC-433E-BD5B-B9B4D639C7A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4207C810-55CC-433E-BD5B-B9B4D639C7A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4207C810-55CC-433E-BD5B-B9B4D639C7A2}.Release|Any CPU.Build.0 = Release|Any CPU
{4FC1B3E0-6DAC-4868-AD68-08093B2555DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4FC1B3E0-6DAC-4868-AD68-08093B2555DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4FC1B3E0-6DAC-4868-AD68-08093B2555DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4FC1B3E0-6DAC-4868-AD68-08093B2555DF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -90,6 +96,7 @@ Global
{2C25BA40-0103-4EB5-B181-98362F563B0F} = {2CE54671-6C98-455E-B2A0-52F8564DDA5D}
{50BD1318-FC44-4582-B333-3033076164E7} = {2CE54671-6C98-455E-B2A0-52F8564DDA5D}
{4207C810-55CC-433E-BD5B-B9B4D639C7A2} = {2CE54671-6C98-455E-B2A0-52F8564DDA5D}
{4FC1B3E0-6DAC-4868-AD68-08093B2555DF} = {2CE54671-6C98-455E-B2A0-52F8564DDA5D}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {70FE103D-26D8-42D8-88BA-1BECB1F73A18}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
* and limitations under the License.
*/

using Microsoft.Extensions.Hosting;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.DependencyInjection;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Infrastructure.Internal;
using MorganStanley.ComposeUI.Messaging;
using MorganStanley.ComposeUI.Messaging.Abstractions;
using MorganStanley.ComposeUI.ModuleLoader;
using MorganStanley.ComposeUI.Shell.Fdc3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Contracts;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Converters;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.DependencyInjection;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Exceptions;
using MorganStanley.ComposeUI.Messaging;
using Finos.Fdc3;
using MorganStanley.ComposeUI.Messaging.Abstractions;

namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent.Infrastructure.Internal;

internal class Fdc3DesktopAgentMessageRouterService : IHostedService
{
private readonly IMessageRouter _messageRouter;
private readonly IMessagingService _messageRouter;
private readonly IFdc3DesktopAgentBridge _desktopAgent;
private readonly Fdc3DesktopAgentOptions _options;
private readonly ILoggerFactory _loggerFactory;
Expand All @@ -51,7 +51,7 @@ internal class Fdc3DesktopAgentMessageRouterService : IHostedService
public JsonSerializerOptions JsonMessageSerializerOptions => new(_jsonSerializerOptions);

public Fdc3DesktopAgentMessageRouterService(
IMessageRouter messageRouter,
IMessagingService messageRouter,
IFdc3DesktopAgentBridge desktopAgent,
IOptions<Fdc3DesktopAgentOptions> options,
ILoggerFactory? loggerFactory = null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,33 @@
*/

using System.Collections.Concurrent;
using System.Reactive.Linq;
using System.Text.Json;
using System.Text.Json.Nodes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Contracts;
using MorganStanley.ComposeUI.Messaging;
using MorganStanley.ComposeUI.Messaging.Abstractions;

namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent;

internal class UserChannel : IAsyncDisposable
{
public string Id { get; }
private readonly ConcurrentDictionary<string, MessageBuffer> _contexts = new ConcurrentDictionary<string, MessageBuffer>();
private MessageBuffer? _lastContext = null;
private readonly ConcurrentDictionary<string, IMessageBuffer> _contexts = new();
private IMessageBuffer? _lastContext = null;
private readonly UserChannelTopics _topics;
private readonly IMessageRouter _messageRouter;
private readonly IMessagingService _messagingService;
private IAsyncDisposable? _broadcastSubscription;
private bool _disposed = false;
private readonly ILogger _logger;


public UserChannel(string id, IMessageRouter messageRouter, ILogger<UserChannel>? logger)
public UserChannel(string id, IMessagingService messageRouter, ILogger<UserChannel>? logger)
{
Id = id;
_topics = Fdc3Topic.UserChannel(id);
_messageRouter = messageRouter;
_messagingService = messageRouter;
_logger = (ILogger?) logger ?? NullLogger.Instance;
}

Expand All @@ -50,19 +50,18 @@ public async ValueTask Connect()
throw new ObjectDisposedException(nameof(UserChannel));
}

await _messageRouter.ConnectAsync();
await _messagingService.ConnectAsync();

var broadcastObserver = AsyncObserver.Create<TopicMessage>(x => HandleBroadcast(x.Payload));
var broadcastHandler = new Func<IMessageBuffer, ValueTask>(HandleBroadcast);
var broadcastSubscription = _messagingService.SubscribeAsync(_topics.Broadcast, broadcastHandler);

var broadcastSubscribing = _messageRouter.SubscribeAsync(_topics.Broadcast, broadcastObserver);

await _messageRouter.RegisterServiceAsync(_topics.GetCurrentContext, GetCurrentContext);
_broadcastSubscription = await broadcastSubscribing;
await _messagingService.RegisterServiceAsync(_topics.GetCurrentContext, GetCurrentContext);
_broadcastSubscription = await broadcastSubscription;

LogConnected();
}

internal ValueTask HandleBroadcast(MessageBuffer? payloadBuffer)
internal ValueTask HandleBroadcast(IMessageBuffer? payloadBuffer)
{
if (payloadBuffer == null)
{
Expand Down Expand Up @@ -101,24 +100,24 @@ internal ValueTask HandleBroadcast(MessageBuffer? payloadBuffer)
return ValueTask.CompletedTask;
}

internal ValueTask<MessageBuffer?> GetCurrentContext(string endpoint, MessageBuffer? payloadBuffer, MessageContext? context)
internal ValueTask<IMessageBuffer?> GetCurrentContext(string endpoint, IMessageBuffer? payloadBuffer, MessageContext? context)
{
if (payloadBuffer == null)
{
return ValueTask.FromResult<MessageBuffer?>(_lastContext);
return ValueTask.FromResult(_lastContext);
}

var payload = payloadBuffer.ReadJson<GetCurrentContextRequest>();
if (payload?.ContextType == null)
{
return ValueTask.FromResult<MessageBuffer?>(_lastContext);
return ValueTask.FromResult(_lastContext);
}

if (_contexts.TryGetValue(payload.ContextType, out MessageBuffer? messageBuffer))
if (_contexts.TryGetValue(payload.ContextType, out IMessageBuffer? messageBuffer))
{
return ValueTask.FromResult<MessageBuffer?>(messageBuffer);
return ValueTask.FromResult<IMessageBuffer?>(messageBuffer);
}
return ValueTask.FromResult<MessageBuffer?>(null);
return ValueTask.FromResult<IMessageBuffer?>(null);
}

public async ValueTask DisposeAsync()
Expand All @@ -130,7 +129,7 @@ public async ValueTask DisposeAsync()

_broadcastSubscription = null;

await _messageRouter.UnregisterServiceAsync(_topics.GetCurrentContext);
await _messagingService.UnregisterServiceAsync(_topics.GetCurrentContext);

_disposed = true;
}
Expand All @@ -139,7 +138,7 @@ private void LogConnected()
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"UserChannel {Id} connected to messagerouter with client id {_messageRouter.ClientId}");
_logger.LogDebug($"UserChannel {Id} connected to messagerouter with client id {_messagingService.ClientId}");
}
}

Expand All @@ -159,7 +158,7 @@ private void LogInvalidPayloadJson()
}
}

private void LogPayload(MessageBuffer payload)
private void LogPayload(IMessageBuffer payload)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Infrastructure.Internal;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests.Helpers;
using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests.TestUtils;
using MorganStanley.ComposeUI.Messaging.Abstractions;
using MorganStanley.ComposeUI.ModuleLoader;
using AppIdentifier = MorganStanley.ComposeUI.Fdc3.DesktopAgent.Protocol.AppIdentifier;
using AppIntent = MorganStanley.ComposeUI.Fdc3.DesktopAgent.Protocol.AppIntent;
Expand Down Expand Up @@ -829,7 +830,7 @@ public async Task AddIntentListener_subscribes()
Intent = "intentMetadataCustom",
Selected = false,
Context = new Context("contextCustom"),
TargetAppIdentifier = new AppIdentifier {AppId = "appId4", InstanceId = targetFdc3InstanceId}
TargetAppIdentifier = new AppIdentifier { AppId = "appId4", InstanceId = targetFdc3InstanceId }
};

var raiseIntentResult = await _fdc3.HandleRaiseIntent(raiseIntentRequest, new MessageContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

using Microsoft.Extensions.Logging;
using MorganStanley.ComposeUI.Messaging.Abstractions;

namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests;

Expand Down Expand Up @@ -51,7 +52,7 @@ public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Except
public UserChannelErrorsAndDiagnosticsTests()
{
_logger = new TestLogger();
_channel = new UserChannel(TestChannel, new Mock<IMessageRouter>().Object, _logger);
_channel = new UserChannel(TestChannel, new Mock<IMessagingService>().Object, _logger);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
*/

using MorganStanley.ComposeUI.Fdc3.DesktopAgent.Contracts;
using MorganStanley.ComposeUI.Messaging.Abstractions;
using Finos.Fdc3.Context;

namespace MorganStanley.ComposeUI.Fdc3.DesktopAgent.Tests;

public class UserChannelTests
{
private const string TestChannel = "testChannel";
UserChannel _channel = new UserChannel(TestChannel, new Mock<IMessageRouter>().Object, null);
UserChannel _channel = new UserChannel(TestChannel, new Mock<IMessagingService>().Object, null);
UserChannelTopics _topics = new UserChannelTopics(TestChannel);

[Theory]
Expand Down
7 changes: 7 additions & 0 deletions src/messaging/dotnet/Messaging.sln
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MorganStanley.ComposeUI.Messaging.Abstractions", "src\Abstractions\MorganStanley.ComposeUI.Messaging.Abstractions.csproj", "{2F0D6AA5-E922-4EAE-B627-4449A92CCD43}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Core", "src\Core\MorganStanley.ComposeUI.Messaging.Core.csproj", "{FC647551-B014-49A8-817E-F5FE6FEBF452}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MorganStanley.ComposeUI.Messaging.Client", "src\Client\MorganStanley.ComposeUI.Messaging.Client.csproj", "{A3156691-0B46-4C3D-B042-E71D9BAF02EC}"
Expand Down Expand Up @@ -83,6 +85,10 @@ Global
{F1B0C089-450E-4B02-881F-F41A8B7F5867}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F1B0C089-450E-4B02-881F-F41A8B7F5867}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F1B0C089-450E-4B02-881F-F41A8B7F5867}.Release|Any CPU.Build.0 = Release|Any CPU
{2F0D6AA5-E922-4EAE-B627-4449A92CCD43}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2F0D6AA5-E922-4EAE-B627-4449A92CCD43}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2F0D6AA5-E922-4EAE-B627-4449A92CCD43}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2F0D6AA5-E922-4EAE-B627-4449A92CCD43}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -99,6 +105,7 @@ Global
{AA63BC7C-9DB8-4062-A689-7FA9689EF6B1} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{CEF78D3F-C645-4471-BAD2-9C538A0CA763} = {78E94277-B0B4-425C-9C47-81E09933FE5B}
{F1B0C089-450E-4B02-881F-F41A8B7F5867} = {B7E63957-3C1B-4A16-B530-3EAD0D0C5F07}
{2F0D6AA5-E922-4EAE-B627-4449A92CCD43} = {37DE16A7-F731-4206-B650-8A1D9D066B60}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {38CFF5DF-459A-49E1-8189-2DB429A99B3A}
Expand Down
29 changes: 29 additions & 0 deletions src/messaging/dotnet/src/Abstractions/IMessageBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

namespace MorganStanley.ComposeUI.Messaging.Abstractions
{
public interface IMessageBuffer
{
/// <summary>
/// Gets the bytes of the underlying buffer as a <see cref="ReadOnlySpan{T}" />
/// </summary>
/// <returns></returns>
ReadOnlySpan<byte> GetSpan();

/// <summary>
/// Gets the string value of the buffer.
/// </summary>
/// <returns></returns>
string GetString();
}
}
93 changes: 93 additions & 0 deletions src/messaging/dotnet/src/Abstractions/IMessagingService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Morgan Stanley makes this available to you under the Apache License,
// Version 2.0 (the "License"). You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0.
//
// See the NOTICE file distributed with this work for additional information
// regarding copyright ownership. Unless required by applicable law or agreed
// to in writing, software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

namespace MorganStanley.ComposeUI.Messaging.Abstractions
{
public interface IMessagingService : IAsyncDisposable
{
/// <summary>
/// Gets the client ID of the current connection.
/// </summary>
/// <remarks>
/// The returned value will be <value>null</value> if the client is not connected.
/// </remarks>
string? ClientId { get; }

/// <summary>
/// Asynchronously connects to the Message Router server endpoint.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <remarks>
/// Clients don't need to call this method before calling other methods on this type.
/// The client should automatically establish a connection when needed.
/// </remarks>
ValueTask ConnectAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Gets an observable that represents a topic.
/// </summary>
/// <param name="topic"></param>
/// <param name="subscriber"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
ValueTask<IAsyncDisposable> SubscribeAsync(string topic,
Func<IMessageBuffer, ValueTask> subscriber,
CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a message to a topic.
/// </summary>
/// <param name="topic"></param>
/// <param name="payload"></param>
/// <param name="options"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
ValueTask PublishAsync(string topic,
IMessageBuffer? message = null,
PublishOptions optinos = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Registers a service by providing a name and handler.
/// </summary>
/// <param name="endpoint"></param>
/// <param name="subscriber"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
ValueTask RegisterServiceAsync(string endpoint,
Func<string, IMessageBuffer?, MessageContext?, ValueTask<IMessageBuffer?>> subscriber,
CancellationToken cancellationToken = default);

/// <summary>
/// Removes a service registration.
/// </summary>
/// <param name="endpoint"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
ValueTask UnregisterServiceAsync(string endpoint, CancellationToken cancellationToken = default);

/// <summary>
/// Invokes a named service.
/// </summary>
/// <param name="endpoint"></param>
/// <param name="payload"></param>
/// <param name="options"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
ValueTask<IMessageBuffer?> InvokeAsync(
string endpoint,
IMessageBuffer? payload = null,
InvokeOptions options = default,
CancellationToken cancellationToken = default);
}
}
Loading

0 comments on commit 72ed8cd

Please sign in to comment.