-
Notifications
You must be signed in to change notification settings - Fork 127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reader: don't decrement total_rdy on message receipt #179
Conversation
Thanks! I'm gonna read through the diff more carefully, but in the meantime tests are failing on Python 3.5 https://travis-ci.org/nsqio/pynsq/jobs/230827968 |
9d2f54c
to
fa348cf
Compare
D'oh! Fixed. |
nsq/reader.py
Outdated
@@ -358,7 +344,7 @@ def _maybe_update_rdy(self, conn): | |||
if self.backoff_timer.get_interval() or self.max_in_flight == 0: | |||
return | |||
|
|||
if conn.rdy <= 1 or conn.rdy < int(conn.last_rdy * 0.25): | |||
if conn.rdy == 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand this change but want to make sure — this is because we initially set RDY 1
and we want to make sure we adjust to an appropriate per-connection max-in-flight, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it's worth a comment since it took me a little while to figure this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Updating RDY
here used to serve two purposes: Periodically reupping RDY
when it dipped too low, and going from an initial throttled RDY 1
on startup to full-throttle connection-max-in-flight. Now only the second case is relevant.
nsq/reader.py
Outdated
@@ -665,10 +646,21 @@ def _redistribute_rdy_state(self): | |||
logger.info('[%s:%s] idle connection, giving up RDY count', conn.id, self.name) | |||
self._send_rdy(conn, 0) | |||
|
|||
conns = self.conns.values() | |||
|
|||
in_flight = [c for c in conns if c.in_flight] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't quite understand why we need to use in_flight
when total_rdy
should represent the same thing? The accounting for both total_rdy
and in_flight
used to happen in 2 subsequent lines in the same block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be further complicated because the max_in_flight
variable below is terribly named. Perhaps a better name would be available_rdy
?
nsq/reader.py
Outdated
@@ -677,7 +669,7 @@ def _redistribute_rdy_state(self): | |||
# We also don't attempt to avoid the connections who previously might have had RDY 1 | |||
# because it would be overly complicated and not actually worth it (ie. given enough | |||
# redistribution rounds it doesn't matter). | |||
possible_conns = list(self.conns.values()) | |||
possible_conns = [c for c in conns if not (c.in_flight or c.rdy)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here re: use of in_flight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider a case where max_in_flight
is less than the connection count and a task takes longer than low_rdy_idle_timeout
. Then we'll set RDY 0
on a connection even while it's still processing a message (https://github.com/nsqio/pynsq/blob/master/nsq/reader.py#L664). So we can end up in a situation where total_rdy
is strictly less than the total count of in-flight messages. If we go from from RDY 0
to RDY 1
on a connection that has messages available for delivery, we guarantee a max-in-flight violation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above last sentence should be:
If we go from from RDY 0
to RDY 1
on a connection (other than the one just set to RDY 0
) that has messages available for delivery, we guarantee a max-in-flight violation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is not difficult to trigger. All you need is:
max_in_flight
less than the number of connections- typical task duration longer than
low_rdy_idle_timeout
- msgs available on each connection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think that makes sense, but what about the section above? The sum of in_flight
can also be less than total_rdy
, in which case we'd enter this loop below thinking we have more than we actually do to give out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I think I misunderstood our earlier discussion to have meant that that was the kind of case you weren't worried about. The possibility you mention would be taken care of with:
in_flight_or_rdy = sum(max(c.in_flight, c.rdy) for c in conns)
if backoff_interval:
max_in_flight = max(0, 1 - in_flight_or_rdy)
else:
max_in_flight = self.max_in_flight - in_flight_or_rdy
That's essentially the logic I proposed in #177 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the proposed changes, the full diff would look like https://github.com/nsqio/pynsq/compare/master...alpaker:no-decr-total-rdy-2?diff=unified&expand=1&name=no-decr-total-rdy-2#diff-2a8bc85bf9c95f482da1eb0490a60251R654
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, let's update this PR with that change.
As per above, I also think we should prefix the max_in_flight
variable in this method with available_
.
Thanks!
d7867f4
to
ba6c2b4
Compare
@mreiferson Updated as discussed. (I went with your earlier suggestion of |
nsq/reader.py
Outdated
c = random.choice([c for c in conns if c.in_flight]) | ||
logger.info('[%s:%s] too many msgs in flight, giving up RDY count', c.id, self.name) | ||
self._send_rdy(c, 0) | ||
except IndexError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer not using exceptions for flow control here.
nsq/reader.py
Outdated
else: | ||
max_in_flight = self.max_in_flight - self.total_rdy | ||
available_rdy = self.max_in_flight - in_flight_or_rdy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we're using in_flight
and rdy
, I think this needs a max(0, ...)
too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both issues fixed.
redistribution logic accordingly. This brings reader behavior into agreement with nsqd behavior (compare nsqio/nsq#404) and removes an opportunity for max_in_flight violations (nsqio#177).
ba6c2b4
to
47d1693
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This brings reader behavior into agreement with nsqd behavior (after nsqio/nsq#404) and removes an opportunity for max_in_flight violations (#177).
This block was removed because it's redundant given
_redistributed_rdy()
and it's easier to maintain invariants without it.@mreiferson