Skip to content

Commit

Permalink
Merge pull request #220 from BalassaMarton/messaging-reactive
Browse files Browse the repository at this point in the history
Use System.Reactive.Async and IAsyncObservable instead of custom ISubscriber abstraction
  • Loading branch information
BalassaMarton authored May 22, 2023
2 parents f9f9f1d + 944e549 commit 469139e
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// * and limitations under the License.
// */

using System.Reactive;
using System.Reactive.Linq;
using Microsoft.Extensions.Logging;
using MorganStanley.ComposeUI.Messaging;

Expand All @@ -38,7 +40,7 @@ public async Task Start()
_subscriptions.Add(
await _messageRouter.SubscribeAsync(
"proto_select_marketData",
Subscriber.Create<TopicMessage>(OnSymbolSelected, OnSymbolSelectionError)));
AsyncObserver.Create<TopicMessage>(OnSymbolSelected, OnSymbolSelectionError, () => default)));
}
catch (Exception exception)
{
Expand Down
41 changes: 20 additions & 21 deletions src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public ValueTask ConnectAsync(CancellationToken cancellationToken = default)

public ValueTask<IDisposable> SubscribeAsync(
string topicName,
ISubscriber<TopicMessage> subscriber,
IAsyncObserver<TopicMessage> subscriber,
CancellationToken cancellationToken = default)
{
Topic.Validate(topicName);
Protocol.Topic.Validate(topicName);
CheckState();

var needsSubscription = false;
Expand All @@ -69,11 +69,11 @@ public ValueTask<IDisposable> SubscribeAsync(
_ =>
{
needsSubscription = true;
return new Topic<TopicMessage>(topicName, _logger);
return new Topic(topicName, _logger);
});

return needsSubscription
? SubscribeAsyncCore(topicName, topic, subscriber, cancellationToken)
? SubscribeAsyncCore(topic, subscriber, cancellationToken)
: ValueTask.FromResult(topic.Subscribe(subscriber));
}

Expand All @@ -83,7 +83,7 @@ public ValueTask PublishAsync(
PublishOptions options = default,
CancellationToken cancellationToken = default)
{
Topic.Validate(topic);
Protocol.Topic.Validate(topic);

return SendMessageAsync(
new PublishMessage
Expand Down Expand Up @@ -192,7 +192,7 @@ public ValueTask DisposeAsync()

private readonly ConcurrentDictionary<string, TaskCompletionSource<AbstractResponse>> _pendingRequests = new();
private readonly ConcurrentDictionary<string, MessageHandler> _endpointHandlers = new();
private readonly ConcurrentDictionary<string, Topic<TopicMessage>> _topics = new();
private readonly ConcurrentDictionary<string, Topic> _topics = new();
private readonly MessageRouterOptions _options;
private readonly ILogger<MessageRouterClient> _logger;

Expand Down Expand Up @@ -537,9 +537,8 @@ private async ValueTask RegisterServiceCore(
}

private async ValueTask<IDisposable> SubscribeAsyncCore(
string topicName,
Topic<TopicMessage> topic,
ISubscriber<TopicMessage> subscriber,
Topic topic,
IAsyncObserver<TopicMessage> subscriber,
CancellationToken cancellationToken)
{
var subscription = topic.Subscribe(subscriber);
Expand All @@ -549,7 +548,7 @@ private async ValueTask<IDisposable> SubscribeAsyncCore(
await SendMessageAsync(
new SubscribeMessage
{
Topic = topicName,
Topic = topic.Name
},
cancellationToken);

Expand Down Expand Up @@ -726,7 +725,7 @@ private class StateChangeEvents
public Task? SendReceiveCompleted;
}

private class Topic<T>
private class Topic
{
public string Name { get; }

Expand All @@ -736,7 +735,7 @@ public Topic(string name, ILogger logger)
_logger = logger;
}

public IDisposable Subscribe(ISubscriber<T> subscriber)
public IDisposable Subscribe(IAsyncObserver<TopicMessage> subscriber)
{
lock (_mutex)
{
Expand All @@ -762,7 +761,7 @@ private void Unsubscribe(Subscription subscription)
// TODO: Unsubscribe from the topic completely if no more subscribers
}

public void OnNext(T value)
public void OnNext(TopicMessage value)
{
lock (_mutex)
{
Expand Down Expand Up @@ -817,7 +816,7 @@ public void Complete()

private class Subscription : IDisposable
{
public Subscription(Topic<T> topic, ISubscriber<T> subscriber, ILogger logger)
public Subscription(Topic topic, IAsyncObserver<TopicMessage> subscriber, ILogger logger)
{
_subscriber = subscriber;
_topic = topic;
Expand All @@ -830,7 +829,7 @@ public void Dispose()
_topic.Unsubscribe(this);
}

public void OnNext(T value)
public void OnNext(TopicMessage value)
{
_queue.Writer.TryWrite(
value); // Since the queue is unbounded, this will succeed unless the channel was completed
Expand Down Expand Up @@ -862,7 +861,7 @@ private async Task ProcessMessages()
{
_logger.LogError(
e,
$"Exception thrown while invoking {nameof(ISubscriber<T>.OnNextAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
$"Exception thrown while invoking {nameof(IAsyncObserver<TopicMessage>.OnNextAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
_topic.Name,
e.Message);
}
Expand All @@ -877,7 +876,7 @@ private async Task ProcessMessages()
{
_logger.LogError(
e,
$"Exception thrown while invoking {nameof(ISubscriber<T>.OnCompletedAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
$"Exception thrown while invoking {nameof(IAsyncObserver<TopicMessage>.OnCompletedAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
_topic.Name,
e.Message);
}
Expand All @@ -892,24 +891,24 @@ private async Task ProcessMessages()
{
_logger.LogError(
e2,
$"Exception thrown while invoking {nameof(ISubscriber<T>.OnErrorAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
$"Exception thrown while invoking {nameof(IAsyncObserver<TopicMessage>.OnErrorAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
_topic.Name,
e2.Message);
}
}
}

private readonly ISubscriber<T> _subscriber;
private readonly IAsyncObserver<TopicMessage> _subscriber;

private readonly Channel<T> _queue = Channel.CreateUnbounded<T>(
private readonly Channel<TopicMessage> _queue = Channel.CreateUnbounded<TopicMessage>(
new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = false
});

private readonly Topic<T> _topic;
private readonly Topic _topic;
private readonly ILogger _logger;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.3" />
<PackageReference Include="System.Reactive" Version="5.0.0" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Reactive.Async" Version="6.0.0-alpha.3" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/messaging/dotnet/src/Core/IMessageRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface IMessageRouter : IAsyncDisposable
/// <returns></returns>
ValueTask<IDisposable> SubscribeAsync(
string topic,
ISubscriber<TopicMessage> subscriber,
IAsyncObserver<TopicMessage> subscriber,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
24 changes: 0 additions & 24 deletions src/messaging/dotnet/src/Core/ISubscriber.cs

This file was deleted.

18 changes: 10 additions & 8 deletions src/messaging/dotnet/src/Core/MessageRouterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using System.Reactive;
using System.Reactive.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Channels;

Expand Down Expand Up @@ -87,7 +89,7 @@ public static ValueTask<IDisposable> SubscribeAsync(
IObserver<TopicMessage> observer,
CancellationToken cancellationToken = default)
{
var innerSubscriber = Subscriber.Create<TopicMessage>(
var innerSubscriber = AsyncObserver.Create<TopicMessage>(
message =>
{
observer.OnNext(message);
Expand Down Expand Up @@ -126,7 +128,7 @@ public static ValueTask<IDisposable> SubscribeAsync(
IObserver<string?> observer,
CancellationToken cancellationToken = default)
{
var innerSubscriber = Subscriber.Create<TopicMessage>(
var innerSubscriber = AsyncObserver.Create<TopicMessage>(
message =>
{
observer.OnNext(message.Payload?.GetString());
Expand Down Expand Up @@ -162,10 +164,10 @@ public static ValueTask<IDisposable> SubscribeAsync(
public static ValueTask<IDisposable> SubscribeAsync(
this IMessageRouter messageRouter,
string topic,
ISubscriber<string?> subscriber,
IAsyncObserver<string?> subscriber,
CancellationToken cancellationToken = default)
{
var innerSubscriber = Subscriber.Create<TopicMessage>(
var innerSubscriber = AsyncObserver.Create<TopicMessage>(
message => subscriber.OnNextAsync(message.Payload?.GetString()),
subscriber.OnErrorAsync,
subscriber.OnCompletedAsync);
Expand All @@ -189,15 +191,15 @@ public static async IAsyncEnumerable<TopicMessage> SubscribeAsync(

using var subscription = await messageRouter.SubscribeAsync(
topic,
Subscriber.Create<TopicMessage>(
onNext: message => channel.Writer.WriteAsync(message, cancellationToken),
onError: exception =>
AsyncObserver.Create<TopicMessage>(
onNextAsync: message => channel.Writer.WriteAsync(message, cancellationToken),
onErrorAsync: exception =>
{
channel.Writer.TryComplete(exception);

return default;
},
onCompleted: () =>
onCompletedAsync: () =>
{
channel.Writer.TryComplete();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.2.1" />
<PackageReference Include="System.Reactive" Version="5.0.0" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Reactive.Async" Version="6.0.0-alpha.3" />
<PackageReference Include="System.Text.Json" Version="6.0.7" />
</ItemGroup>

Expand Down
95 changes: 0 additions & 95 deletions src/messaging/dotnet/src/Core/Subscriber.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.3" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
Expand Down
Loading

0 comments on commit 469139e

Please sign in to comment.