diff --git a/consumer.go b/consumer.go index 0d0410d8..1db43565 100644 --- a/consumer.go +++ b/consumer.go @@ -981,7 +981,13 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error { } atomic.AddInt64(&r.totalRdyCount, count-c.RDY()) + + lastRDY := c.LastRDY() c.SetRDY(count) + if count == lastRDY { + return nil + } + err := c.WriteCommand(Ready(int(count))) if err != nil { r.log(LogLevelError, "(%s) error sending RDY %d - %s", c.String(), count, err) diff --git a/mock_test.go b/mock_test.go index 8d9dbf91..9c7f2879 100644 --- a/mock_test.go +++ b/mock_test.go @@ -343,15 +343,19 @@ func TestConsumerRequeueNoBackoff(t *testing.T) { "RDY 0", fmt.Sprintf("REQ %s 0", msgIDRequeueNoBackoff), "RDY 1", - "RDY 1", + "RDY 1", // Duplicate rdy update will be ignored. fmt.Sprintf("FIN %s", msgIDGood), } - if len(n.got) != len(expected) { + if len(n.got) != len(expected) - 1 { t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected)) } for i, r := range n.got { - if string(r) != expected[i] { - t.Fatalf("cmd %d bad %s != %s", i, r, expected[i]) + j := i + if i == len(n.got) - 1 { + j = i + 1 + } + if string(r) != expected[j] { + t.Fatalf("cmd %d bad %s != %s", i, r, expected[j]) } } }