From 53a5d6f5969c1f390a0b12357fcdbb77b81a0a4a Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Mon, 19 Nov 2018 22:40:27 +0000 Subject: [PATCH 1/6] Make IMessageProcessingStrategy non-blocking for queued workers --- ...ThrowingBeforeMessageProcessingStrategy.cs | 111 +++--- ...ThrowingDuringMessageProcessingStrategy.cs | 90 ++--- .../MessageLoopTests.cs | 344 +++++++++--------- .../ThrottledTests.cs | 274 +++++++------- .../SqsNotificationListener.cs | 6 +- .../IMessageProcessingStrategy.cs | 70 ++-- .../MessageProcessingStrategies/Throttled.cs | 170 ++++----- 7 files changed, 534 insertions(+), 531 deletions(-) diff --git a/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/Support/ThrowingBeforeMessageProcessingStrategy.cs b/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/Support/ThrowingBeforeMessageProcessingStrategy.cs index ef47a3663..053bf7d34 100644 --- a/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/Support/ThrowingBeforeMessageProcessingStrategy.cs +++ b/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/Support/ThrowingBeforeMessageProcessingStrategy.cs @@ -1,55 +1,56 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using JustSaying.Messaging.MessageProcessingStrategies; -using JustSaying.TestingFramework; - -namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener.Support -{ - public class ThrowingBeforeMessageProcessingStrategy : IMessageProcessingStrategy - { - public int MaxWorkers => int.MaxValue; - - public int AvailableWorkers - { - get - { - if (_firstTime) - { - return 0; - } - - return int.MaxValue; - } - } - - private readonly TaskCompletionSource _doneSignal; - private bool _firstTime = true; - - public ThrowingBeforeMessageProcessingStrategy(TaskCompletionSource doneSignal) - { - _doneSignal = doneSignal; - } - - public Task WaitForAvailableWorkers() - { - if (_firstTime) - { - _firstTime = false; - Fail(); - } - return Task.FromResult(true); - } - - public void StartWorker(Func action, CancellationToken cancellationToken) - { - } - - private void Fail() - { - Tasks.DelaySendDone(_doneSignal); - throw new TestException("Thrown by test ProcessMessage"); - } - - } -} +using System; +using System.Threading; +using System.Threading.Tasks; +using JustSaying.Messaging.MessageProcessingStrategies; +using JustSaying.TestingFramework; + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener.Support +{ + public class ThrowingBeforeMessageProcessingStrategy : IMessageProcessingStrategy + { + public int MaxWorkers => int.MaxValue; + + public int AvailableWorkers + { + get + { + if (_firstTime) + { + return 0; + } + + return int.MaxValue; + } + } + + private readonly TaskCompletionSource _doneSignal; + private bool _firstTime = true; + + public ThrowingBeforeMessageProcessingStrategy(TaskCompletionSource doneSignal) + { + _doneSignal = doneSignal; + } + + public Task WaitForAvailableWorkers() + { + if (_firstTime) + { + _firstTime = false; + Fail(); + } + return Task.FromResult(true); + } + + public Task StartWorker(Func action, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + private void Fail() + { + Tasks.DelaySendDone(_doneSignal); + throw new TestException("Thrown by test ProcessMessage"); + } + + } +} diff --git a/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/Support/ThrowingDuringMessageProcessingStrategy.cs b/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/Support/ThrowingDuringMessageProcessingStrategy.cs index 3ad067b49..3f0d03041 100644 --- a/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/Support/ThrowingDuringMessageProcessingStrategy.cs +++ b/JustSaying.UnitTests/AwsTools/MessageHandling/SqsNotificationListener/Support/ThrowingDuringMessageProcessingStrategy.cs @@ -1,44 +1,46 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using JustSaying.Messaging.MessageProcessingStrategies; -using JustSaying.TestingFramework; - -namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener.Support -{ - public class ThrowingDuringMessageProcessingStrategy : IMessageProcessingStrategy - { - public int MaxWorkers => int.MaxValue; - - public int AvailableWorkers => int.MaxValue; - - private readonly TaskCompletionSource _doneSignal; - private bool _firstTime = true; - - public ThrowingDuringMessageProcessingStrategy(TaskCompletionSource doneSignal) - { - _doneSignal = doneSignal; - } - - public async Task WaitForAvailableWorkers() - { - await Task.Yield(); - } - - public void StartWorker(Func action, CancellationToken cancellationToken) - { - if (_firstTime) - { - _firstTime = false; - Fail(); - } - } - - private void Fail() - { - Tasks.DelaySendDone(_doneSignal); - throw new TestException("Thrown by test ProcessMessage"); - } - - } -} +using System; +using System.Threading; +using System.Threading.Tasks; +using JustSaying.Messaging.MessageProcessingStrategies; +using JustSaying.TestingFramework; + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener.Support +{ + public class ThrowingDuringMessageProcessingStrategy : IMessageProcessingStrategy + { + public int MaxWorkers => int.MaxValue; + + public int AvailableWorkers => int.MaxValue; + + private readonly TaskCompletionSource _doneSignal; + private bool _firstTime = true; + + public ThrowingDuringMessageProcessingStrategy(TaskCompletionSource doneSignal) + { + _doneSignal = doneSignal; + } + + public async Task WaitForAvailableWorkers() + { + await Task.Yield(); + } + + public Task StartWorker(Func action, CancellationToken cancellationToken) + { + if (_firstTime) + { + _firstTime = false; + return Fail(); + } + + return Task.CompletedTask; + } + + private Task Fail() + { + Tasks.DelaySendDone(_doneSignal); + throw new TestException("Thrown by test ProcessMessage"); + } + + } +} diff --git a/JustSaying.UnitTests/Messaging/MessageProcessingStrategies/MessageLoopTests.cs b/JustSaying.UnitTests/Messaging/MessageProcessingStrategies/MessageLoopTests.cs index 20cade719..59f04dfef 100644 --- a/JustSaying.UnitTests/Messaging/MessageProcessingStrategies/MessageLoopTests.cs +++ b/JustSaying.UnitTests/Messaging/MessageProcessingStrategies/MessageLoopTests.cs @@ -1,172 +1,172 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using JustSaying.Messaging.MessageProcessingStrategies; -using JustSaying.Messaging.Monitoring; -using NSubstitute; -using Shouldly; -using Xunit; - -namespace JustSaying.UnitTests.Messaging.MessageProcessingStrategies -{ - public class ThreadSafeCounter - { - private int _count; - - public void Increment() - { - Interlocked.Increment(ref _count); - } - - public int Count => _count; - } - - public class MessageLoopTests - { - private const int MinTaskDuration = 10; - private const int TaskDurationVariance = 20; - - private const int ConcurrencyLevel = 20; - private const int MaxAmazonBatchSize = 10; - - [Theory] - [InlineData(1)] - [InlineData(2)] - [InlineData(10)] - [InlineData(20)] - public async Task SimulatedListenLoop_ProcessedAllMessages(int numberOfMessagesToProcess) - { - var fakeMonitor = Substitute.For(); - var messageProcessingStrategy = new Throttled(ConcurrencyLevel, fakeMonitor); - var counter = new ThreadSafeCounter(); - - var watch = new Stopwatch(); - watch.Start(); - - var actions = BuildFakeIncomingMessages(numberOfMessagesToProcess, counter); - await ListenLoopExecuted(actions, messageProcessingStrategy); - - watch.Stop(); - - await Task.Yield(); - await Task.Delay(2000); - await Task.Yield(); - - counter.Count.ShouldBe(numberOfMessagesToProcess); - } - - [Theory] - [InlineData(2, 1)] - [InlineData(3, 2)] - [InlineData(6, 5)] - [InlineData(11, 10)] - [InlineData(100, 90)] - [InlineData(30, 20)] - [InlineData(1000, 900)] - public async Task SimulatedListenLoop_WhenThrottlingOccurs_CallsMessageMonitor(int messageCount, int capacity) - { - messageCount.ShouldBeGreaterThan(capacity, "To cause throttling, message count must be over capacity"); - - var fakeMonitor = Substitute.For(); - var messageProcessingStrategy = new Throttled(capacity, fakeMonitor); - var counter = new ThreadSafeCounter(); - - var actions = BuildFakeIncomingMessages(messageCount, counter); - - await ListenLoopExecuted(actions, messageProcessingStrategy); - - fakeMonitor.Received().IncrementThrottlingStatistic(); - fakeMonitor.Received().HandleThrottlingTime(Arg.Any()); - } - - [Theory] - [InlineData(1, 1)] - [InlineData(1, 2)] - [InlineData(2, 2)] - [InlineData(5, 10)] - [InlineData(10, 50)] - [InlineData(50, 50)] - public async Task SimulatedListenLoop_WhenThrottlingDoesNotOccur_DoNotCallMessageMonitor(int messageCount, int capacity) - { - messageCount.ShouldBeLessThanOrEqualTo(capacity, - "To avoid throttling, message count must be not be over capacity"); - - var fakeMonitor = Substitute.For(); - var messageProcessingStrategy = new Throttled(capacity, fakeMonitor); - var counter = new ThreadSafeCounter(); - - var actions = BuildFakeIncomingMessages(messageCount, counter); - - await ListenLoopExecuted(actions, messageProcessingStrategy); - - fakeMonitor.DidNotReceive().IncrementThrottlingStatistic(); - } - - private static async Task ListenLoopExecuted(Queue> actions, - IMessageProcessingStrategy messageProcessingStrategy) - { - var initalActionCount = actions.Count; - var timeoutSeconds = 10 + (initalActionCount / 100); - var timeout = new TimeSpan(0, 0, timeoutSeconds); - var stopwatch = Stopwatch.StartNew(); - - while (actions.Any()) - { - var batch = GetFromFakeSnsQueue(actions, messageProcessingStrategy.AvailableWorkers); - - foreach (var action in batch) - { - messageProcessingStrategy.StartWorker(action, CancellationToken.None); - } - - if (!actions.Any()) - { - break; - } - - messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThanOrEqualTo(0); - await messageProcessingStrategy.WaitForAvailableWorkers(); - messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThan(0); - - stopwatch.Elapsed.ShouldBeLessThanOrEqualTo(timeout, - $"ListenLoopExecuted took longer than timeout of {timeoutSeconds}s, with {actions.Count} of {initalActionCount} messages remaining"); - } - } - - private static IList> GetFromFakeSnsQueue(Queue> actions, int requestedBatchSize) - { - var batchSize = Math.Min(requestedBatchSize, MaxAmazonBatchSize); - batchSize = Math.Min(batchSize, actions.Count); - - var batch = new List>(); - - for (var i = 0; i < batchSize; i++) - { - batch.Add(actions.Dequeue()); - } - return batch; - } - - private static Queue> BuildFakeIncomingMessages(int numberOfMessagesToCreate, ThreadSafeCounter counter) - { - var random = new Random(); - var actions = new Queue>(); - for (var i = 0; i != numberOfMessagesToCreate; i++) - { - var duration = MinTaskDuration + random.Next(TaskDurationVariance); - - var action = new Func(async () => - { - await Task.Delay(duration); - counter.Increment(); - }); - actions.Enqueue(action); - } - - return actions; - } - } -} +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JustSaying.Messaging.MessageProcessingStrategies; +using JustSaying.Messaging.Monitoring; +using NSubstitute; +using Shouldly; +using Xunit; + +namespace JustSaying.UnitTests.Messaging.MessageProcessingStrategies +{ + public class ThreadSafeCounter + { + private int _count; + + public void Increment() + { + Interlocked.Increment(ref _count); + } + + public int Count => _count; + } + + public class MessageLoopTests + { + private const int MinTaskDuration = 10; + private const int TaskDurationVariance = 20; + + private const int ConcurrencyLevel = 20; + private const int MaxAmazonBatchSize = 10; + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(10)] + [InlineData(20)] + public async Task SimulatedListenLoop_ProcessedAllMessages(int numberOfMessagesToProcess) + { + var fakeMonitor = Substitute.For(); + var messageProcessingStrategy = new Throttled(ConcurrencyLevel, fakeMonitor); + var counter = new ThreadSafeCounter(); + + var watch = new Stopwatch(); + watch.Start(); + + var actions = BuildFakeIncomingMessages(numberOfMessagesToProcess, counter); + await ListenLoopExecuted(actions, messageProcessingStrategy); + + watch.Stop(); + + await Task.Yield(); + await Task.Delay(2000); + await Task.Yield(); + + counter.Count.ShouldBe(numberOfMessagesToProcess); + } + + [Theory] + [InlineData(2, 1)] + [InlineData(3, 2)] + [InlineData(6, 5)] + [InlineData(11, 10)] + [InlineData(100, 90)] + [InlineData(30, 20)] + [InlineData(1000, 900)] + public async Task SimulatedListenLoop_WhenThrottlingOccurs_CallsMessageMonitor(int messageCount, int capacity) + { + messageCount.ShouldBeGreaterThan(capacity, "To cause throttling, message count must be over capacity"); + + var fakeMonitor = Substitute.For(); + var messageProcessingStrategy = new Throttled(capacity, fakeMonitor); + var counter = new ThreadSafeCounter(); + + var actions = BuildFakeIncomingMessages(messageCount, counter); + + await ListenLoopExecuted(actions, messageProcessingStrategy); + + fakeMonitor.Received().IncrementThrottlingStatistic(); + fakeMonitor.Received().HandleThrottlingTime(Arg.Any()); + } + + [Theory] + [InlineData(1, 1)] + [InlineData(1, 2)] + [InlineData(2, 2)] + [InlineData(5, 10)] + [InlineData(10, 50)] + [InlineData(50, 50)] + public async Task SimulatedListenLoop_WhenThrottlingDoesNotOccur_DoNotCallMessageMonitor(int messageCount, int capacity) + { + messageCount.ShouldBeLessThanOrEqualTo(capacity, + "To avoid throttling, message count must be not be over capacity"); + + var fakeMonitor = Substitute.For(); + var messageProcessingStrategy = new Throttled(capacity, fakeMonitor); + var counter = new ThreadSafeCounter(); + + var actions = BuildFakeIncomingMessages(messageCount, counter); + + await ListenLoopExecuted(actions, messageProcessingStrategy); + + fakeMonitor.DidNotReceive().IncrementThrottlingStatistic(); + } + + private static async Task ListenLoopExecuted(Queue> actions, + IMessageProcessingStrategy messageProcessingStrategy) + { + var initalActionCount = actions.Count; + var timeoutSeconds = 10 + (initalActionCount / 100); + var timeout = new TimeSpan(0, 0, timeoutSeconds); + var stopwatch = Stopwatch.StartNew(); + + while (actions.Any()) + { + var batch = GetFromFakeSnsQueue(actions, messageProcessingStrategy.AvailableWorkers); + + foreach (var action in batch) + { + await messageProcessingStrategy.StartWorker(action, CancellationToken.None); + } + + if (!actions.Any()) + { + break; + } + + messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThanOrEqualTo(0); + await messageProcessingStrategy.WaitForAvailableWorkers(); + messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThan(0); + + stopwatch.Elapsed.ShouldBeLessThanOrEqualTo(timeout, + $"ListenLoopExecuted took longer than timeout of {timeoutSeconds}s, with {actions.Count} of {initalActionCount} messages remaining"); + } + } + + private static IList> GetFromFakeSnsQueue(Queue> actions, int requestedBatchSize) + { + var batchSize = Math.Min(requestedBatchSize, MaxAmazonBatchSize); + batchSize = Math.Min(batchSize, actions.Count); + + var batch = new List>(); + + for (var i = 0; i < batchSize; i++) + { + batch.Add(actions.Dequeue()); + } + return batch; + } + + private static Queue> BuildFakeIncomingMessages(int numberOfMessagesToCreate, ThreadSafeCounter counter) + { + var random = new Random(); + var actions = new Queue>(); + for (var i = 0; i != numberOfMessagesToCreate; i++) + { + var duration = MinTaskDuration + random.Next(TaskDurationVariance); + + var action = new Func(async () => + { + await Task.Delay(duration); + counter.Increment(); + }); + actions.Enqueue(action); + } + + return actions; + } + } +} diff --git a/JustSaying.UnitTests/Messaging/MessageProcessingStrategies/ThrottledTests.cs b/JustSaying.UnitTests/Messaging/MessageProcessingStrategies/ThrottledTests.cs index db834f8fd..efc02a33e 100644 --- a/JustSaying.UnitTests/Messaging/MessageProcessingStrategies/ThrottledTests.cs +++ b/JustSaying.UnitTests/Messaging/MessageProcessingStrategies/ThrottledTests.cs @@ -1,137 +1,137 @@ -using System.Threading; -using System.Threading.Tasks; -using JustSaying.Messaging.MessageProcessingStrategies; -using JustSaying.Messaging.Monitoring; -using NSubstitute; -using Shouldly; -using Xunit; - -namespace JustSaying.UnitTests.Messaging.MessageProcessingStrategies -{ - public class ThrottledTests - { - private readonly IMessageMonitor _fakeMonitor; - - public ThrottledTests() - { - _fakeMonitor = Substitute.For(); - } - - [Fact] - public void MaxWorkers_StartsAtCapacity() - { - var messageProcessingStrategy = new Throttled(123, _fakeMonitor); - - messageProcessingStrategy.MaxWorkers.ShouldBe(123); - } - - [Fact] - public void AvailableWorkers_StartsAtCapacity() - { - var messageProcessingStrategy = new Throttled(123, _fakeMonitor); - - messageProcessingStrategy.AvailableWorkers.ShouldBe(123); - } - - [Fact] - public async Task WhenATasksIsAdded_MaxWorkersIsUnaffected() - { - var messageProcessingStrategy = new Throttled(123, _fakeMonitor); - var tcs = new TaskCompletionSource(); - - messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); - - messageProcessingStrategy.MaxWorkers.ShouldBe(123); - - await AllowTasksToComplete(tcs); - } - - [Fact] - public async Task WhenATasksIsAdded_AvailableWorkersIsDecremented() - { - var messageProcessingStrategy = new Throttled(123, _fakeMonitor); - var tcs = new TaskCompletionSource(); - - messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); - - messageProcessingStrategy.AvailableWorkers.ShouldBe(122); - await AllowTasksToComplete(tcs); - } - - [Fact] - public async Task WhenATaskCompletes_AvailableWorkersIsIncremented() - { - var messageProcessingStrategy = new Throttled(3, _fakeMonitor); - var tcs = new TaskCompletionSource(); - - messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); - - messageProcessingStrategy.AvailableWorkers.ShouldBe(2); - - await AllowTasksToComplete(tcs); - - messageProcessingStrategy.MaxWorkers.ShouldBe(3); - messageProcessingStrategy.AvailableWorkers.ShouldBe(3); - } - - [Fact] - public async Task AvailableWorkers_CanReachZero() - { - const int capacity = 10; - var messageProcessingStrategy = new Throttled(capacity, _fakeMonitor); - var tcs = new TaskCompletionSource(); - - for (int i = 0; i < capacity; i++) - { - messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); - } - - messageProcessingStrategy.MaxWorkers.ShouldBe(capacity); - messageProcessingStrategy.AvailableWorkers.ShouldBe(0); - await AllowTasksToComplete(tcs); - } - - [Fact] - public async Task AvailableWorkers_CanGoToZeroAndBackToFull() - { - const int capacity = 10; - var messageProcessingStrategy = new Throttled(capacity, _fakeMonitor); - var tcs = new TaskCompletionSource(); - - for (int i = 0; i < capacity; i++) - { - messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); - } - - messageProcessingStrategy.AvailableWorkers.ShouldBe(0); - - await AllowTasksToComplete(tcs); - - messageProcessingStrategy.AvailableWorkers.ShouldBe(capacity); - } - - [Fact] - public async Task AvailableWorkers_IsNeverNegative() - { - const int capacity = 10; - var messageProcessingStrategy = new Throttled(capacity, _fakeMonitor); - var tcs = new TaskCompletionSource(); - - - for (int i = 0; i < capacity; i++) - { - messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); - messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThanOrEqualTo(0); - } - - await AllowTasksToComplete(tcs); - } - - private static async Task AllowTasksToComplete(TaskCompletionSource doneSignal) - { - doneSignal.SetResult(null); - await Task.Yield(); - await Task.Delay(100); - } - } -} +using System.Threading; +using System.Threading.Tasks; +using JustSaying.Messaging.MessageProcessingStrategies; +using JustSaying.Messaging.Monitoring; +using NSubstitute; +using Shouldly; +using Xunit; + +namespace JustSaying.UnitTests.Messaging.MessageProcessingStrategies +{ + public class ThrottledTests + { + private readonly IMessageMonitor _fakeMonitor; + + public ThrottledTests() + { + _fakeMonitor = Substitute.For(); + } + + [Fact] + public void MaxWorkers_StartsAtCapacity() + { + var messageProcessingStrategy = new Throttled(123, _fakeMonitor); + + messageProcessingStrategy.MaxWorkers.ShouldBe(123); + } + + [Fact] + public void AvailableWorkers_StartsAtCapacity() + { + var messageProcessingStrategy = new Throttled(123, _fakeMonitor); + + messageProcessingStrategy.AvailableWorkers.ShouldBe(123); + } + + [Fact] + public async Task WhenATasksIsAdded_MaxWorkersIsUnaffected() + { + var messageProcessingStrategy = new Throttled(123, _fakeMonitor); + var tcs = new TaskCompletionSource(); + + await messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); + + messageProcessingStrategy.MaxWorkers.ShouldBe(123); + + await AllowTasksToComplete(tcs); + } + + [Fact] + public async Task WhenATasksIsAdded_AvailableWorkersIsDecremented() + { + var messageProcessingStrategy = new Throttled(123, _fakeMonitor); + var tcs = new TaskCompletionSource(); + + await messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); + + messageProcessingStrategy.AvailableWorkers.ShouldBe(122); + await AllowTasksToComplete(tcs); + } + + [Fact] + public async Task WhenATaskCompletes_AvailableWorkersIsIncremented() + { + var messageProcessingStrategy = new Throttled(3, _fakeMonitor); + var tcs = new TaskCompletionSource(); + + await messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); + + messageProcessingStrategy.AvailableWorkers.ShouldBe(2); + + await AllowTasksToComplete(tcs); + + messageProcessingStrategy.MaxWorkers.ShouldBe(3); + messageProcessingStrategy.AvailableWorkers.ShouldBe(3); + } + + [Fact] + public async Task AvailableWorkers_CanReachZero() + { + const int capacity = 10; + var messageProcessingStrategy = new Throttled(capacity, _fakeMonitor); + var tcs = new TaskCompletionSource(); + + for (int i = 0; i < capacity; i++) + { + await messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); + } + + messageProcessingStrategy.MaxWorkers.ShouldBe(capacity); + messageProcessingStrategy.AvailableWorkers.ShouldBe(0); + await AllowTasksToComplete(tcs); + } + + [Fact] + public async Task AvailableWorkers_CanGoToZeroAndBackToFull() + { + const int capacity = 10; + var messageProcessingStrategy = new Throttled(capacity, _fakeMonitor); + var tcs = new TaskCompletionSource(); + + for (int i = 0; i < capacity; i++) + { + await messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); + } + + messageProcessingStrategy.AvailableWorkers.ShouldBe(0); + + await AllowTasksToComplete(tcs); + + messageProcessingStrategy.AvailableWorkers.ShouldBe(capacity); + } + + [Fact] + public async Task AvailableWorkers_IsNeverNegative() + { + const int capacity = 10; + var messageProcessingStrategy = new Throttled(capacity, _fakeMonitor); + var tcs = new TaskCompletionSource(); + + + for (int i = 0; i < capacity; i++) + { + await messageProcessingStrategy.StartWorker(() => tcs.Task, CancellationToken.None); + messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThanOrEqualTo(0); + } + + await AllowTasksToComplete(tcs); + } + + private static async Task AllowTasksToComplete(TaskCompletionSource doneSignal) + { + doneSignal.SetResult(null); + await Task.Yield(); + await Task.Delay(100); + } + } +} diff --git a/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs b/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs index 35d2d2525..6ad1d094c 100644 --- a/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs +++ b/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs @@ -177,7 +177,7 @@ internal async Task ListenLoop(CancellationToken ct) { foreach (var message in sqsMessageResponse.Messages) { - HandleMessage(message, ct); + await HandleMessage(message, ct).ConfigureAwait(false); } } } @@ -251,10 +251,10 @@ private async Task GetNumberOfMessagesToReadFromSqs() return numberOfMessagesToReadFromSqs; } - private void HandleMessage(Amazon.SQS.Model.Message message, CancellationToken ct) + private Task HandleMessage(Amazon.SQS.Model.Message message, CancellationToken ct) { var action = new Func(() => _messageDispatcher.DispatchMessage(message, ct)); - _messageProcessingStrategy.StartWorker(action, ct); + return _messageProcessingStrategy.StartWorker(action, ct); } public ICollection Subscribers { get; set; } diff --git a/JustSaying/Messaging/MessageProcessingStrategies/IMessageProcessingStrategy.cs b/JustSaying/Messaging/MessageProcessingStrategies/IMessageProcessingStrategy.cs index 86a97b62e..f16aeef00 100644 --- a/JustSaying/Messaging/MessageProcessingStrategies/IMessageProcessingStrategy.cs +++ b/JustSaying/Messaging/MessageProcessingStrategies/IMessageProcessingStrategy.cs @@ -1,35 +1,35 @@ -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace JustSaying.Messaging.MessageProcessingStrategies -{ - public interface IMessageProcessingStrategy - { - /// - /// The maximum number of worker tasks that will be used to run messages handlers at any one time - /// - int MaxWorkers { get; } - - /// - /// The number of worker tasks that are free to run messages handlers right now, - /// Always in the range 0 to MaxWorkers - /// the number of currently running workers will be = (MaxWorkers - AvailableWorkers) - /// - int AvailableWorkers { get; } - - /// - /// Launch a worker to start processing a message. - /// - /// - /// The cancellation token - void StartWorker(Func action, CancellationToken cancellationToken); - - /// - /// After awaiting this, you should be in a position to start another worker - /// i.e. AvailableWorkers should be above 0 - /// - /// - Task WaitForAvailableWorkers(); - } -} +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace JustSaying.Messaging.MessageProcessingStrategies +{ + public interface IMessageProcessingStrategy + { + /// + /// The maximum number of worker tasks that will be used to run messages handlers at any one time + /// + int MaxWorkers { get; } + + /// + /// The number of worker tasks that are free to run messages handlers right now, + /// Always in the range 0 to MaxWorkers + /// the number of currently running workers will be = (MaxWorkers - AvailableWorkers) + /// + int AvailableWorkers { get; } + + /// + /// Launch a worker to start processing a message. + /// + /// + /// The cancellation token + Task StartWorker(Func action, CancellationToken cancellationToken); + + /// + /// After awaiting this, you should be in a position to start another worker + /// i.e. AvailableWorkers should be above 0 + /// + /// + Task WaitForAvailableWorkers(); + } +} diff --git a/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs b/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs index 128dc257e..5eaa5b11c 100644 --- a/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs +++ b/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs @@ -1,85 +1,85 @@ -using System; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; -using JustSaying.Messaging.Monitoring; - -namespace JustSaying.Messaging.MessageProcessingStrategies -{ - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "Known issue")] - public class Throttled : IMessageProcessingStrategy - { - private readonly IMessageMonitor _messageMonitor; - private readonly SemaphoreSlim _semaphore; - - public Throttled(int maxWorkers, IMessageMonitor messageMonitor) - { - _messageMonitor = messageMonitor; - MaxWorkers = maxWorkers; - _semaphore = new SemaphoreSlim(maxWorkers, maxWorkers); - } - - public void StartWorker(Func action, CancellationToken cancellationToken) - { - var messageProcessingTask = new Task(() => ReleaseOnCompleted(action)); - - try - { - _semaphore.Wait(cancellationToken); - } - catch (OperationCanceledException) - { - return; - } - - messageProcessingTask.Start(); - } - - private async Task ReleaseOnCompleted(Func action) - { - try - { - await action().ConfigureAwait(false); - } - finally - { - _semaphore.Release(); - } - } - - - public async Task WaitForAvailableWorkers() - { - if (_semaphore.CurrentCount != 0) - return; - - _messageMonitor.IncrementThrottlingStatistic(); - - var watch = Stopwatch.StartNew(); - await AsTask(_semaphore.AvailableWaitHandle).ConfigureAwait(false); - watch.Stop(); - - _messageMonitor.HandleThrottlingTime(watch.ElapsedMilliseconds); - } - - private static Task AsTask(WaitHandle waitHandle) - { - var tcs = new TaskCompletionSource(); - - ThreadPool.RegisterWaitForSingleObject( - waitObject: waitHandle, - callBack: (o, timeout) => { tcs.SetResult(null); }, - state: null, - timeout: TimeSpan.FromMilliseconds(int.MaxValue), - executeOnlyOnce: true); - - return tcs.Task; - } - - public int MaxWorkers { get; } - public int AvailableWorkers - { - get { return _semaphore.CurrentCount; } - } - } -} +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using JustSaying.Messaging.Monitoring; + +namespace JustSaying.Messaging.MessageProcessingStrategies +{ + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "Known issue")] + public class Throttled : IMessageProcessingStrategy + { + private readonly IMessageMonitor _messageMonitor; + private readonly SemaphoreSlim _semaphore; + + public Throttled(int maxWorkers, IMessageMonitor messageMonitor) + { + _messageMonitor = messageMonitor; + MaxWorkers = maxWorkers; + _semaphore = new SemaphoreSlim(maxWorkers, maxWorkers); + } + + public async Task StartWorker(Func action, CancellationToken cancellationToken) + { + var messageProcessingTask = new Task(() => ReleaseOnCompleted(action)); + + try + { + await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } + + messageProcessingTask.Start(); + } + + private async Task ReleaseOnCompleted(Func action) + { + try + { + await action().ConfigureAwait(false); + } + finally + { + _semaphore.Release(); + } + } + + + public async Task WaitForAvailableWorkers() + { + if (_semaphore.CurrentCount != 0) + return; + + _messageMonitor.IncrementThrottlingStatistic(); + + var watch = Stopwatch.StartNew(); + await AsTask(_semaphore.AvailableWaitHandle).ConfigureAwait(false); + watch.Stop(); + + _messageMonitor.HandleThrottlingTime(watch.ElapsedMilliseconds); + } + + private static Task AsTask(WaitHandle waitHandle) + { + var tcs = new TaskCompletionSource(); + + ThreadPool.RegisterWaitForSingleObject( + waitObject: waitHandle, + callBack: (o, timeout) => { tcs.SetResult(null); }, + state: null, + timeout: TimeSpan.FromMilliseconds(int.MaxValue), + executeOnlyOnce: true); + + return tcs.Task; + } + + public int MaxWorkers { get; } + public int AvailableWorkers + { + get { return _semaphore.CurrentCount; } + } + } +} From 77dba4bbc91c63cf4d424b6da3b455357f6f74fe Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Mon, 19 Nov 2018 22:54:23 +0000 Subject: [PATCH 2/6] Simplify launch of ListenLoop --- .../SqsNotificationListener.cs | 50 +++---------------- 1 file changed, 8 insertions(+), 42 deletions(-) diff --git a/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs b/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs index 6ad1d094c..8ebab58f0 100644 --- a/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs +++ b/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs @@ -94,53 +94,19 @@ public void Listen(CancellationToken cancellationToken) var region = _queue.Region.SystemName; var queueInfo = $"Queue: {queue}, Region: {region}"; - Task.Factory.StartNew(async () => { await ListenLoop(cancellationToken).ConfigureAwait(false); }) - .Unwrap() - .ContinueWith(t => LogTaskEndState(t, queueInfo, _log)); + // Run task in background + // ListenLoop will cancel gracefully, so no need to pass cancellation token to Task.Run + _ = Task.Run(async () => + { + await ListenLoop(cancellationToken).ConfigureAwait(false); + IsListening = false; + _log.LogInformation($"Stopped Listening - {queueInfo}"); + }); IsListening = true; _log.LogInformation($"Starting Listening - {queueInfo}"); } - private void LogTaskEndState(Task task, string queueInfo, ILogger log) - { - IsListening = false; - - if (task.IsFaulted) - { - log.LogWarning($"[Faulted] Stopped Listening - {queueInfo}\n{AggregateExceptionDetails(task.Exception)}"); - } - else - { - log.LogInformation($"[{task.Status}] Stopped Listening - {queueInfo}"); - } - } - - private static string AggregateExceptionDetails(AggregateException ex) - { - var flatEx = ex.Flatten(); - - if (flatEx.InnerExceptions.Count == 0) - { - return "AggregateException containing no inner exceptions\n" + ex; - } - - if (flatEx.InnerExceptions.Count == 1) - { - return ex.InnerExceptions[0].ToString(); - } - - var innerExDetails = new StringBuilder(); - innerExDetails.AppendFormat(CultureInfo.InvariantCulture, - "AggregateException containing {0} inner exceptions", flatEx.InnerExceptions.Count); - foreach (var innerEx in flatEx.InnerExceptions) - { - innerExDetails.AppendLine(innerEx.ToString()); - } - - return innerExDetails.ToString(); - } - internal async Task ListenLoop(CancellationToken ct) { var queueName = _queue.QueueName; From c6cbb021d4b1aaa15d07044da010179b7e2b3e0c Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Mon, 19 Nov 2018 23:04:30 +0000 Subject: [PATCH 3/6] Update docs --- .../MessageProcessingStrategies/IMessageProcessingStrategy.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/JustSaying/Messaging/MessageProcessingStrategies/IMessageProcessingStrategy.cs b/JustSaying/Messaging/MessageProcessingStrategies/IMessageProcessingStrategy.cs index f16aeef00..47e267809 100644 --- a/JustSaying/Messaging/MessageProcessingStrategies/IMessageProcessingStrategy.cs +++ b/JustSaying/Messaging/MessageProcessingStrategies/IMessageProcessingStrategy.cs @@ -23,6 +23,10 @@ public interface IMessageProcessingStrategy /// /// /// The cancellation token + /// + /// A representing the asynchronous operation of queuing , + /// including waiting for a worker to become available. + /// Task StartWorker(Func action, CancellationToken cancellationToken); /// From fc3c1eb5f6e624f8e148881fdd2084f3fa055d16 Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Mon, 19 Nov 2018 23:06:52 +0000 Subject: [PATCH 4/6] Move CancellationToken to last parameter of private method --- .../AwsTools/MessageHandling/SqsNotificationListener.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs b/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs index 8ebab58f0..3bf6e4723 100644 --- a/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs +++ b/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs @@ -117,7 +117,7 @@ internal async Task ListenLoop(CancellationToken ct) { try { - sqsMessageResponse = await GetMessagesFromSqsQueue(ct, queueName, region).ConfigureAwait(false); + sqsMessageResponse = await GetMessagesFromSqsQueue(queueName, region, ct).ConfigureAwait(false); var messageCount = sqsMessageResponse.Messages.Count; _log.LogTrace( @@ -155,7 +155,7 @@ internal async Task ListenLoop(CancellationToken ct) } } - private async Task GetMessagesFromSqsQueue(CancellationToken ct, string queueName, string region) + private async Task GetMessagesFromSqsQueue(string queueName, string region, CancellationToken ct) { var numberOfMessagesToReadFromSqs = await GetNumberOfMessagesToReadFromSqs() .ConfigureAwait(false); From d2ed621e564c0dcfbfb65681f1aeb7efb23df066 Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Mon, 19 Nov 2018 23:11:01 +0000 Subject: [PATCH 5/6] Use more familiar way to launch workers --- JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs b/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs index 5eaa5b11c..e5f68df86 100644 --- a/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs +++ b/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs @@ -21,8 +21,6 @@ public Throttled(int maxWorkers, IMessageMonitor messageMonitor) public async Task StartWorker(Func action, CancellationToken cancellationToken) { - var messageProcessingTask = new Task(() => ReleaseOnCompleted(action)); - try { await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); @@ -32,7 +30,7 @@ public async Task StartWorker(Func action, CancellationToken cancellationT return; } - messageProcessingTask.Start(); + _ = Task.Run(() => ReleaseOnCompleted(action)); } private async Task ReleaseOnCompleted(Func action) From afccf1fa03e9a0d20b9554a1cf148ac1fad953fd Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Tue, 20 Nov 2018 08:25:03 +0000 Subject: [PATCH 6/6] Check for cancellation more frequently --- .../AwsTools/MessageHandling/SqsNotificationListener.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs b/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs index 3bf6e4723..c7e38c1fc 100644 --- a/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs +++ b/JustSaying/AwsTools/MessageHandling/SqsNotificationListener.cs @@ -143,6 +143,10 @@ internal async Task ListenLoop(CancellationToken ct) { foreach (var message in sqsMessageResponse.Messages) { + if (ct.IsCancellationRequested) + { + return; + } await HandleMessage(message, ct).ConfigureAwait(false); } }