Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threading review of SqsNotificationListener and Throttled Message Processing Strategy #422

Closed
slang25 opened this issue Nov 7, 2018 · 6 comments
Milestone

Comments

@slang25
Copy link
Member

slang25 commented Nov 7, 2018

This has come up in recent discussions, so here is a braindump so that we don't lose this.

Looking at IMessageProcessingStrategy, it has the following members:

int MaxWorkers { get; }
int AvailableWorkers { get; }
void StartWorker(Func<Task> action);
Task WaitForAvailableWorkers();

Imagine we have the default Throttled implementation, with 10 max workers, and 6 workers in-flight (invoking handlers).

When SqsNotificationListener wants to get messages it goes through the following process:

  1. Is AvailableWorkers > 0? (if not await WaitForAvailableWorkers)
    Here, yes, we have 4 available. We can continue.
  2. Long poll SQS for a maximum of 4 messages. We will either receive 4 messages, or wait a maximum of 20 second to receive up to 4 messages.
  3. Foreach message, call StartWorker with the handler (wrapped in a few bits and bobs).
  4. Go to step 1 immediately (while the handlers are inflight).
  5. (Step 1 again) Is AvailableWorkers > 0? No, now await WaitForAvailableWorkers

In isolation, this process is alright, and seems sensible. Where it gets tricky is when you share the IMessageProcessingStrategy and have multiple SqsNotificationListener going through this same workflow.
However, it is quite likely you will want to share an instance of IMessageProcessingStrategy, as you might want a concurrency level across multiple queues, or for your whole application.

There's a few problems with the IMessageProcessingStrategy interface as it stands when you have multiple listeners interacting with it:

  • When all workers are busy, you will likely have multiple listeners all waiting at the WaitForAvailableWorkers step.
    They will all be released simultaneously and race.
    They will all read the AvailableWorkers value, and likely see the same number (lets say 1).
    Once they receive the message, they will all try to call StartWorker, the first succeeding.
    In an old version of JustSaying (early 4), this race condition would cause Throttled to lose count, which wouldn't end well.
    In recent version, subsequent attempts to call StartWorker will block until there are available workers. (Blocking threadpool threads is bad)
  • There is a race condition where listeners are release from awaiting WaitForAvailableWorkers, then they check the AvailableWorkers count to know how many messages to request from SQS.
    When reading AvailableWorkers, this value could have changed and now might be 0, in which case an exception is thrown, logged and the loop continues.

The Throttled implementation uses SemaphoreSlim internally, this lets us guarantee the count is maintained in a thread safe way, and we can wait on it both synchronously and asynchronously.
StartWorker uses a Task constructor to wrap the async work, then started. This will the task on the threadpool. This behaves the same as Task.Run, but just less familiar.

My view is that the StartWorker implementation behaves as we desire (with exception of the blocking scenario), it may just allocate more than it needs to.
The throttling logic is a bit broken in it's design and could do with some drastic rethinking, and simplifying.

@shaynevanasperen
Copy link
Contributor

Another concern is that if you have scaled out your application to run on multiple machines, each instance runs in it's own "world" and doesn't know about the other instances, so you could have one instance being too greedy and fetching all the messages from a queue, leaving the other instances "starved".

@slang25
Copy link
Member Author

slang25 commented Nov 7, 2018

Yeah, we pretty much have to trust that SQS distributes the messages fairly. A very similar issue is being able to throttle across multiple machines to prevent some downstream service from getting too much load. Consul has the concept of a distributed semaphore that could be used for that, and you could build your own IMessageProcessingStrategy on top of that (there are still problems to solve with that though).

@AnthonySteele
Copy link
Contributor

AnthonySteele commented Nov 7, 2018

IMHO, it is not worth trying to achieve accurate distributed consensus on which machine has the most resources to process the next message - in the time taken, you could probably have processed the messages instead.

What we have to keep happening is that statistically, over time, they distribute the load fairly evenly. This is especially important under high load.

@slang25
Copy link
Member Author

slang25 commented Nov 13, 2018

I'd like to address the issue where we block under throttling and contention, this will only happen when you share an instance of IMessageProcessingStrategy across multiple subscriptions (I've seen this used a lot).

See this code:
https://github.com/justeat/JustSaying/blob/1aab3228bea66035cda5b22ac3acba8a8dbd1bc7/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs#L21-L38

We basically want line 24 to be awaited, this could be done by making this method async and return a Task, which is a breaking change but retains the current behaviour.

Option 1

public async Task StartWorker(Func<Task> action) // breaking change
{
    var messageProcessingTask = new Task<Task>(() => ReleaseOnCompleted(action));
    await _semaphore.WaitAsync();
    messageProcessingTask.Start();
}

private async Task ReleaseOnCompleted(Func<Task> action)
{
    try
    {
        await action();
    }
    finally
    {
        _semaphore.Release();
    }
}

One easy non-breaking alternative would be to move this wait into into the started task, like this:

Options 2

public void StartWorker(Func<Task> action)
{
    var messageProcessingTask = new Task<Task>(() => ReleaseOnCompleted(action));
    messageProcessingTask.Start(); // Could now be Task.Run
}

private async Task ReleaseOnCompleted(Func<Task> action)
{
    await _semaphore.WaitAsync();
    try
    {
        await action();
    }
    finally
    {
        _semaphore.Release();
    }
}

However this would now progress the listen loop to the WaitForAvailableWorkers inside step 1 of the original post. To retain the current behaviour, you'd want the await inside ReleaseOnCompleted to always take priority over the await in step 1, which would require a ManualResetEventSlim or maybe a larger rework of the internals of this class, so maybe not ideal, let's call this option 3.

Going back to option 1, we could add a new interface to not break the old one, but what would we call it? Maintaining 2 versions get more messy. I also feel like we would be building on top of a shaky foundation, because the interface is fundamentally flawed. It also seems wrong when you look at the signature, why should StartWorker need to be asynchronous, here it's just a hack to fix the bad design.

I'd welcome some thoughts on this, and other suggestions.

Update I've just noticed that the StartWorker signature is in the process of being changed as part of #403, so maybe this is the perfect opportunity to make this change in the coming release.

@shaynevanasperen
Copy link
Contributor

@stuart-lang We've already decided that the next version of JustSaying will have some breaking changes, so this should be fine.

@slang25
Copy link
Member Author

slang25 commented Nov 25, 2018

The blocking in IMessageProcessingStrategy has been removed, so closing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants