Skip to content

Commit

Permalink
Merge pull request #684 from Shopify/fix-timestamp-parsing
Browse files Browse the repository at this point in the history
Fix parsing of producer timestamp
  • Loading branch information
eapache authored Jun 20, 2016
2 parents 1deca4b + 5246fed commit 7e6290f
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
3 changes: 1 addition & 2 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,9 +728,8 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
// Success
case ErrNoError:
if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) {
timestamp := time.Unix(block.Timestamp, 0)
for _, msg := range msgs {
msg.Timestamp = timestamp
msg.Timestamp = block.Timestamp
}
}
for i, msg := range msgs {
Expand Down
8 changes: 6 additions & 2 deletions produce_response.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package sarama

import "time"

type ProduceResponseBlock struct {
Err KError
Offset int64
Timestamp int64 // only provided if Version >= 2
Timestamp time.Time // only provided if Version >= 2
}

func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand All @@ -19,8 +21,10 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err err
}

if version >= 2 {
if pr.Timestamp, err = pd.getInt64(); err != nil {
if millis, err := pd.getInt64(); err != nil {
return err
} else {
pr.Timestamp = time.Unix(millis/1000, millis%1000)
}
}

Expand Down

0 comments on commit 7e6290f

Please sign in to comment.