Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb through v0.10 support for consumer/fetch #681

Merged
merged 1 commit into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type ConsumerMessage struct {
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+
}

// ConsumerError is what is provided to the user when an error occurs.
Expand Down Expand Up @@ -489,6 +490,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
Timestamp: msg.Msg.Timestamp,
})
child.offset = msg.Offset + 1
} else {
Expand Down Expand Up @@ -682,6 +684,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
Expand Down
13 changes: 11 additions & 2 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) {
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
Version int16
blocks map[string]map[int32]*fetchRequestBlock
}

Expand Down Expand Up @@ -56,6 +57,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
}

func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
f.Version = version
if _, err = pd.getInt32(); err != nil {
return err
}
Expand Down Expand Up @@ -103,11 +105,18 @@ func (f *FetchRequest) key() int16 {
}

func (f *FetchRequest) version() int16 {
return 0
return f.Version
}

func (r *FetchRequest) requiredVersion() KafkaVersion {
return minVersion
switch r.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
default:
return minVersion
}
}

func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
Expand Down
31 changes: 28 additions & 3 deletions fetch_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
Expand Down Expand Up @@ -33,7 +35,9 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
}

func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
Expand All @@ -50,6 +54,16 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
}

func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
fr.Version = version

if fr.Version >= 1 {
throttle, err := pd.getInt32()
if err != nil {
return err
}
fr.ThrottleTime = time.Duration(throttle) * time.Millisecond
}

numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -88,6 +102,10 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
}

func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
if fr.Version >= 1 {
pe.putInt32(int32(fr.ThrottleTime / time.Millisecond))
}

err = pe.putArrayLength(len(fr.Blocks))
if err != nil {
return err
Expand Down Expand Up @@ -121,11 +139,18 @@ func (r *FetchResponse) key() int16 {
}

func (r *FetchResponse) version() int16 {
return 0
return r.Version
}

func (r *FetchResponse) requiredVersion() KafkaVersion {
return minVersion
switch r.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
default:
return minVersion
}
}

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
Expand Down
34 changes: 21 additions & 13 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"compress/gzip"
"fmt"
"io/ioutil"
"time"

"github.com/eapache/go-xerial-snappy"
)
Expand All @@ -21,27 +22,29 @@ const (
CompressionSnappy CompressionCodec = 2
)

// The spec just says: "This is a version id used to allow backwards compatible evolution of the message
// binary format." but it doesn't say what the current value is, so presumably 0...
const messageFormat int8 = 0

type Message struct {
Codec CompressionCodec // codec used to compress the message contents
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Codec CompressionCodec // codec used to compress the message contents
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Version int8 // v1 requires Kafka 0.10
Timestamp time.Time // the timestamp of the message (version 1+ only)

compressedCache []byte
}

func (m *Message) encode(pe packetEncoder) error {
pe.push(&crc32Field{})

pe.putInt8(messageFormat)
pe.putInt8(m.Version)

attributes := int8(m.Codec) & compressionCodecMask
pe.putInt8(attributes)

if m.Version >= 1 {
pe.putInt64(m.Timestamp.UnixNano() / int64(time.Millisecond))
}

err := pe.putBytes(m.Key)
if err != nil {
return err
Expand Down Expand Up @@ -89,20 +92,25 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}

format, err := pd.getInt8()
m.Version, err = pd.getInt8()
if err != nil {
return err
}
if format != messageFormat {
return PacketDecodingError{"unexpected messageFormat"}
}

attribute, err := pd.getInt8()
if err != nil {
return err
}
m.Codec = CompressionCodec(attribute & compressionCodecMask)

if m.Version >= 1 {
millis, err := pd.getInt64()
if err != nil {
return err
}
m.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}

m.Key, err = pd.getBytes()
if err != nil {
return err
Expand Down
11 changes: 8 additions & 3 deletions produce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
if millis, err := pd.getInt64(); err != nil {
return err
} else {
pr.Timestamp = time.Unix(millis/1000, millis%1000)
pr.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}
}

Expand All @@ -34,7 +34,7 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseBlock
Version int16
ThrottleTime int32 // only provided if Version >= 1
ThrottleTime time.Duration // only provided if Version >= 1
}

func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -75,8 +75,10 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
}

if pr.Version >= 1 {
if pr.ThrottleTime, err = pd.getInt32(); err != nil {
if millis, err := pd.getInt32(); err != nil {
return err
} else {
pr.ThrottleTime = time.Duration(millis) * time.Millisecond
}
}

Expand All @@ -103,6 +105,9 @@ func (pr *ProduceResponse) encode(pe packetEncoder) error {
pe.putInt64(prb.Offset)
}
}
if pr.Version >= 1 {
pe.putInt32(int32(pr.ThrottleTime / time.Millisecond))
}
return nil
}

Expand Down