From 7d8b29209023924b972dadda288f302ea0e994f9 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 14 Apr 2022 01:30:54 +0100 Subject: [PATCH] fix(test): mockbroker offsetResponse vers behavior For some reason (unlike the other mocks) offset response was using a manually encoded version rather than using the version from the request body which was causing protocol decode failures unless those were manually set correctly. --- consumer_test.go | 25 ------------------------- mockresponses.go | 16 ++-------------- 2 files changed, 2 insertions(+), 39 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index 756ac3207..68b6398fc 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -573,12 +573,10 @@ func TestConsumerExtraOffsets(t *testing.T) { newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4) newFetchResponse.SetLastStableOffset("my_topic", 0, 4) for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { - var offsetResponseVersion int16 cfg := NewTestConfig() cfg.Consumer.Return.Errors = true if fetchResponse1.Version >= 4 { cfg.Version = V0_11_0_0 - offsetResponseVersion = 1 } broker0 := NewMockBroker(t, 0) @@ -590,7 +588,6 @@ func TestConsumerExtraOffsets(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(offsetResponseVersion). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), @@ -651,7 +648,6 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), @@ -698,7 +694,6 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), @@ -742,7 +737,6 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), @@ -810,7 +804,6 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { SetBroker(leader.Addr(), leader.BrokerID()). SetLeader("my_topic", 0, leader.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), @@ -822,7 +815,6 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { SetBroker(leader.Addr(), leader.BrokerID()). SetLeader("my_topic", 0, leader.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse3, fetchResponse4), @@ -875,7 +867,6 @@ func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) { SetBroker(leader.Addr(), leader.BrokerID()). SetLeader("my_topic", 0, leader.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), @@ -934,7 +925,6 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { SetBroker(leader.Addr(), leader.BrokerID()). SetLeader("my_topic", 0, leader.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse4), @@ -946,7 +936,6 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { SetBroker(leader.Addr(), leader.BrokerID()). SetLeader("my_topic", 0, leader.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse2, fetchResponse3), @@ -1006,7 +995,6 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { SetBroker(leader.Addr(), leader.BrokerID()). SetLeader("my_topic", 0, leader.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse4), @@ -1018,7 +1006,6 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { SetBroker(leader.Addr(), leader.BrokerID()). SetLeader("my_topic", 0, leader.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse2, fetchResponse3), @@ -1078,11 +1065,9 @@ func TestConsumeMessagesTrackLeader(t *testing.T) { leader1.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": mockMetadataResponse1, "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockFetchResponse(t, 1). - SetVersion(10). SetMessage("my_topic", 0, 1, testMsg). SetMessage("my_topic", 0, 2, testMsg), }) @@ -1114,7 +1099,6 @@ func TestConsumeMessagesTrackLeader(t *testing.T) { leader2.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": mockMetadataResponse2, "FetchRequest": NewMockFetchResponse(t, 1). - SetVersion(10). SetMessage("my_topic", 0, 3, testMsg). SetMessage("my_topic", 0, 4, testMsg), }) @@ -1134,7 +1118,6 @@ func TestConsumeMessagesTrackLeader(t *testing.T) { leader1.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": mockMetadataResponse3, "FetchRequest": NewMockFetchResponse(t, 1). - SetVersion(10). SetMessage("my_topic", 0, 5, testMsg). SetMessage("my_topic", 0, 6, testMsg), }) @@ -1177,11 +1160,9 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11) newFetchResponse.SetLastStableOffset("my_topic", 0, 11) for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { - var offsetResponseVersion int16 cfg := NewTestConfig() if fetchResponse1.Version >= 4 { cfg.Version = V0_11_0_0 - offsetResponseVersion = 1 } broker0 := NewMockBroker(t, 0) @@ -1192,7 +1173,6 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(offsetResponseVersion). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), @@ -1684,12 +1664,10 @@ func TestConsumerTimestamps(t *testing.T) { }, []time.Time{now, now}}, } { var fr *FetchResponse - var offsetResponseVersion int16 cfg := NewTestConfig() cfg.Version = d.kversion switch { case d.kversion.IsAtLeast(V0_11_0_0): - offsetResponseVersion = 1 fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now} for _, m := range d.messages { fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp) @@ -1697,7 +1675,6 @@ func TestConsumerTimestamps(t *testing.T) { fr.SetLastOffsetDelta("my_topic", 0, 2) fr.SetLastStableOffset("my_topic", 0, 2) case d.kversion.IsAtLeast(V0_10_1_0): - offsetResponseVersion = 1 fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now} for _, m := range d.messages { fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1) @@ -1722,7 +1699,6 @@ func TestConsumerTimestamps(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(offsetResponseVersion). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), "FetchRequest": NewMockSequence(fr), @@ -1779,7 +1755,6 @@ func TestExcludeUncommitted(t *testing.T) { SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetVersion(1). SetOffset("my_topic", 0, OffsetOldest, 0). SetOffset("my_topic", 0, OffsetNewest, 1237), "FetchRequest": NewMockWrapper(fetchResponse), diff --git a/mockresponses.go b/mockresponses.go index fff9dd77e..dcc361738 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -204,7 +204,6 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader type MockOffsetResponse struct { offsets map[string]map[int32]map[int64]int64 t TestReporter - version int16 } func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse { @@ -214,11 +213,6 @@ func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse { } } -func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse { - mor.version = version - return mor -} - func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse { partitions := mor.offsets[topic] if partitions == nil { @@ -236,7 +230,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader { offsetRequest := reqBody.(*OffsetRequest) - offsetResponse := &OffsetResponse{Version: mor.version} + offsetResponse := &OffsetResponse{Version: offsetRequest.Version} for topic, partitions := range offsetRequest.blocks { for partition, block := range partitions { offset := mor.getOffset(topic, partition, block.time) @@ -269,7 +263,6 @@ type MockFetchResponse struct { highWaterMarks map[string]map[int32]int64 t TestReporter batchSize int - version int16 } func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse { @@ -282,11 +275,6 @@ func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse { } } -func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse { - mfr.version = version - return mfr -} - func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse { mfr.messagesLock.Lock() defer mfr.messagesLock.Unlock() @@ -317,7 +305,7 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader { fetchRequest := reqBody.(*FetchRequest) res := &FetchResponse{ - Version: mfr.version, + Version: fetchRequest.Version, } for topic, partitions := range fetchRequest.blocks { for partition, block := range partitions {