Skip to content

Commit

Permalink
reduce duplicate rdy update requests
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Jan 16, 2020
1 parent 5d545f2 commit 5a5352c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
21 changes: 11 additions & 10 deletions nsq/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,17 @@ def upgrade_to_deflate(self):
self.encoder = DeflateEncoder(level=self.deflate_level)

def send_rdy(self, value):
try:
self.send(protocol.ready(value))
except Exception as e:
self.close()
self.trigger(
event.ERROR,
conn=self,
error=protocol.SendError('failed to send RDY %d' % value, e),
)
return False
if self.last_rdy != value:
try:
self.send(protocol.ready(value))
except Exception as e:
self.close()
self.trigger(
event.ERROR,
conn=self,
error=protocol.SendError('failed to send RDY %d' % value, e),
)
return False
self.last_rdy = value
self.rdy = value
return True
Expand Down
47 changes: 46 additions & 1 deletion tests/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,28 @@ def test_backoff_many_conns(mock_ioloop_current):
assert r.backoff_timer.get_interval() == 0

for c in conns:
filtered_expected_args = []
i = 0
while i < len(c.expected_args):
if i == 1 and c.expected_args[i] == b'RDY 0\n':
i += 1
continue
j = i
while c.expected_args[i].startswith(b'RDY') and j+1 < len(c.expected_args) and c.expected_args[j+1] == c.expected_args[i]:
j = j + 1
continue
rdy_found = False
for index in range(len(filtered_expected_args)-1, -1, -1):
arg = filtered_expected_args[index]
if arg.startswith(b'RDY'):
rdy_found = True
if arg != c.expected_args[i]:
filtered_expected_args.append(c.expected_args[i])
break
if not rdy_found:
filtered_expected_args.append(c.expected_args[i])
i = j + 1
c.expected_args = filtered_expected_args
for i, f in enumerate(c.stream.write.call_args_list):
print("%d: %s" % (i, f))
assert c.stream.write.call_args_list == [call(arg) for arg in c.expected_args]
Expand Down Expand Up @@ -454,6 +476,29 @@ def test_backoff_conns_disconnect(mock_ioloop_current):
assert r.backoff_timer.get_interval() == 0

for c in conns:
filtered_expected_args = []
i = 0
while i < len(c.expected_args):
if i == 1 and c.expected_args[i] == b'RDY 0\n':
i += 1
continue
j = i
while c.expected_args[i].startswith(b'RDY') and j+1 < len(c.expected_args) and c.expected_args[j+1] == c.expected_args[i]:
j = j + 1
continue
rdy_found = False
for index in range(len(filtered_expected_args)-1, -1, -1):
arg = filtered_expected_args[index]
if arg.startswith(b'RDY'):
rdy_found = True
if arg != c.expected_args[i]:
filtered_expected_args.append(c.expected_args[i])
break
if not rdy_found:
filtered_expected_args.append(c.expected_args[i])
print('###%s' % filtered_expected_args)
i = j + 1
c.expected_args = filtered_expected_args
for i, f in enumerate(c.stream.write.call_args_list):
print("%d: %s" % (i, f))
print("%d: %s, %s, %s" % (i, f, call(c.expected_args[i]), f==call(c.expected_args[i])))
assert c.stream.write.call_args_list == [call(arg) for arg in c.expected_args]

0 comments on commit 5a5352c

Please sign in to comment.