diff --git a/async_producer.go b/async_producer.go index a6fa3d4a2..5f257524b 100644 --- a/async_producer.go +++ b/async_producer.go @@ -13,6 +13,13 @@ import ( "github.com/rcrowley/go-metrics" ) +// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. +var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow") + +// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function. +// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit. +const minFunctionalRetryBufferLength = 4 * 1024 + // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages // to the correct broker for the provided topic-partition, refreshing metadata as appropriate, // and parses responses for errors. You must read from the Errors() channel or the @@ -1207,6 +1214,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { + maxBufferSize := p.conf.Producer.Retry.MaxBufferLength + if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength { + maxBufferSize = minFunctionalRetryBufferLength + } + var msg *ProducerMessage buf := queue.New() @@ -1227,6 +1239,19 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) + + if maxBufferSize > 0 && buf.Length() >= maxBufferSize { + msgToHandle := buf.Peek().(*ProducerMessage) + if msgToHandle.flags == 0 { + select { + case p.input <- msgToHandle: + buf.Remove() + default: + buf.Remove() + p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) + } + } + } } } diff --git a/async_producer_test.go b/async_producer_test.go index c192235cc..099441d70 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2244,6 +2244,82 @@ func TestTxnCanAbort(t *testing.T) { require.NoError(t, err) } +func TestPreventRetryBufferOverflow(t *testing.T) { + broker := NewMockBroker(t, 1) + defer broker.Close() + topic := "test-topic" + + metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) { + r := new(MetadataResponse) + r.AddBroker(broker.Addr(), broker.BrokerID()) + r.AddTopicPartition(topic, 0, broker.BrokerID(), nil, nil, nil, ErrNoError) + return r + } + + produceRequestHandlerFunc := func(req *request) (res encoderWithHeader) { + r := new(ProduceResponse) + r.AddTopicPartition(topic, 0, ErrNotLeaderForPartition) + return r + } + + broker.SetHandlerFuncByMap(map[string]requestHandlerFunc{ + "ProduceRequest": produceRequestHandlerFunc, + "MetadataRequest": metadataRequestHandlerFunc, + }) + + config := NewTestConfig() + config.Producer.Flush.MaxMessages = 1 + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + config.Producer.Return.Successes = true + + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + var ( + wg sync.WaitGroup + successes, producerErrors int + errorFound bool + ) + + wg.Add(1) + go func() { + defer wg.Done() + for range producer.Successes() { + successes++ + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for errMsg := range producer.Errors() { + if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) { + errorFound = true + } + producerErrors++ + } + }() + + numMessages := 100000 + for i := 0; i < numMessages; i++ { + kv := StringEncoder(strconv.Itoa(i)) + producer.Input() <- &ProducerMessage{ + Topic: topic, + Key: kv, + Value: kv, + Metadata: i, + } + } + + producer.AsyncClose() + wg.Wait() + + require.Equal(t, successes+producerErrors, numMessages, "Expected all messages to be processed") + require.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow") +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() { diff --git a/config.go b/config.go index f2f197887..78d4758af 100644 --- a/config.go +++ b/config.go @@ -269,6 +269,13 @@ type Config struct { // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration + // The maximum length of the bridging buffer between `input` and `retries` channels + // in AsyncProducer#retryHandler. + // The limit is to prevent this buffer from overflowing or causing OOM. + // Defaults to 0 for unlimited. + // Any value between 0 and 5000 is pushed to 5000. + // A zero or negative value indicates unlimited. + MaxBufferLength int } // Interceptors to be called when the producer dispatcher reads the