diff --git a/async_producer.go b/async_producer.go index a6fa3d4a2..e330a1073 100644 --- a/async_producer.go +++ b/async_producer.go @@ -13,6 +13,9 @@ import ( "github.com/rcrowley/go-metrics" ) +// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. +var ErrProducerRetryBufferOverflow = errors.New("discard a message and surface it as an error to prevent retry buffer to overflow") + // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages // to the correct broker for the provided topic-partition, refreshing metadata as appropriate, // and parses responses for errors. You must read from the Errors() channel or the @@ -1207,6 +1210,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { + maxBufferSize := p.conf.Producer.Retry.MaxBufferLength var msg *ProducerMessage buf := queue.New() @@ -1227,6 +1231,19 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) + + if maxBufferSize > 0 && buf.Length() >= maxBufferSize { + msgToHandle := buf.Peek().(*ProducerMessage) + if msgToHandle.flags == 0 { + select { + case p.input <- msgToHandle: + buf.Remove() + default: + buf.Remove() + p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) + } + } + } } } diff --git a/config.go b/config.go index f2f197887..91bd61857 100644 --- a/config.go +++ b/config.go @@ -269,6 +269,12 @@ type Config struct { // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration + // The maximum length of the bridging buffer between `input` and `retries` channels + // in AsyncProducer#retryHandler. + // Defaults to 0 for unlimited. + // The limit is to prevent this buffer from overflowing. + // A zero or negative number indicates unlimited. + MaxBufferLength int } // Interceptors to be called when the producer dispatcher reads the