Skip to content

Commit

Permalink
Add test for encoding failures
Browse files Browse the repository at this point in the history
This test also demonstrates the several problems with the current fix.
  • Loading branch information
eapache committed Sep 24, 2015
1 parent 8b5d6d8 commit 34c6a26
Showing 1 changed file with 56 additions and 1 deletion.
57 changes: 56 additions & 1 deletion async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ func closeProducer(t *testing.T, p AsyncProducer) {
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
for successes > 0 || errors > 0 {
expect := successes + errors
for expect > 0 {
select {
case msg := <-p.Errors():
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
}
errors--
expect--
if errors < 0 {
t.Error(msg.Err)
}
Expand All @@ -48,11 +50,15 @@ func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
t.Error("Message had flags set")
}
successes--
expect--
if successes < 0 {
t.Error("Too many successes")
}
}
}
if successes != 0 || errors != 0 {
t.Error("Unexpected successes", successes, "or errors", errors)
}
}

type testPartitioner chan *int32
Expand All @@ -74,6 +80,19 @@ func (p testPartitioner) feed(partition int32) {
p <- &partition
}

type flakyEncoder bool

func (f flakyEncoder) Length() int {
return len(TestMessage)
}

func (f flakyEncoder) Encode() ([]byte, error) {
if !bool(f) {
return nil, errors.New("flaky encoding error")
}
return []byte(TestMessage), nil
}

func TestAsyncProducer(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
Expand Down Expand Up @@ -285,6 +304,42 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
closeProducer(t, producer)
}

func TestAsyncProducerEncoderFailures(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
leader.Returns(prodSuccess)
leader.Returns(prodSuccess)

config := NewConfig()
config.Producer.Flush.Messages = 3
config.Producer.Return.Successes = true
config.Producer.Partitioner = NewManualPartitioner
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for flush := 0; flush < 3; flush++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
expectResults(t, producer, 1, 2)
}

closeProducer(t, producer)
leader.Close()
seedBroker.Close()
}

// If a Kafka broker becomes unavailable and then returns back in service, then
// producer reconnects to it and continues sending messages.
func TestAsyncProducerBrokerBounce(t *testing.T) {
Expand Down

0 comments on commit 34c6a26

Please sign in to comment.