diff --git a/consumer.go b/consumer.go index 2d76e8a1e8..f68eb1d2b9 100644 --- a/consumer.go +++ b/consumer.go @@ -629,6 +629,10 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu child.fetchSize = child.conf.Consumer.Fetch.Max } } + } else if block.LastRecordsBatchOffset != nil && *block.LastRecordsBatchOffset < block.HighWaterMarkOffset { + // check last record offset to avoid stuck if high watermark was not reached + Logger.Printf("consumer/broker/%d received batch with zero records but high watermark was not reached, topic %s, partition %d, offset %d\n", child.broker.broker.ID(), child.topic, child.partition, *block.LastRecordsBatchOffset) + child.offset = *block.LastRecordsBatchOffset + 1 } return nil, nil diff --git a/consumer_test.go b/consumer_test.go index 811fb9082e..7f9ff2c2e5 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1734,6 +1734,39 @@ func Test_partitionConsumer_parseResponse(t *testing.T) { } } +func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) { + lrbOffset := int64(5) + block := &FetchResponseBlock{ + HighWaterMarkOffset: 10, + LastStableOffset: 10, + LastRecordsBatchOffset: &lrbOffset, + LogStartOffset: 0, + } + response := &FetchResponse{ + Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: block}}, + Version: 2, + } + child := &partitionConsumer{ + broker: &brokerConsumer{ + broker: &Broker{}, + }, + conf: NewConfig(), + topic: "my_topic", + partition: 0, + } + got, err := child.parseResponse(response) + if err != nil { + t.Errorf("partitionConsumer.parseResponse() error = %v", err) + return + } + if got != nil { + t.Errorf("partitionConsumer.parseResponse() should be nil, got %v", got) + } + if child.offset != 6 { + t.Errorf("child.offset should be LastRecordsBatchOffset + 1: %d, got %d", lrbOffset+1, child.offset) + } +} + func testConsumerInterceptor( t *testing.T, interceptors []ConsumerInterceptor, diff --git a/fetch_response.go b/fetch_response.go index 54b88284ad..19040c8270 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -30,15 +30,16 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) { } type FetchResponseBlock struct { - Err KError - HighWaterMarkOffset int64 - LastStableOffset int64 - LogStartOffset int64 - AbortedTransactions []*AbortedTransaction - PreferredReadReplica int32 - Records *Records // deprecated: use FetchResponseBlock.RecordsSet - RecordsSet []*Records - Partial bool + Err KError + HighWaterMarkOffset int64 + LastStableOffset int64 + LastRecordsBatchOffset *int64 + LogStartOffset int64 + AbortedTransactions []*AbortedTransaction + PreferredReadReplica int32 + Records *Records // deprecated: use FetchResponseBlock.RecordsSet + RecordsSet []*Records + Partial bool } func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) { @@ -118,6 +119,11 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) return err } + b.LastRecordsBatchOffset, err = records.recordsOffset() + if err != nil { + return err + } + partial, err := records.isPartial() if err != nil { return err diff --git a/fetch_response_test.go b/fetch_response_test.go index 45ad8013b8..3ba3eb5d17 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -121,6 +121,37 @@ var ( 0x00, } + emptyRecordsFetchResponsev11 = []byte{ + 0x00, 0x00, 0x00, 0x00, // ThrottleTime + 0x00, 0x00, // Error + 0x00, 0x00, 0x00, 0x00, // Fetch session + 0x00, 0x00, 0x00, 0x01, // Num topic + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic + 0x00, 0x00, 0x00, 0x01, // Num partition + 0x00, 0x00, 0x00, 0x05, // Partition + 0x00, 0x00, // Error + 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset + 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Log start offset + 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions + 0xff, 0xff, 0xff, 0xff, // Replica id + 0x00, 0x00, 0x00, 0x3D, // Batch size + // recordBatch + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Offset + 0x00, 0x00, 0x00, 0x31, // Message size + 0x00, 0x00, 0x00, 0x00, // Leader epoch + 0x02, // Magic byte + 0x14, 0xE0, 0x7A, 0x62, // CRC + 0x00, 0x00, // Flags + 0x00, 0x00, 0x00, 0x00, // Last offset delta + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A, // First timestamp + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0B, // Last timestamp + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // Producer id + 0x00, 0x00, // Producer epoch + 0x00, 0x00, 0x00, 0x3d, // Base sequence + 0x00, 0x00, 0x00, 0x00, // Records size + } + oneMessageFetchResponseV4 = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime 0x00, 0x00, 0x00, 0x01, // Number of Topics @@ -386,6 +417,51 @@ func TestPartailFetchResponse(t *testing.T) { } } +func TestEmptyRecordsFetchResponse(t *testing.T) { + response := FetchResponse{} + testVersionDecodable(t, "empty record", &response, emptyRecordsFetchResponsev11, 11) + + if len(response.Blocks) != 1 { + t.Fatal("Decoding produced incorrect number of topic blocks.") + } + + if len(response.Blocks["topic"]) != 1 { + t.Fatal("Decoding produced incorrect number of partition blocks for topic.") + } + + block := response.GetBlock("topic", 5) + if block == nil { + t.Fatal("GetBlock didn't return block.") + } + if block.Err != ErrNoError { + t.Error("Decoding didn't produce correct error code.") + } + if block.HighWaterMarkOffset != 0x10101010 { + t.Error("Decoding didn't produce correct high water mark offset.") + } + if block.PreferredReadReplica != -1 { + t.Error("Decoding didn't produce correct preferred read replica.") + } + partial, err := block.isPartial() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if partial { + t.Error("Decoding a partial trailing record") + } + + n, err := block.numRecords() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != 0 { + t.Fatal("Decoding produced incorrect number of records.") + } + if *block.LastRecordsBatchOffset != 0 { + t.Fatal("Last records batch offset is incorrect.") + } +} + func TestOneMessageFetchResponseV4(t *testing.T) { response := FetchResponse{} testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4) diff --git a/records.go b/records.go index f4c5e95f1d..5685fb8d33 100644 --- a/records.go +++ b/records.go @@ -183,6 +183,21 @@ func (r *Records) isOverflow() (bool, error) { return false, fmt.Errorf("unknown records type: %v", r.recordsType) } +func (r *Records) recordsOffset() (*int64, error) { + switch r.recordsType { + case unknownRecords: + return nil, nil + case legacyRecords: + return nil, nil + case defaultRecords: + if r.RecordBatch == nil { + return nil, nil + } + return &r.RecordBatch.FirstOffset, nil + } + return nil, fmt.Errorf("unknown records type: %v", r.recordsType) +} + func magicValue(pd packetDecoder) (int8, error) { return pd.peekInt8(magicOffset) } diff --git a/records_test.go b/records_test.go index 7a7214f739..34f1b4a6ee 100644 --- a/records_test.go +++ b/records_test.go @@ -72,12 +72,20 @@ func TestLegacyRecords(t *testing.T) { if c { t.Errorf("MessageSet can't be a control batch") } + f, err := r.recordsOffset() + if err != nil { + t.Fatal(err) + } + if f != nil { + t.Errorf("RecordBatch record offset is invalid") + } } func TestDefaultRecords(t *testing.T) { batch := &RecordBatch{ IsTransactional: true, Version: 2, + FirstOffset: 1, Records: []*Record{ { Value: []byte{1}, @@ -141,4 +149,11 @@ func TestDefaultRecords(t *testing.T) { if c { t.Errorf("RecordBatch shouldn't be a control batch") } + f, err := r.recordsOffset() + if err != nil { + t.Fatal(err) + } + if f == nil || *f != 1 { + t.Errorf("RecordBatch record offset is invalid") + } }