diff --git a/consumer_test.go b/consumer_test.go index ee50d10eb..84aecb678 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -6,7 +6,6 @@ import ( "os/signal" "reflect" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -1251,18 +1250,30 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { // launch test goroutines config := NewTestConfig() + config.ClientID = t.Name() config.Consumer.Retry.Backoff = 50 master, err := NewConsumer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } - // we expect to end up (eventually) consuming exactly ten messages on each partition - var wg sync.WaitGroup + consumers := map[int32]PartitionConsumer{} + checkMessage := func(partition int32, offset int) { + c := consumers[partition] + message := <-c.Messages() + t.Logf("Received message my_topic-%d offset=%d", partition, message.Offset) + if message.Offset != int64(offset) { + t.Error("Incorrect message offset!", offset, partition, message.Offset) + } + if message.Partition != partition { + t.Error("Incorrect message partition!") + } + } + for i := int32(0); i < 2; i++ { consumer, err := master.ConsumePartition("my_topic", i, 0) if err != nil { - t.Error(err) + t.Fatal(err) } go func(c PartitionConsumer) { @@ -1271,27 +1282,13 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { } }(consumer) - wg.Add(1) - go func(partition int32, c PartitionConsumer) { - for i := 0; i < 10; i++ { - message := <-consumer.Messages() - if message.Offset != int64(i) { - t.Error("Incorrect message offset!", i, partition, message.Offset) - } - if message.Partition != partition { - t.Error("Incorrect message partition!") - } - } - safeClose(t, consumer) - wg.Done() - }(i, consumer) + consumers[i] = consumer } time.Sleep(50 * time.Millisecond) - Logger.Printf(" STAGE 1") - // Stage 1: - // * my_topic/0 -> leader0 serves 4 messages - // * my_topic/1 -> leader1 serves 0 messages + t.Log(` STAGE 1: + * my_topic/0 -> leader0 will serve 4 messages + * my_topic/1 -> leader1 will serve 0 messages`) mockFetchResponse := NewMockFetchResponse(t, 1) for i := 0; i < 4; i++ { @@ -1301,11 +1298,15 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { "FetchRequest": mockFetchResponse, }) + for i := 0; i < 4; i++ { + checkMessage(0, i) + } + time.Sleep(50 * time.Millisecond) - Logger.Printf(" STAGE 2") - // Stage 2: - // * leader0 says that it is no longer serving my_topic/0 - // * seedBroker tells that leader1 is serving my_topic/0 now + t.Log(` STAGE 2: + * my_topic/0 -> leader0 will return NotLeaderForPartition + seedBroker will give leader1 as serving my_topic/0 now + * my_topic/1 -> leader1 will serve 0 messages`) // seed broker tells that the new partition 0 leader is leader1 seedBroker.SetHandlerByMap(map[string]MockResponse{ @@ -1325,13 +1326,12 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { }) time.Sleep(50 * time.Millisecond) - Logger.Printf(" STAGE 3") - // Stage 3: - // * my_topic/0 -> leader1 serves 3 messages - // * my_topic/1 -> leader1 server 8 messages + t.Log(` STAGE 3: + * my_topic/0 -> leader1 will serve 3 messages + * my_topic/1 -> leader1 will serve 8 messages`) // leader1 provides 3 message on partition 0, and 8 messages on partition 1 - mockFetchResponse2 := NewMockFetchResponse(t, 2) + mockFetchResponse2 := NewMockFetchResponse(t, 11) for i := 4; i < 7; i++ { mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg) } @@ -1342,12 +1342,25 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { "FetchRequest": mockFetchResponse2, }) + for i := 0; i < 8; i++ { + checkMessage(1, i) + } + for i := 4; i < 7; i++ { + checkMessage(0, i) + } + time.Sleep(50 * time.Millisecond) - Logger.Printf(" STAGE 4") - // Stage 4: - // * my_topic/0 -> leader1 serves 3 messages - // * my_topic/1 -> leader1 tells that it is no longer the leader - // * seedBroker tells that leader0 is a new leader for my_topic/1 + t.Log(` STAGE 4: + * my_topic/0 -> leader1 will serve 3 messages + * my_topic/1 -> leader1 will return NotLeaderForPartition + seedBroker will give leader0 as serving my_topic/1 now`) + + leader0.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": NewMockFetchResponse(t, 1), + }) + leader1.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": NewMockFetchResponse(t, 1), + }) // metadata assigns 0 to leader1 and 1 to leader0 seedBroker.SetHandlerByMap(map[string]MockResponse{ @@ -1365,11 +1378,16 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { SetMessage("my_topic", 0, int64(8), testMsg). SetMessage("my_topic", 0, int64(9), testMsg) fetchResponse4 := new(FetchResponse) + fetchResponse4.AddError("my_topic", 0, ErrNoError) fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition) leader1.SetHandlerByMap(map[string]MockResponse{ "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4), }) + t.Log(` STAGE 5: + * my_topic/0 -> leader1 will serve 0 messages + * my_topic/1 -> leader0 will serve 2 messages`) + // leader0 provides two messages on partition 1 mockFetchResponse4 := NewMockFetchResponse(t, 2) for i := 8; i < 10; i++ { @@ -1379,7 +1397,17 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { "FetchRequest": mockFetchResponse4, }) - wg.Wait() + for i := 7; i < 10; i++ { + checkMessage(0, i) + } + + for i := 8; i < 10; i++ { + checkMessage(1, i) + } + + for _, pc := range consumers { + safeClose(t, pc) + } safeClose(t, master) leader1.Close() leader0.Close()