From 682468dcc75130e0813107ef154784a8b9e17c09 Mon Sep 17 00:00:00 2001 From: Vlad Hanciuta Date: Fri, 10 Aug 2018 17:21:03 +0100 Subject: [PATCH 1/4] Fix partial messages handling Kafka 2.0 introduced chunked message down conversions which might produce partial messages at the end of the set. These partial messages are not well formed beyond offset and length, so they might cause strange decoding errors down the line. This fix makes `lengthField` a `dynamicPushDecoder` so it checks that the `packetDecoder` has at least that many bytes. It fixes the problem above and I think it's a more robust check in general. --- fetch_response_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++ length_field.go | 15 ++++++++- 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/fetch_response_test.go b/fetch_response_test.go index c6b6b46e4..9f4113015 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -27,6 +27,29 @@ var ( 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + overflowMessageFetchResponse = []byte{ + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x05, + 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, + 0x00, 0x00, 0x00, 0x30, + // messageSet + 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x10, + // message + 0x23, 0x96, 0x4a, 0xf7, // CRC + 0x00, + 0x00, + 0xFF, 0xFF, 0xFF, 0xFF, + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + // overflow messageSet + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0x00, 0x00, 0x00, 0xFF, + // overflow bytes + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + oneRecordFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime 0x00, 0x00, 0x00, 0x01, // Number of Topics @@ -148,6 +171,59 @@ func TestOneMessageFetchResponse(t *testing.T) { } } +func TestOverflowMessageFetchResponse(t *testing.T) { + response := FetchResponse{} + testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0) + + if len(response.Blocks) != 1 { + t.Fatal("Decoding produced incorrect number of topic blocks.") + } + + if len(response.Blocks["topic"]) != 1 { + t.Fatal("Decoding produced incorrect number of partition blocks for topic.") + } + + block := response.GetBlock("topic", 5) + if block == nil { + t.Fatal("GetBlock didn't return block.") + } + if block.Err != ErrOffsetOutOfRange { + t.Error("Decoding didn't produce correct error code.") + } + if block.HighWaterMarkOffset != 0x10101010 { + t.Error("Decoding didn't produce correct high water mark offset.") + } + partial, err := block.Records.isPartial() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !partial { + t.Error("Overflow messages should be partial.") + } + + n, err := block.Records.numRecords() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != 1 { + t.Fatal("Decoding produced incorrect number of messages.") + } + msgBlock := block.Records.msgSet.Messages[0] + if msgBlock.Offset != 0x550000 { + t.Error("Decoding produced incorrect message offset.") + } + msg := msgBlock.Msg + if msg.Codec != CompressionNone { + t.Error("Decoding produced incorrect message compression.") + } + if msg.Key != nil { + t.Error("Decoding produced message key where there was none.") + } + if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) { + t.Error("Decoding produced incorrect message value.") + } +} + func TestOneRecordFetchResponse(t *testing.T) { response := FetchResponse{} testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4) diff --git a/length_field.go b/length_field.go index 576b1a6f6..da199a70a 100644 --- a/length_field.go +++ b/length_field.go @@ -5,6 +5,19 @@ import "encoding/binary" // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths. type lengthField struct { startOffset int + length int32 +} + +func (l *lengthField) decode(pd packetDecoder) error { + var err error + l.length, err = pd.getInt32() + if err != nil { + return err + } + if l.length > int32(pd.remaining()) { + return ErrInsufficientData + } + return nil } func (l *lengthField) saveOffset(in int) { @@ -21,7 +34,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error { } func (l *lengthField) check(curOffset int, buf []byte) error { - if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) { + if int32(curOffset-l.startOffset-4) != l.length { return PacketDecodingError{"length field invalid"} } From ed280996b51ddb92b2cf90947a4ba12d695a1f36 Mon Sep 17 00:00:00 2001 From: Vlad Hanciuta Date: Mon, 13 Aug 2018 15:31:11 +0100 Subject: [PATCH 2/4] Fix MsgSet reference in test --- fetch_response_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fetch_response_test.go b/fetch_response_test.go index 9f4113015..4637cc89e 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -208,7 +208,7 @@ func TestOverflowMessageFetchResponse(t *testing.T) { if n != 1 { t.Fatal("Decoding produced incorrect number of messages.") } - msgBlock := block.Records.msgSet.Messages[0] + msgBlock := block.Records.MsgSet.Messages[0] if msgBlock.Offset != 0x550000 { t.Error("Decoding produced incorrect message offset.") } From 8d1bd4c0d6e7249e04399ce9dbc7501ef39c26db Mon Sep 17 00:00:00 2001 From: Vlad Hanciuta Date: Thu, 16 Aug 2018 17:18:11 +0100 Subject: [PATCH 3/4] Handle overflow-only batch Also, this fixes the case when there's more than one message in a message set with a partial trailing message. --- fetch_response.go | 23 +++++++++++++++++------ fetch_response_test.go | 11 +++++++++-- message_set.go | 8 +++++++- records.go | 21 +++++++++++++++++++++ 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/fetch_response.go b/fetch_response.go index ae91bb9eb..dade1c47d 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -104,15 +104,26 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) return err } - // If we have at least one full records, we skip incomplete ones - if partial && len(b.RecordsSet) > 0 { - break + n, err := records.numRecords() + if err != nil { + return err } - b.RecordsSet = append(b.RecordsSet, records) + if n > 0 || (partial && len(b.RecordsSet) == 0) { + b.RecordsSet = append(b.RecordsSet, records) + + if b.Records == nil { + b.Records = records + } + } - if b.Records == nil { - b.Records = records + overflow, err := records.isOverflow() + if err != nil { + return err + } + + if partial || overflow { + break } } diff --git a/fetch_response_test.go b/fetch_response_test.go index 4637cc89e..917027644 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -197,8 +197,15 @@ func TestOverflowMessageFetchResponse(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !partial { - t.Error("Overflow messages should be partial.") + if partial { + t.Error("Decoding detected a partial trailing message where there wasn't one.") + } + overflow, err := block.Records.isOverflow() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !overflow { + t.Error("Decoding detected a partial trailing message where there wasn't one.") } n, err := block.Records.numRecords() diff --git a/message_set.go b/message_set.go index 27db52fdf..600c7c4df 100644 --- a/message_set.go +++ b/message_set.go @@ -47,6 +47,7 @@ func (msb *MessageBlock) decode(pd packetDecoder) (err error) { type MessageSet struct { PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock + OverflowMessage bool // whether the set on the wire contained an overflow message Messages []*MessageBlock } @@ -85,7 +86,12 @@ func (ms *MessageSet) decode(pd packetDecoder) (err error) { case ErrInsufficientData: // As an optimization the server is allowed to return a partial message at the // end of the message set. Clients should handle this case. So we just ignore such things. - ms.PartialTrailingMessage = true + if msb.Offset == -1 { + // This is an overflow message caused by chunked down conversion + ms.OverflowMessage = true + } else { + ms.PartialTrailingMessage = true + } return nil default: return err diff --git a/records.go b/records.go index 301055bb0..192f5927b 100644 --- a/records.go +++ b/records.go @@ -163,6 +163,27 @@ func (r *Records) isControl() (bool, error) { return false, fmt.Errorf("unknown records type: %v", r.recordsType) } +func (r *Records) isOverflow() (bool, error) { + if r.recordsType == unknownRecords { + if empty, err := r.setTypeFromFields(); err != nil || empty { + return false, err + } + } + + switch r.recordsType { + case unknownRecords: + return false, nil + case legacyRecords: + if r.MsgSet == nil { + return false, nil + } + return r.MsgSet.OverflowMessage, nil + case defaultRecords: + return false, nil + } + return false, fmt.Errorf("unknown records type: %v", r.recordsType) +} + func magicValue(pd packetDecoder) (int8, error) { dec, err := pd.peek(magicOffset, magicLength) if err != nil { From da6608c7d11a8272a6dd64ae44d38408251a82c7 Mon Sep 17 00:00:00 2001 From: Vlad Gorodetsky Date: Mon, 13 Aug 2018 22:40:43 +0300 Subject: [PATCH 4/4] Add Kafka 2.0 to CI --- .travis.yml | 2 +- dev.yml | 2 +- utils.go | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index ea295ec5f..5e8d4979d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,9 +12,9 @@ env: - KAFKA_HOSTNAME=localhost - DEBUG=true matrix: - - KAFKA_VERSION=0.11.0.2 - KAFKA_VERSION=1.0.0 - KAFKA_VERSION=1.1.0 + - KAFKA_VERSION=2.0.0 before_install: - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR} diff --git a/dev.yml b/dev.yml index 294fcdb41..bc8c4e452 100644 --- a/dev.yml +++ b/dev.yml @@ -2,7 +2,7 @@ name: sarama up: - go: - version: '1.9' + version: '1.10' commands: test: diff --git a/utils.go b/utils.go index 702e22627..1bb00d761 100644 --- a/utils.go +++ b/utils.go @@ -155,6 +155,7 @@ var ( V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) + V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -173,9 +174,10 @@ var ( V0_11_0_2, V1_0_0_0, V1_1_0_0, + V2_0_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V1_1_0_0 + MaxVersion = V2_0_0_0 ) func ParseKafkaVersion(s string) (KafkaVersion, error) {