Skip to content

Commit

Permalink
feat: Prevent retry buffer in AsyncProducer to go OOM
Browse files Browse the repository at this point in the history
This commit adds an optional configuration to Sarama's retry mechanism to limit the size of the retry buffer.
The change addresses issues IBM#1358 and IBM#1372 by preventing unbounded memory growth when retries are backlogged or brokers are unresponsive.

Key updates:
- Added `Producer.Retry.MaxBufferLength` configuration to control the maximum number of messages stored in the retry buffer.
- Implemented logic to handle overflow scenarios, ensuring non-flagged messages are either retried or sent to the errors channel, while flagged messages are re-queued.

This enhancement provides a safeguard against OOM errors in high-throughput or unstable environments while maintaining backward compatibility (unlimited buffer by default).

Signed-off-by: Wenli Wan <[email protected]>
  • Loading branch information
wanwenli committed Dec 6, 2024
1 parent 4178837 commit fc4fc68
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 0 deletions.
26 changes: 26 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ import (
"github.com/rcrowley/go-metrics"
)

// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
var ErrProducerRetryBufferOverflow = errors.New("retry buffer is full: message discarded to " +
"prevent buffer overflow. Check other errors and logs to determine the cause of excessive retries")

// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function.
// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit.
const minFunctionalRetryBufferLength = 5000

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
Expand Down Expand Up @@ -1207,6 +1215,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
maxBufferSize := p.conf.Producer.Retry.MaxBufferLength
if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength {
maxBufferSize = minFunctionalRetryBufferLength
}

var msg *ProducerMessage
buf := queue.New()

Expand All @@ -1227,6 +1240,19 @@ func (p *asyncProducer) retryHandler() {
}

buf.Add(msg)

if maxBufferSize > 0 && buf.Length() >= maxBufferSize {
msgToHandle := buf.Peek().(*ProducerMessage)
if msgToHandle.flags == 0 {
select {
case p.input <- msgToHandle:
buf.Remove()
default:
buf.Remove()
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
}
}
}
}
}

Expand Down
82 changes: 82 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"errors"
"fmt"
"log"
"math"
"os"
Expand Down Expand Up @@ -2244,6 +2245,87 @@ func TestTxnCanAbort(t *testing.T) {
require.NoError(t, err)
}

func TestPreventRetryBufferOverflow(t *testing.T) {
leader := NewMockBroker(t, 1)
defer leader.Close()
topic := "test-topic"

metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
r := new(MetadataResponse)
r.AddBroker(leader.Addr(), leader.BrokerID())
r.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return r
}

produceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
r := new(ProduceResponse)
r.AddTopicPartition(topic, 0, ErrNotLeaderForPartition)
return r
}

leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": produceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

config := NewTestConfig()
config.Producer.Flush.MaxMessages = 1
config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength
config.Producer.Return.Successes = true

producer, err := NewAsyncProducer([]string{leader.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var (
wg sync.WaitGroup
successes, producerErrors int
errorFound bool
)

wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for errMsg := range producer.Errors() {
if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) {
errorFound = true
}
producerErrors++
}
}()

numMessages := 100000
for i := 0; i < numMessages; i++ {
kv := StringEncoder(fmt.Sprintf("%d", i))
producer.Input() <- &ProducerMessage{
Topic: topic,
Key: kv,
Value: kv,
Metadata: i,
}
}

producer.AsyncClose()
wg.Wait()

if successes+producerErrors < numMessages {
t.Error("Expected all messages to be processed")
}

if !errorFound {
t.Error("Expected at least one error matching ErrProducerRetryBufferOverflow")
}
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ type Config struct {
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
// The maximum length of the bridging buffer between `input` and `retries` channels
// in AsyncProducer#retryHandler.
// The limit is to prevent this buffer from overflowing or causing OOM.
// Defaults to 0 for unlimited.
// Any value between 0 and 5000 is pushed to 5000.
// A zero or negative value indicates unlimited.
MaxBufferLength int
}

// Interceptors to be called when the producer dispatcher reads the
Expand Down

0 comments on commit fc4fc68

Please sign in to comment.