From 3aea989f608e0e59178376e874c27ba448cdd9ab Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 7 Aug 2024 13:26:41 -0500 Subject: [PATCH] fix(producer): treat ErrKafkaStorageError as retriable (#2939) This is retriable according to the spec: https://kafka.apache.org/protocol.html Signed-off-by: Richard Artoul --- async_producer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async_producer.go b/async_producer.go index f629a6a2e..a6fa3d4a2 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1101,7 +1101,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnSuccesses(pSet.msgs) // Retriable errors case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: if bp.parent.conf.Producer.Retry.Max <= 0 { bp.parent.abandonBrokerConnection(bp.broker) bp.parent.returnErrors(pSet.msgs, block.Err) @@ -1134,7 +1134,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo switch block.Err { case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", bp.broker.ID(), topic, partition, block.Err) if bp.currentRetries[topic] == nil {