From 10d06c2fe7285fb809eea826ef8c9e2ad2fe23e5 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 16 Jun 2016 10:50:12 -0400 Subject: [PATCH] Plumb through v0.10 support for consumer/fetch --- consumer.go | 5 +++++ fetch_request.go | 13 +++++++++++-- fetch_response.go | 31 ++++++++++++++++++++++++++++--- message.go | 34 +++++++++++++++++++++------------- produce_response.go | 11 ++++++++--- 5 files changed, 73 insertions(+), 21 deletions(-) diff --git a/consumer.go b/consumer.go index 869452754..f6c979766 100644 --- a/consumer.go +++ b/consumer.go @@ -14,6 +14,7 @@ type ConsumerMessage struct { Topic string Partition int32 Offset int64 + Timestamp time.Time // only set if kafka is version 0.10+ } // ConsumerError is what is provided to the user when an error occurs. @@ -489,6 +490,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, + Timestamp: msg.Msg.Timestamp, }) child.offset = msg.Offset + 1 } else { @@ -682,6 +684,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { MinBytes: bc.consumer.conf.Consumer.Fetch.Min, MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), } + if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { + request.Version = 2 + } for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) diff --git a/fetch_request.go b/fetch_request.go index 01cbad1bf..193e45c94 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -24,6 +24,7 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) { type FetchRequest struct { MaxWaitTime int32 MinBytes int32 + Version int16 blocks map[string]map[int32]*fetchRequestBlock } @@ -56,6 +57,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) { } func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) { + f.Version = version if _, err = pd.getInt32(); err != nil { return err } @@ -103,11 +105,18 @@ func (f *FetchRequest) key() int16 { } func (f *FetchRequest) version() int16 { - return 0 + return f.Version } func (r *FetchRequest) requiredVersion() KafkaVersion { - return minVersion + switch r.Version { + case 1: + return V0_9_0_0 + case 2: + return V0_10_0_0 + default: + return minVersion + } } func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) { diff --git a/fetch_response.go b/fetch_response.go index 8d9942d1f..e5881cf9c 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 @@ -33,7 +35,9 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) { } type FetchResponse struct { - Blocks map[string]map[int32]*FetchResponseBlock + Blocks map[string]map[int32]*FetchResponseBlock + ThrottleTime time.Duration + Version int16 // v1 requires 0.9+, v2 requires 0.10+ } func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) { @@ -50,6 +54,16 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) { } func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) { + fr.Version = version + + if fr.Version >= 1 { + throttle, err := pd.getInt32() + if err != nil { + return err + } + fr.ThrottleTime = time.Duration(throttle) * time.Millisecond + } + numTopics, err := pd.getArrayLength() if err != nil { return err @@ -88,6 +102,10 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) { } func (fr *FetchResponse) encode(pe packetEncoder) (err error) { + if fr.Version >= 1 { + pe.putInt32(int32(fr.ThrottleTime / time.Millisecond)) + } + err = pe.putArrayLength(len(fr.Blocks)) if err != nil { return err @@ -121,11 +139,18 @@ func (r *FetchResponse) key() int16 { } func (r *FetchResponse) version() int16 { - return 0 + return r.Version } func (r *FetchResponse) requiredVersion() KafkaVersion { - return minVersion + switch r.Version { + case 1: + return V0_9_0_0 + case 2: + return V0_10_0_0 + default: + return minVersion + } } func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock { diff --git a/message.go b/message.go index 55edc9d52..0f0ca5b6d 100644 --- a/message.go +++ b/message.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "fmt" "io/ioutil" + "time" "github.com/eapache/go-xerial-snappy" ) @@ -21,15 +22,13 @@ const ( CompressionSnappy CompressionCodec = 2 ) -// The spec just says: "This is a version id used to allow backwards compatible evolution of the message -// binary format." but it doesn't say what the current value is, so presumably 0... -const messageFormat int8 = 0 - type Message struct { - Codec CompressionCodec // codec used to compress the message contents - Key []byte // the message key, may be nil - Value []byte // the message contents - Set *MessageSet // the message set a message might wrap + Codec CompressionCodec // codec used to compress the message contents + Key []byte // the message key, may be nil + Value []byte // the message contents + Set *MessageSet // the message set a message might wrap + Version int8 // v1 requires Kafka 0.10 + Timestamp time.Time // the timestamp of the message (version 1+ only) compressedCache []byte } @@ -37,11 +36,15 @@ type Message struct { func (m *Message) encode(pe packetEncoder) error { pe.push(&crc32Field{}) - pe.putInt8(messageFormat) + pe.putInt8(m.Version) attributes := int8(m.Codec) & compressionCodecMask pe.putInt8(attributes) + if m.Version >= 1 { + pe.putInt64(m.Timestamp.UnixNano() / int64(time.Millisecond)) + } + err := pe.putBytes(m.Key) if err != nil { return err @@ -89,13 +92,10 @@ func (m *Message) decode(pd packetDecoder) (err error) { return err } - format, err := pd.getInt8() + m.Version, err = pd.getInt8() if err != nil { return err } - if format != messageFormat { - return PacketDecodingError{"unexpected messageFormat"} - } attribute, err := pd.getInt8() if err != nil { @@ -103,6 +103,14 @@ func (m *Message) decode(pd packetDecoder) (err error) { } m.Codec = CompressionCodec(attribute & compressionCodecMask) + if m.Version >= 1 { + millis, err := pd.getInt64() + if err != nil { + return err + } + m.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond)) + } + m.Key, err = pd.getBytes() if err != nil { return err diff --git a/produce_response.go b/produce_response.go index 7a140eff0..62023f765 100644 --- a/produce_response.go +++ b/produce_response.go @@ -24,7 +24,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err if millis, err := pd.getInt64(); err != nil { return err } else { - pr.Timestamp = time.Unix(millis/1000, millis%1000) + pr.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond)) } } @@ -34,7 +34,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err type ProduceResponse struct { Blocks map[string]map[int32]*ProduceResponseBlock Version int16 - ThrottleTime int32 // only provided if Version >= 1 + ThrottleTime time.Duration // only provided if Version >= 1 } func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) { @@ -75,8 +75,10 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) { } if pr.Version >= 1 { - if pr.ThrottleTime, err = pd.getInt32(); err != nil { + if millis, err := pd.getInt32(); err != nil { return err + } else { + pr.ThrottleTime = time.Duration(millis) * time.Millisecond } } @@ -103,6 +105,9 @@ func (pr *ProduceResponse) encode(pe packetEncoder) error { pe.putInt64(prb.Offset) } } + if pr.Version >= 1 { + pe.putInt32(int32(pr.ThrottleTime / time.Millisecond)) + } return nil }