Skip to content

Commit

Permalink
Merge pull request #1149 from wladh/master
Browse files Browse the repository at this point in the history
Fix partial messages handling
  • Loading branch information
bai authored Aug 20, 2018
2 parents e7238b1 + da6608c commit 647feef
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ env:
- KAFKA_HOSTNAME=localhost
- DEBUG=true
matrix:
- KAFKA_VERSION=0.11.0.2
- KAFKA_VERSION=1.0.0
- KAFKA_VERSION=1.1.0
- KAFKA_VERSION=2.0.0

before_install:
- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
Expand Down
2 changes: 1 addition & 1 deletion dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: sarama

up:
- go:
version: '1.9'
version: '1.10'

commands:
test:
Expand Down
23 changes: 17 additions & 6 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,26 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}

// If we have at least one full records, we skip incomplete ones
if partial && len(b.RecordsSet) > 0 {
break
n, err := records.numRecords()
if err != nil {
return err
}

b.RecordsSet = append(b.RecordsSet, records)
if n > 0 || (partial && len(b.RecordsSet) == 0) {
b.RecordsSet = append(b.RecordsSet, records)

if b.Records == nil {
b.Records = records
}
}

if b.Records == nil {
b.Records = records
overflow, err := records.isOverflow()
if err != nil {
return err
}

if partial || overflow {
break
}
}

Expand Down
83 changes: 83 additions & 0 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ var (
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}

overflowMessageFetchResponse = []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x05,
0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
0x00, 0x00, 0x00, 0x30,
// messageSet
0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
0x00, 0x00, 0x00, 0x10,
// message
0x23, 0x96, 0x4a, 0xf7, // CRC
0x00,
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
// overflow messageSet
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0xFF,
// overflow bytes
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}

oneRecordFetchResponse = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
Expand Down Expand Up @@ -148,6 +171,66 @@ func TestOneMessageFetchResponse(t *testing.T) {
}
}

func TestOverflowMessageFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}

if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}

block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrOffsetOutOfRange {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial, err := block.Records.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing message where there wasn't one.")
}
overflow, err := block.Records.isOverflow()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !overflow {
t.Error("Decoding detected a partial trailing message where there wasn't one.")
}

n, err := block.Records.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of messages.")
}
msgBlock := block.Records.MsgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
msg := msgBlock.Msg
if msg.Codec != CompressionNone {
t.Error("Decoding produced incorrect message compression.")
}
if msg.Key != nil {
t.Error("Decoding produced message key where there was none.")
}
if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
t.Error("Decoding produced incorrect message value.")
}
}

func TestOneRecordFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
Expand Down
15 changes: 14 additions & 1 deletion length_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ import "encoding/binary"
// LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
type lengthField struct {
startOffset int
length int32
}

func (l *lengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getInt32()
if err != nil {
return err
}
if l.length > int32(pd.remaining()) {
return ErrInsufficientData
}
return nil
}

func (l *lengthField) saveOffset(in int) {
Expand All @@ -21,7 +34,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {
}

func (l *lengthField) check(curOffset int, buf []byte) error {
if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
if int32(curOffset-l.startOffset-4) != l.length {
return PacketDecodingError{"length field invalid"}
}

Expand Down
8 changes: 7 additions & 1 deletion message_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (msb *MessageBlock) decode(pd packetDecoder) (err error) {

type MessageSet struct {
PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
OverflowMessage bool // whether the set on the wire contained an overflow message
Messages []*MessageBlock
}

Expand Down Expand Up @@ -85,7 +86,12 @@ func (ms *MessageSet) decode(pd packetDecoder) (err error) {
case ErrInsufficientData:
// As an optimization the server is allowed to return a partial message at the
// end of the message set. Clients should handle this case. So we just ignore such things.
ms.PartialTrailingMessage = true
if msb.Offset == -1 {
// This is an overflow message caused by chunked down conversion
ms.OverflowMessage = true
} else {
ms.PartialTrailingMessage = true
}
return nil
default:
return err
Expand Down
21 changes: 21 additions & 0 deletions records.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,27 @@ func (r *Records) isControl() (bool, error) {
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}

func (r *Records) isOverflow() (bool, error) {
if r.recordsType == unknownRecords {
if empty, err := r.setTypeFromFields(); err != nil || empty {
return false, err
}
}

switch r.recordsType {
case unknownRecords:
return false, nil
case legacyRecords:
if r.MsgSet == nil {
return false, nil
}
return r.MsgSet.OverflowMessage, nil
case defaultRecords:
return false, nil
}
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}

func magicValue(pd packetDecoder) (int8, error) {
dec, err := pd.peek(magicOffset, magicLength)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ var (
V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)
V1_1_0_0 = newKafkaVersion(1, 1, 0, 0)
V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)

SupportedVersions = []KafkaVersion{
V0_8_2_0,
Expand All @@ -173,9 +174,10 @@ var (
V0_11_0_2,
V1_0_0_0,
V1_1_0_0,
V2_0_0_0,
}
MinVersion = V0_8_2_0
MaxVersion = V1_1_0_0
MaxVersion = V2_0_0_0
)

func ParseKafkaVersion(s string) (KafkaVersion, error) {
Expand Down

0 comments on commit 647feef

Please sign in to comment.