diff --git a/JustSaying.IntegrationTests/AwsTools/WhenSettingUpMultipleHandlers.cs b/JustSaying.IntegrationTests/AwsTools/WhenSettingUpMultipleHandlers.cs index 86f97bce3..8aa7156ec 100644 --- a/JustSaying.IntegrationTests/AwsTools/WhenSettingUpMultipleHandlers.cs +++ b/JustSaying.IntegrationTests/AwsTools/WhenSettingUpMultipleHandlers.cs @@ -1,4 +1,5 @@ using System.Linq; +using System.Threading; using System.Threading.Tasks; using JustBehave; using JustSaying.AwsTools.QueueCreation; @@ -14,6 +15,7 @@ public class WhenSettingUpMultipleHandlers : XAsyncBehaviourTest Task.CompletedTask; @@ -41,13 +43,14 @@ protected override Task CreateSystemUnde .IntoQueue(baseQueueName) .WithMessageHandlers(new OrderHandler(), new OrderHandler()); - subscription.StartListening(); + _subscriberCts = new CancellationTokenSource(); + subscription.StartListening(_subscriberCts.Token); return Task.FromResult(subscription); } protected override Task PostAssertTeardownAsync() { - SystemUnderTest.StopListening(); + _subscriberCts.Cancel(); return Task.CompletedTask; } diff --git a/JustSaying.UnitTests/AwsTools/MessageHandling/MessageDispatcherTests/WhenDispatchingMessage.cs b/JustSaying.UnitTests/AwsTools/MessageHandling/MessageDispatcherTests/WhenDispatchingMessage.cs index d2a7bb720..e66f00c7b 100644 --- a/JustSaying.UnitTests/AwsTools/MessageHandling/MessageDispatcherTests/WhenDispatchingMessage.cs +++ b/JustSaying.UnitTests/AwsTools/MessageHandling/MessageDispatcherTests/WhenDispatchingMessage.cs @@ -1,139 +1,141 @@ -using System; -using System.Threading.Tasks; -using Amazon; -using Amazon.SQS; -using Amazon.SQS.Model; -using JustBehave; -using JustSaying.AwsTools.MessageHandling; -using JustSaying.Messaging.MessageProcessingStrategies; -using JustSaying.Messaging.MessageSerialisation; -using JustSaying.Messaging.Monitoring; -using JustSaying.TestingFramework; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using NSubstitute; -using NSubstitute.ExceptionExtensions; -using Xunit; -using Message = JustSaying.Models.Message; -using SQSMessage = Amazon.SQS.Model.Message; - -namespace JustSaying.UnitTests.AwsTools.MessageHandling.MessageDispatcherTests -{ - public class DummySqsQueue : SqsQueueBase - { - public DummySqsQueue(Uri uri, IAmazonSQS client) : base(RegionEndpoint.EUWest1, client) - { - Uri = uri; - } - - public override Task ExistsAsync() => Task.FromResult(true); - } - - public class WhenDispatchingMessage : XAsyncBehaviourTest - { - private const string ExpectedQueueUrl = "http://testurl.com/queue"; - - private readonly IMessageSerialisationRegister _serialisationRegister = Substitute.For(); - private readonly IMessageMonitor _messageMonitor = Substitute.For(); - private readonly Action _onError = Substitute.For>(); - private readonly HandlerMap _handlerMap = new HandlerMap(); - private readonly ILoggerFactory _loggerFactory = Substitute.For(); - private readonly ILogger _logger = Substitute.For(); - private readonly IMessageBackoffStrategy _messageBackoffStrategy = Substitute.For(); - private readonly IAmazonSQS _amazonSqsClient = Substitute.For(); - - private DummySqsQueue _queue; - private SQSMessage _sqsMessage; - private Message _typedMessage; - - protected override Task Given() - { - _typedMessage = new OrderAccepted(); - - _sqsMessage = new SQSMessage - { - Body = JsonConvert.SerializeObject(_typedMessage), - ReceiptHandle = "i_am_receipt_handle" - }; - - _loggerFactory.CreateLogger(Arg.Any()).Returns(_logger); - _queue = new DummySqsQueue(new Uri(ExpectedQueueUrl), _amazonSqsClient); - _serialisationRegister.DeserializeMessage(Arg.Any()).Returns(_typedMessage); - return Task.CompletedTask; - } - - protected override async Task When() => await SystemUnderTest.DispatchMessage(_sqsMessage, CancellationToken.None); - - protected override Task CreateSystemUnderTestAsync() - { - return Task.FromResult(new MessageDispatcher(_queue, _serialisationRegister, _messageMonitor, _onError, _handlerMap, _loggerFactory, _messageBackoffStrategy)); - } - - public class AndMessageProcessingSucceeds : WhenDispatchingMessage - { - protected override async Task Given() - { - await base.Given(); - _handlerMap.Add(typeof(OrderAccepted), m => Task.FromResult(true)); - } - - [Fact] - public void ShouldDeserializeMessage() - { - _serialisationRegister.Received(1).DeserializeMessage(Arg.Is(x => x == _sqsMessage.Body)); - } - - [Fact] - public void ShouldDeleteMessageIfHandledSuccessfully() - { - _amazonSqsClient.Received(1).DeleteMessageAsync(Arg.Is(x => x.QueueUrl == ExpectedQueueUrl && x.ReceiptHandle == _sqsMessage.ReceiptHandle)); - } - } - - public class AndMessageProcessingFails : WhenDispatchingMessage - { - private const int ExpectedReceiveCount = 1; - private readonly TimeSpan _expectedBackoffTimeSpan = TimeSpan.FromMinutes(4); - private readonly Exception _expectedException = new Exception("Something failed when processing"); - - protected override async Task Given() - { - await base.Given(); - _messageBackoffStrategy.GetBackoffDuration(_typedMessage, 1, _expectedException).Returns(_expectedBackoffTimeSpan); - _handlerMap.Add(typeof(OrderAccepted), m => throw _expectedException); - _sqsMessage.Attributes.Add(MessageSystemAttributeName.ApproximateReceiveCount, ExpectedReceiveCount.ToString()); - } - - [Fact] - public void ShouldInvokeMessageBackoffStrategyWithNumberOfReceives() - { - _messageBackoffStrategy.Received(1).GetBackoffDuration(Arg.Is(_typedMessage), Arg.Is(ExpectedReceiveCount), Arg.Is(_expectedException)); - } - - [Fact] - public void ShouldUpdateMessageVisibility() - { - _amazonSqsClient.Received(1).ChangeMessageVisibilityAsync(Arg.Is(x => x.QueueUrl == ExpectedQueueUrl && x.ReceiptHandle == _sqsMessage.ReceiptHandle && x.VisibilityTimeout == (int) _expectedBackoffTimeSpan.TotalSeconds)); - } - } - - public class AndUpdatingMessageVisibilityErrors : WhenDispatchingMessage - { - protected override async Task Given() - { - await base.Given(); - _messageBackoffStrategy.GetBackoffDuration(_typedMessage, Arg.Any()).Returns(TimeSpan.FromMinutes(4)); - _amazonSqsClient.ChangeMessageVisibilityAsync(Arg.Any()).Throws(new Exception("Something gone wrong")); - - _handlerMap.Add(typeof(OrderAccepted), m => Task.FromResult(false)); - _sqsMessage.Attributes.Add(MessageSystemAttributeName.ApproximateReceiveCount, "1"); - } - - [Fact] - public void ShouldLogException() - { - _logger.ReceivedWithAnyArgs().LogError(0, null, "msg"); - } - } - } -} +using System; +using System.Globalization; +using System.Threading; +using System.Threading.Tasks; +using Amazon; +using Amazon.SQS; +using Amazon.SQS.Model; +using JustBehave; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging.MessageProcessingStrategies; +using JustSaying.Messaging.MessageSerialisation; +using JustSaying.Messaging.Monitoring; +using JustSaying.TestingFramework; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using Xunit; +using Message = JustSaying.Models.Message; +using SQSMessage = Amazon.SQS.Model.Message; + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.MessageDispatcherTests +{ + public class DummySqsQueue : SqsQueueBase + { + public DummySqsQueue(Uri uri, IAmazonSQS client) : base(RegionEndpoint.EUWest1, client) + { + Uri = uri; + } + + public override Task ExistsAsync() => Task.FromResult(true); + } + + public class WhenDispatchingMessage : XAsyncBehaviourTest + { + private const string ExpectedQueueUrl = "http://testurl.com/queue"; + + private readonly IMessageSerialisationRegister _serialisationRegister = Substitute.For(); + private readonly IMessageMonitor _messageMonitor = Substitute.For(); + private readonly Action _onError = Substitute.For>(); + private readonly HandlerMap _handlerMap = new HandlerMap(); + private readonly ILoggerFactory _loggerFactory = Substitute.For(); + private readonly ILogger _logger = Substitute.For(); + private readonly IMessageBackoffStrategy _messageBackoffStrategy = Substitute.For(); + private readonly IAmazonSQS _amazonSqsClient = Substitute.For(); + + private DummySqsQueue _queue; + private SQSMessage _sqsMessage; + private Message _typedMessage; + + protected override Task Given() + { + _typedMessage = new OrderAccepted(); + + _sqsMessage = new SQSMessage + { + Body = JsonConvert.SerializeObject(_typedMessage), + ReceiptHandle = "i_am_receipt_handle" + }; + + _loggerFactory.CreateLogger(Arg.Any()).Returns(_logger); + _queue = new DummySqsQueue(new Uri(ExpectedQueueUrl), _amazonSqsClient); + _serialisationRegister.DeserializeMessage(Arg.Any()).Returns(_typedMessage); + return Task.CompletedTask; + } + + protected override async Task When() => await SystemUnderTest.DispatchMessage(_sqsMessage, CancellationToken.None); + + protected override Task CreateSystemUnderTestAsync() + { + return Task.FromResult(new MessageDispatcher(_queue, _serialisationRegister, _messageMonitor, _onError, _handlerMap, _loggerFactory, _messageBackoffStrategy)); + } + + public class AndMessageProcessingSucceeds : WhenDispatchingMessage + { + protected override async Task Given() + { + await base.Given(); + _handlerMap.Add(typeof(OrderAccepted), m => Task.FromResult(true)); + } + + [Fact] + public void ShouldDeserializeMessage() + { + _serialisationRegister.Received(1).DeserializeMessage(Arg.Is(x => x == _sqsMessage.Body)); + } + + [Fact] + public void ShouldDeleteMessageIfHandledSuccessfully() + { + _amazonSqsClient.Received(1).DeleteMessageAsync(Arg.Is(x => x.QueueUrl == ExpectedQueueUrl && x.ReceiptHandle == _sqsMessage.ReceiptHandle)); + } + } + + public class AndMessageProcessingFails : WhenDispatchingMessage + { + private const int ExpectedReceiveCount = 1; + private readonly TimeSpan _expectedBackoffTimeSpan = TimeSpan.FromMinutes(4); + private readonly Exception _expectedException = new Exception("Something failed when processing"); + + protected override async Task Given() + { + await base.Given(); + _messageBackoffStrategy.GetBackoffDuration(_typedMessage, 1, _expectedException).Returns(_expectedBackoffTimeSpan); + _handlerMap.Add(typeof(OrderAccepted), m => throw _expectedException); + _sqsMessage.Attributes.Add(MessageSystemAttributeName.ApproximateReceiveCount, ExpectedReceiveCount.ToString(CultureInfo.InvariantCulture)); + } + + [Fact] + public void ShouldInvokeMessageBackoffStrategyWithNumberOfReceives() + { + _messageBackoffStrategy.Received(1).GetBackoffDuration(Arg.Is(_typedMessage), Arg.Is(ExpectedReceiveCount), Arg.Is(_expectedException)); + } + + [Fact] + public void ShouldUpdateMessageVisibility() + { + _amazonSqsClient.Received(1).ChangeMessageVisibilityAsync(Arg.Is(x => x.QueueUrl == ExpectedQueueUrl && x.ReceiptHandle == _sqsMessage.ReceiptHandle && x.VisibilityTimeout == (int) _expectedBackoffTimeSpan.TotalSeconds)); + } + } + + public class AndUpdatingMessageVisibilityErrors : WhenDispatchingMessage + { + protected override async Task Given() + { + await base.Given(); + _messageBackoffStrategy.GetBackoffDuration(_typedMessage, Arg.Any()).Returns(TimeSpan.FromMinutes(4)); + _amazonSqsClient.ChangeMessageVisibilityAsync(Arg.Any()).Throws(new Exception("Something gone wrong")); + + _handlerMap.Add(typeof(OrderAccepted), m => Task.FromResult(false)); + _sqsMessage.Attributes.Add(MessageSystemAttributeName.ApproximateReceiveCount, "1"); + } + + [Fact] + public void ShouldLogException() + { + _logger.ReceivedWithAnyArgs().LogError(0, null, "msg"); + } + } + } +} diff --git a/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/BaseQueuePollingTest.cs b/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/BaseQueuePollingTest.cs index de43f4011..bc17fcd00 100644 --- a/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/BaseQueuePollingTest.cs +++ b/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/BaseQueuePollingTest.cs @@ -1,105 +1,105 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Amazon; -using Amazon.SQS; -using Amazon.SQS.Model; -using JustBehave; -using JustSaying.AwsTools.MessageHandling; -using JustSaying.Messaging.MessageHandling; -using JustSaying.Messaging.MessageSerialisation; -using JustSaying.Messaging.Monitoring; -using JustSaying.TestingFramework; -using JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener.Support; -using Microsoft.Extensions.Logging; -using NSubstitute; -using Shouldly; - -namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener -{ - public abstract class BaseQueuePollingTest : XAsyncBehaviourTest - { - protected const string QueueUrl = "http://testurl.com/queue"; - protected IAmazonSQS Sqs; - protected SimpleMessage DeserialisedMessage; - protected const string MessageBody = "object"; - protected IHandlerAsync Handler; - protected IMessageMonitor Monitor; - protected ILoggerFactory LoggerFactory; - protected IMessageSerialisationRegister SerialisationRegister; - protected IMessageLockAsync MessageLock; - protected readonly string MessageTypeString = typeof(SimpleMessage).ToString(); - - protected override Task CreateSystemUnderTestAsync() - { - var queue = new SqsQueueByUrl(RegionEndpoint.EUWest1, new Uri(QueueUrl), Sqs); - return Task.FromResult(new JustSaying.AwsTools.MessageHandling.SqsNotificationListener(queue, SerialisationRegister, Monitor, LoggerFactory, null, MessageLock)); - } - - protected override Task Given() - { - LoggerFactory = new LoggerFactory(); - Sqs = Substitute.For(); - SerialisationRegister = Substitute.For(); - Monitor = Substitute.For(); - Handler = Substitute.For>(); - LoggerFactory = Substitute.For(); - - var response = GenerateResponseMessage(MessageTypeString, Guid.NewGuid()); - - Sqs.ReceiveMessageAsync( - Arg.Any(), - Arg.Any()) - .Returns( - x => Task.FromResult(response), - x => Task.FromResult(new ReceiveMessageResponse())); - - DeserialisedMessage = new SimpleMessage { RaisingComponent = "Component" }; - SerialisationRegister.DeserializeMessage(Arg.Any()).Returns(DeserialisedMessage); - return Task.CompletedTask; - } - - protected override async Task When() - { - var doneSignal = new TaskCompletionSource(); - var signallingHandler = new SignallingHandler(doneSignal, Handler); - - SystemUnderTest.AddMessageHandler(() => signallingHandler); - var cts = new CancellationTokenSource(); - SystemUnderTest.Listen(cts.Token); - - // wait until it's done - var doneOk = await Tasks.WaitWithTimeoutAsync(doneSignal.Task); - - cts.Cancel(); - - doneOk.ShouldBeTrue("Timeout occured before done signal"); - } - - protected ReceiveMessageResponse GenerateResponseMessage(string messageType, Guid messageId) - { - return new ReceiveMessageResponse - { - Messages = new List - { - new Message - { - MessageId = messageId.ToString(), - Body = SqsMessageBody(messageType) - }, - new Message - { - MessageId = messageId.ToString(), - Body = "{\"Subject\":\"SOME_UNKNOWN_MESSAGE\"," + "\"Message\":\"SOME_RANDOM_MESSAGE\"}" - } - } - }; - } - - protected string SqsMessageBody(string messageType) - { - return "{\"Subject\":\"" + messageType + "\"," + "\"Message\":\"" + MessageBody + "\"}"; - } - } -} +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Amazon; +using Amazon.SQS; +using Amazon.SQS.Model; +using JustBehave; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging.MessageHandling; +using JustSaying.Messaging.MessageSerialisation; +using JustSaying.Messaging.Monitoring; +using JustSaying.TestingFramework; +using JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener.Support; +using Microsoft.Extensions.Logging; +using NSubstitute; +using Shouldly; + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener +{ + public abstract class BaseQueuePollingTest : XAsyncBehaviourTest + { + protected const string QueueUrl = "http://testurl.com/queue"; + protected IAmazonSQS Sqs; + protected SimpleMessage DeserialisedMessage; + protected const string MessageBody = "object"; + protected IHandlerAsync Handler; + protected IMessageMonitor Monitor; + protected ILoggerFactory LoggerFactory; + protected IMessageSerialisationRegister SerialisationRegister; + protected IMessageLockAsync MessageLock; + protected readonly string MessageTypeString = typeof(SimpleMessage).ToString(); + + protected override Task CreateSystemUnderTestAsync() + { + var queue = new SqsQueueByUrl(RegionEndpoint.EUWest1, new Uri(QueueUrl), Sqs); + return Task.FromResult(new JustSaying.AwsTools.MessageHandling.SqsNotificationListener(queue, SerialisationRegister, Monitor, LoggerFactory, null, MessageLock)); + } + + protected override Task Given() + { + LoggerFactory = new LoggerFactory(); + Sqs = Substitute.For(); + SerialisationRegister = Substitute.For(); + Monitor = Substitute.For(); + Handler = Substitute.For>(); + LoggerFactory = Substitute.For(); + + var response = GenerateResponseMessage(MessageTypeString, Guid.NewGuid()); + + Sqs.ReceiveMessageAsync( + Arg.Any(), + Arg.Any()) + .Returns( + x => Task.FromResult(response), + x => Task.FromResult(new ReceiveMessageResponse())); + + DeserialisedMessage = new SimpleMessage { RaisingComponent = "Component" }; + SerialisationRegister.DeserializeMessage(Arg.Any()).Returns(DeserialisedMessage); + return Task.CompletedTask; + } + + protected override async Task When() + { + var doneSignal = new TaskCompletionSource(); + var signallingHandler = new SignallingHandler(doneSignal, Handler); + + SystemUnderTest.AddMessageHandler(() => signallingHandler); + var cts = new CancellationTokenSource(); + SystemUnderTest.Listen(cts.Token); + + // wait until it's done + var doneOk = await Tasks.WaitWithTimeoutAsync(doneSignal.Task); + + cts.Cancel(); + + doneOk.ShouldBeTrue("Timeout occured before done signal"); + } + + protected ReceiveMessageResponse GenerateResponseMessage(string messageType, Guid messageId) + { + return new ReceiveMessageResponse + { + Messages = new List + { + new Message + { + MessageId = messageId.ToString(), + Body = SqsMessageBody(messageType) + }, + new Message + { + MessageId = messageId.ToString(), + Body = "{\"Subject\":\"SOME_UNKNOWN_MESSAGE\"," + "\"Message\":\"SOME_RANDOM_MESSAGE\"}" + } + } + }; + } + + protected static string SqsMessageBody(string messageType) + { + return "{\"Subject\":\"" + messageType + "\"," + "\"Message\":\"" + MessageBody + "\"}"; + } + } +} diff --git a/JustSaying.UnitTests/JustSaying.UnitTests.csproj b/JustSaying.UnitTests/JustSaying.UnitTests.csproj index 0f50c2298..babb94453 100644 --- a/JustSaying.UnitTests/JustSaying.UnitTests.csproj +++ b/JustSaying.UnitTests/JustSaying.UnitTests.csproj @@ -1,25 +1,25 @@ - - - netcoreapp2.1 - + + + netcoreapp2.1 + - $(NoWarn);CA2007;CA1051;CA1707;CA1034;CA1812;CA1307 - - - - - - - - - - - - - + $(NoWarn);CA2007;CA1051;CA1707;CA1034;CA1812;CA1307;CA1001 + + + + + + + + + + + + + all runtime; build; native; contentfiles; analyzers - - - + + + diff --git a/JustSaying.UnitTests/JustSayingBus/WhenStartingThenStopping.cs b/JustSaying.UnitTests/JustSayingBus/WhenStartingThenStopping.cs deleted file mode 100644 index 2147086fb..000000000 --- a/JustSaying.UnitTests/JustSayingBus/WhenStartingThenStopping.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System.Threading.Tasks; -using JustSaying.Messaging; -using NSubstitute; -using Shouldly; -using Xunit; - -namespace JustSaying.UnitTests.JustSayingBus -{ - public class WhenStartingThenStopping : GivenAServiceBus - { - private INotificationSubscriber _subscriber1; - - protected override async Task Given() - { - await base.Given(); - _subscriber1 = Substitute.For(); - } - - protected override Task When() - { - SystemUnderTest.AddNotificationSubscriber("region1", _subscriber1); - SystemUnderTest.Start(); - SystemUnderTest.Stop(); - - return Task.CompletedTask; - } - - [Fact] - public void StateIsNotListening() - { - SystemUnderTest.Listening.ShouldBeFalse(); - } - - [Fact] - public void CallingStopTwiceDoesNotStopListeningTwice() - { - SystemUnderTest.Stop(); - _subscriber1.Received(1).StopListening(); - } - } -}