Skip to content

Commit

Permalink
Support dynamic publishers (#1010)
Browse files Browse the repository at this point in the history
* WithName => WithQueueName and WithName => WithTopicName, to make the fluent api a bit easier to read

* - Add new static/dynamic publication configuration to support both startup and late-bound topic publishers.

- Make test pass (it fails sometimes for some reason still but let's agree on the shape)

* - ContainsKey => TryGetValue

- ConfigureAwait

* - Push localstack logs to debug for more useful info (e.g. to debug publishes)

- Add unique test id to each queue so tests are independent
- Switch publisher cache to use concurrent dictionary to protect against reads if a publisher is being added by another thread.

* - Fix tests so they don't use conditional access as it doesn't work well with shouldly

* - Share SNS publisher between all topic publishers that share a topic name customiser

* - Fix some broken xml docs

- Add xml docs for new API
- Minor renames

* - Add interrogation support for dynamic publishers

* - Undo unnecessary changes to MessagingConfigurationBuilder

- Publish second message to same topic to make sure that works
- Add some more docs

* - Publish two messages for each tenant to ensure it works

* Update src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/TopicPublicationBuilder`1.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/TopicPublicationBuilder`1.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/TopicPublicationBuilder`1.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/TopicPublicationBuilder`1.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/PublishConfig/DynamicPublicationConfiguration.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/PublishConfig/ITopicPublisher.cs

Co-authored-by: Martin Costello <[email protected]>

* Update src/JustSaying/Fluent/PublishConfig/StaticPublicationConfiguration.cs

Co-authored-by: Martin Costello <[email protected]>

* - Add missing xmldoc

* - Fix build

Co-authored-by: George Kinsman <[email protected]>
Co-authored-by: Martin Costello <[email protected]>
  • Loading branch information
3 people authored May 19, 2022
1 parent db12efc commit 002db5b
Show file tree
Hide file tree
Showing 25 changed files with 387 additions and 87 deletions.
74 changes: 74 additions & 0 deletions src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System.Collections.Concurrent;
using JustSaying.Messaging;
using JustSaying.Messaging.Interrogation;
using JustSaying.Models;
using Microsoft.Extensions.Logging;

namespace JustSaying.Fluent;

internal sealed class DynamicMessagePublisher : IMessagePublisher
{
private readonly ConcurrentDictionary<string, IMessagePublisher> _publisherCache = new();
private readonly Func<Message, string> _topicNameCustomizer;
private readonly Func<string, StaticPublicationConfiguration> _staticConfigBuilder;

private readonly ConcurrentDictionary<string, SemaphoreSlim> _topicCreationLocks = new();
private readonly ILogger<DynamicMessagePublisher> _logger;

public DynamicMessagePublisher(Func<Message, string> topicNameCustomizer, Func<string, StaticPublicationConfiguration> staticConfigBuilder, ILoggerFactory loggerFactory)
{
_topicNameCustomizer = topicNameCustomizer;
_staticConfigBuilder = staticConfigBuilder;
_logger = loggerFactory.CreateLogger<DynamicMessagePublisher>();
}

public InterrogationResult Interrogate()
{
var pairs = _publisherCache.Keys.OrderBy(x => x)
.ToDictionary(x => x, x => _publisherCache[x].Interrogate());

return new InterrogationResult(new
{
Publishers = pairs
});
}

public Task StartAsync(CancellationToken stoppingToken)
{
return Task.CompletedTask;
}

public async Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken)
{
var topicName = _topicNameCustomizer(message);
if (_publisherCache.TryGetValue(topicName, out var publisher))
{
await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
return;
}

var lockObj = _topicCreationLocks.GetOrAdd(topicName, _ => new SemaphoreSlim(1, 1));

_logger.LogDebug("Publisher for topic {TopicName} not found, waiting on creation lock", topicName);
await lockObj.WaitAsync(cancellationToken).ConfigureAwait(false);
if (_publisherCache.TryGetValue(topicName, out var thePublisher))
{
_logger.LogDebug("Lock re-entrancy detected, returning existing publisher");
await thePublisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
return;
}

_logger.LogDebug("Lock acquired to initialize topic {TopicName}", topicName);
var config = _staticConfigBuilder(topicName);
_logger.LogDebug("Executing startup task for topic {TopicName}", topicName);
await config.StartupTask(cancellationToken).ConfigureAwait(false);

_ = _publisherCache.TryAdd(topicName, config.Publisher);

_logger.LogDebug("Publishing message on newly created topic {TopicName}", topicName);
await config.Publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
}

public Task PublishAsync(Message message, CancellationToken cancellationToken)
=> PublishAsync(message, null, cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.ComponentModel;
using System.Xml.Linq;
using JustSaying.AwsTools;
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Messaging;
using JustSaying.Models;
using Microsoft.Extensions.Logging;

namespace JustSaying.Fluent;

internal sealed class DynamicPublicationConfiguration : ITopicPublisher
{
public DynamicPublicationConfiguration(IMessagePublisher publisher)
{
Publisher = publisher;
}

public Func<CancellationToken, Task> StartupTask => _ => Task.CompletedTask;
public IMessagePublisher Publisher { get; }

public static DynamicPublicationConfiguration Build<T>(
Func<Message, string> topicNameCustomizer,
Func<string, StaticPublicationConfiguration> staticConfigBuilder,
ILoggerFactory loggerFactory)
{
var publisher = new DynamicMessagePublisher(topicNameCustomizer, staticConfigBuilder, loggerFactory);

return new DynamicPublicationConfiguration(publisher);
}
}
9 changes: 9 additions & 0 deletions src/JustSaying/Fluent/PublishConfig/ITopicPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using JustSaying.Messaging;

namespace JustSaying.Fluent;

internal interface ITopicPublisher
{
Func<CancellationToken, Task> StartupTask { get; }
IMessagePublisher Publisher { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using Amazon;
using Amazon.Internal;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using JustSaying.AwsTools;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Messaging;
using Microsoft.Extensions.Logging;

#pragma warning disable CS0618

namespace JustSaying.Fluent;

internal sealed class StaticPublicationConfiguration : ITopicPublisher
{
public Func<CancellationToken, Task> StartupTask { get; }
public IMessagePublisher Publisher { get; }

public StaticPublicationConfiguration(
Func<CancellationToken, Task> startupTask,
IMessagePublisher publisher)
{
StartupTask = startupTask;
Publisher = publisher;
}

public static StaticPublicationConfiguration Build<T>(
string topicName,
Dictionary<string, string> tags,
SnsWriteConfiguration writeConfiguration,
IAmazonSimpleNotificationService snsClient,
ILoggerFactory loggerFactory,
JustSayingBus bus)
{
var readConfiguration = new SqsReadConfiguration(SubscriptionType.ToTopic)
{
TopicName = topicName
};

readConfiguration.ApplyTopicNamingConvention<T>(bus.Config.TopicNamingConvention);

var eventPublisher = new SnsMessagePublisher(
snsClient,
bus.SerializationRegister,
loggerFactory,
bus.Config.MessageSubjectProvider)
{
MessageResponseLogger = bus.Config.MessageResponseLogger,
};

var snsTopic = new SnsTopicByName(
readConfiguration.TopicName,
snsClient,
loggerFactory)
{
Tags = tags
};

async Task StartupTask(CancellationToken cancellationToken)
{
if (writeConfiguration.Encryption != null)
{
await snsTopic.CreateWithEncryptionAsync(writeConfiguration.Encryption, cancellationToken)
.ConfigureAwait(false);
}
else
{
await snsTopic.CreateAsync(cancellationToken).ConfigureAwait(false);
}

await snsTopic.EnsurePolicyIsUpdatedAsync(bus.Config.AdditionalSubscriberAccounts)
.ConfigureAwait(false);

await snsTopic.ApplyTagsAsync(cancellationToken).ConfigureAwait(false);

eventPublisher.Arn = snsTopic.Arn;

loggerFactory.CreateLogger<StaticPublicationConfiguration>().LogInformation(
"Created SNS topic publisher on topic '{TopicName}' for message type '{MessageType}'.",
snsTopic.TopicName,
typeof(T));
}

return new StaticPublicationConfiguration(StartupTask, eventPublisher);
}
}
6 changes: 3 additions & 3 deletions src/JustSaying/Fluent/QueueSubscriptionBuilder`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal QueueSubscriptionBuilder()
/// The current <see cref="QueueSubscriptionBuilder{T}"/>.
/// </returns>
public QueueSubscriptionBuilder<T> WithDefaultQueue()
=> WithName(string.Empty);
=> WithQueueName(string.Empty);

/// <summary>
/// Configures the name of the queue.
Expand All @@ -62,7 +62,7 @@ public QueueSubscriptionBuilder<T> WithDefaultQueue()
/// <exception cref="ArgumentNullException">
/// <paramref name="name"/> is <see langword="null"/>.
/// </exception>
public QueueSubscriptionBuilder<T> WithName(string name)
public QueueSubscriptionBuilder<T> WithQueueName(string name)
{
QueueName = name ?? throw new ArgumentNullException(nameof(name));
return this;
Expand Down Expand Up @@ -212,4 +212,4 @@ void ISubscriptionBuilder<T>.Configure(
subscriptionConfig.TopicName,
subscriptionConfig.QueueName);
}
}
}
95 changes: 42 additions & 53 deletions src/JustSaying/Fluent/TopicPublicationBuilder`1.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Amazon;
using Amazon.SimpleNotificationService.Model;
using JustSaying.AwsTools;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.AwsTools.QueueCreation;
Expand Down Expand Up @@ -37,6 +38,12 @@ internal TopicPublicationBuilder()
/// </summary>
private string TopicName { get; set; } = string.Empty;

/// <summary>
/// Function that will produce a topic name dynamically from a Message at publish time.
/// If the topic doesn't exist, it will be created at that point.
/// </summary>
public Func<Message,string> TopicNameCustomizer { get; set; }

/// <summary>
/// Configures the SNS write configuration.
/// </summary>
Expand Down Expand Up @@ -126,12 +133,31 @@ public TopicPublicationBuilder<T> WithTag(string key, string value)
/// <exception cref="ArgumentNullException">
/// <paramref name="name"/> is <see langword="null"/>.
/// </exception>
public TopicPublicationBuilder<T> WithName(string name)
public TopicPublicationBuilder<T> WithTopicName(string name)
{
TopicName = name ?? throw new ArgumentNullException(nameof(name));
return this;
}

/// <summary>
/// Configures the name of the topic by calling this func at publish time to determine the name of the topic.
/// If the topic does not exist, it will be created on first publish.
/// </summary>
/// <param name="topicNameCustomizer">Function that will be called at publish time to determine the name of the target topic for this <see cref="T"/>.
/// <para>
/// For example: <c>WithTopicName(msg => $"{msg.Tenant}-mymessage")</c> with <c>msg.Tenant</c> of <c>["uk", "au"]</c> would
/// create topics <c>"uk-mymessage"</c> and <c>"au-mymessage"</c> when a message is published with those tenants.
/// </para>
/// </param>
/// <returns>
/// The current <see cref="TopicSubscriptionBuilder{T}"/>.
/// </returns>
public TopicPublicationBuilder<T> WithTopicName(Func<Message, string> topicNameCustomizer)
{
TopicNameCustomizer = topicNameCustomizer;
return this;
}

/// <inheritdoc />
void IPublicationBuilder<T>.Configure(
JustSayingBus bus,
Expand All @@ -143,65 +169,28 @@ void IPublicationBuilder<T>.Configure(
logger.LogInformation("Adding SNS publisher for message type '{MessageType}'.",
typeof(T));

var config = bus.Config;
var region = config.Region ?? throw new InvalidOperationException($"Config cannot have a blank entry for the {nameof(config.Region)} property.");
var region = bus.Config.Region ?? throw new InvalidOperationException($"Config cannot have a blank entry for the {nameof(bus.Config.Region)} property.");

var readConfiguration = new SqsReadConfiguration(SubscriptionType.ToTopic)
{
TopicName = TopicName
};
var writeConfiguration = new SnsWriteConfiguration();
ConfigureWrites?.Invoke(writeConfiguration);
readConfiguration.ApplyTopicNamingConvention<T>(config.TopicNamingConvention);

bus.SerializationRegister.AddSerializer<T>();

var eventPublisher = new SnsMessagePublisher(
proxy.GetAwsClientFactory().GetSnsClient(RegionEndpoint.GetBySystemName(region)),
bus.SerializationRegister,
loggerFactory,
config.MessageSubjectProvider)
{
MessageResponseLogger = config.MessageResponseLogger,
};

#pragma warning disable 618
var snsTopic = new SnsTopicByName(
readConfiguration.TopicName,
proxy.GetAwsClientFactory().GetSnsClient(RegionEndpoint.GetBySystemName(region)),
loggerFactory)
{
Tags = Tags
};
#pragma warning restore 618
var client = proxy.GetAwsClientFactory().GetSnsClient(RegionEndpoint.GetBySystemName(region));

async Task StartupTask(CancellationToken cancellationToken)
{
if (writeConfiguration.Encryption != null)
{
await snsTopic.CreateWithEncryptionAsync(writeConfiguration.Encryption, cancellationToken)
.ConfigureAwait(false);
}
else
{
await snsTopic.CreateAsync(cancellationToken).ConfigureAwait(false);
}

await snsTopic.EnsurePolicyIsUpdatedAsync(config.AdditionalSubscriberAccounts)
.ConfigureAwait(false);

await snsTopic.ApplyTagsAsync(cancellationToken).ConfigureAwait(false);

eventPublisher.Arn = snsTopic.Arn;
}
StaticPublicationConfiguration BuildConfiguration(string topicName)
=> StaticPublicationConfiguration.Build<T>(topicName,
Tags,
writeConfiguration,
client,
loggerFactory,
bus);

bus.AddStartupTask(StartupTask);
ITopicPublisher config = TopicNameCustomizer != null
? DynamicPublicationConfiguration.Build<T>(TopicNameCustomizer, BuildConfiguration, loggerFactory)
: BuildConfiguration(TopicName);

bus.AddMessagePublisher<T>(eventPublisher);
bus.AddStartupTask(config.StartupTask);
bus.AddMessagePublisher<T>(config.Publisher);

logger.LogInformation(
"Created SNS topic publisher on topic '{TopicName}' for message type '{MessageType}'.",
readConfiguration.TopicName,
typeof(T));
bus.SerializationRegister.AddSerializer<T>();
}
}
4 changes: 2 additions & 2 deletions src/JustSaying/Fluent/TopicSubscriptionBuilder`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal TopicSubscriptionBuilder()
/// The current <see cref="TopicSubscriptionBuilder{T}"/>.
/// </returns>
public TopicSubscriptionBuilder<T> IntoDefaultTopic()
=> WithName(string.Empty);
=> WithQueueName(string.Empty);

/// <summary>
/// Configures the name of the queue that will be subscribed to.
Expand All @@ -62,7 +62,7 @@ public TopicSubscriptionBuilder<T> IntoDefaultTopic()
/// <exception cref="ArgumentNullException">
/// <paramref name="name"/> is <see langword="null"/>.
/// </exception>
public TopicSubscriptionBuilder<T> WithName(string name)
public TopicSubscriptionBuilder<T> WithQueueName(string name)
{
QueueName = name ?? throw new ArgumentNullException(nameof(name));
return this;
Expand Down
Loading

0 comments on commit 002db5b

Please sign in to comment.