Skip to content

Commit

Permalink
Add a test for retries during shutdow
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Apr 27, 2015
1 parent 42123dd commit 08ccf5e
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os/signal"
"sync"
"testing"
"time"
)

const TestMessage = "ABC THE MESSAGE"
Expand Down Expand Up @@ -525,6 +526,55 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
leader.Close()
}

func TestAsyncProducerRetryShutdown(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
producer.AsyncClose()
time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in

producer.Input() <- &ProducerMessage{Topic: "FOO"}
if err := <-producer.Errors(); err.Err != ErrShuttingDown {
t.Error(err)
}

prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

seedBroker.Returns(metadataLeader)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 10)

seedBroker.Close()
leader.Close()

// wait for the async-closed producer to shut down fully
for err := range producer.Errors() {
t.Error(err)
}
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down

0 comments on commit 08ccf5e

Please sign in to comment.