Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve throttled processing strategy #546

Merged
merged 7 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener.
{
public class ThrowingBeforeMessageProcessingStrategy : IMessageProcessingStrategy
{
public int MaxWorkers => int.MaxValue;
public int MaxConcurrency => int.MaxValue;

public int AvailableWorkers
{
Expand All @@ -31,26 +31,26 @@ public ThrowingBeforeMessageProcessingStrategy(TaskCompletionSource<object> done
_doneSignal = doneSignal;
}

public Task WaitForAvailableWorkers()
public Task<int> WaitForAvailableWorkerAsync()
{
if (_firstTime)
{
_firstTime = false;
Fail();
}
return Task.FromResult(true);

return Task.FromResult(1);
}

public Task StartWorker(Func<Task> action, CancellationToken cancellationToken)
public Task<bool> StartWorkerAsync(Func<Task> action, CancellationToken cancellationToken)
{
return Task.CompletedTask;
return Task.FromResult(true);
}

private void Fail()
{
TaskHelpers.DelaySendDone(_doneSignal);
throw new TestException("Thrown by test ProcessMessage");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ namespace JustSaying.UnitTests.AwsTools.MessageHandling.SqsNotificationListener.
{
public class ThrowingDuringMessageProcessingStrategy : IMessageProcessingStrategy
{
public int MaxWorkers => int.MaxValue;

public int AvailableWorkers => int.MaxValue;
public int MaxConcurrency => int.MaxValue;

private readonly TaskCompletionSource<object> _doneSignal;
private bool _firstTime = true;
Expand All @@ -20,27 +18,26 @@ public ThrowingDuringMessageProcessingStrategy(TaskCompletionSource<object> done
_doneSignal = doneSignal;
}

public Task WaitForAvailableWorkers()
public Task<int> WaitForAvailableWorkerAsync()
{
return Task.CompletedTask;
return Task.FromResult(MaxConcurrency);
}

public Task StartWorker(Func<Task> action, CancellationToken cancellationToken)
public Task<bool> StartWorkerAsync(Func<Task> action, CancellationToken cancellationToken)
{
if (_firstTime)
{
_firstTime = false;
return Fail();
}

return Task.CompletedTask;
return Task.FromResult(true);
}

private Task Fail()
private Task<bool> Fail()
{
TaskHelpers.DelaySendDone(_doneSignal);
throw new TestException("Thrown by test ProcessMessage");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using JustSaying.Messaging.MessageProcessingStrategies;
using JustSaying.Messaging.Monitoring;
using Microsoft.Extensions.Logging;
using NSubstitute;
using Shouldly;
using Xunit;
Expand All @@ -26,25 +27,67 @@ public void Increment()

public class MessageLoopTests
{
private const int MinTaskDuration = 100;
private static readonly TimeSpan MinTaskDuration = TimeSpan.FromMilliseconds(100);
private const int TaskDurationVariance = 20;

private const int ConcurrencyLevel = 20;
private const int MaxAmazonBatchSize = 10;
private static readonly TimeSpan StartTimeout = Timeout.InfiniteTimeSpan;

[Theory]
[InlineData(1)]
[InlineData(2)]
[InlineData(10)]
[InlineData(20)]
public async Task SimulatedListenLoop_ProcessedAllMessages(int numberOfMessagesToProcess)
[InlineData(1, 20)]
[InlineData(2, 20)]
[InlineData(10, 20)]
[InlineData(20, 20)]
public async Task SimulatedListenLoop_ProcessedAllMessages_In_Parallel(
int numberOfMessagesToProcess,
int concurrency)
{
var fakeMonitor = Substitute.For<IMessageMonitor>();
var messageProcessingStrategy = new Throttled(ConcurrencyLevel, fakeMonitor);
var options = new ThrottledOptions()
{
MaxConcurrency = concurrency,
Logger = Substitute.For<ILogger>(),
MessageMonitor = Substitute.For<IMessageMonitor>(),
StartTimeout = StartTimeout,
ProcessMessagesSequentially = false,
};

var messageProcessingStrategy = new Throttled(options);
var counter = new ThreadSafeCounter();

var stopwatch = Stopwatch.StartNew();

var actions = BuildFakeIncomingMessages(numberOfMessagesToProcess, counter);
await ListenLoopExecuted(actions, messageProcessingStrategy);

stopwatch.Stop();

await Task.Delay(2000);

counter.Count.ShouldBe(numberOfMessagesToProcess);
}

[Theory]
[InlineData(1, 20)]
[InlineData(2, 20)]
[InlineData(10, 20)]
[InlineData(20, 20)]
public async Task SimulatedListenLoop_ProcessedAllMessages_Sequentially(
int numberOfMessagesToProcess,
int concurrency)
{
var options = new ThrottledOptions()
{
MaxConcurrency = concurrency,
Logger = Substitute.For<ILogger>(),
MessageMonitor = Substitute.For<IMessageMonitor>(),
StartTimeout = StartTimeout,
ProcessMessagesSequentially = true,
};

var messageProcessingStrategy = new Throttled(options);
var counter = new ThreadSafeCounter();

var watch = new Stopwatch();
watch.Start();
var watch = Stopwatch.StartNew();

var actions = BuildFakeIncomingMessages(numberOfMessagesToProcess, counter);
await ListenLoopExecuted(actions, messageProcessingStrategy);
Expand All @@ -64,36 +107,70 @@ public async Task SimulatedListenLoop_ProcessedAllMessages(int numberOfMessagesT
[InlineData(100, 90)]
[InlineData(30, 20)]
[InlineData(1000, 900)]
public async Task SimulatedListenLoop_WhenThrottlingOccurs_CallsMessageMonitor(int messageCount, int capacity)
public async Task SimulatedListenLoop_WhenThrottlingOccurs_CallsMessageMonitor(int messageCount, int concurrency)
{
messageCount.ShouldBeGreaterThan(capacity, "To cause throttling, message count must be over capacity");
messageCount.ShouldBeGreaterThan(concurrency, "To cause throttling, message count must be greater than concurrency.");

var fakeMonitor = Substitute.For<IMessageMonitor>();
var messageProcessingStrategy = new Throttled(capacity, fakeMonitor);

var options = new ThrottledOptions()
{
MaxConcurrency = concurrency,
Logger = Substitute.For<ILogger>(),
MessageMonitor = fakeMonitor,
StartTimeout = TimeSpan.FromTicks(1),
};

var messageProcessingStrategy = new Throttled(options);
var counter = new ThreadSafeCounter();
var tcs = new TaskCompletionSource<bool>();

var actions = BuildFakeIncomingMessages(messageCount, counter);
for (int i = 0; i < concurrency; i++)
{
(await messageProcessingStrategy.StartWorkerAsync(
async () => await tcs.Task,
CancellationToken.None)).ShouldBeTrue();
}

await ListenLoopExecuted(actions, messageProcessingStrategy);
messageProcessingStrategy.AvailableWorkers.ShouldBe(0);

for (int i = 0; i < messageCount - concurrency; i++)
{
(await messageProcessingStrategy.StartWorkerAsync(() => Task.CompletedTask, CancellationToken.None)).ShouldBeFalse();
}

messageProcessingStrategy.AvailableWorkers.ShouldBe(0);

tcs.SetResult(true);

(await messageProcessingStrategy.WaitForAvailableWorkerAsync()).ShouldBeGreaterThan(0);

fakeMonitor.Received().IncrementThrottlingStatistic();
fakeMonitor.Received().HandleThrottlingTime(Arg.Any<TimeSpan>());
}

[Theory]
[InlineData(1, 1)]
[InlineData(1, 2)]
[InlineData(2, 2)]
[InlineData(5, 10)]
[InlineData(9, 10)]
[InlineData(10, 50)]
[InlineData(50, 50)]
public async Task SimulatedListenLoop_WhenThrottlingDoesNotOccur_DoNotCallMessageMonitor(int messageCount, int capacity)
[InlineData(49, 50)]
public async Task SimulatedListenLoop_WhenThrottlingDoesNotOccur_DoNotCallMessageMonitor(int messageCount, int concurrency)
{
messageCount.ShouldBeLessThanOrEqualTo(capacity,
"To avoid throttling, message count must be not be over capacity");
messageCount.ShouldBeLessThanOrEqualTo(concurrency,
"To avoid throttling, message count must be not be greater than capacity.");

var fakeMonitor = Substitute.For<IMessageMonitor>();
var messageProcessingStrategy = new Throttled(capacity, fakeMonitor);

var options = new ThrottledOptions()
{
MaxConcurrency = concurrency,
Logger = Substitute.For<ILogger>(),
MessageMonitor = fakeMonitor,
StartTimeout = Timeout.InfiniteTimeSpan,
};

var messageProcessingStrategy = new Throttled(options);
var counter = new ThreadSafeCounter();

var actions = BuildFakeIncomingMessages(messageCount, counter);
Expand All @@ -103,34 +180,58 @@ public async Task SimulatedListenLoop_WhenThrottlingDoesNotOccur_DoNotCallMessag
fakeMonitor.DidNotReceive().IncrementThrottlingStatistic();
}

private static async Task ListenLoopExecuted(Queue<Func<Task>> actions,
[Theory]
[InlineData(2, 2)]
[InlineData(10, 10)]
[InlineData(50, 50)]
public async Task SimulatedListenLoop_WhenThrottlingDoesNotOccur_CallsMessageMonitor_Once(int messageCount, int concurrency)
{
var fakeMonitor = Substitute.For<IMessageMonitor>();

var options = new ThrottledOptions()
{
MaxConcurrency = concurrency,
Logger = Substitute.For<ILogger>(),
MessageMonitor = fakeMonitor,
StartTimeout = Timeout.InfiniteTimeSpan,
};

var messageProcessingStrategy = new Throttled(options);
var counter = new ThreadSafeCounter();

var actions = BuildFakeIncomingMessages(messageCount, counter);

await ListenLoopExecuted(actions, messageProcessingStrategy);

fakeMonitor.Received(1).IncrementThrottlingStatistic();
}

private static async Task ListenLoopExecuted(
Queue<Func<Task>> actions,
IMessageProcessingStrategy messageProcessingStrategy)
{
var initalActionCount = (double)actions.Count;
var timeoutSeconds = MinTaskDuration + (initalActionCount / 100);
var timeout = TimeSpan.FromSeconds(timeoutSeconds);
var timeout = MinTaskDuration + TimeSpan.FromMilliseconds(initalActionCount / 100);
var stopwatch = Stopwatch.StartNew();

while (actions.Any())
{
var batch = GetFromFakeSnsQueue(actions, messageProcessingStrategy.AvailableWorkers);
var batch = GetFromFakeSnsQueue(actions, messageProcessingStrategy.MaxConcurrency);

foreach (var action in batch)
if (batch.Count < 1)
{
await messageProcessingStrategy.StartWorker(action, CancellationToken.None);
break;
}

if (!actions.Any())
foreach (var action in batch)
{
break;
(await messageProcessingStrategy.StartWorkerAsync(action, CancellationToken.None)).ShouldBeTrue();
}

messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThanOrEqualTo(0);
await messageProcessingStrategy.WaitForAvailableWorkers();
messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThan(0);
messageProcessingStrategy.MaxConcurrency.ShouldBeGreaterThanOrEqualTo(0);

stopwatch.Elapsed.ShouldBeLessThanOrEqualTo(timeout,
$"ListenLoopExecuted took longer than timeout of {timeoutSeconds}s, with {actions.Count} of {initalActionCount} messages remaining");
(await messageProcessingStrategy.WaitForAvailableWorkerAsync()).ShouldBeGreaterThan(0);
messageProcessingStrategy.MaxConcurrency.ShouldBeGreaterThan(0);
}
}

Expand All @@ -154,7 +255,7 @@ private static Queue<Func<Task>> BuildFakeIncomingMessages(int numberOfMessagesT
var actions = new Queue<Func<Task>>();
for (var i = 0; i != numberOfMessagesToCreate; i++)
{
var duration = MinTaskDuration + random.Next(TaskDurationVariance);
var duration = MinTaskDuration + TimeSpan.FromMilliseconds(random.Next(TaskDurationVariance));

var action = new Func<Task>(async () =>
{
Expand Down
Loading