Skip to content

Commit

Permalink
Merge pull request #431 from martincostello/Builder-RFC
Browse files Browse the repository at this point in the history
BusBuilder prototype first-step
  • Loading branch information
martincostello authored Dec 13, 2018
2 parents aa3460f + 1b61c41 commit 6c0a5ab
Show file tree
Hide file tree
Showing 30 changed files with 2,744 additions and 6 deletions.
271 changes: 271 additions & 0 deletions JustSaying.IntegrationTests/Fluent/MessagingBusBuilderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JustSaying.Fluent;
using JustSaying.Messaging;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.Monitoring;
using JustSaying.Models;
using JustSaying.TestingFramework;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Shouldly;
using Xunit.Abstractions;

namespace JustSaying.IntegrationTests
{
public class MessagingBusBuilderTests
{
public MessagingBusBuilderTests(ITestOutputHelper outputHelper)
{
OutputHelper = outputHelper;
}

private ITestOutputHelper OutputHelper { get; }

[AwsFact]
public async Task Can_Create_Messaging_Bus_Fluently_For_A_Queue()
{
// Arrange
var services = new ServiceCollection()
.AddLogging((p) => p.AddXUnit(OutputHelper))
.AddJustSaying(
(builder) =>
{
builder.Client((options) => options.WithBasicCredentials("accessKey", "secretKey").WithServiceUri(TestEnvironment.SimulatorUrl))
.Messaging((options) => options.WithRegions("eu-west-1"))
.Publications((options) => options.WithQueue<QueueMessage>())
.Subscriptions((options) => options.ForQueue<QueueMessage>())
.Services((options) => options.WithMessageMonitoring(() => new MyMonitor()));
})
.AddJustSayingHandler<QueueMessage, QueueHandler>();

IServiceProvider serviceProvider = services.BuildServiceProvider();

IMessagePublisher publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
IMessagingBus listener = serviceProvider.GetRequiredService<IMessagingBus>();

using (var source = new CancellationTokenSource(TimeSpan.FromSeconds(20)))
{
// Act
listener.Start(source.Token);

var message = new QueueMessage();

await publisher.PublishAsync(message, source.Token);

// Assert
while (!source.IsCancellationRequested && !QueueHandler.MessageIds.Contains(message.Id))
{
await Task.Delay(TimeSpan.FromSeconds(0.2), source.Token);
}

QueueHandler.MessageIds.ShouldContain(message.Id);
}
}

[AwsFact]
public async Task Can_Create_Messaging_Bus_Fluently_For_A_Topic()
{
// Arrange
var services = new ServiceCollection()
.AddLogging((p) => p.AddXUnit(OutputHelper))
.AddJustSaying(
(builder) =>
{
builder.Client((options) => options.WithBasicCredentials("accessKey", "secretKey").WithServiceUri(TestEnvironment.SimulatorUrl))
.Messaging((options) => options.WithRegions("eu-west-1"))
.Publications((options) => options.WithTopic<TopicMessage>())
.Subscriptions((options) => options.ForTopic<TopicMessage>());
})
.AddJustSayingHandler<TopicMessage, TopicHandler>();

IServiceProvider serviceProvider = services.BuildServiceProvider();

IMessagePublisher publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
IMessagingBus listener = serviceProvider.GetRequiredService<IMessagingBus>();

using (var source = new CancellationTokenSource(TimeSpan.FromSeconds(20)))
{
// Act
listener.Start(source.Token);

var message = new TopicMessage();

await publisher.PublishAsync(message, source.Token);

// Assert
while (!source.IsCancellationRequested && !TopicHandler.MessageIds.Contains(message.Id))
{
await Task.Delay(TimeSpan.FromSeconds(0.2), source.Token);
}

TopicHandler.MessageIds.ShouldContain(message.Id);
}
}

[AwsFact]
public void Can_Create_Messaging_Bus()
{
// Arrange
var services = new ServiceCollection()
.AddLogging((p) => p.AddXUnit(OutputHelper))
.AddJustSaying("eu-west-1")
.AddJustSayingHandler<QueueMessage, QueueHandler>();

IServiceProvider serviceProvider = services.BuildServiceProvider();

IMessagePublisher publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
IMessagingBus listener = serviceProvider.GetRequiredService<IMessagingBus>();

using (var source = new CancellationTokenSource(TimeSpan.FromSeconds(20)))
{
// Act
listener.Start(source.Token);
}
}

[AwsFact]
public async Task Can_Create_Messaging_Bus_With_Contributors()
{
// Arrange
var services = new ServiceCollection()
.AddLogging((p) => p.AddXUnit(OutputHelper))
.AddJustSaying()
.AddSingleton<IMessageBusConfigurationContributor, AwsContributor>()
.AddSingleton<IMessageBusConfigurationContributor, MessagingContributor>()
.AddSingleton<IMessageBusConfigurationContributor, QueueContributor>()
.AddSingleton<IMessageBusConfigurationContributor, RegionContributor>()
.AddJustSayingHandler<QueueMessage, QueueHandler>()
.AddSingleton<MyMonitor>();

IServiceProvider serviceProvider = services.BuildServiceProvider();

IMessagePublisher publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
IMessagingBus listener = serviceProvider.GetRequiredService<IMessagingBus>();

using (var source = new CancellationTokenSource(TimeSpan.FromSeconds(20)))
{
// Act
listener.Start(source.Token);

var message = new QueueMessage();

await publisher.PublishAsync(message, source.Token);

// Assert
while (!source.IsCancellationRequested && !QueueHandler.MessageIds.Contains(message.Id))
{
await Task.Delay(TimeSpan.FromSeconds(0.2), source.Token);
}

QueueHandler.MessageIds.ShouldContain(message.Id);
}
}

private sealed class AwsContributor : IMessageBusConfigurationContributor
{
public void Configure(MessagingBusBuilder builder)
{
builder.Client(
(options) => options.WithSessionCredentials("accessKeyId", "secretKeyId", "token")
.WithServiceUri(TestEnvironment.SimulatorUrl));
}
}

private sealed class MessagingContributor : IMessageBusConfigurationContributor
{
public MessagingContributor(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}

private IServiceProvider ServiceProvider { get; }

public void Configure(MessagingBusBuilder builder)
{
builder.Services((p) => p.WithMessageMonitoring(ServiceProvider.GetRequiredService<MyMonitor>));
}
}

private sealed class QueueContributor : IMessageBusConfigurationContributor
{
public void Configure(MessagingBusBuilder builder)
{
builder.Publications((p) => p.WithQueue<QueueMessage>())
.Subscriptions((p) => p.ForQueue<QueueMessage>());
}
}

private sealed class RegionContributor : IMessageBusConfigurationContributor
{
public void Configure(MessagingBusBuilder builder)
{
builder.Messaging((p) => p.WithRegion("eu-west-1"));
}
}

private sealed class QueueMessage : Message
{
}

private sealed class QueueHandler : IHandlerAsync<QueueMessage>
{
internal static ConcurrentBag<Guid> MessageIds { get; } = new ConcurrentBag<Guid>();

public Task<bool> Handle(QueueMessage message)
{
MessageIds.Add(message.Id);
return Task.FromResult(true);
}
}

private sealed class TopicMessage : Message
{
}

private sealed class TopicHandler : IHandlerAsync<TopicMessage>
{
internal static ConcurrentBag<Guid> MessageIds { get; } = new ConcurrentBag<Guid>();

public Task<bool> Handle(TopicMessage message)
{
MessageIds.Add(message.Id);
return Task.FromResult(true);
}
}

private sealed class MyMonitor : IMessageMonitor
{
public void HandleException(Type messageType)
{
}

public void HandleThrottlingTime(long handleTimeMs)
{
}

public void HandleTime(long handleTimeMs)
{
}

public void IncrementThrottlingStatistic()
{
}

public void IssuePublishingMessage()
{
}

public void PublishMessageTime(long handleTimeMs)
{
}

public void ReceiveMessageTime(long handleTimeMs, string queueName, string region)
{
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>$(NoWarn);CA1001;CA1034;CA1052;CA1054;CA1063;CA1307;CA1816;CA1822;CA2007</NoWarn>
<NoWarn>$(NoWarn);CA1001;CA1034;CA1052;CA1054;CA1063;CA1307;CA1707;CA1812;CA1816;CA1822;CA2007</NoWarn>
</PropertyGroup>
<ItemGroup>
<Content Include="xunit.runner.json" CopyToOutputDirectory="PreserveNewest" />
Expand All @@ -15,6 +15,7 @@
<PackageReference Include="AWSSDK.Core" Version="3.3.25.3" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.3.1.11" />
<PackageReference Include="AWSSDK.SQS" Version="3.3.3.19" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="StructureMap" Version="4.7.0" />
Expand Down
2 changes: 2 additions & 0 deletions JustSaying.TestingFramework/LocalAwsClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public IAmazonSimpleNotificationService GetSnsClient(RegionEndpoint region)
var credentials = new AnonymousAWSCredentials();
var clientConfig = new AmazonSimpleNotificationServiceConfig
{
RegionEndpoint = region,
ServiceURL = ServiceUrl.ToString()
};

Expand All @@ -32,6 +33,7 @@ public IAmazonSQS GetSqsClient(RegionEndpoint region)
var credentials = new AnonymousAWSCredentials();
var clientConfig = new AmazonSQSConfig
{
RegionEndpoint = region,
ServiceURL = ServiceUrl.ToString()
};

Expand Down
2 changes: 1 addition & 1 deletion JustSaying/CreateMeABus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public static class CreateMeABus
public static Func<IAwsClientFactory> DefaultClientFactory { get; set; }
= () => new DefaultAwsClientFactory();

public static JustSayingFluentlyDependencies WithLogging(ILoggerFactory loggerFactory) =>
public static JustSayingFluentlyDependencies WithLogging(ILoggerFactory loggerFactory) =>
new JustSayingFluentlyDependencies { LoggerFactory = loggerFactory};
}
}
Loading

0 comments on commit 6c0a5ab

Please sign in to comment.