diff --git a/async_producer.go b/async_producer.go index a6fa3d4a2..c4b29ad38 100644 --- a/async_producer.go +++ b/async_producer.go @@ -13,6 +13,14 @@ import ( "github.com/rcrowley/go-metrics" ) +// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. +var ErrProducerRetryBufferOverflow = errors.New("retry buffer is full: message discarded to " + + "prevent buffer overflow. Check other errors and logs to determine the cause of excessive retries") + +// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function. +// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit. +const minFunctionalRetryBufferLength = 5000 + // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages // to the correct broker for the provided topic-partition, refreshing metadata as appropriate, // and parses responses for errors. You must read from the Errors() channel or the @@ -1207,6 +1215,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { + maxBufferSize := p.conf.Producer.Retry.MaxBufferLength + if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength { + maxBufferSize = minFunctionalRetryBufferLength + } + var msg *ProducerMessage buf := queue.New() @@ -1227,6 +1240,19 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) + + if maxBufferSize > 0 && buf.Length() >= maxBufferSize { + msgToHandle := buf.Peek().(*ProducerMessage) + if msgToHandle.flags == 0 { + select { + case p.input <- msgToHandle: + buf.Remove() + default: + buf.Remove() + p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) + } + } + } } } diff --git a/config.go b/config.go index f2f197887..78d4758af 100644 --- a/config.go +++ b/config.go @@ -269,6 +269,13 @@ type Config struct { // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration + // The maximum length of the bridging buffer between `input` and `retries` channels + // in AsyncProducer#retryHandler. + // The limit is to prevent this buffer from overflowing or causing OOM. + // Defaults to 0 for unlimited. + // Any value between 0 and 5000 is pushed to 5000. + // A zero or negative value indicates unlimited. + MaxBufferLength int } // Interceptors to be called when the producer dispatcher reads the