From 551b28c9f0ad97a7833832847701c767b1c7e8b9 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Thu, 13 Feb 2020 14:21:52 +1100 Subject: [PATCH] Revert "Merge pull request #1582 from dnwe/fetch-request-response-protocol" This reverts commit ab4036c9fc05d1489fd6fe356ede92ea72f01dfc, reversing changes made to ae8f056ea30a2cd5534e6151e8f8951c7d0b1fef. --- fetch_request.go | 138 +++++------------------------------------- fetch_request_test.go | 44 +++++--------- fetch_response.go | 48 +-------------- request.go | 2 +- 4 files changed, 34 insertions(+), 198 deletions(-) diff --git a/fetch_request.go b/fetch_request.go index 836e6dec1..4db9ddd3d 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -1,41 +1,20 @@ package sarama type fetchRequestBlock struct { - Version int16 - currentLeaderEpoch int32 - fetchOffset int64 - logStartOffset int64 - maxBytes int32 + fetchOffset int64 + maxBytes int32 } -func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error { - b.Version = version - if b.Version >= 9 { - pe.putInt32(b.currentLeaderEpoch) - } +func (b *fetchRequestBlock) encode(pe packetEncoder) error { pe.putInt64(b.fetchOffset) - if b.Version >= 5 { - pe.putInt64(b.logStartOffset) - } pe.putInt32(b.maxBytes) return nil } -func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) { - b.Version = version - if b.Version >= 9 { - if b.currentLeaderEpoch, err = pd.getInt32(); err != nil { - return err - } - } +func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) { if b.fetchOffset, err = pd.getInt64(); err != nil { return err } - if b.Version >= 5 { - if b.logStartOffset, err = pd.getInt64(); err != nil { - return err - } - } if b.maxBytes, err = pd.getInt32(); err != nil { return err } @@ -46,15 +25,12 @@ func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) // https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at // https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes type FetchRequest struct { - MaxWaitTime int32 - MinBytes int32 - MaxBytes int32 - Version int16 - Isolation IsolationLevel - SessionID int32 - SessionEpoch int32 - blocks map[string]map[int32]*fetchRequestBlock - forgotten map[string][]int32 + MaxWaitTime int32 + MinBytes int32 + MaxBytes int32 + Version int16 + Isolation IsolationLevel + blocks map[string]map[int32]*fetchRequestBlock } type IsolationLevel int8 @@ -74,10 +50,6 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { if r.Version >= 4 { pe.putInt8(int8(r.Isolation)) } - if r.Version >= 7 { - pe.putInt32(r.SessionID) - pe.putInt32(r.SessionEpoch) - } err = pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -93,38 +65,17 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { } for partition, block := range blocks { pe.putInt32(partition) - err = block.encode(pe, r.Version) - if err != nil { - return err - } - } - } - if r.Version >= 7 { - err = pe.putArrayLength(len(r.forgotten)) - if err != nil { - return err - } - for topic, partitions := range r.forgotten { - err = pe.putString(topic) - if err != nil { - return err - } - err = pe.putArrayLength(len(partitions)) + err = block.encode(pe) if err != nil { return err } - for _, partition := range partitions { - pe.putInt32(partition) - } } } - return nil } func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { r.Version = version - if _, err = pd.getInt32(); err != nil { return err } @@ -146,16 +97,6 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { } r.Isolation = IsolationLevel(isolation) } - if r.Version >= 7 { - r.SessionID, err = pd.getInt32() - if err != nil { - return err - } - r.SessionEpoch, err = pd.getInt32() - if err != nil { - return err - } - } topicCount, err := pd.getArrayLength() if err != nil { return err @@ -180,43 +121,12 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { return err } fetchBlock := &fetchRequestBlock{} - if err = fetchBlock.decode(pd, r.Version); err != nil { + if err = fetchBlock.decode(pd); err != nil { return err } r.blocks[topic][partition] = fetchBlock } } - - if r.Version >= 7 { - forgottenCount, err := pd.getArrayLength() - if err != nil { - return err - } - if forgottenCount == 0 { - return nil - } - r.forgotten = make(map[string][]int32) - for i := 0; i < forgottenCount; i++ { - topic, err := pd.getString() - if err != nil { - return err - } - partitionCount, err := pd.getArrayLength() - if err != nil { - return err - } - r.forgotten[topic] = make([]int32, partitionCount) - - for j := 0; j < partitionCount; j++ { - partition, err := pd.getInt32() - if err != nil { - return err - } - r.forgotten[topic][j] = partition - } - } - } - return nil } @@ -230,28 +140,16 @@ func (r *FetchRequest) version() int16 { func (r *FetchRequest) requiredVersion() KafkaVersion { switch r.Version { - case 0: - return MinVersion case 1: return V0_9_0_0 case 2: return V0_10_0_0 case 3: return V0_10_1_0 - case 4, 5: + case 4: return V0_11_0_0 - case 6: - return V1_0_0_0 - case 7: - return V1_1_0_0 - case 8: - return V2_0_0_0 - case 9, 10: - return V2_1_0_0 - case 11: - return V2_3_0_0 default: - return MaxVersion + return MinVersion } } @@ -260,21 +158,13 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int r.blocks = make(map[string]map[int32]*fetchRequestBlock) } - if r.Version >= 7 && r.forgotten == nil { - r.forgotten = make(map[string][]int32) - } - if r.blocks[topic] == nil { r.blocks[topic] = make(map[int32]*fetchRequestBlock) } tmp := new(fetchRequestBlock) - tmp.Version = r.Version tmp.maxBytes = maxBytes tmp.fetchOffset = fetchOffset - if r.Version >= 9 { - tmp.currentLeaderEpoch = int32(-1) - } r.blocks[topic][partitionID] = tmp } diff --git a/fetch_request_test.go b/fetch_request_test.go index 2fdd90509..1a94c2d1f 100644 --- a/fetch_request_test.go +++ b/fetch_request_test.go @@ -29,32 +29,20 @@ var ( ) func TestFetchRequest(t *testing.T) { - t.Run("no blocks", func(t *testing.T) { - request := new(FetchRequest) - testRequest(t, "no blocks", request, fetchRequestNoBlocks) - }) - - t.Run("with properties", func(t *testing.T) { - request := new(FetchRequest) - request.MaxWaitTime = 0x20 - request.MinBytes = 0xEF - testRequest(t, "with properties", request, fetchRequestWithProperties) - }) - - t.Run("one block", func(t *testing.T) { - request := new(FetchRequest) - request.MaxWaitTime = 0 - request.MinBytes = 0 - request.AddBlock("topic", 0x12, 0x34, 0x56) - testRequest(t, "one block", request, fetchRequestOneBlock) - }) - - t.Run("one block v4", func(t *testing.T) { - request := new(FetchRequest) - request.Version = 4 - request.MaxBytes = 0xFF - request.Isolation = ReadCommitted - request.AddBlock("topic", 0x12, 0x34, 0x56) - testRequest(t, "one block v4", request, fetchRequestOneBlockV4) - }) + request := new(FetchRequest) + testRequest(t, "no blocks", request, fetchRequestNoBlocks) + + request.MaxWaitTime = 0x20 + request.MinBytes = 0xEF + testRequest(t, "with properties", request, fetchRequestWithProperties) + + request.MaxWaitTime = 0 + request.MinBytes = 0 + request.AddBlock("topic", 0x12, 0x34, 0x56) + testRequest(t, "one block", request, fetchRequestOneBlock) + + request.Version = 4 + request.MaxBytes = 0xFF + request.Isolation = ReadCommitted + testRequest(t, "one block v4", request, fetchRequestOneBlockV4) } diff --git a/fetch_response.go b/fetch_response.go index 26936d968..65841c528 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -33,7 +33,6 @@ type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 LastStableOffset int64 - LogStartOffset int64 AbortedTransactions []*AbortedTransaction Records *Records // deprecated: use FetchResponseBlock.RecordsSet RecordsSet []*Records @@ -58,13 +57,6 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) return err } - if version >= 5 { - b.LogStartOffset, err = pd.getInt64() - if err != nil { - return err - } - } - numTransact, err := pd.getArrayLength() if err != nil { return err @@ -174,10 +166,6 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) if version >= 4 { pe.putInt64(b.LastStableOffset) - if version >= 5 { - pe.putInt64(b.LogStartOffset) - } - if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil { return err } @@ -212,9 +200,7 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction { type FetchResponse struct { Blocks map[string]map[int32]*FetchResponseBlock ThrottleTime time.Duration - ErrorCode int16 - SessionID int32 - Version int16 + Version int16 // v1 requires 0.9+, v2 requires 0.10+ LogAppendTime bool Timestamp time.Time } @@ -230,17 +216,6 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) { r.ThrottleTime = time.Duration(throttle) * time.Millisecond } - if r.Version >= 7 { - r.ErrorCode, err = pd.getInt16() - if err != nil { - return err - } - r.SessionID, err = pd.getInt32() - if err != nil { - return err - } - } - numTopics, err := pd.getArrayLength() if err != nil { return err @@ -283,11 +258,6 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) { pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) } - if r.Version >= 7 { - pe.putInt16(r.ErrorCode) - pe.putInt32(r.SessionID) - } - err = pe.putArrayLength(len(r.Blocks)) if err != nil { return err @@ -325,28 +295,16 @@ func (r *FetchResponse) version() int16 { func (r *FetchResponse) requiredVersion() KafkaVersion { switch r.Version { - case 0: - return MinVersion case 1: return V0_9_0_0 case 2: return V0_10_0_0 case 3: return V0_10_1_0 - case 4, 5: + case 4: return V0_11_0_0 - case 6: - return V1_0_0_0 - case 7: - return V1_1_0_0 - case 8: - return V2_0_0_0 - case 9, 10: - return V2_1_0_0 - case 11: - return V2_3_0_0 default: - return MaxVersion + return MinVersion } } diff --git a/request.go b/request.go index 6e4ad8731..97437d67b 100644 --- a/request.go +++ b/request.go @@ -105,7 +105,7 @@ func allocateBody(key, version int16) protocolBody { case 0: return &ProduceRequest{} case 1: - return &FetchRequest{Version: version} + return &FetchRequest{} case 2: return &OffsetRequest{Version: version} case 3: