From fce917aa039cec6d776252cb501c1057f208b239 Mon Sep 17 00:00:00 2001 From: Celrenheit Date: Thu, 29 Apr 2021 18:54:17 +0200 Subject: [PATCH] This commit enables isolation level handling when fetching offsets. It enables list available offset (OffsetRequest and OffsetResponse) to require read committed isolation level. It adds RequireStable field to FetchOffsetRequest. --- broker.go | 1 + errors.go | 18 +++++ offset_fetch_request.go | 137 +++++++++++++++++++++++++++++----- offset_fetch_request_test.go | 60 +++++++++++++++ offset_fetch_response.go | 111 +++++++++++++++++++++++---- offset_fetch_response_test.go | 19 ++++- offset_request.go | 19 +++++ offset_request_test.go | 29 ++++++- offset_response.go | 18 ++++- offset_response_test.go | 52 ++++++++++++- request.go | 2 +- 11 files changed, 422 insertions(+), 44 deletions(-) diff --git a/broker.go b/broker.go index a466689cd..4ecca6c52 100644 --- a/broker.go +++ b/broker.go @@ -376,6 +376,7 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon //FetchOffset returns an offset fetch response or error func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) { response := new(OffsetFetchResponse) + response.Version = request.Version // needed to handle the two header versions err := b.sendAndReceive(request, response) if err != nil { diff --git a/errors.go b/errors.go index 5781c1c0c..da3353654 100644 --- a/errors.go +++ b/errors.go @@ -208,6 +208,12 @@ const ( ErrPreferredLeaderNotAvailable KError = 80 ErrGroupMaxSizeReached KError = 81 ErrFencedInstancedId KError = 82 + ErrEligibleLeadersNotAvailable KError = 83 + ErrElectionNotNeeded KError = 84 + ErrNoReassignmentInProgress KError = 85 + ErrGroupSubscribedToTopic KError = 86 + ErrInvalidRecord KError = 87 + ErrUnstableOffsetCommit KError = 88 ) func (err KError) Error() string { @@ -382,6 +388,18 @@ func (err KError) Error() string { return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members." case ErrFencedInstancedId: return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id." + case ErrEligibleLeadersNotAvailable: + return "kafka server: Eligible topic partition leaders are not available." + case ErrElectionNotNeeded: + return "kafka server: Leader election not needed for topic partition." + case ErrNoReassignmentInProgress: + return "kafka server: No partition reassignment is in progress." + case ErrGroupSubscribedToTopic: + return "kafka server: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it." + case ErrInvalidRecord: + return "kafka server: This record has failed the validation on broker and hence will be rejected." + case ErrUnstableOffsetCommit: + return "kafka server: There are unstable offsets that need to be cleared." } return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err) diff --git a/offset_fetch_request.go b/offset_fetch_request.go index 51e9faa3f..7e147eb60 100644 --- a/offset_fetch_request.go +++ b/offset_fetch_request.go @@ -3,60 +3,155 @@ package sarama type OffsetFetchRequest struct { Version int16 ConsumerGroup string + RequireStable bool // requires v7+ partitions map[string][]int32 } func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) { - if r.Version < 0 || r.Version > 5 { + if r.Version < 0 || r.Version > 7 { return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"} } - if err = pe.putString(r.ConsumerGroup); err != nil { + isFlexible := r.Version >= 6 + + if isFlexible { + err = pe.putCompactString(r.ConsumerGroup) + } else { + err = pe.putString(r.ConsumerGroup) + } + if err != nil { return err } - if r.Version >= 2 && r.partitions == nil { - pe.putInt32(-1) - } else { - if err = pe.putArrayLength(len(r.partitions)); err != nil { - return err + if isFlexible { + if r.partitions == nil { + pe.putUVarint(0) + } else { + pe.putCompactArrayLength(len(r.partitions)) } - for topic, partitions := range r.partitions { - if err = pe.putString(topic); err != nil { - return err - } - if err = pe.putInt32Array(partitions); err != nil { + } else { + if r.partitions == nil && r.Version >= 2 { + pe.putInt32(-1) + } else { + if err = pe.putArrayLength(len(r.partitions)); err != nil { return err } } } + + for topic, partitions := range r.partitions { + if isFlexible { + err = pe.putCompactString(topic) + } else { + err = pe.putString(topic) + } + if err != nil { + return err + } + + // + + if isFlexible { + err = pe.putCompactInt32Array(partitions) + } else { + err = pe.putInt32Array(partitions) + } + if err != nil { + return err + } + + if isFlexible { + pe.putEmptyTaggedFieldArray() + } + } + + if r.RequireStable && r.Version < 7 { + return PacketEncodingError{"requireStable is not supported. use version 7 or later"} + } + + if r.Version >= 7 { + pe.putBool(r.RequireStable) + } + + if isFlexible { + pe.putEmptyTaggedFieldArray() + } + return nil } func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) { r.Version = version - if r.ConsumerGroup, err = pd.getString(); err != nil { + isFlexible := r.Version >= 6 + if isFlexible { + r.ConsumerGroup, err = pd.getCompactString() + } else { + r.ConsumerGroup, err = pd.getString() + } + if err != nil { return err } - partitionCount, err := pd.getArrayLength() + + var partitionCount int + + if isFlexible { + partitionCount, err = pd.getCompactArrayLength() + } else { + partitionCount, err = pd.getArrayLength() + } if err != nil { return err } + if (partitionCount == 0 && version < 2) || partitionCount < 0 { return nil } - r.partitions = make(map[string][]int32) + + r.partitions = make(map[string][]int32, partitionCount) for i := 0; i < partitionCount; i++ { - topic, err := pd.getString() + var topic string + if isFlexible { + topic, err = pd.getCompactString() + } else { + topic, err = pd.getString() + } if err != nil { return err } - partitions, err := pd.getInt32Array() + + var partitions []int32 + if isFlexible { + partitions, err = pd.getCompactInt32Array() + } else { + partitions, err = pd.getInt32Array() + } if err != nil { return err } + if isFlexible { + _, err = pd.getEmptyTaggedFieldArray() + if err != nil { + return err + } + } + r.partitions[topic] = partitions } + + if r.Version >= 7 { + r.RequireStable, err = pd.getBool() + if err != nil { + return err + } + } + + if isFlexible { + _, err = pd.getEmptyTaggedFieldArray() + if err != nil { + return err + } + } + return nil } @@ -69,6 +164,10 @@ func (r *OffsetFetchRequest) version() int16 { } func (r *OffsetFetchRequest) headerVersion() int16 { + if r.Version >= 6 { + return 2 + } + return 1 } @@ -84,6 +183,10 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion { return V2_0_0_0 case 5: return V2_1_0_0 + case 6: + return V2_4_0_0 + case 7: + return V2_5_0_0 default: return MinVersion } diff --git a/offset_fetch_request_test.go b/offset_fetch_request_test.go index 55b46eea7..a5270dbeb 100644 --- a/offset_fetch_request_test.go +++ b/offset_fetch_request_test.go @@ -10,6 +10,12 @@ var ( 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + offsetFetchRequestNoPartitionsV6 = []byte{ + 0x05, 'b', 'l', 'a', 'h', 0x01, 0x00} + + offsetFetchRequestNoPartitionsV7 = []byte{ + 0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00} + offsetFetchRequestNoPartitions = []byte{ 0x00, 0x04, 'b', 'l', 'a', 'h', 0x00, 0x00, 0x00, 0x00} @@ -21,6 +27,20 @@ var ( 0x00, 0x00, 0x00, 0x01, 0x4F, 0x4F, 0x4F, 0x4F} + offsetFetchRequestOnePartitionV6 = []byte{ + 0x05, 'b', 'l', 'a', 'h', + 0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', + 0x02, + 0x4F, 0x4F, 0x4F, 0x4F, + 0x00, 0x00} + + offsetFetchRequestOnePartitionV7 = []byte{ + 0x05, 'b', 'l', 'a', 'h', + 0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', + 0x02, + 0x4F, 0x4F, 0x4F, 0x4F, + 0x00, 0x00, 0x00} + offsetFetchRequestAllPartitions = []byte{ 0x00, 0x04, 'b', 'l', 'a', 'h', 0xff, 0xff, 0xff, 0xff} @@ -36,7 +56,29 @@ func TestOffsetFetchRequestNoPartitions(t *testing.T) { request.ConsumerGroup = "blah" testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions) } + + { // v6 + version := 6 + request := new(OffsetFetchRequest) + request.Version = int16(version) + request.ConsumerGroup = "blah" + request.ZeroPartitions() + + testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV6) + } + + { // v7 + version := 7 + request := new(OffsetFetchRequest) + request.Version = int16(version) + request.ConsumerGroup = "blah" + request.RequireStable = true + request.ZeroPartitions() + + testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV7) + } } + func TestOffsetFetchRequest(t *testing.T) { for version := 0; version <= 5; version++ { request := new(OffsetFetchRequest) @@ -45,6 +87,24 @@ func TestOffsetFetchRequest(t *testing.T) { request.AddPartition("topicTheFirst", 0x4F4F4F4F) testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition) } + + { //v6 + version := 6 + request := new(OffsetFetchRequest) + request.Version = int16(version) + request.ConsumerGroup = "blah" + request.AddPartition("topicTheFirst", 0x4F4F4F4F) + testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV6) + } + + { //v7 + version := 7 + request := new(OffsetFetchRequest) + request.Version = int16(version) + request.ConsumerGroup = "blah" + request.AddPartition("topicTheFirst", 0x4F4F4F4F) + testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV7) + } } func TestOffsetFetchRequestAllPartitions(t *testing.T) { diff --git a/offset_fetch_response.go b/offset_fetch_response.go index 9c64e0708..19449220f 100644 --- a/offset_fetch_response.go +++ b/offset_fetch_response.go @@ -8,6 +8,8 @@ type OffsetFetchResponseBlock struct { } func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) { + isFlexible := version >= 6 + b.Offset, err = pd.getInt64() if err != nil { return err @@ -20,7 +22,11 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err } } - b.Metadata, err = pd.getString() + if isFlexible { + b.Metadata, err = pd.getCompactString() + } else { + b.Metadata, err = pd.getString() + } if err != nil { return err } @@ -31,23 +37,37 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err } b.Err = KError(tmp) + if isFlexible { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + return nil } func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) { + isFlexible := version >= 6 pe.putInt64(b.Offset) if version >= 5 { pe.putInt32(b.LeaderEpoch) } - - err = pe.putString(b.Metadata) + if isFlexible { + err = pe.putCompactString(b.Metadata) + } else { + err = pe.putString(b.Metadata) + } if err != nil { return err } pe.putInt16(int16(b.Err)) + if isFlexible { + pe.putEmptyTaggedFieldArray() + } + return nil } @@ -58,19 +78,37 @@ type OffsetFetchResponse struct { Err KError } -func (r *OffsetFetchResponse) encode(pe packetEncoder) error { +func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) { + isFlexible := r.Version >= 6 + if r.Version >= 3 { pe.putInt32(r.ThrottleTimeMs) } - - if err := pe.putArrayLength(len(r.Blocks)); err != nil { + if isFlexible { + pe.putCompactArrayLength(len(r.Blocks)) + } else { + err = pe.putArrayLength(len(r.Blocks)) + } + if err != nil { return err } + for topic, partitions := range r.Blocks { - if err := pe.putString(topic); err != nil { + if isFlexible { + err = pe.putCompactString(topic) + } else { + err = pe.putString(topic) + } + if err != nil { return err } - if err := pe.putArrayLength(len(partitions)); err != nil { + + if isFlexible { + pe.putCompactArrayLength(len(partitions)) + } else { + err = pe.putArrayLength(len(partitions)) + } + if err != nil { return err } for partition, block := range partitions { @@ -79,15 +117,22 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error { return err } } + if isFlexible { + pe.putEmptyTaggedFieldArray() + } } if r.Version >= 2 { pe.putInt16(int16(r.Err)) } + if isFlexible { + pe.putEmptyTaggedFieldArray() + } return nil } func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) { r.Version = version + isFlexible := version >= 6 if version >= 3 { r.ThrottleTimeMs, err = pd.getInt32() @@ -96,7 +141,12 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error } } - numTopics, err := pd.getArrayLength() + var numTopics int + if isFlexible { + numTopics, err = pd.getCompactArrayLength() + } else { + numTopics, err = pd.getArrayLength() + } if err != nil { return err } @@ -104,22 +154,30 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error if numTopics > 0 { r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics) for i := 0; i < numTopics; i++ { - name, err := pd.getString() + var name string + if isFlexible { + name, err = pd.getCompactString() + } else { + name, err = pd.getString() + } if err != nil { return err } - numBlocks, err := pd.getArrayLength() + var numBlocks int + if isFlexible { + numBlocks, err = pd.getCompactArrayLength() + } else { + numBlocks, err = pd.getArrayLength() + } if err != nil { return err } - if numBlocks == 0 { - r.Blocks[name] = nil - continue + r.Blocks[name] = nil + if numBlocks > 0 { + r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks) } - r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks) - for j := 0; j < numBlocks; j++ { id, err := pd.getInt32() if err != nil { @@ -131,8 +189,15 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error if err != nil { return err } + r.Blocks[name][id] = block } + + if isFlexible { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } } } @@ -144,6 +209,12 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error r.Err = KError(kerr) } + if isFlexible { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + return nil } @@ -156,6 +227,10 @@ func (r *OffsetFetchResponse) version() int16 { } func (r *OffsetFetchResponse) headerVersion() int16 { + if r.Version >= 6 { + return 1 + } + return 0 } @@ -171,6 +246,10 @@ func (r *OffsetFetchResponse) requiredVersion() KafkaVersion { return V2_0_0_0 case 5: return V2_1_0_0 + case 6: + return V2_4_0_0 + case 7: + return V2_5_0_0 default: return MinVersion } diff --git a/offset_fetch_response_test.go b/offset_fetch_response_test.go index b564f70f9..b1a6c3543 100644 --- a/offset_fetch_response_test.go +++ b/offset_fetch_response_test.go @@ -17,6 +17,10 @@ var ( 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2A} + + emptyOffsetFetchResponseV6 = []byte{ + 0x00, 0x00, 0x00, 0x09, + 0x01, 0x00, 0x2A, 0x00} ) func TestEmptyOffsetFetchResponse(t *testing.T) { @@ -32,6 +36,11 @@ func TestEmptyOffsetFetchResponse(t *testing.T) { responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3) } + + for version := 6; version <= 7; version++ { + responseV6 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} + testResponse(t, fmt.Sprintf("empty v%d", version), &responseV6, emptyOffsetFetchResponseV6) + } } func TestNormalOffsetFetchResponse(t *testing.T) { @@ -58,8 +67,10 @@ func TestNormalOffsetFetchResponse(t *testing.T) { testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil) } - responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9} - responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut}) - responseV5.Blocks["m"] = nil - testResponse(t, "normal V5", &responseV5, nil) + for version := 5; version <= 7; version++ { + res := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} + res.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut}) + res.Blocks["m"] = nil + testResponse(t, fmt.Sprintf("normal V%d", version), &res, nil) + } } diff --git a/offset_request.go b/offset_request.go index c0b3305f6..4c9ce4df5 100644 --- a/offset_request.go +++ b/offset_request.go @@ -28,6 +28,7 @@ func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) type OffsetRequest struct { Version int16 + IsolationLevel IsolationLevel replicaID int32 isReplicaIDSet bool blocks map[string]map[int32]*offsetRequestBlock @@ -41,6 +42,10 @@ func (r *OffsetRequest) encode(pe packetEncoder) error { pe.putInt32(-1) } + if r.Version >= 2 { + pe.putBool(r.IsolationLevel == ReadCommitted) + } + err := pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -75,6 +80,18 @@ func (r *OffsetRequest) decode(pd packetDecoder, version int16) error { r.SetReplicaID(replicaID) } + if r.Version >= 2 { + tmp, err := pd.getBool() + if err != nil { + return err + } + + r.IsolationLevel = ReadUncommitted + if tmp { + r.IsolationLevel = ReadCommitted + } + } + blockCount, err := pd.getArrayLength() if err != nil { return err @@ -124,6 +141,8 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: return V0_10_1_0 + case 2: + return V0_11_0_0 default: return MinVersion } diff --git a/offset_request_test.go b/offset_request_test.go index 8ca818e49..0e6951a00 100644 --- a/offset_request_test.go +++ b/offset_request_test.go @@ -3,10 +3,15 @@ package sarama import "testing" var ( - offsetRequestNoBlocks = []byte{ + offsetRequestNoBlocksV1 = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00} + offsetRequestNoBlocksV2 = []byte{ + 0xFF, 0xFF, 0xFF, 0xFF, + 0x00, 0x00, 0x00, 0x00, + 0x00} + offsetRequestOneBlock = []byte{ 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x01, @@ -24,6 +29,14 @@ var ( 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + offsetRequestOneBlockReadCommittedV2 = []byte{ + 0xFF, 0xFF, 0xFF, 0xFF, + 0x01, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, 'b', 'a', 'r', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + offsetRequestReplicaID = []byte{ 0x00, 0x00, 0x00, 0x2a, 0x00, 0x00, 0x00, 0x00} @@ -31,7 +44,7 @@ var ( func TestOffsetRequest(t *testing.T) { request := new(OffsetRequest) - testRequest(t, "no blocks", request, offsetRequestNoBlocks) + testRequest(t, "no blocks", request, offsetRequestNoBlocksV1) request.AddBlock("foo", 4, 1, 2) testRequest(t, "one block", request, offsetRequestOneBlock) @@ -40,12 +53,22 @@ func TestOffsetRequest(t *testing.T) { func TestOffsetRequestV1(t *testing.T) { request := new(OffsetRequest) request.Version = 1 - testRequest(t, "no blocks", request, offsetRequestNoBlocks) + testRequest(t, "no blocks", request, offsetRequestNoBlocksV1) request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1 testRequest(t, "one block", request, offsetRequestOneBlockV1) } +func TestOffsetRequestV2(t *testing.T) { + request := new(OffsetRequest) + request.Version = 2 + testRequest(t, "no blocks", request, offsetRequestNoBlocksV2) + + request.IsolationLevel = ReadCommitted + request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1 + testRequest(t, "one block", request, offsetRequestOneBlockReadCommittedV2) +} + func TestOffsetRequestReplicaID(t *testing.T) { request := new(OffsetRequest) replicaID := int32(42) diff --git a/offset_response.go b/offset_response.go index ead3ebbcc..69349efe2 100644 --- a/offset_response.go +++ b/offset_response.go @@ -50,11 +50,19 @@ func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error } type OffsetResponse struct { - Version int16 - Blocks map[string]map[int32]*OffsetResponseBlock + Version int16 + ThrottleTimeMs int32 + Blocks map[string]map[int32]*OffsetResponseBlock } func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) { + if version >= 2 { + r.ThrottleTimeMs, err = pd.getInt32() + if err != nil { + return err + } + } + numTopics, err := pd.getArrayLength() if err != nil { return err @@ -120,6 +128,10 @@ func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponse */ func (r *OffsetResponse) encode(pe packetEncoder) (err error) { + if r.Version >= 2 { + pe.putInt32(r.ThrottleTimeMs) + } + if err = pe.putArrayLength(len(r.Blocks)); err != nil { return err } @@ -158,6 +170,8 @@ func (r *OffsetResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: return V0_10_1_0 + case 2: + return V0_11_0_0 default: return MinVersion } diff --git a/offset_response_test.go b/offset_response_test.go index 0df6c9f3e..a711a9864 100644 --- a/offset_response_test.go +++ b/offset_response_test.go @@ -1,6 +1,8 @@ package sarama -import "testing" +import ( + "testing" +) var ( emptyOffsetResponse = []byte{ @@ -26,6 +28,20 @@ var ( 0x00, 0x01, 'a', 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 'z', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x02, + 0x00, 0x00, + 0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} + + normalOffsetResponseV2 = []byte{ + 0x00, 0x00, 0x00, 0x09, + 0x00, 0x00, 0x00, 0x02, + + 0x00, 0x01, 'a', + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 'z', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, @@ -109,3 +125,37 @@ func TestNormalOffsetResponseV1(t *testing.T) { t.Fatal("Decoding produced invalid offsets for topic z partition 2.") } } + +func TestNormalOffsetResponseV2(t *testing.T) { + response := OffsetResponse{} + + testVersionDecodable(t, "normal", &response, normalOffsetResponseV2, 2) // response should not change + + if response.ThrottleTimeMs != 9 { + t.Fatal("Decoding produced", response.ThrottleTimeMs, "throttle milliseconds where there were nine milliseconds.") + } + + if len(response.Blocks) != 2 { + t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.") + } + + if len(response.Blocks["a"]) != 0 { + t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.") + } + + if len(response.Blocks["z"]) != 1 { + t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.") + } + + if response.Blocks["z"][2].Err != ErrNoError { + t.Fatal("Decoding produced invalid error for topic z partition 2.") + } + + if response.Blocks["z"][2].Timestamp != 1477920049286 { + t.Fatal("Decoding produced invalid timestamp for topic z partition 2.", response.Blocks["z"][2].Timestamp) + } + + if response.Blocks["z"][2].Offset != 6 { + t.Fatal("Decoding produced invalid offsets for topic z partition 2.") + } +} diff --git a/request.go b/request.go index dcfd3946c..0f76ae534 100644 --- a/request.go +++ b/request.go @@ -129,7 +129,7 @@ func allocateBody(key, version int16) protocolBody { case 8: return &OffsetCommitRequest{Version: version} case 9: - return &OffsetFetchRequest{} + return &OffsetFetchRequest{Version: version} case 10: return &FindCoordinatorRequest{} case 11: