From d65b3fd46b9f35c68df31b9884aa72dfefe31e04 Mon Sep 17 00:00:00 2001 From: Jurriaan Pruis Date: Mon, 31 Oct 2016 14:37:38 +0100 Subject: [PATCH] Support ListOffsetRequest v1 [KIP-79] Makes it possible to lookup offsets based on a Timestamp. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090 --- .travis.yml | 1 + client.go | 3 +++ errors.go | 3 +++ offset_request.go | 39 ++++++++++++++++++--------- offset_request_test.go | 17 ++++++++++++ offset_response.go | 58 ++++++++++++++++++++++++++++++++--------- offset_response_test.go | 49 ++++++++++++++++++++++++++++++++++ request.go | 2 +- utils.go | 1 + 9 files changed, 147 insertions(+), 26 deletions(-) diff --git a/.travis.yml b/.travis.yml index 29b0c6d27..59503b72d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,6 +15,7 @@ env: - KAFKA_VERSION=0.8.2.2 - KAFKA_VERSION=0.9.0.1 - KAFKA_VERSION=0.10.0.1 + - KAFKA_VERSION=0.10.1.0 before_install: - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR} diff --git a/client.go b/client.go index e9a9ea77e..f869a1434 100644 --- a/client.go +++ b/client.go @@ -521,6 +521,9 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in } request := &OffsetRequest{} + if client.conf.Version.IsAtLeast(V0_10_1_0) { + request.Version = 1 + } request.AddBlock(topic, partitionID, time, 1) response, err := broker.GetAvailableOffsets(request) diff --git a/errors.go b/errors.go index aa8f213d3..cc3f623d0 100644 --- a/errors.go +++ b/errors.go @@ -108,6 +108,7 @@ const ( ErrUnsupportedSASLMechanism KError = 33 ErrIllegalSASLState KError = 34 ErrUnsupportedVersion KError = 35 + ErrUnsupportedForMessageFormat KError = 43 ) func (err KError) Error() string { @@ -188,6 +189,8 @@ func (err KError) Error() string { return "kafka server: Request is not valid given the current SASL state." case ErrUnsupportedVersion: return "kafka server: The version of API is not supported." + case ErrUnsupportedForMessageFormat: + return "kafka server: The requested operation is not supported by the message format version." } return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err) diff --git a/offset_request.go b/offset_request.go index c66d8f709..6c2696016 100644 --- a/offset_request.go +++ b/offset_request.go @@ -2,27 +2,33 @@ package sarama type offsetRequestBlock struct { time int64 - maxOffsets int32 + maxOffsets int32 // Only used in version 0 } -func (b *offsetRequestBlock) encode(pe packetEncoder) error { +func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error { pe.putInt64(int64(b.time)) - pe.putInt32(b.maxOffsets) + if version == 0 { + pe.putInt32(b.maxOffsets) + } + return nil } -func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) { +func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) { if b.time, err = pd.getInt64(); err != nil { return err } - if b.maxOffsets, err = pd.getInt32(); err != nil { - return err + if version == 0 { + if b.maxOffsets, err = pd.getInt32(); err != nil { + return err + } } return nil } type OffsetRequest struct { - blocks map[string]map[int32]*offsetRequestBlock + Version int16 + blocks map[string]map[int32]*offsetRequestBlock } func (r *OffsetRequest) encode(pe packetEncoder) error { @@ -42,7 +48,7 @@ func (r *OffsetRequest) encode(pe packetEncoder) error { } for partition, block := range partitions { pe.putInt32(partition) - if err = block.encode(pe); err != nil { + if err = block.encode(pe, r.Version); err != nil { return err } } @@ -51,6 +57,8 @@ func (r *OffsetRequest) encode(pe packetEncoder) error { } func (r *OffsetRequest) decode(pd packetDecoder, version int16) error { + r.Version = version + // Ignore replica ID if _, err := pd.getInt32(); err != nil { return err @@ -79,7 +87,7 @@ func (r *OffsetRequest) decode(pd packetDecoder, version int16) error { return err } block := &offsetRequestBlock{} - if err := block.decode(pd); err != nil { + if err := block.decode(pd, version); err != nil { return err } r.blocks[topic][partition] = block @@ -93,11 +101,16 @@ func (r *OffsetRequest) key() int16 { } func (r *OffsetRequest) version() int16 { - return 0 + return r.Version } func (r *OffsetRequest) requiredVersion() KafkaVersion { - return minVersion + switch r.Version { + case 1: + return V0_10_1_0 + default: + return minVersion + } } func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) { @@ -111,7 +124,9 @@ func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, ma tmp := new(offsetRequestBlock) tmp.time = time - tmp.maxOffsets = maxOffsets + if r.Version == 0 { + tmp.maxOffsets = maxOffsets + } r.blocks[topic][partitionID] = tmp } diff --git a/offset_request_test.go b/offset_request_test.go index f3b3046bb..9ce562c99 100644 --- a/offset_request_test.go +++ b/offset_request_test.go @@ -15,6 +15,14 @@ var ( 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02} + + offsetRequestOneBlockV1 = []byte{ + 0xFF, 0xFF, 0xFF, 0xFF, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, 'b', 'a', 'r', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} ) func TestOffsetRequest(t *testing.T) { @@ -24,3 +32,12 @@ func TestOffsetRequest(t *testing.T) { request.AddBlock("foo", 4, 1, 2) testRequest(t, "one block", request, offsetRequestOneBlock) } + +func TestOffsetRequestV1(t *testing.T) { + request := new(OffsetRequest) + request.Version = 1 + testRequest(t, "no blocks", request, offsetRequestNoBlocks) + + request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1 + testRequest(t, "one block", request, offsetRequestOneBlockV1) +} diff --git a/offset_response.go b/offset_response.go index ad1a66974..9a9cfe96f 100644 --- a/offset_response.go +++ b/offset_response.go @@ -1,30 +1,57 @@ package sarama type OffsetResponseBlock struct { - Err KError - Offsets []int64 + Err KError + Offsets []int64 // Version 0 + Offset int64 // Version 1 + Timestamp int64 // Version 1 } -func (b *OffsetResponseBlock) decode(pd packetDecoder) (err error) { +func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err } b.Err = KError(tmp) - b.Offsets, err = pd.getInt64Array() + if version == 0 { + b.Offsets, err = pd.getInt64Array() - return err + return err + } + + b.Timestamp, err = pd.getInt64() + if err != nil { + return err + } + + b.Offset, err = pd.getInt64() + if err != nil { + return err + } + + // For backwards compatibility put the offset in the offsets array too + b.Offsets = []int64{b.Offset} + + return nil } -func (b *OffsetResponseBlock) encode(pe packetEncoder) (err error) { +func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(int16(b.Err)) - return pe.putInt64Array(b.Offsets) + if version == 0 { + return pe.putInt64Array(b.Offsets) + } + + pe.putInt64(b.Timestamp) + pe.putInt64(b.Offset) + + return nil } type OffsetResponse struct { - Blocks map[string]map[int32]*OffsetResponseBlock + Version int16 + Blocks map[string]map[int32]*OffsetResponseBlock } func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) { @@ -54,7 +81,7 @@ func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) { } block := new(OffsetResponseBlock) - err = block.decode(pd) + err = block.decode(pd, version) if err != nil { return err } @@ -106,7 +133,7 @@ func (r *OffsetResponse) encode(pe packetEncoder) (err error) { } for partition, block := range partitions { pe.putInt32(partition) - if err = block.encode(pe); err != nil { + if err = block.encode(pe, r.version()); err != nil { return err } } @@ -120,11 +147,16 @@ func (r *OffsetResponse) key() int16 { } func (r *OffsetResponse) version() int16 { - return 0 + return r.Version } func (r *OffsetResponse) requiredVersion() KafkaVersion { - return minVersion + switch r.Version { + case 1: + return V0_10_1_0 + default: + return minVersion + } } // testing API @@ -138,5 +170,5 @@ func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset byTopic = make(map[int32]*OffsetResponseBlock) r.Blocks[topic] = byTopic } - byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}} + byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset} } diff --git a/offset_response_test.go b/offset_response_test.go index fc00f4b60..0df6c9f3e 100644 --- a/offset_response_test.go +++ b/offset_response_test.go @@ -19,6 +19,19 @@ var ( 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} + + normalOffsetResponseV1 = []byte{ + 0x00, 0x00, 0x00, 0x02, + + 0x00, 0x01, 'a', + 0x00, 0x00, 0x00, 0x00, + + 0x00, 0x01, 'z', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x02, + 0x00, 0x00, + 0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06} ) func TestEmptyOffsetResponse(t *testing.T) { @@ -28,6 +41,13 @@ func TestEmptyOffsetResponse(t *testing.T) { if len(response.Blocks) != 0 { t.Error("Decoding produced", len(response.Blocks), "topics where there were none.") } + + response = OffsetResponse{} + + testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 1) + if len(response.Blocks) != 0 { + t.Error("Decoding produced", len(response.Blocks), "topics where there were none.") + } } func TestNormalOffsetResponse(t *testing.T) { @@ -58,5 +78,34 @@ func TestNormalOffsetResponse(t *testing.T) { if response.Blocks["z"][2].Offsets[0] != 5 || response.Blocks["z"][2].Offsets[1] != 6 { t.Fatal("Decoding produced invalid offsets for topic z partition 2.") } +} + +func TestNormalOffsetResponseV1(t *testing.T) { + response := OffsetResponse{} + + testVersionDecodable(t, "normal", &response, normalOffsetResponseV1, 1) + + if len(response.Blocks) != 2 { + t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.") + } + + if len(response.Blocks["a"]) != 0 { + t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.") + } + + if len(response.Blocks["z"]) != 1 { + t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.") + } + + if response.Blocks["z"][2].Err != ErrNoError { + t.Fatal("Decoding produced invalid error for topic z partition 2.") + } + + if response.Blocks["z"][2].Timestamp != 1477920049286 { + t.Fatal("Decoding produced invalid timestamp for topic z partition 2.", response.Blocks["z"][2].Timestamp) + } + if response.Blocks["z"][2].Offset != 6 { + t.Fatal("Decoding produced invalid offsets for topic z partition 2.") + } } diff --git a/request.go b/request.go index 3cca8bd20..73310ca87 100644 --- a/request.go +++ b/request.go @@ -89,7 +89,7 @@ func allocateBody(key, version int16) protocolBody { case 1: return &FetchRequest{} case 2: - return &OffsetRequest{} + return &OffsetRequest{Version: version} case 3: return &MetadataRequest{} case 8: diff --git a/utils.go b/utils.go index c2da38666..3cbab2d92 100644 --- a/utils.go +++ b/utils.go @@ -147,5 +147,6 @@ var ( V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) + V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) minVersion = V0_8_2_0 )