diff --git a/consumer.go b/consumer.go index 0d0410d8..1db43565 100644 --- a/consumer.go +++ b/consumer.go @@ -981,7 +981,13 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error { } atomic.AddInt64(&r.totalRdyCount, count-c.RDY()) + + lastRDY := c.LastRDY() c.SetRDY(count) + if count == lastRDY { + return nil + } + err := c.WriteCommand(Ready(int(count))) if err != nil { r.log(LogLevelError, "(%s) error sending RDY %d - %s", c.String(), count, err) diff --git a/mock_test.go b/mock_test.go index 8d9dbf91..af328845 100644 --- a/mock_test.go +++ b/mock_test.go @@ -343,10 +343,10 @@ func TestConsumerRequeueNoBackoff(t *testing.T) { "RDY 0", fmt.Sprintf("REQ %s 0", msgIDRequeueNoBackoff), "RDY 1", - "RDY 1", + "RDY 1", // Duplicate rdy update will be ignored. fmt.Sprintf("FIN %s", msgIDGood), } - if len(n.got) != len(expected) { + if len(n.got) != len(expected) - 1 { t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected)) } for i, r := range n.got {