From d9ef2bef2e2d07ba76b7719f0a38e1b0eefb000e Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Sat, 23 Dec 2017 16:44:58 +0100 Subject: [PATCH] advance partition consumer offset after compacted messages * Increases a partition consumer's offset to match a RecordBatch' LastOffsetDelta. Fixes an issue where consuming a log compacted topic would get stuck on a compacted offset. (closes #1005) --- consumer.go | 9 ++++++++- consumer_test.go | 18 ++++++++++++++++-- errors.go | 4 ++++ fetch_response.go | 10 ++++++++++ 4 files changed, 38 insertions(+), 3 deletions(-) diff --git a/consumer.go b/consumer.go index 1a0728945..2d29e4bb4 100644 --- a/consumer.go +++ b/consumer.go @@ -523,6 +523,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes var messages []*ConsumerMessage var incomplete bool prelude := true + originalOffset := child.offset for _, rec := range batch.Records { offset := batch.FirstOffset + rec.OffsetDelta @@ -547,9 +548,15 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes } } - if incomplete || len(messages) == 0 { + if incomplete { return nil, ErrIncompleteResponse } + + child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1 + if child.offset <= originalOffset { + return nil, ErrConsumerOffsetNotAdvanced + } + return messages, nil } diff --git a/consumer_test.go b/consumer_test.go index babd03913..881e0fe82 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -389,10 +389,12 @@ func TestConsumerExtraOffsets(t *testing.T) { newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2) newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3) newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4) + newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4) newFetchResponse.SetLastStableOffset("my_topic", 0, 4) for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { var offsetResponseVersion int16 cfg := NewConfig() + cfg.Consumer.Return.Errors = true if fetchResponse1.Version >= 4 { cfg.Version = V0_11_0_0 offsetResponseVersion = 1 @@ -426,8 +428,19 @@ func TestConsumerExtraOffsets(t *testing.T) { // Then: messages with offsets 1 and 2 are not returned even though they // are present in the response. - assertMessageOffset(t, <-consumer.Messages(), 3) - assertMessageOffset(t, <-consumer.Messages(), 4) + select { + case msg := <-consumer.Messages(): + assertMessageOffset(t, msg, 3) + case err := <-consumer.Errors(): + t.Fatal(err) + } + + select { + case msg := <-consumer.Messages(): + assertMessageOffset(t, msg, 4) + case err := <-consumer.Errors(): + t.Fatal(err) + } safeClose(t, consumer) safeClose(t, master) @@ -490,6 +503,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5) newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7) newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11) + newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11) newFetchResponse.SetLastStableOffset("my_topic", 0, 11) for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { var offsetResponseVersion int16 diff --git a/errors.go b/errors.go index b6242cd8e..54f431a4a 100644 --- a/errors.go +++ b/errors.go @@ -37,6 +37,10 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process // ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") +// ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing +// a RecordBatch. +var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch") + // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that. type PacketEncodingError struct { diff --git a/fetch_response.go b/fetch_response.go index 3433bcfdb..86fa549f6 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -309,6 +309,16 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco batch.addRecord(rec) } +func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) { + frb := r.getOrCreateBlock(topic, partition) + batch := frb.Records.recordBatch + if batch == nil { + batch = &RecordBatch{Version: 2} + frb.Records = newDefaultRecords(batch) + } + batch.LastOffsetDelta = offset +} + func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) { frb := r.getOrCreateBlock(topic, partition) frb.LastStableOffset = offset