Skip to content

Commit

Permalink
Revert "Merge pull request IBM#1582 from dnwe/fetch-request-response-…
Browse files Browse the repository at this point in the history
…protocol"

This reverts commit ab4036c, reversing
changes made to ae8f056.
  • Loading branch information
KJ Tsanaktsidis committed Feb 13, 2020
1 parent b3a8121 commit 551b28c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 198 deletions.
138 changes: 14 additions & 124 deletions fetch_request.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,20 @@
package sarama

type fetchRequestBlock struct {
Version int16
currentLeaderEpoch int32
fetchOffset int64
logStartOffset int64
maxBytes int32
fetchOffset int64
maxBytes int32
}

func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
b.Version = version
if b.Version >= 9 {
pe.putInt32(b.currentLeaderEpoch)
}
func (b *fetchRequestBlock) encode(pe packetEncoder) error {
pe.putInt64(b.fetchOffset)
if b.Version >= 5 {
pe.putInt64(b.logStartOffset)
}
pe.putInt32(b.maxBytes)
return nil
}

func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
b.Version = version
if b.Version >= 9 {
if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
}
func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
if b.fetchOffset, err = pd.getInt64(); err != nil {
return err
}
if b.Version >= 5 {
if b.logStartOffset, err = pd.getInt64(); err != nil {
return err
}
}
if b.maxBytes, err = pd.getInt32(); err != nil {
return err
}
Expand All @@ -46,15 +25,12 @@ func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error)
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
SessionID int32
SessionEpoch int32
blocks map[string]map[int32]*fetchRequestBlock
forgotten map[string][]int32
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
blocks map[string]map[int32]*fetchRequestBlock
}

type IsolationLevel int8
Expand All @@ -74,10 +50,6 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
if r.Version >= 4 {
pe.putInt8(int8(r.Isolation))
}
if r.Version >= 7 {
pe.putInt32(r.SessionID)
pe.putInt32(r.SessionEpoch)
}
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
Expand All @@ -93,38 +65,17 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
}
for partition, block := range blocks {
pe.putInt32(partition)
err = block.encode(pe, r.Version)
if err != nil {
return err
}
}
}
if r.Version >= 7 {
err = pe.putArrayLength(len(r.forgotten))
if err != nil {
return err
}
for topic, partitions := range r.forgotten {
err = pe.putString(topic)
if err != nil {
return err
}
err = pe.putArrayLength(len(partitions))
err = block.encode(pe)
if err != nil {
return err
}
for _, partition := range partitions {
pe.putInt32(partition)
}
}
}

return nil
}

func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version

if _, err = pd.getInt32(); err != nil {
return err
}
Expand All @@ -146,16 +97,6 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
}
r.Isolation = IsolationLevel(isolation)
}
if r.Version >= 7 {
r.SessionID, err = pd.getInt32()
if err != nil {
return err
}
r.SessionEpoch, err = pd.getInt32()
if err != nil {
return err
}
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -180,43 +121,12 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
return err
}
fetchBlock := &fetchRequestBlock{}
if err = fetchBlock.decode(pd, r.Version); err != nil {
if err = fetchBlock.decode(pd); err != nil {
return err
}
r.blocks[topic][partition] = fetchBlock
}
}

if r.Version >= 7 {
forgottenCount, err := pd.getArrayLength()
if err != nil {
return err
}
if forgottenCount == 0 {
return nil
}
r.forgotten = make(map[string][]int32)
for i := 0; i < forgottenCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
r.forgotten[topic] = make([]int32, partitionCount)

for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
r.forgotten[topic][j] = partition
}
}
}

return nil
}

Expand All @@ -230,28 +140,16 @@ func (r *FetchRequest) version() int16 {

func (r *FetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4, 5:
case 4:
return V0_11_0_0
case 6:
return V1_0_0_0
case 7:
return V1_1_0_0
case 8:
return V2_0_0_0
case 9, 10:
return V2_1_0_0
case 11:
return V2_3_0_0
default:
return MaxVersion
return MinVersion
}
}

Expand All @@ -260,21 +158,13 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
}

if r.Version >= 7 && r.forgotten == nil {
r.forgotten = make(map[string][]int32)
}

if r.blocks[topic] == nil {
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
}

tmp := new(fetchRequestBlock)
tmp.Version = r.Version
tmp.maxBytes = maxBytes
tmp.fetchOffset = fetchOffset
if r.Version >= 9 {
tmp.currentLeaderEpoch = int32(-1)
}

r.blocks[topic][partitionID] = tmp
}
44 changes: 16 additions & 28 deletions fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,20 @@ var (
)

func TestFetchRequest(t *testing.T) {
t.Run("no blocks", func(t *testing.T) {
request := new(FetchRequest)
testRequest(t, "no blocks", request, fetchRequestNoBlocks)
})

t.Run("with properties", func(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0x20
request.MinBytes = 0xEF
testRequest(t, "with properties", request, fetchRequestWithProperties)
})

t.Run("one block", func(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block", request, fetchRequestOneBlock)
})

t.Run("one block v4", func(t *testing.T) {
request := new(FetchRequest)
request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
})
request := new(FetchRequest)
testRequest(t, "no blocks", request, fetchRequestNoBlocks)

request.MaxWaitTime = 0x20
request.MinBytes = 0xEF
testRequest(t, "with properties", request, fetchRequestWithProperties)

request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block", request, fetchRequestOneBlock)

request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
}
48 changes: 3 additions & 45 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Expand All @@ -58,13 +57,6 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}

if version >= 5 {
b.LogStartOffset, err = pd.getInt64()
if err != nil {
return err
}
}

numTransact, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -174,10 +166,6 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
if version >= 4 {
pe.putInt64(b.LastStableOffset)

if version >= 5 {
pe.putInt64(b.LogStartOffset)
}

if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
return err
}
Expand Down Expand Up @@ -212,9 +200,7 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
ErrorCode int16
SessionID int32
Version int16
Version int16 // v1 requires 0.9+, v2 requires 0.10+
LogAppendTime bool
Timestamp time.Time
}
Expand All @@ -230,17 +216,6 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
r.ThrottleTime = time.Duration(throttle) * time.Millisecond
}

if r.Version >= 7 {
r.ErrorCode, err = pd.getInt16()
if err != nil {
return err
}
r.SessionID, err = pd.getInt32()
if err != nil {
return err
}
}

numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -283,11 +258,6 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}

if r.Version >= 7 {
pe.putInt16(r.ErrorCode)
pe.putInt32(r.SessionID)
}

err = pe.putArrayLength(len(r.Blocks))
if err != nil {
return err
Expand Down Expand Up @@ -325,28 +295,16 @@ func (r *FetchResponse) version() int16 {

func (r *FetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4, 5:
case 4:
return V0_11_0_0
case 6:
return V1_0_0_0
case 7:
return V1_1_0_0
case 8:
return V2_0_0_0
case 9, 10:
return V2_1_0_0
case 11:
return V2_3_0_0
default:
return MaxVersion
return MinVersion
}
}

Expand Down
2 changes: 1 addition & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func allocateBody(key, version int16) protocolBody {
case 0:
return &ProduceRequest{}
case 1:
return &FetchRequest{Version: version}
return &FetchRequest{}
case 2:
return &OffsetRequest{Version: version}
case 3:
Expand Down

0 comments on commit 551b28c

Please sign in to comment.