Skip to content

Commit

Permalink
Merge pull request #420 from Shopify/retry-on-shutdown
Browse files Browse the repository at this point in the history
Retry messages on shutdown
  • Loading branch information
eapache committed Apr 27, 2015
2 parents 02c41d7 + 08ccf5e commit f948bc2
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 63 deletions.
111 changes: 48 additions & 63 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type asyncProducer struct {

errors chan *ProducerError
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup

brokers map[*Broker]chan *ProducerMessage
brokerRefs map[chan *ProducerMessage]int
Expand Down Expand Up @@ -105,8 +106,6 @@ type flagSet int8

const (
chaser flagSet = 1 << iota // message is last in a group that failed
ref // add a reference to a singleton channel
unref // remove a reference from a singleton channel
shutdown // start the shutdown process
)

Expand Down Expand Up @@ -193,9 +192,7 @@ func (p *asyncProducer) Close() error {
}

func (p *asyncProducer) AsyncClose() {
go withRecover(func() {
p.input <- &ProducerMessage{flags: shutdown}
})
go withRecover(p.shutdown)
}

///////////////////////////////////////////
Expand All @@ -209,6 +206,7 @@ func (p *asyncProducer) AsyncClose() {
// dispatches messages by topic
func (p *asyncProducer) topicDispatcher() {
handlers := make(map[string]chan *ProducerMessage)
shuttingDown := false

for msg := range p.input {
if msg == nil {
Expand All @@ -217,8 +215,14 @@ func (p *asyncProducer) topicDispatcher() {
}

if msg.flags&shutdown != 0 {
Logger.Println("Producer shutting down.")
break
shuttingDown = true
continue
} else if msg.retries == 0 {
p.inFlight.Add(1)
if shuttingDown {
p.returnError(msg, ErrShuttingDown)
continue
}
}

if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
Expand All @@ -230,7 +234,6 @@ func (p *asyncProducer) topicDispatcher() {

handler := handlers[msg.Topic]
if handler == nil {
p.retries <- &ProducerMessage{flags: ref}
newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
topic := msg.Topic // block local because go's closure semantics suck
go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
Expand All @@ -244,21 +247,6 @@ func (p *asyncProducer) topicDispatcher() {
for _, handler := range handlers {
close(handler)
}

p.retries <- &ProducerMessage{flags: shutdown}

for msg := range p.input {
p.returnError(msg, ErrShuttingDown)
}

if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
}
close(p.errors)
close(p.successes)
}

// one per topic
Expand All @@ -281,7 +269,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe

handler := handlers[msg.Partition]
if handler == nil {
p.retries <- &ProducerMessage{flags: ref}
newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
topic := msg.Topic // block local because go's closure semantics suck
partition := msg.Partition // block local because go's closure semantics suck
Expand All @@ -296,7 +283,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe
for _, handler := range handlers {
close(handler)
}
p.retries <- &ProducerMessage{flags: unref}
}

// one per partition per topic
Expand Down Expand Up @@ -344,6 +330,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
highWatermark = msg.retries
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark)
retryState[msg.retries].expectChaser = true
p.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID())
p.unrefBrokerProducer(leader, output)
Expand All @@ -355,6 +342,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
if msg.flags&chaser == chaser {
retryState[msg.retries].expectChaser = false
p.inFlight.Done() // this chaser is now handled and will be garbage collected
} else {
retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg)
}
Expand Down Expand Up @@ -392,6 +380,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
}

}
p.inFlight.Done() // this chaser is now handled and will be garbage collected
continue
}
}
Expand All @@ -414,7 +403,6 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
if output != nil {
p.unrefBrokerProducer(leader, output)
}
p.retries <- &ProducerMessage{flags: unref}
}

// one per broker
Expand Down Expand Up @@ -543,9 +531,7 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {

if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
if p.conf.Producer.Return.Successes {
p.returnSuccesses(batch)
}
p.returnSuccesses(batch)
continue
}

Expand All @@ -563,12 +549,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
switch block.Err {
case ErrNoError:
// All the messages for this topic-partition were delivered successfully!
if p.conf.Producer.Return.Successes {
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
p.returnSuccesses(msgs)
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
p.returnSuccesses(msgs)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
Expand All @@ -585,19 +569,14 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
}
}
Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
p.retries <- &ProducerMessage{flags: unref}
}

// singleton
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
var (
msg *ProducerMessage
buf = queue.New()
refs = 0
shuttingDown = false
)
var msg *ProducerMessage
buf := queue.New()

for {
if buf.Length() == 0 {
Expand All @@ -611,36 +590,38 @@ func (p *asyncProducer) retryHandler() {
}
}

if msg.flags&ref != 0 {
refs++
} else if msg.flags&unref != 0 {
refs--
if refs == 0 && shuttingDown {
break
}
} else if msg.flags&shutdown != 0 {
shuttingDown = true
if refs == 0 {
break
}
} else {
buf.Add(msg)
if msg == nil {
return
}
}

close(p.retries)
for buf.Length() != 0 {
p.input <- buf.Peek().(*ProducerMessage)
buf.Remove()
buf.Add(msg)
}
close(p.input)
}

///////////////////////////////////////////
///////////////////////////////////////////

// utility functions

func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.input <- &ProducerMessage{flags: shutdown}

p.inFlight.Wait()

if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
}

close(p.input)
close(p.retries)
close(p.errors)
close(p.successes)
}

func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
var partitions []int32
var err error
Expand Down Expand Up @@ -745,6 +726,7 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
} else {
Logger.Println(pErr)
}
p.inFlight.Done()
}

func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
Expand All @@ -757,10 +739,14 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {

func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
for _, msg := range batch {
if msg != nil {
if msg == nil {
continue
}
if p.conf.Producer.Return.Successes {
msg.flags = 0
p.successes <- msg
}
p.inFlight.Done()
}
}

Expand All @@ -785,7 +771,6 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage
bp := p.brokers[broker]

if bp == nil {
p.retries <- &ProducerMessage{flags: ref}
bp = make(chan *ProducerMessage)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
Expand Down
50 changes: 50 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os/signal"
"sync"
"testing"
"time"
)

const TestMessage = "ABC THE MESSAGE"
Expand Down Expand Up @@ -525,6 +526,55 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
leader.Close()
}

func TestAsyncProducerRetryShutdown(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
producer.AsyncClose()
time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in

producer.Input() <- &ProducerMessage{Topic: "FOO"}
if err := <-producer.Errors(); err.Err != ErrShuttingDown {
t.Error(err)
}

prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

seedBroker.Returns(metadataLeader)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 10)

seedBroker.Close()
leader.Close()

// wait for the async-closed producer to shut down fully
for err := range producer.Errors() {
t.Error(err)
}
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down

0 comments on commit f948bc2

Please sign in to comment.