-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove blocking in IMessageProcessingStrategy #440
Remove blocking in IMessageProcessingStrategy #440
Conversation
Following this I'll rebase #426 |
ec0c695
to
afccf1f
Compare
return; | ||
} | ||
|
||
_ = Task.Run(() => ReleaseOnCompleted(action)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm:
It should still be the case that StartWorker
and HandleMessage
returning doesn't mean that the message processing has been completed, just that it has been started (that's the Task.Run
here ) and so when we loop over a list of messages in ListenLoop
, we're usually launching them in parallel. I think this is still true.
Although the semaphore can introduce latency between retrieving a message and processing it, due to throttling. That's not new in this PR, and should not stop it being merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we are just starting them, but in the case where we are maxed out (which can only happen when we have multiple queues sharing a processing strategy), they will wait at the semaphore. Today that is blocking, this change makes it async.
That latency is not ideal, an alternative is to always immediately process messages we've received. In this case the throttling concurrency can easily be processing many times more messages at once, but we could treat the concurrency level as an aspirational target, not a hard limit.
await ListenLoop(cancellationToken).ConfigureAwait(false); | ||
IsListening = false; | ||
_log.LogInformation($"Stopped Listening - {queueInfo}"); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 Nice!
... and tidied up the background launch of the ListenLoop.
This PR addresses the concern in #422