diff --git a/async_producer_test.go b/async_producer_test.go index 403456839..7febbb7de 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -580,20 +580,23 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) leader.Returns(prodNotLeader) + time.Sleep(50 * time.Millisecond) + + leader.SetHandlerByMap(map[string]MockResponse{ + "ProduceRequest": newMockProduceResponse(t). + SetError("my_topic", 0, ErrNoError), + }) + // tell partition 0 to go to that broker again seedBroker.Returns(metadataResponse) // succeed this time - prodSuccess := new(ProduceResponse) - prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - leader.Returns(prodSuccess) expectResults(t, producer, 5, 0) // put five more through for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} } - leader.Returns(prodSuccess) expectResults(t, producer, 5, 0) // shutdown