Skip to content

Commit

Permalink
Fix "broker received out of order sequence" when brokers die
Browse files Browse the repository at this point in the history
When the following three conditions are satisfied, the producer code can
skip message sequence numbers and cause the broker to complain that the
sequences are out of order:

* config.Producer.Idempotent is set
* The producer loses, and then regains, its connection to a broker
* The client code continues to attempt to produce messages whilst the
broker is unavailable.

For every message the client attempted to send while the broker is
unavailable, the transaction manager sequence number will be
incremented, however these messages will eventually fail and return an
error to the caller. When the broker re-appears, and another message is
published, it's sequence number is higher than the last one the broker
remembered - the values that were attempted while it was down were never
seen. Thus, from the broker's perspective, it's seeing out-of-order
sequence numbers.

The fix to this has a few parts:

* Don't obtain a sequence number from the transaction manager until
we're sure we want to try publishing the message
* Affix the producer ID and epoch to the message once the sequence is
generated
* Increment the transaction manager epoch (and reset all sequence
numbers to zero) when we permenantly fail to publish a message. That
represents a sequence that the broker will never see, so the only safe
thing to do is to roll over the epoch number.
* Ensure we don't publish message sets that contain messages from
multiple transaction manager epochs.
  • Loading branch information
KJ Tsanaktsidis committed Apr 14, 2020
1 parent 6159078 commit 9c6582c
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 18 deletions.
60 changes: 49 additions & 11 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,28 @@ const (
noProducerEpoch = -1
)

func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence
return sequence, t.producerEpoch
}

func (t *transactionManager) bumpEpoch() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.producerEpoch++
for k := range t.sequenceNumbers {
t.sequenceNumbers[k] = 0
}
}

func (t *transactionManager) getProducerID() (int64, int16) {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.producerID, t.producerEpoch
}

func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
Expand Down Expand Up @@ -208,6 +223,8 @@ type ProducerMessage struct {
flags flagSet
expectation chan *ProducerError
sequenceNumber int32
producerEpoch int16
hasSequence bool
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand All @@ -234,6 +251,9 @@ func (m *ProducerMessage) byteSize(version int) int {
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
m.sequenceNumber = 0
m.producerEpoch = 0
m.hasSequence = false
}

// ProducerError is the type of error generated when the producer fails to deliver a message.
Expand Down Expand Up @@ -388,10 +408,6 @@ func (tp *topicProducer) dispatch() {
continue
}
}
// All messages being retried (sent or not) have already had their retry count updated
if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
}

handler := tp.handlers[msg.Partition]
if handler == nil {
Expand Down Expand Up @@ -570,6 +586,15 @@ func (pp *partitionProducer) dispatch() {
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}

// Now that we know we have a broker to actually try and send this message to, generate the sequence
// number for it.
// All messages being retried (sent or not) have already had their retry count updated
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}

pp.brokerProducer.input <- msg
}
}
Expand Down Expand Up @@ -748,12 +773,21 @@ func (bp *brokerProducer) run() {
}

if bp.buffer.wouldOverflow(msg) {
if err := bp.waitForSpace(msg); err != nil {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}

if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
// The epoch was reset, need to roll the buffer over
Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
if err := bp.waitForSpace(msg, true); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
Expand Down Expand Up @@ -809,17 +843,15 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
return bp.currentRetries[msg.Topic][msg.Partition]
}

func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())

func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
for {
select {
case response := <-bp.responses:
bp.handleResponse(response)
// handling a response can change our state, so re-check some things
if reason := bp.needsRetry(msg); reason != nil {
return reason
} else if !bp.buffer.wouldOverflow(msg) {
} else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
return nil
}
case bp.output <- bp.buffer:
Expand Down Expand Up @@ -1030,6 +1062,12 @@ func (p *asyncProducer) shutdown() {
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
// will never see a message with this number, so we can never continue the sequence.
if msg.hasSequence {
Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
p.txnmgr.bumpEpoch()
}
msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {
Expand Down
69 changes: 69 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,75 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
closeProducer(t, producer)
}

func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

metadataResponse := &MetadataResponse{
Version: 1,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataResponse)

initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: 1000,
ProducerEpoch: 1,
}
broker.Returns(initProducerID)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Flush.Frequency = 10 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Backoff = 0
config.Producer.Idempotent = true
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer closeProducer(t, producer)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
prodError := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
broker.Returns(prodError)
<- producer.Errors()

lastReqRes := broker.history[len(broker.history)-1]
lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.FirstSequence != 0 {
t.Error("first sequence not zero")
}
if lastProduceBatch.ProducerEpoch != 1 {
t.Error("first epoch was not one")
}

// Now if we produce again, the epoch should have rolled over.
producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
broker.Returns(prodError)
<- producer.Errors()

lastReqRes = broker.history[len(broker.history)-1]
lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
if lastProduceBatch.FirstSequence != 0 {
t.Error("second sequence not zero")
}
if lastProduceBatch.ProducerEpoch <= 1 {
t.Error("second epoch was not > 1")
}
}

// TestBrokerProducerShutdown ensures that a call to shutdown stops the
// brokerProducer run() loop and doesn't leak any goroutines
func TestBrokerProducerShutdown(t *testing.T) {
Expand Down
24 changes: 17 additions & 7 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ type partitionSet struct {
}

type produceSet struct {
parent *asyncProducer
msgs map[string]map[int32]*partitionSet
parent *asyncProducer
msgs map[string]map[int32]*partitionSet
producerID int64
producerEpoch int16

bufferBytes int
bufferCount int
}

func newProduceSet(parent *asyncProducer) *produceSet {
pid, epoch := parent.txnmgr.getProducerID()
return &produceSet{
msgs: make(map[string]map[int32]*partitionSet),
parent: parent,
msgs: make(map[string]map[int32]*partitionSet),
parent: parent,
producerID: pid,
producerEpoch: epoch,
}
}

Expand Down Expand Up @@ -65,8 +70,8 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
ProducerID: ps.parent.txnmgr.producerID,
ProducerEpoch: ps.parent.txnmgr.producerEpoch,
ProducerID: ps.producerID,
ProducerEpoch: ps.producerEpoch,
}
if ps.parent.conf.Producer.Idempotent {
batch.FirstSequence = msg.sequenceNumber
Expand All @@ -78,12 +83,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
}
partitions[msg.Partition] = set
}
set.msgs = append(set.msgs, msg)

if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
return errors.New("assertion failed: message out of sequence added to a batch")
}
}

// Past this point we can't return an error, because we've already added the message to the set.
set.msgs = append(set.msgs, msg)

if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
// We are being conservative here to avoid having to prep encode the record
size += maximumRecordOverhead
rec := &Record{
Expand Down

0 comments on commit 9c6582c

Please sign in to comment.