diff --git a/CHANGELOG.md b/CHANGELOG.md index 844f481b49..700d83858f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog + +#### Unreleased + +Improvements: +- Enabled zstd compression + ([1574](https://github.com/Shopify/sarama/pull/1574)) + #### Version 1.25.0 (2020-01-13) New Features: diff --git a/config.go b/config.go index 69c716173d..e0e02f2b62 100644 --- a/config.go +++ b/config.go @@ -629,6 +629,10 @@ func (c *Config) Validate() error { } } + if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) { + return ConfigurationError("zstd compression requires Version >= V2_1_0_0") + } + if c.Producer.Idempotent { if !c.Version.IsAtLeast(V0_11_0_0) { return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0") diff --git a/config_test.go b/config_test.go index 3f7a7a7a50..df41d5d029 100644 --- a/config_test.go +++ b/config_test.go @@ -405,6 +405,18 @@ func TestLZ4ConfigValidation(t *testing.T) { } } +func TestZstdConfigValidation(t *testing.T) { + config := NewConfig() + config.Producer.Compression = CompressionZSTD + if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" { + t.Error("Expected invalid zstd/kafka version error, got ", err) + } + config.Version = V2_1_0_0 + if err := config.Validate(); err != nil { + t.Error("Expected zstd to work, got ", err) + } +} + // This example shows how to integrate with an existing registry as well as publishing metrics // on the standard output func ExampleConfig_metrics() { diff --git a/consumer.go b/consumer.go index 72c4d7cd8f..f6e753851b 100644 --- a/consumer.go +++ b/consumer.go @@ -888,6 +888,16 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { request.Isolation = bc.consumer.conf.Consumer.IsolationLevel } + // from https://github.com/Shopify/sarama/pull/1443 + // TODO: verify that versions 5-9 didn't introduce breaking change. + // v9 introduces FETCH_REQUEST_TOPIC_V9, + // v7 introduces FORGOTTEN_TOPIC_DATA_V7, SESSION_ID, & SESSION_EPOCH, and + // v5 introduces TOPICS_V5 (subsumed by FETCH_REQUEST_TOPIC_V9). + // There also may be response level changes. + if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) { + request.Version = 10 + } + for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) } diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 135ee791b2..8b31b45c5e 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -106,6 +106,21 @@ func TestVersionMatrixLZ4(t *testing.T) { consumeMsgs(t, testVersions, producedMessages) } +// Support for zstd codec was introduced in v2.1.0.0 +func TestVersionMatrixZstd(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + // Produce lot's of message with all possible combinations of supported + // protocol versions starting with v2.1.0.0 (first where zstd was supported) + testVersions := versionRange(V2_1_0_0) + allCodecs := []CompressionCodec{CompressionZSTD} + producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false) + + // When/Then + consumeMsgs(t, testVersions, producedMessages) +} + func TestVersionMatrixIdempotent(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) diff --git a/produce_request.go b/produce_request.go index 0c755d02b6..178972a0f2 100644 --- a/produce_request.go +++ b/produce_request.go @@ -214,6 +214,8 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion { return V0_10_0_0 case 3: return V0_11_0_0 + case 7: + return V2_1_0_0 default: return MinVersion } diff --git a/produce_response.go b/produce_response.go index 4c5cd3569c..e4f19a7220 100644 --- a/produce_response.go +++ b/produce_response.go @@ -5,11 +5,27 @@ import ( "time" ) +// Protocol, http://kafka.apache.org/protocol.html +// v1 +// v2 = v3 = v4 +// v5 = v6 = v7 +// Produce Response (Version: 7) => [responses] throttle_time_ms +// responses => topic [partition_responses] +// topic => STRING +// partition_responses => partition error_code base_offset log_append_time log_start_offset +// partition => INT32 +// error_code => INT16 +// base_offset => INT64 +// log_append_time => INT64 +// log_start_offset => INT64 +// throttle_time_ms => INT32 + +// partition_responses in protocol type ProduceResponseBlock struct { - Err KError - Offset int64 - // only provided if Version >= 2 and the broker is configured with `LogAppendTime` - Timestamp time.Time + Err KError // v0, error_code + Offset int64 // v0, base_offset + Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime` + StartOffset int64 // v5, log_start_offset } func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) { @@ -32,6 +48,13 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro } } + if version >= 5 { + b.StartOffset, err = pd.getInt64() + if err != nil { + return err + } + } + return nil } @@ -49,13 +72,17 @@ func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err erro pe.putInt64(timestamp) } + if version >= 5 { + pe.putInt64(b.StartOffset) + } + return nil } type ProduceResponse struct { - Blocks map[string]map[int32]*ProduceResponseBlock + Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses Version int16 - ThrottleTime time.Duration // only provided if Version >= 1 + ThrottleTime time.Duration // v1, throttle_time_ms } func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) { @@ -129,6 +156,7 @@ func (r *ProduceResponse) encode(pe packetEncoder) error { } } } + if r.Version >= 1 { pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) } @@ -143,19 +171,6 @@ func (r *ProduceResponse) version() int16 { return r.Version } -func (r *ProduceResponse) requiredVersion() KafkaVersion { - switch r.Version { - case 1: - return V0_9_0_0 - case 2: - return V0_10_0_0 - case 3: - return V0_11_0_0 - default: - return MinVersion - } -} - func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock { if r.Blocks == nil { return nil diff --git a/produce_response_test.go b/produce_response_test.go index 197c7fb50d..0dfac79474 100644 --- a/produce_response_test.go +++ b/produce_response_test.go @@ -10,8 +10,8 @@ var ( produceResponseNoBlocksV0 = []byte{ 0x00, 0x00, 0x00, 0x00} - produceResponseManyBlocksVersions = [][]byte{ - { + produceResponseManyBlocksVersions = map[int][]byte{ + 0: { 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 'f', 'o', 'o', @@ -20,7 +20,9 @@ var ( 0x00, 0x00, 0x00, 0x01, // Partition 1 0x00, 0x02, // ErrInvalidMessage 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 - }, { + }, + + 1: { 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 'f', 'o', 'o', @@ -31,7 +33,8 @@ var ( 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time - }, { + }, + 2: { 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 'f', 'o', 'o', @@ -42,6 +45,20 @@ var ( 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used) + 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time + }, + 7: { // version 7 adds StartOffset + 0x00, 0x00, 0x00, 0x01, + + 0x00, 0x03, 'f', 'o', 'o', + 0x00, 0x00, 0x00, 0x01, + + 0x00, 0x00, 0x00, 0x01, // Partition 1 + 0x00, 0x02, // ErrInvalidMessage + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x32, // StartOffset 50 + 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time }, } @@ -69,14 +86,19 @@ func TestProduceResponseDecode(t *testing.T) { t.Error("Decoding did not produce a block for foo/1") } else { if block.Err != ErrInvalidMessage { - t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err)) + t.Error("Decoding failed for foo/1/Err, got:", int16(block.Err)) } if block.Offset != 255 { t.Error("Decoding failed for foo/1/Offset, got:", block.Offset) } if v >= 2 { if block.Timestamp != time.Unix(1, 0) { - t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp) + t.Error("Decoding failed for foo/1/Timestamp, got:", block.Timestamp) + } + } + if v >= 7 { + if block.StartOffset != 50 { + t.Error("Decoding failed for foo/1/StartOffset, got:", block.StartOffset) } } } @@ -95,9 +117,10 @@ func TestProduceResponseEncode(t *testing.T) { response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock) response.Blocks["foo"][1] = &ProduceResponseBlock{ - Err: ErrInvalidMessage, - Offset: 255, - Timestamp: time.Unix(1, 0), + Err: ErrInvalidMessage, + Offset: 255, + Timestamp: time.Unix(1, 0), + StartOffset: 50, } response.ThrottleTime = 100 * time.Millisecond for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions { diff --git a/produce_set.go b/produce_set.go index b684aa4dca..36c43c6a61 100644 --- a/produce_set.go +++ b/produce_set.go @@ -129,6 +129,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest { req.Version = 3 } + if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) { + req.Version = 7 + } + for topic, partitionSets := range ps.msgs { for partition, set := range partitionSets { if req.Version >= 3 {