Skip to content

Commit

Permalink
Refactor brokerProducer again
Browse files Browse the repository at this point in the history
Turn the "if wouldOverflow then flush" logic into a more complete `waitForSpace`
method which handles all of that logic including re-checking error states and
buffer sizes as necessary.

Extract the "when should we be setting `output`" logic into the end of every
single iteration. This requires tracking an additional `timerFired` instance
variable on the `brokerProducer`, but is much easier to follow and allows us
to be more precise when e.g. a response flushes part but not all of our buffer.

Extract the "if this is a chaser, change state" logic out of `retryReason` and
rename it to `needsRetry`. This makes it a pure function and avoids unnecessary
work in `waitForSpace`.
  • Loading branch information
eapache committed Sep 27, 2015
1 parent 6516b87 commit 5fc13db
Showing 1 changed file with 55 additions and 36 deletions.
91 changes: 55 additions & 36 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,9 @@ type brokerProducer struct {
output chan<- *produceSet
responses <-chan *brokerProducerResponse

buffer *produceSet
timer <-chan time.Time
buffer *produceSet
timer <-chan time.Time
timerFired bool

closing error
currentRetries map[string]map[int32]error
Expand All @@ -571,45 +572,57 @@ func (bp *brokerProducer) run() {
goto shutdown
}

if bp.buffer.wouldOverflow(msg) {
Logger.Printf("producer/broker/%d maximum request accumulated, forcing blocking flush\n", bp.broker.ID())
bp.flush()
output = nil
}

if reason := bp.retryReason(msg); reason != nil {
if reason := bp.needsRetry(msg); reason != nil {
bp.parent.retryMessage(msg, reason)

if bp.closing == nil && msg.flags&chaser == chaser {
// we were retrying this partition but we can start processing again
delete(bp.currentRetries[msg.Topic], msg.Partition)
Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
}

continue
}

if bp.buffer.wouldOverflow(msg) {
if err := bp.waitForSpace(msg); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}

if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
}

if bp.buffer.readyToFlush() {
output = bp.output
} else if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
}
case <-bp.timer:
output = bp.output
bp.timerFired = true
case output <- bp.buffer:
bp.rollOver()
output = nil
case response := <-bp.responses:
bp.handleResponse(response)
if bp.buffer.empty() {
// this can happen if the response was an error
output = nil
bp.timer = nil
}
}

if bp.timerFired || bp.buffer.readyToFlush() {
output = bp.output
} else {
output = nil
}
}

shutdown:
if !bp.buffer.empty() {
bp.flush()
for !bp.buffer.empty() {
select {
case response := <-bp.responses:
bp.handleResponse(response)
case bp.output <- bp.buffer:
bp.rollOver()
}
}
close(bp.output)
for response := range bp.responses {
Expand All @@ -619,42 +632,41 @@ shutdown:
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
}

func (bp *brokerProducer) retryReason(msg *ProducerMessage) error {
func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
if bp.closing != nil {
return bp.closing
}

if bp.currentRetries[msg.Topic] != nil {
err := bp.currentRetries[msg.Topic][msg.Partition]
if err != nil && msg.flags&chaser == chaser {
// we were retrying this partition but we can start processing again
Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
delete(bp.currentRetries[msg.Topic], msg.Partition)
}
return err
if bp.currentRetries[msg.Topic] == nil {
return nil
}

return nil
return bp.currentRetries[msg.Topic][msg.Partition]
}

func (bp *brokerProducer) flush() {
func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())

for {
select {
case response := <-bp.responses:
bp.handleResponse(response)
if bp.buffer.empty() {
return // this can happen if the response was an error
// handling a response can change our state, so re-check some things
if reason := bp.needsRetry(msg); reason != nil {
return reason
} else if !bp.buffer.wouldOverflow(msg) {
return nil
}
case bp.output <- bp.buffer:
bp.rollOver()
return
return nil
}
}
}

func (bp *brokerProducer) rollOver() {
bp.timer = nil
bp.timerFired = false
bp.buffer = newProduceSet(bp.parent)
}

Expand All @@ -664,6 +676,10 @@ func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
} else {
bp.handleSuccess(response.set, response.res)
}

if bp.buffer.empty() {
bp.rollOver() // this can happen if the response invalidated our buffer
}
}

func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
Expand Down Expand Up @@ -914,6 +930,9 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {

func (ps *produceSet) readyToFlush() bool {
switch {
// If we don't have any messages, nothing else matters
case ps.empty():
return false
// If all three config values are 0, we always flush as-fast-as-possible
case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
return true
Expand Down

0 comments on commit 5fc13db

Please sign in to comment.