Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use System.Reactive.Async and IAsyncObservable instead of custom ISubscriber abstraction #220

Merged
merged 1 commit into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// * and limitations under the License.
// */

using System.Reactive;
using System.Reactive.Linq;
using Microsoft.Extensions.Logging;
using MorganStanley.ComposeUI.Messaging;

Expand All @@ -38,7 +40,7 @@ public async Task Start()
_subscriptions.Add(
await _messageRouter.SubscribeAsync(
"proto_select_marketData",
Subscriber.Create<TopicMessage>(OnSymbolSelected, OnSymbolSelectionError)));
AsyncObserver.Create<TopicMessage>(OnSymbolSelected, OnSymbolSelectionError, () => default)));
}
catch (Exception exception)
{
Expand Down
41 changes: 20 additions & 21 deletions src/messaging/dotnet/src/Client/Client/MessageRouterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public ValueTask ConnectAsync(CancellationToken cancellationToken = default)

public ValueTask<IDisposable> SubscribeAsync(
string topicName,
ISubscriber<TopicMessage> subscriber,
IAsyncObserver<TopicMessage> subscriber,
CancellationToken cancellationToken = default)
{
Topic.Validate(topicName);
Protocol.Topic.Validate(topicName);
CheckState();

var needsSubscription = false;
Expand All @@ -69,11 +69,11 @@ public ValueTask<IDisposable> SubscribeAsync(
_ =>
{
needsSubscription = true;
return new Topic<TopicMessage>(topicName, _logger);
return new Topic(topicName, _logger);
});

return needsSubscription
? SubscribeAsyncCore(topicName, topic, subscriber, cancellationToken)
? SubscribeAsyncCore(topic, subscriber, cancellationToken)
: ValueTask.FromResult(topic.Subscribe(subscriber));
}

Expand All @@ -83,7 +83,7 @@ public ValueTask PublishAsync(
PublishOptions options = default,
CancellationToken cancellationToken = default)
{
Topic.Validate(topic);
Protocol.Topic.Validate(topic);

return SendMessageAsync(
new PublishMessage
Expand Down Expand Up @@ -192,7 +192,7 @@ public ValueTask DisposeAsync()

private readonly ConcurrentDictionary<string, TaskCompletionSource<AbstractResponse>> _pendingRequests = new();
private readonly ConcurrentDictionary<string, MessageHandler> _endpointHandlers = new();
private readonly ConcurrentDictionary<string, Topic<TopicMessage>> _topics = new();
private readonly ConcurrentDictionary<string, Topic> _topics = new();
private readonly MessageRouterOptions _options;
private readonly ILogger<MessageRouterClient> _logger;

Expand Down Expand Up @@ -537,9 +537,8 @@ private async ValueTask RegisterServiceCore(
}

private async ValueTask<IDisposable> SubscribeAsyncCore(
string topicName,
Topic<TopicMessage> topic,
ISubscriber<TopicMessage> subscriber,
Topic topic,
IAsyncObserver<TopicMessage> subscriber,
CancellationToken cancellationToken)
{
var subscription = topic.Subscribe(subscriber);
Expand All @@ -549,7 +548,7 @@ private async ValueTask<IDisposable> SubscribeAsyncCore(
await SendMessageAsync(
new SubscribeMessage
{
Topic = topicName,
Topic = topic.Name
},
cancellationToken);

Expand Down Expand Up @@ -726,7 +725,7 @@ private class StateChangeEvents
public Task? SendReceiveCompleted;
}

private class Topic<T>
private class Topic
{
public string Name { get; }

Expand All @@ -736,7 +735,7 @@ public Topic(string name, ILogger logger)
_logger = logger;
}

public IDisposable Subscribe(ISubscriber<T> subscriber)
public IDisposable Subscribe(IAsyncObserver<TopicMessage> subscriber)
{
lock (_mutex)
{
Expand All @@ -762,7 +761,7 @@ private void Unsubscribe(Subscription subscription)
// TODO: Unsubscribe from the topic completely if no more subscribers
}

public void OnNext(T value)
public void OnNext(TopicMessage value)
{
lock (_mutex)
{
Expand Down Expand Up @@ -817,7 +816,7 @@ public void Complete()

private class Subscription : IDisposable
{
public Subscription(Topic<T> topic, ISubscriber<T> subscriber, ILogger logger)
public Subscription(Topic topic, IAsyncObserver<TopicMessage> subscriber, ILogger logger)
{
_subscriber = subscriber;
_topic = topic;
Expand All @@ -830,7 +829,7 @@ public void Dispose()
_topic.Unsubscribe(this);
}

public void OnNext(T value)
public void OnNext(TopicMessage value)
{
_queue.Writer.TryWrite(
value); // Since the queue is unbounded, this will succeed unless the channel was completed
Expand Down Expand Up @@ -862,7 +861,7 @@ private async Task ProcessMessages()
{
_logger.LogError(
e,
$"Exception thrown while invoking {nameof(ISubscriber<T>.OnNextAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
$"Exception thrown while invoking {nameof(IAsyncObserver<TopicMessage>.OnNextAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
_topic.Name,
e.Message);
}
Expand All @@ -877,7 +876,7 @@ private async Task ProcessMessages()
{
_logger.LogError(
e,
$"Exception thrown while invoking {nameof(ISubscriber<T>.OnCompletedAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
$"Exception thrown while invoking {nameof(IAsyncObserver<TopicMessage>.OnCompletedAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
_topic.Name,
e.Message);
}
Expand All @@ -892,24 +891,24 @@ private async Task ProcessMessages()
{
_logger.LogError(
e2,
$"Exception thrown while invoking {nameof(ISubscriber<T>.OnErrorAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
$"Exception thrown while invoking {nameof(IAsyncObserver<TopicMessage>.OnErrorAsync)} on a subscriber of topic '{{TopicName}}': {{ExceptionMessage}}",
_topic.Name,
e2.Message);
}
}
}

private readonly ISubscriber<T> _subscriber;
private readonly IAsyncObserver<TopicMessage> _subscriber;

private readonly Channel<T> _queue = Channel.CreateUnbounded<T>(
private readonly Channel<TopicMessage> _queue = Channel.CreateUnbounded<TopicMessage>(
new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = false
});

private readonly Topic<T> _topic;
private readonly Topic _topic;
private readonly ILogger _logger;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.3" />
<PackageReference Include="System.Reactive" Version="5.0.0" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Reactive.Async" Version="6.0.0-alpha.3" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/messaging/dotnet/src/Core/IMessageRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface IMessageRouter : IAsyncDisposable
/// <returns></returns>
ValueTask<IDisposable> SubscribeAsync(
string topic,
ISubscriber<TopicMessage> subscriber,
IAsyncObserver<TopicMessage> subscriber,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
24 changes: 0 additions & 24 deletions src/messaging/dotnet/src/Core/ISubscriber.cs

This file was deleted.

18 changes: 10 additions & 8 deletions src/messaging/dotnet/src/Core/MessageRouterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
// or implied. See the License for the specific language governing permissions
// and limitations under the License.

using System.Reactive;
using System.Reactive.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Channels;

Expand Down Expand Up @@ -87,7 +89,7 @@ public static ValueTask<IDisposable> SubscribeAsync(
IObserver<TopicMessage> observer,
CancellationToken cancellationToken = default)
{
var innerSubscriber = Subscriber.Create<TopicMessage>(
var innerSubscriber = AsyncObserver.Create<TopicMessage>(
message =>
{
observer.OnNext(message);
Expand Down Expand Up @@ -126,7 +128,7 @@ public static ValueTask<IDisposable> SubscribeAsync(
IObserver<string?> observer,
CancellationToken cancellationToken = default)
{
var innerSubscriber = Subscriber.Create<TopicMessage>(
var innerSubscriber = AsyncObserver.Create<TopicMessage>(
message =>
{
observer.OnNext(message.Payload?.GetString());
Expand Down Expand Up @@ -162,10 +164,10 @@ public static ValueTask<IDisposable> SubscribeAsync(
public static ValueTask<IDisposable> SubscribeAsync(
this IMessageRouter messageRouter,
string topic,
ISubscriber<string?> subscriber,
IAsyncObserver<string?> subscriber,
CancellationToken cancellationToken = default)
{
var innerSubscriber = Subscriber.Create<TopicMessage>(
var innerSubscriber = AsyncObserver.Create<TopicMessage>(
message => subscriber.OnNextAsync(message.Payload?.GetString()),
subscriber.OnErrorAsync,
subscriber.OnCompletedAsync);
Expand All @@ -189,15 +191,15 @@ public static async IAsyncEnumerable<TopicMessage> SubscribeAsync(

using var subscription = await messageRouter.SubscribeAsync(
topic,
Subscriber.Create<TopicMessage>(
onNext: message => channel.Writer.WriteAsync(message, cancellationToken),
onError: exception =>
AsyncObserver.Create<TopicMessage>(
onNextAsync: message => channel.Writer.WriteAsync(message, cancellationToken),
onErrorAsync: exception =>
{
channel.Writer.TryComplete(exception);

return default;
},
onCompleted: () =>
onCompletedAsync: () =>
{
channel.Writer.TryComplete();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.2.1" />
<PackageReference Include="System.Reactive" Version="5.0.0" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Reactive.Async" Version="6.0.0-alpha.3" />
<PackageReference Include="System.Text.Json" Version="6.0.7" />
</ItemGroup>

Expand Down
95 changes: 0 additions & 95 deletions src/messaging/dotnet/src/Core/Subscriber.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
ztanczos marked this conversation as resolved.
Show resolved Hide resolved
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.3" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
Expand Down
Loading