diff --git a/async_producer.go b/async_producer.go index 3fbe4265b8..93ff0f5e8e 100644 --- a/async_producer.go +++ b/async_producer.go @@ -505,246 +505,251 @@ func (pp *partitionProducer) updateLeader() error { }) } -// one per broker, constructs both an aggregator and a flusher +// one per broker; also constructs an associated flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { - input := make(chan *ProducerMessage) - bridge := make(chan []*ProducerMessage) + var ( + input = make(chan *ProducerMessage) + bridge = make(chan *produceSet) + responses = make(chan *brokerProducerResponse) + ) - a := &aggregator{ - parent: p, - broker: broker, - input: input, - output: bridge, - } - go withRecover(a.run) - - f := &flusher{ + a := &brokerProducer{ parent: p, broker: broker, - input: bridge, + input: input, + output: bridge, + responses: responses, + buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } - go withRecover(f.run) + go withRecover(a.run) + + // minimal bridge to make the network response `select`able + go withRecover(func() { + for set := range bridge { + request := set.buildRequest() + + response, err := broker.Produce(request) + + responses <- &brokerProducerResponse{ + set: set, + err: err, + res: response, + } + } + close(responses) + }) return input } +type brokerProducerResponse struct { + set *produceSet + err error + res *ProduceResponse +} + // groups messages together into appropriately-sized batches for sending to the broker -// based on https://godoc.org/github.com/eapache/channels#BatchingChannel -type aggregator struct { +// handles state related to retries etc +type brokerProducer struct { parent *asyncProducer broker *Broker - input <-chan *ProducerMessage - output chan<- []*ProducerMessage - buffer []*ProducerMessage - bufferBytes int - timer <-chan time.Time + input <-chan *ProducerMessage + output chan<- *produceSet + responses <-chan *brokerProducerResponse + + buffer *produceSet + timer <-chan time.Time + timerFired bool + + closing error + currentRetries map[string]map[int32]error } -func (a *aggregator) run() { - var output chan<- []*ProducerMessage +func (bp *brokerProducer) run() { + var output chan<- *produceSet + Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID()) for { select { - case msg := <-a.input: + case msg := <-bp.input: if msg == nil { goto shutdown } - if a.wouldOverflow(msg) { - Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) - a.output <- a.buffer - a.reset() - output = nil + if reason := bp.needsRetry(msg); reason != nil { + bp.parent.retryMessage(msg, reason) + + if bp.closing == nil && msg.flags&chaser == chaser { + // we were retrying this partition but we can start processing again + delete(bp.currentRetries[msg.Topic], msg.Partition) + Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n", + bp.broker.ID(), msg.Topic, msg.Partition) + } + + continue } - a.buffer = append(a.buffer, msg) - a.bufferBytes += msg.byteSize() + if bp.buffer.wouldOverflow(msg) { + if err := bp.waitForSpace(msg); err != nil { + bp.parent.retryMessage(msg, err) + continue + } + } - if a.readyToFlush(msg) { - output = a.output - } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil { - a.timer = time.After(a.parent.conf.Producer.Flush.Frequency) + if err := bp.buffer.add(msg); err != nil { + bp.parent.returnError(msg, err) + continue } - case <-a.timer: - output = a.output - case output <- a.buffer: - a.reset() + + if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { + bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) + } + case <-bp.timer: + bp.timerFired = true + case output <- bp.buffer: + bp.rollOver() + case response := <-bp.responses: + bp.handleResponse(response) + } + + if bp.timerFired || bp.buffer.readyToFlush() { + output = bp.output + } else { output = nil } } shutdown: - if len(a.buffer) > 0 { - a.output <- a.buffer + for !bp.buffer.empty() { + select { + case response := <-bp.responses: + bp.handleResponse(response) + case bp.output <- bp.buffer: + bp.rollOver() + } } - close(a.output) -} - -func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool { - switch { - // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. - case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): - return true - // Would we overflow the size-limit of a compressed message-batch? - case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes: - return true - // Would we overflow simply in number of messages? - case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages: - return true - default: - return false + close(bp.output) + for response := range bp.responses { + bp.handleResponse(response) } -} -func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { - switch { - // If all three config values are 0, we always flush as-fast-as-possible - case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0: - return true - // If the messages is a chaser we must flush to maintain the state-machine - case msg.flags&chaser == chaser: - return true - // If we've passed the message trigger-point - case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages: - return true - // If we've passed the byte trigger-point - case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes: - return true - default: - return false - } + Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } -func (a *aggregator) reset() { - a.timer = nil - a.buffer = nil - a.bufferBytes = 0 -} +func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error { + if bp.closing != nil { + return bp.closing + } -// takes a batch at a time from the aggregator and sends to the broker -type flusher struct { - parent *asyncProducer - broker *Broker - input <-chan []*ProducerMessage + if bp.currentRetries[msg.Topic] == nil { + return nil + } - currentRetries map[string]map[int32]error + return bp.currentRetries[msg.Topic][msg.Partition] } -func (f *flusher) run() { - var closing error - - Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID()) - - for batch := range f.input { - if closing != nil { - f.parent.retryMessages(batch, closing) - continue - } - - set := f.groupAndFilter(batch) - if set.empty() { - continue - } - - request := set.buildRequest() - response, err := f.broker.Produce(request) +func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { + Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) - switch err.(type) { - case nil: - break - case PacketEncodingError: - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - f.parent.returnErrors(msgs, err) - }) - continue - default: - Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) - f.parent.abandonBrokerConnection(f.broker) - _ = f.broker.Close() - closing = err - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - f.parent.retryMessages(msgs, err) - }) - continue - } - - if response == nil { - // this only happens when RequiredAcks is NoResponse, so we have to assume success - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - f.parent.returnSuccesses(msgs) - }) - continue + for { + select { + case response := <-bp.responses: + bp.handleResponse(response) + // handling a response can change our state, so re-check some things + if reason := bp.needsRetry(msg); reason != nil { + return reason + } else if !bp.buffer.wouldOverflow(msg) { + return nil + } + case bp.output <- bp.buffer: + bp.rollOver() + return nil } - - f.parseResponse(set, response) } - Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } -func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { - set := newProduceSet(f.parent) - - for _, msg := range batch { - - if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { - // we're currently retrying this partition so we need to filter out this message - f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - - if msg.flags&chaser == chaser { - // ...but now we can start processing future messages again - Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - f.broker.ID(), msg.Topic, msg.Partition) - delete(f.currentRetries[msg.Topic], msg.Partition) - } - - continue - } +func (bp *brokerProducer) rollOver() { + bp.timer = nil + bp.timerFired = false + bp.buffer = newProduceSet(bp.parent) +} - if err := set.add(msg); err != nil { - f.parent.returnError(msg, err) - continue - } +func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) { + if response.err != nil { + bp.handleError(response.set, response.err) + } else { + bp.handleSuccess(response.set, response.res) } - return set + if bp.buffer.empty() { + bp.rollOver() // this can happen if the response invalidated our buffer + } } -func (f *flusher) parseResponse(set *produceSet, response *ProduceResponse) { +func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) { // we iterate through the blocks in the request set, not the response, so that we notice // if the response is missing a block completely - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + if response == nil { + // this only happens when RequiredAcks is NoResponse, so we have to assume success + bp.parent.returnSuccesses(msgs) + return + } + block := response.GetBlock(topic, partition) if block == nil { - f.parent.returnErrors(msgs, ErrIncompleteResponse) + bp.parent.returnErrors(msgs, ErrIncompleteResponse) return } switch block.Err { // Success case ErrNoError: - for i := range msgs { - msgs[i].Offset = block.Offset + int64(i) + for i, msg := range msgs { + msg.Offset = block.Offset + int64(i) } - f.parent.returnSuccesses(msgs) + bp.parent.returnSuccesses(msgs) // Retriable errors case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: - Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", - f.broker.ID(), topic, partition, block.Err) - if f.currentRetries[topic] == nil { - f.currentRetries[topic] = make(map[int32]error) + Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", + bp.broker.ID(), topic, partition, block.Err) + if bp.currentRetries[topic] == nil { + bp.currentRetries[topic] = make(map[int32]error) } - f.currentRetries[topic][partition] = block.Err - f.parent.retryMessages(msgs, block.Err) + bp.currentRetries[topic][partition] = block.Err + bp.parent.retryMessages(msgs, block.Err) + bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) // Other non-retriable errors default: - f.parent.returnErrors(msgs, block.Err) + bp.parent.returnErrors(msgs, block.Err) } }) } +func (bp *brokerProducer) handleError(sent *produceSet, err error) { + switch err.(type) { + case PacketEncodingError: + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.returnErrors(msgs, err) + }) + default: + Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) + bp.parent.abandonBrokerConnection(bp.broker) + _ = bp.broker.Close() + bp.closing = err + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.retryMessages(msgs, err) + }) + bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.retryMessages(msgs, err) + }) + bp.rollOver() + } +} + // singleton // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel @@ -874,6 +879,57 @@ func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs } } +func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage { + if ps.msgs[topic] == nil { + return nil + } + set := ps.msgs[topic][partition] + if set == nil { + return nil + } + ps.bufferBytes -= set.bufferBytes + ps.bufferCount -= len(set.msgs) + delete(ps.msgs[topic], partition) + return set.msgs +} + +func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { + switch { + // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. + case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): + return true + // Would we overflow the size-limit of a compressed message-batch for this partition? + case ps.parent.conf.Producer.Compression != CompressionNone && + ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && + ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: + return true + // Would we overflow simply in number of messages? + case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: + return true + default: + return false + } +} + +func (ps *produceSet) readyToFlush() bool { + switch { + // If we don't have any messages, nothing else matters + case ps.empty(): + return false + // If all three config values are 0, we always flush as-fast-as-possible + case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: + return true + // If we've passed the message trigger-point + case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages: + return true + // If we've passed the byte trigger-point + case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes: + return true + default: + return false + } +} + func (ps *produceSet) empty() bool { return ps.bufferCount == 0 }