Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stuck on the batch with zero records length #2057

Merged
merged 1 commit into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
} else if block.LastRecordsBatchOffset != nil && *block.LastRecordsBatchOffset < block.HighWaterMarkOffset {
// check last record offset to avoid stuck if high watermark was not reached
Logger.Printf("consumer/broker/%d received batch with zero records but high watermark was not reached, topic %s, partition %d, offset %d\n", child.broker.broker.ID(), child.topic, child.partition, *block.LastRecordsBatchOffset)
child.offset = *block.LastRecordsBatchOffset + 1
}

return nil, nil
Expand Down
33 changes: 33 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,39 @@ func Test_partitionConsumer_parseResponse(t *testing.T) {
}
}

func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) {
lrbOffset := int64(5)
block := &FetchResponseBlock{
HighWaterMarkOffset: 10,
LastStableOffset: 10,
LastRecordsBatchOffset: &lrbOffset,
LogStartOffset: 0,
}
response := &FetchResponse{
Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: block}},
Version: 2,
}
child := &partitionConsumer{
broker: &brokerConsumer{
broker: &Broker{},
},
conf: NewConfig(),
topic: "my_topic",
partition: 0,
}
got, err := child.parseResponse(response)
if err != nil {
t.Errorf("partitionConsumer.parseResponse() error = %v", err)
return
}
if got != nil {
t.Errorf("partitionConsumer.parseResponse() should be nil, got %v", got)
}
if child.offset != 6 {
t.Errorf("child.offset should be LastRecordsBatchOffset + 1: %d, got %d", lrbOffset+1, child.offset)
}
}

func testConsumerInterceptor(
t *testing.T,
interceptors []ConsumerInterceptor,
Expand Down
24 changes: 15 additions & 9 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
}

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
PreferredReadReplica int32
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LastRecordsBatchOffset *int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
PreferredReadReplica int32
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
}

func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -118,6 +119,11 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}

b.LastRecordsBatchOffset, err = records.recordsOffset()
if err != nil {
return err
}

partial, err := records.isPartial()
if err != nil {
return err
Expand Down
76 changes: 76 additions & 0 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,37 @@ var (
0x00,
}

emptyRecordsFetchResponsev11 = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, // Error
0x00, 0x00, 0x00, 0x00, // Fetch session
0x00, 0x00, 0x00, 0x01, // Num topic
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Num partition
0x00, 0x00, 0x00, 0x05, // Partition
0x00, 0x00, // Error
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Log start offset
0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
0xff, 0xff, 0xff, 0xff, // Replica id
0x00, 0x00, 0x00, 0x3D, // Batch size
// recordBatch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Offset
0x00, 0x00, 0x00, 0x31, // Message size
0x00, 0x00, 0x00, 0x00, // Leader epoch
0x02, // Magic byte
0x14, 0xE0, 0x7A, 0x62, // CRC
0x00, 0x00, // Flags
0x00, 0x00, 0x00, 0x00, // Last offset delta
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A, // First timestamp
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0B, // Last timestamp
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // Producer id
0x00, 0x00, // Producer epoch
0x00, 0x00, 0x00, 0x3d, // Base sequence
0x00, 0x00, 0x00, 0x00, // Records size
}

oneMessageFetchResponseV4 = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
Expand Down Expand Up @@ -386,6 +417,51 @@ func TestPartailFetchResponse(t *testing.T) {
}
}

func TestEmptyRecordsFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "empty record", &response, emptyRecordsFetchResponsev11, 11)

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}

if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}

block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrNoError {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
if block.PreferredReadReplica != -1 {
t.Error("Decoding didn't produce correct preferred read replica.")
}
partial, err := block.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding a partial trailing record")
}

n, err := block.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 0 {
t.Fatal("Decoding produced incorrect number of records.")
}
if *block.LastRecordsBatchOffset != 0 {
t.Fatal("Last records batch offset is incorrect.")
}
}

func TestOneMessageFetchResponseV4(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)
Expand Down
15 changes: 15 additions & 0 deletions records.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,21 @@ func (r *Records) isOverflow() (bool, error) {
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}

func (r *Records) recordsOffset() (*int64, error) {
switch r.recordsType {
case unknownRecords:
return nil, nil
case legacyRecords:
return nil, nil
case defaultRecords:
if r.RecordBatch == nil {
return nil, nil
}
return &r.RecordBatch.FirstOffset, nil
}
return nil, fmt.Errorf("unknown records type: %v", r.recordsType)
}

func magicValue(pd packetDecoder) (int8, error) {
return pd.peekInt8(magicOffset)
}
Expand Down
15 changes: 15 additions & 0 deletions records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,20 @@ func TestLegacyRecords(t *testing.T) {
if c {
t.Errorf("MessageSet can't be a control batch")
}
f, err := r.recordsOffset()
if err != nil {
t.Fatal(err)
}
if f != nil {
t.Errorf("RecordBatch record offset is invalid")
}
}

func TestDefaultRecords(t *testing.T) {
batch := &RecordBatch{
IsTransactional: true,
Version: 2,
FirstOffset: 1,
Records: []*Record{
{
Value: []byte{1},
Expand Down Expand Up @@ -141,4 +149,11 @@ func TestDefaultRecords(t *testing.T) {
if c {
t.Errorf("RecordBatch shouldn't be a control batch")
}
f, err := r.recordsOffset()
if err != nil {
t.Fatal(err)
}
if f == nil || *f != 1 {
t.Errorf("RecordBatch record offset is invalid")
}
}