-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cancellable handler #571
Cancellable handler #571
Conversation
Add a new concept of a handler that can be cancelled if the message visibility timeout has expired.
Codecov Report
@@ Coverage Diff @@
## master #571 +/- ##
==========================================
- Coverage 38.98% 38.94% -0.05%
==========================================
Files 78 78
Lines 2783 2876 +93
Branches 488 508 +20
==========================================
+ Hits 1085 1120 +35
- Misses 1541 1585 +44
- Partials 157 171 +14
Continue to review full report at Codecov.
|
Just to add another option, it could renew the message visibility timeout: #291 |
To be honest I thought we already had that capability, so I've re-opened it. It's also something we'd probably want as opt-in? However, that's not what I was trying to achieve here. This is more of a reaction to a backed-up local queue (see #546). In the event of messages being sat on scheduled Tasks waiting to progress and not being processed, the chances are AWS has already considered the message "lost" and redelivered it somewhere else. By that point if the task does eventually run, it's probably dealing with what is now a duplicate, and a waste of processing. By observing the fact the timeout has probably elapsed, we can just not process the message at all and end the Task and release some resources back to the worker to try and make some headway through the queue. In that scenario, renewing the message visibility timeout would probably make things worse as it would leave messages on a backed-up local queue even longer? |
I would expect an "auto-renew" to be on by default, but you'd certainly want control. This is something that the PE team needed a long time back for their deployment tool, so we exposed the I happen to have the opposite view to you on this 😄, I think it more directly solves the problem of local queues holding stale messages. Cancelling sounds more haphazard as a general pattern here. If one worker has the message, it has a duty to process it, and if it doesn't it will eventually become visible to others. I could see that cancelling before a handler gets the message would solve your problem, but adding cancellation to handlers feels wrong from an event-driven architecture perspective. If a hander is still working as (or soon before) a message visibility times out, it should let others know it's still working on it. It's "at least once" message delivery after all, cancellation doesn't belong here. |
But here, the cancellation is pre-handler. Once control is passed to the handler, it's just as it is today, except you have to opt-in to implementing the new interface, and then have to decide to observe the token to do something different in your handler implementation. This is just trying to improve the situation where the internals get stuck in a never-progressing backlog (which is what we had in the issue that lead to #546 as the If we wanted to do something to allow renewing the message visibility, I think that's something else that would need to be a first-class concept somewhere in the handlers to give you the hook to decide to renew/extend the visibility after the handler has already seen the message to make a decision. Maybe that could be morphed into a more detailed handler concept that did both separately, so for more advanced scenarios you'd get like this very-of-the-top-of-my-head thing: public interface ISuperDuperHandler<in T>
{
Task<bool> HandleAsync(HandlerContext<T> context);
}
public abstract class HandlerContext<T>
where T : Message
{
public CancellationToken CancellationToken { get; protected set; }
public T Message { get; protected set; }
public MessageContext MessageContext { get; protected set; }
public abstract async Task ExtendVisibilityAsync(TimeSpan value);
}
internal sealed class SuperDuperHandlerContext<T> : HandlerContext<T>
where T : Message
{
private _client;
public SuperDuperHandlerContext(
T message,
MessageContext messageContext,
SomeClient client)
{
Message = message;
MessageContext = messageContext;
_client = client;
}
public override async Task ExtendVisibilityAsync(TimeSpan value)
{
await _client.ExtendTheVisbilityAsync(Message.Id, value, CancellationToken);
}
} Then you could do stuff like this: public class MyHandler : ISuperDuperHandler<MyMessage>
{
public async Task<bool> HandleAsync(HandlerContext<T> context)
{
if (context.CancellationToken.IsCancellationRequested)
{
// Too old or shutting down, decide to junk it and let someone else handle it
return false;
}
if (context.Message.IsMessageHuge)
{
await context.ExtendVisibilityAsync(TimeSpan.FromHours(1));
}
await DoExpensiveStuffAsync(context.Message);
return true;
}
} |
Also want to add, I'm not saying that by default the behaviour is that messages are continually renewed in terms of visibility, there would be a configurable policy of how long / how many renews before giving up. Edit: this is not a response, just reading your reply now |
Ah, I was (or maybe still am) confused 😄 So if I understand you, If we want to let the user optionally know if the message had timed out, the cancellation token could lead to a little confusion, as the signature would imply that the ct is live, and you'd be expected to constantly check it and flow it for cooperative cancellation. Edit: No, I am still confused, they will get a live cancellation token, which may make people think cancelable handlers are a reasonable thing to do. I think this is a bad pattern vs renewing visibility locks, I get that pre-handler works as a short term workaround though. |
No, the interface would be to allow a developer to opt-in to doing something different in the handler if the message is considered "stale" or if the message loop is shutting down. To me, visibility locks is a different feature for people who need to process messages that may take a long time to handle in the handler. I don't think they're a solution to a backed-up internal queue that is operating with no context that could be potentially having messages lying around in memory that are dozens of minutes old. Effectively this makes this PR two different, related, things in one:
|
The way I see it, JustSaying grabs a handful of messages and will use the processing strategy to decide when to call the handlers, it's its duty to try to guarantee correctness (within reason), in this case by dropping the messages (FIFO as this change is), or with re-acquiring visibility locks. This wouldn't need much further complexity than what is done in this PR, there is still the same timer involved. (I sense I won't sell you in the short term) I see though that dropping the messages is a good solution, and is the minimal change. I'd add a suggestion of being able to add (or subtract rather) a buffer to the timeout, if I'm 0.1 seconds away from someone else picking up the message, and the timeout is 30 seconds, I'd probably want to still drop it. On the opt-in behaviour, the only thing I don't like is the way it's presented, when I look at the |
For some more reading in favour of extending timeouts, here's the guidance on the matter: The thing with dropping messages is that the messages being dropped will be mostly random, so it's very easy for messages to be dropped multiple times, exceeding the max receive count and ending up in a dead letter queue. If you extend locks, you generally get a much more ordered processing of messages, which will leave messages in queues where they are nice and safe. |
I think it's worth comparing a prototype of that guidance, when a message is received at the earliest occasion we create a heartbeat that renews the lock, it would be configurable and defaults to half the timeout, and extends by the timeout. So if it's 30 seconds, schedule a task in 15 secs to renew by 30 (45s cumulative), then again 15 secs later forever pushing out the lock until some overall timeout. Once the handler has completed, any heartbeat task is cancelled via a cts. Waddaya say? I can have a bash and we can compare notes? |
I'd be happy to drop the If you want to try a different tack and raise a PR I'll take a look. |
Thanks, I'll start work in a different PR (likely not today but soon). Sorry if this sounds negative (you know I love your work ❤️), but now I've considered it some more I think this PR should be closed, as it's trading off one problem for another an not really solving the issue, I hope my points in the previous comments have persuaded you too. Edit: Closing it a bit strong sorry, this PR doesn't worsen the situation as the messages would would be taken from the queue by multiple consumers regardless of whether the processing is skipped, it just seems like a workaround rather than a fix. |
I'll re-review when I get the chance and see what could be removed to make it simpler but still provide a mitigation for a runaway Task queue, as ultimately that issue would have brought down the worker due to a memory leak. Also waiting for anyone else to chip in with their thoughts 😄 |
Oh god, I've just encountered the same issue, what gives? The fork upstream link is gone from GH repos for JustSaying 😞 Edit: #573 👍 |
@@ -177,6 +179,13 @@ internal async Task ListenLoopAsync(CancellationToken ct) | |||
continue; | |||
} | |||
|
|||
if (sqsMessageResponse == null || sqsMessageResponse.Messages.Count < 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block is duplicated due to a bad merge and needs removing.
I had an idea this morning based on the changes in #546, so I thought I'd see how far I could get with it and submit a PR for discussion.
In the event that the issue that lead to #546 occurring and tasks to process messages back up, it's possible that a task to process a message may be trying to process a message that was received so long ago that the message visibility timeout has expired and it is now a potential duplicate waiting to be processed.
If the handler had a
CancellationToken
that cancels after the approximate message visibility timeout expires, JustSaying could enable two behaviours:false
to send it back to the queue (where it could have since been handled elsewhere anyway), or it could returntrue
on the basis, if this has expired I'll just ignore it and pretend I've dealt with it.This is implemented here by adding the
ICancellableHandlerAsync<T>
interface that derives fromIHandlerAsync<T>
, and then plumbing it in appropriately so it's used if implemented by the handler for a particular message and extending things like the exactly-once handler and stopwatch wrappers.Replaces #549.