diff --git a/async_producer_test.go b/async_producer_test.go index 0242dda80..8f271f6fd 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -6,6 +6,7 @@ import ( "os/signal" "sync" "testing" + "time" ) const TestMessage = "ABC THE MESSAGE" @@ -525,6 +526,55 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { leader.Close() } +func TestAsyncProducerRetryShutdown(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataLeader) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Return.Successes = true + config.Producer.Retry.Backoff = 0 + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + producer.AsyncClose() + time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in + + producer.Input() <- &ProducerMessage{Topic: "FOO"} + if err := <-producer.Errors(); err.Err != ErrShuttingDown { + t.Error(err) + } + + prodNotLeader := new(ProduceResponse) + prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) + leader.Returns(prodNotLeader) + + seedBroker.Returns(metadataLeader) + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + expectSuccesses(t, producer, 10) + + seedBroker.Close() + leader.Close() + + // wait for the async-closed producer to shut down fully + for err := range producer.Errors() { + t.Error(err) + } +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() {