diff --git a/async_producer.go b/async_producer.go index 5c57a81375..d0ce01b66e 100644 --- a/async_producer.go +++ b/async_producer.go @@ -60,13 +60,28 @@ const ( noProducerEpoch = -1 ) -func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 { +func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) { key := fmt.Sprintf("%s-%d", topic, partition) t.mutex.Lock() defer t.mutex.Unlock() sequence := t.sequenceNumbers[key] t.sequenceNumbers[key] = sequence + 1 - return sequence + return sequence, t.producerEpoch +} + +func (t *transactionManager) bumpEpoch() { + t.mutex.Lock() + defer t.mutex.Unlock() + t.producerEpoch++ + for k := range t.sequenceNumbers { + t.sequenceNumbers[k] = 0 + } +} + +func (t *transactionManager) getProducerID() (int64, int16) { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.producerID, t.producerEpoch } func newTransactionManager(conf *Config, client Client) (*transactionManager, error) { @@ -208,6 +223,8 @@ type ProducerMessage struct { flags flagSet expectation chan *ProducerError sequenceNumber int32 + producerEpoch int16 + hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -234,6 +251,9 @@ func (m *ProducerMessage) byteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.sequenceNumber = 0 + m.producerEpoch = 0 + m.hasSequence = false } // ProducerError is the type of error generated when the producer fails to deliver a message. @@ -388,10 +408,6 @@ func (tp *topicProducer) dispatch() { continue } } - // All messages being retried (sent or not) have already had their retry count updated - if tp.parent.conf.Producer.Idempotent && msg.retries == 0 { - msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) - } handler := tp.handlers[msg.Partition] if handler == nil { @@ -570,6 +586,15 @@ func (pp *partitionProducer) dispatch() { Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) } + // Now that we know we have a broker to actually try and send this message to, generate the sequence + // number for it. + // All messages being retried (sent or not) have already had their retry count updated + // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer. + if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 { + msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) + msg.hasSequence = true + } + pp.brokerProducer.input <- msg } } @@ -748,12 +773,21 @@ func (bp *brokerProducer) run() { } if bp.buffer.wouldOverflow(msg) { - if err := bp.waitForSpace(msg); err != nil { + Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) + if err := bp.waitForSpace(msg, false); err != nil { bp.parent.retryMessage(msg, err) continue } } + if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch { + // The epoch was reset, need to roll the buffer over + Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID()) + if err := bp.waitForSpace(msg, true); err != nil { + bp.parent.retryMessage(msg, err) + continue + } + } if err := bp.buffer.add(msg); err != nil { bp.parent.returnError(msg, err) continue @@ -809,9 +843,7 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error { return bp.currentRetries[msg.Topic][msg.Partition] } -func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { - Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) - +func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error { for { select { case response := <-bp.responses: @@ -819,7 +851,7 @@ func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { // handling a response can change our state, so re-check some things if reason := bp.needsRetry(msg); reason != nil { return reason - } else if !bp.buffer.wouldOverflow(msg) { + } else if !bp.buffer.wouldOverflow(msg) && !forceRollover { return nil } case bp.output <- bp.buffer: @@ -1030,6 +1062,12 @@ func (p *asyncProducer) shutdown() { } func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { + // We need to reset the producer ID epoch if we set a sequence number on it, because the broker + // will never see a message with this number, so we can never continue the sequence. + if msg.hasSequence { + Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition) + p.txnmgr.bumpEpoch() + } msg.clear() pErr := &ProducerError{Msg: msg, Err: err} if p.conf.Producer.Return.Errors { diff --git a/async_producer_test.go b/async_producer_test.go index d0f012d249..7bc0794dd2 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1130,6 +1130,75 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { closeProducer(t, producer) } +func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { + broker := NewMockBroker(t, 1) + defer broker.Close() + + metadataResponse := &MetadataResponse{ + Version: 1, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) + broker.Returns(metadataResponse) + + initProducerID := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: 1000, + ProducerEpoch: 1, + } + broker.Returns(initProducerID) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Flush.Frequency = 10 * time.Millisecond + config.Producer.Return.Successes = true + config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust + config.Producer.RequiredAcks = WaitForAll + config.Producer.Retry.Backoff = 0 + config.Producer.Idempotent = true + config.Net.MaxOpenRequests = 1 + config.Version = V0_11_0_0 + + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + defer closeProducer(t, producer) + + producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} + prodError := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable) + broker.Returns(prodError) + <- producer.Errors() + + lastReqRes := broker.history[len(broker.history)-1] + lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch + if lastProduceBatch.FirstSequence != 0 { + t.Error("first sequence not zero") + } + if lastProduceBatch.ProducerEpoch != 1 { + t.Error("first epoch was not one") + } + + // Now if we produce again, the epoch should have rolled over. + producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} + broker.Returns(prodError) + <- producer.Errors() + + lastReqRes = broker.history[len(broker.history)-1] + lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch + if lastProduceBatch.FirstSequence != 0 { + t.Error("second sequence not zero") + } + if lastProduceBatch.ProducerEpoch <= 1 { + t.Error("second epoch was not > 1") + } +} + // TestBrokerProducerShutdown ensures that a call to shutdown stops the // brokerProducer run() loop and doesn't leak any goroutines func TestBrokerProducerShutdown(t *testing.T) { diff --git a/produce_set.go b/produce_set.go index 36c43c6a61..9c70f81800 100644 --- a/produce_set.go +++ b/produce_set.go @@ -13,17 +13,22 @@ type partitionSet struct { } type produceSet struct { - parent *asyncProducer - msgs map[string]map[int32]*partitionSet + parent *asyncProducer + msgs map[string]map[int32]*partitionSet + producerID int64 + producerEpoch int16 bufferBytes int bufferCount int } func newProduceSet(parent *asyncProducer) *produceSet { + pid, epoch := parent.txnmgr.getProducerID() return &produceSet{ - msgs: make(map[string]map[int32]*partitionSet), - parent: parent, + msgs: make(map[string]map[int32]*partitionSet), + parent: parent, + producerID: pid, + producerEpoch: epoch, } } @@ -65,8 +70,8 @@ func (ps *produceSet) add(msg *ProducerMessage) error { Version: 2, Codec: ps.parent.conf.Producer.Compression, CompressionLevel: ps.parent.conf.Producer.CompressionLevel, - ProducerID: ps.parent.txnmgr.producerID, - ProducerEpoch: ps.parent.txnmgr.producerEpoch, + ProducerID: ps.producerID, + ProducerEpoch: ps.producerEpoch, } if ps.parent.conf.Producer.Idempotent { batch.FirstSequence = msg.sequenceNumber @@ -78,12 +83,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } partitions[msg.Partition] = set } - set.msgs = append(set.msgs, msg) if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence { return errors.New("assertion failed: message out of sequence added to a batch") } + } + + // Past this point we can't return an error, because we've already added the message to the set. + set.msgs = append(set.msgs, msg) + + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { // We are being conservative here to avoid having to prep encode the record size += maximumRecordOverhead rec := &Record{