Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(producer): add retry buffer tuning option to prevent OOM #3026

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
"github.com/rcrowley/go-metrics"
)

// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow")

// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function.
// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit.
const minFunctionalRetryBufferLength = 4 * 1024

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
Expand Down Expand Up @@ -1207,6 +1214,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
maxBufferSize := p.conf.Producer.Retry.MaxBufferLength
if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength {
maxBufferSize = minFunctionalRetryBufferLength
}

var msg *ProducerMessage
buf := queue.New()

Expand All @@ -1227,6 +1239,19 @@ func (p *asyncProducer) retryHandler() {
}

buf.Add(msg)

if maxBufferSize > 0 && buf.Length() >= maxBufferSize {
dnwe marked this conversation as resolved.
Show resolved Hide resolved
msgToHandle := buf.Peek().(*ProducerMessage)
dnwe marked this conversation as resolved.
Show resolved Hide resolved
if msgToHandle.flags == 0 {
select {
case p.input <- msgToHandle:
buf.Remove()
default:
buf.Remove()
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
}
}
}
}
}

Expand Down
76 changes: 76 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2244,6 +2244,82 @@ func TestTxnCanAbort(t *testing.T) {
require.NoError(t, err)
}

func TestPreventRetryBufferOverflow(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()
topic := "test-topic"

metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
r := new(MetadataResponse)
r.AddBroker(broker.Addr(), broker.BrokerID())
r.AddTopicPartition(topic, 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
return r
}

produceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
r := new(ProduceResponse)
r.AddTopicPartition(topic, 0, ErrNotLeaderForPartition)
return r
}

broker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": produceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

config := NewTestConfig()
config.Producer.Flush.MaxMessages = 1
config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength
config.Producer.Return.Successes = true

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var (
wg sync.WaitGroup
successes, producerErrors int
errorFound bool
)

wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for errMsg := range producer.Errors() {
if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) {
errorFound = true
}
producerErrors++
}
}()

numMessages := 100000
for i := 0; i < numMessages; i++ {
kv := StringEncoder(strconv.Itoa(i))
producer.Input() <- &ProducerMessage{
Topic: topic,
Key: kv,
Value: kv,
Metadata: i,
}
}

producer.AsyncClose()
wg.Wait()

require.Equal(t, successes+producerErrors, numMessages, "Expected all messages to be processed")
require.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow")
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ type Config struct {
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
// The maximum length of the bridging buffer between `input` and `retries` channels
// in AsyncProducer#retryHandler.
// The limit is to prevent this buffer from overflowing or causing OOM.
// Defaults to 0 for unlimited.
// Any value between 0 and 4096 is pushed to 4096.
// A zero or negative value indicates unlimited.
MaxBufferLength int
}

// Interceptors to be called when the producer dispatcher reads the
Expand Down
Loading