Skip to content

Commit

Permalink
fix(test): mockbroker offsetResponse vers behavior
Browse files Browse the repository at this point in the history
For some reason (unlike the other mocks) offset response was using a
manually encoded version rather than using the version from the request
body which was causing protocol decode failures unless those were
manually set correctly.
  • Loading branch information
dnwe committed Apr 14, 2022
1 parent 43bd562 commit 7d8b292
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 39 deletions.
25 changes: 0 additions & 25 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 +573,10 @@ func TestConsumerExtraOffsets(t *testing.T) {
newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
var offsetResponseVersion int16
cfg := NewTestConfig()
cfg.Consumer.Return.Errors = true
if fetchResponse1.Version >= 4 {
cfg.Version = V0_11_0_0
offsetResponseVersion = 1
}

broker0 := NewMockBroker(t, 0)
Expand All @@ -590,7 +588,6 @@ func TestConsumerExtraOffsets(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(offsetResponseVersion).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
Expand Down Expand Up @@ -651,7 +648,6 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
Expand Down Expand Up @@ -698,7 +694,6 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
Expand Down Expand Up @@ -742,7 +737,6 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
Expand Down Expand Up @@ -810,7 +804,6 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) {
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
Expand All @@ -822,7 +815,6 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) {
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse3, fetchResponse4),
Expand Down Expand Up @@ -875,7 +867,6 @@ func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) {
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
Expand Down Expand Up @@ -934,7 +925,6 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) {
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse4),
Expand All @@ -946,7 +936,6 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) {
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse2, fetchResponse3),
Expand Down Expand Up @@ -1006,7 +995,6 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) {
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse4),
Expand All @@ -1018,7 +1006,6 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) {
SetBroker(leader.Addr(), leader.BrokerID()).
SetLeader("my_topic", 0, leader.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse2, fetchResponse3),
Expand Down Expand Up @@ -1078,11 +1065,9 @@ func TestConsumeMessagesTrackLeader(t *testing.T) {
leader1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": mockMetadataResponse1,
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockFetchResponse(t, 1).
SetVersion(10).
SetMessage("my_topic", 0, 1, testMsg).
SetMessage("my_topic", 0, 2, testMsg),
})
Expand Down Expand Up @@ -1114,7 +1099,6 @@ func TestConsumeMessagesTrackLeader(t *testing.T) {
leader2.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": mockMetadataResponse2,
"FetchRequest": NewMockFetchResponse(t, 1).
SetVersion(10).
SetMessage("my_topic", 0, 3, testMsg).
SetMessage("my_topic", 0, 4, testMsg),
})
Expand All @@ -1134,7 +1118,6 @@ func TestConsumeMessagesTrackLeader(t *testing.T) {
leader1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": mockMetadataResponse3,
"FetchRequest": NewMockFetchResponse(t, 1).
SetVersion(10).
SetMessage("my_topic", 0, 5, testMsg).
SetMessage("my_topic", 0, 6, testMsg),
})
Expand Down Expand Up @@ -1177,11 +1160,9 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
var offsetResponseVersion int16
cfg := NewTestConfig()
if fetchResponse1.Version >= 4 {
cfg.Version = V0_11_0_0
offsetResponseVersion = 1
}

broker0 := NewMockBroker(t, 0)
Expand All @@ -1192,7 +1173,6 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(offsetResponseVersion).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
Expand Down Expand Up @@ -1684,20 +1664,17 @@ func TestConsumerTimestamps(t *testing.T) {
}, []time.Time{now, now}},
} {
var fr *FetchResponse
var offsetResponseVersion int16
cfg := NewTestConfig()
cfg.Version = d.kversion
switch {
case d.kversion.IsAtLeast(V0_11_0_0):
offsetResponseVersion = 1
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
}
fr.SetLastOffsetDelta("my_topic", 0, 2)
fr.SetLastStableOffset("my_topic", 0, 2)
case d.kversion.IsAtLeast(V0_10_1_0):
offsetResponseVersion = 1
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
Expand All @@ -1722,7 +1699,6 @@ func TestConsumerTimestamps(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(offsetResponseVersion).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fr),
Expand Down Expand Up @@ -1779,7 +1755,6 @@ func TestExcludeUncommitted(t *testing.T) {
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, 1237),
"FetchRequest": NewMockWrapper(fetchResponse),
Expand Down
16 changes: 2 additions & 14 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader
type MockOffsetResponse struct {
offsets map[string]map[int32]map[int64]int64
t TestReporter
version int16
}

func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
Expand All @@ -214,11 +213,6 @@ func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
}
}

func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
mor.version = version
return mor
}

func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
partitions := mor.offsets[topic]
if partitions == nil {
Expand All @@ -236,7 +230,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of

func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
offsetRequest := reqBody.(*OffsetRequest)
offsetResponse := &OffsetResponse{Version: mor.version}
offsetResponse := &OffsetResponse{Version: offsetRequest.Version}
for topic, partitions := range offsetRequest.blocks {
for partition, block := range partitions {
offset := mor.getOffset(topic, partition, block.time)
Expand Down Expand Up @@ -269,7 +263,6 @@ type MockFetchResponse struct {
highWaterMarks map[string]map[int32]int64
t TestReporter
batchSize int
version int16
}

func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
Expand All @@ -282,11 +275,6 @@ func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
}
}

func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
mfr.version = version
return mfr
}

func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
mfr.messagesLock.Lock()
defer mfr.messagesLock.Unlock()
Expand Down Expand Up @@ -317,7 +305,7 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of
func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
fetchRequest := reqBody.(*FetchRequest)
res := &FetchResponse{
Version: mfr.version,
Version: fetchRequest.Version,
}
for topic, partitions := range fetchRequest.blocks {
for partition, block := range partitions {
Expand Down

0 comments on commit 7d8b292

Please sign in to comment.