Skip to content

Commit

Permalink
Merge pull request #243 from andyxning/reduce_duplicate_rdy_update_re…
Browse files Browse the repository at this point in the history
…quests

reduce duplicate rdy update requests
  • Loading branch information
ploxiln authored Mar 4, 2020
2 parents 5d545f2 + 86cbb1b commit 3f61477
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 27 deletions.
21 changes: 11 additions & 10 deletions nsq/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,17 @@ def upgrade_to_deflate(self):
self.encoder = DeflateEncoder(level=self.deflate_level)

def send_rdy(self, value):
try:
self.send(protocol.ready(value))
except Exception as e:
self.close()
self.trigger(
event.ERROR,
conn=self,
error=protocol.SendError('failed to send RDY %d' % value, e),
)
return False
if self.last_rdy != value:
try:
self.send(protocol.ready(value))
except Exception as e:
self.close()
self.trigger(
event.ERROR,
conn=self,
error=protocol.SendError('failed to send RDY %d' % value, e),
)
return False
self.last_rdy = value
self.rdy = value
return True
Expand Down
43 changes: 26 additions & 17 deletions tests/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def test_backoff_many_conns(mock_ioloop_current):
for i in range(num_conns):
conn = get_conn(r)
conn.expected_args = [b'SUB test test\n', b'RDY 1\n']
conn.last_exp_rdy = b'RDY 1\n'
conn.fails = 0
conns.append(conn)

Expand All @@ -271,23 +272,23 @@ def test_backoff_many_conns(mock_ioloop_current):
msg = send_message(conn)

if r.backoff_timer.get_interval() == 0:
conn.expected_args.append(b'RDY 1\n')
add_exp_rdy(conn, b'RDY 1\n')

if fail or not conn.fails:
msg.trigger(event.REQUEUE, message=msg)
total_fails += 1
conn.fails += 1

for c in conns:
c.expected_args.append(b'RDY 0\n')
add_exp_rdy(c, b'RDY 0\n')
conn.expected_args.append(b'REQ 1234 0\n')
else:
msg.trigger(event.FINISH, message=msg)
total_fails -= 1
conn.fails -= 1

for c in conns:
c.expected_args.append(b'RDY 0\n')
add_exp_rdy(c, b'RDY 0\n')
conn.expected_args.append(b'FIN 1234\n')

assert r.backoff_block is True
Expand All @@ -299,7 +300,7 @@ def test_backoff_many_conns(mock_ioloop_current):
conn = timeout_args[1]()
last_timeout_time = timeout_args[0]
assert r.backoff_block is False
conn.expected_args.append(b'RDY 1\n')
add_exp_rdy(conn, b'RDY 1\n')

fail = True
if random.random() < 0.3 and total_fails > 1:
Expand All @@ -313,9 +314,9 @@ def test_backoff_many_conns(mock_ioloop_current):
for c in conns:
if c.rdy > 0:
c.last_msg_timestamp = time.time() - 60
c.expected_args.append(b'RDY 0\n')
add_exp_rdy(c, b'RDY 0\n')
conn = r._redistribute_rdy_state()
conn.expected_args.append(b'RDY 1\n')
add_exp_rdy(conn, b'RDY 1\n')
continue

msg = send_message(conn)
Expand All @@ -326,10 +327,10 @@ def test_backoff_many_conns(mock_ioloop_current):

if total_fails > 0:
for c in conns:
c.expected_args.append(b'RDY 0\n')
add_exp_rdy(c, b'RDY 0\n')
else:
for c in conns:
c.expected_args.append(b'RDY 1\n')
add_exp_rdy(c, b'RDY 1\n')

conn.expected_args.append(b'FIN 1234\n')

Expand All @@ -339,7 +340,7 @@ def test_backoff_many_conns(mock_ioloop_current):
last_timeout_time = timeout_args[0]

if total_fails > 0:
conn.expected_args.append(b'RDY 1\n')
add_exp_rdy(conn, b'RDY 1\n')

assert r.backoff_block is False
assert r.backoff_timer.get_interval() == 0
Expand All @@ -361,6 +362,7 @@ def test_backoff_conns_disconnect(mock_ioloop_current):
for i in range(num_conns):
conn = get_conn(r)
conn.expected_args = [b'SUB test test\n', b'RDY 1\n']
conn.last_exp_rdy = b'RDY 1\n'
conn.fails = 0
conns.append(conn)

Expand All @@ -379,34 +381,35 @@ def test_backoff_conns_disconnect(mock_ioloop_current):
if not conn:
conn = random.choice(conns)
else:
conn.expected_args.append(b'RDY 1\n')
add_exp_rdy(conn, b'RDY 1\n')
continue
else:
c = get_conn(r)
c.expected_args = [b'SUB test test\n']
c.last_exp_rdy = b'RDY 0\n'
c.fails = 0
conns.append(c)

msg = send_message(conn)

if r.backoff_timer.get_interval() == 0:
conn.expected_args.append(b'RDY 1\n')
add_exp_rdy(conn, b'RDY 1\n')

if fail or not conn.fails:
msg.trigger(event.REQUEUE, message=msg)
total_fails += 1
conn.fails += 1

for c in conns:
c.expected_args.append(b'RDY 0\n')
add_exp_rdy(c, b'RDY 0\n')
conn.expected_args.append(b'REQ 1234 0\n')
else:
msg.trigger(event.FINISH, message=msg)
total_fails -= 1
conn.fails -= 1

for c in conns:
c.expected_args.append(b'RDY 0\n')
add_exp_rdy(c, b'RDY 0\n')
conn.expected_args.append(b'FIN 1234\n')

assert r.backoff_block is True
Expand All @@ -418,7 +421,7 @@ def test_backoff_conns_disconnect(mock_ioloop_current):
conn = timeout_args[1]()
last_timeout_time = timeout_args[0]
assert r.backoff_block is False
conn.expected_args.append(b'RDY 1\n')
add_exp_rdy(conn, b'RDY 1\n')

fail = True
if random.random() < 0.3 and total_fails > 1:
Expand All @@ -435,10 +438,10 @@ def test_backoff_conns_disconnect(mock_ioloop_current):

if total_fails > 0:
for c in conns:
c.expected_args.append(b'RDY 0\n')
add_exp_rdy(c, b'RDY 0\n')
else:
for c in conns:
c.expected_args.append(b'RDY 1\n')
add_exp_rdy(c, b'RDY 1\n')

conn.expected_args.append(b'FIN 1234\n')

Expand All @@ -448,7 +451,7 @@ def test_backoff_conns_disconnect(mock_ioloop_current):
last_timeout_time = timeout_args[0]

if total_fails > 0:
conn.expected_args.append(b'RDY 1\n')
add_exp_rdy(conn, b'RDY 1\n')

assert r.backoff_block is False
assert r.backoff_timer.get_interval() == 0
Expand All @@ -457,3 +460,9 @@ def test_backoff_conns_disconnect(mock_ioloop_current):
for i, f in enumerate(c.stream.write.call_args_list):
print("%d: %s" % (i, f))
assert c.stream.write.call_args_list == [call(arg) for arg in c.expected_args]


def add_exp_rdy(conn, rdy):
if conn.last_exp_rdy != rdy:
conn.expected_args.append(rdy)
conn.last_exp_rdy = rdy

0 comments on commit 3f61477

Please sign in to comment.