diff --git a/acl_create_request.go b/acl_create_request.go index da1cdefc3..6d8a70e1a 100644 --- a/acl_create_request.go +++ b/acl_create_request.go @@ -47,6 +47,10 @@ func (c *CreateAclsRequest) version() int16 { return c.Version } +func (c *CreateAclsRequest) headerVersion() int16 { + return 1 +} + func (c *CreateAclsRequest) requiredVersion() KafkaVersion { switch c.Version { case 1: diff --git a/acl_create_response.go b/acl_create_response.go index f5a5e9a64..bc018ed00 100644 --- a/acl_create_response.go +++ b/acl_create_response.go @@ -55,6 +55,10 @@ func (c *CreateAclsResponse) version() int16 { return 0 } +func (c *CreateAclsResponse) headerVersion() int16 { + return 0 +} + func (c *CreateAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/acl_delete_request.go b/acl_delete_request.go index 15908eac9..415252259 100644 --- a/acl_delete_request.go +++ b/acl_delete_request.go @@ -48,6 +48,10 @@ func (d *DeleteAclsRequest) version() int16 { return int16(d.Version) } +func (c *DeleteAclsRequest) headerVersion() int16 { + return 1 +} + func (d *DeleteAclsRequest) requiredVersion() KafkaVersion { switch d.Version { case 1: diff --git a/acl_delete_response.go b/acl_delete_response.go index 3754faeba..cb6308826 100644 --- a/acl_delete_response.go +++ b/acl_delete_response.go @@ -56,6 +56,10 @@ func (d *DeleteAclsResponse) version() int16 { return d.Version } +func (d *DeleteAclsResponse) headerVersion() int16 { + return 0 +} + func (d *DeleteAclsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/acl_describe_request.go b/acl_describe_request.go index 5222d46ee..29841a5ce 100644 --- a/acl_describe_request.go +++ b/acl_describe_request.go @@ -25,6 +25,10 @@ func (d *DescribeAclsRequest) version() int16 { return int16(d.Version) } +func (d *DescribeAclsRequest) headerVersion() int16 { + return 1 +} + func (d *DescribeAclsRequest) requiredVersion() KafkaVersion { switch d.Version { case 1: diff --git a/acl_describe_response.go b/acl_describe_response.go index a60d39f35..c43408b24 100644 --- a/acl_describe_response.go +++ b/acl_describe_response.go @@ -77,6 +77,10 @@ func (d *DescribeAclsResponse) version() int16 { return d.Version } +func (d *DescribeAclsResponse) headerVersion() int16 { + return 0 +} + func (d *DescribeAclsResponse) requiredVersion() KafkaVersion { switch d.Version { case 1: diff --git a/add_offsets_to_txn_request.go b/add_offsets_to_txn_request.go index fc227ab86..95586f9a1 100644 --- a/add_offsets_to_txn_request.go +++ b/add_offsets_to_txn_request.go @@ -48,6 +48,10 @@ func (a *AddOffsetsToTxnRequest) version() int16 { return 0 } +func (a *AddOffsetsToTxnRequest) headerVersion() int16 { + return 1 +} + func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/add_offsets_to_txn_response.go b/add_offsets_to_txn_response.go index c88c1f89f..bdb184419 100644 --- a/add_offsets_to_txn_response.go +++ b/add_offsets_to_txn_response.go @@ -40,6 +40,10 @@ func (a *AddOffsetsToTxnResponse) version() int16 { return 0 } +func (a *AddOffsetsToTxnResponse) headerVersion() int16 { + return 0 +} + func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/add_partitions_to_txn_request.go b/add_partitions_to_txn_request.go index 8d4b42e34..6289f4514 100644 --- a/add_partitions_to_txn_request.go +++ b/add_partitions_to_txn_request.go @@ -72,6 +72,10 @@ func (a *AddPartitionsToTxnRequest) version() int16 { return 0 } +func (a *AddPartitionsToTxnRequest) headerVersion() int16 { + return 1 +} + func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/add_partitions_to_txn_response.go b/add_partitions_to_txn_response.go index eb4f23eca..73b73b07f 100644 --- a/add_partitions_to_txn_response.go +++ b/add_partitions_to_txn_response.go @@ -79,6 +79,10 @@ func (a *AddPartitionsToTxnResponse) version() int16 { return 0 } +func (a *AddPartitionsToTxnResponse) headerVersion() int16 { + return 0 +} + func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/admin.go b/admin.go index 7dd725064..a7d733da1 100644 --- a/admin.go +++ b/admin.go @@ -42,6 +42,14 @@ type ClusterAdmin interface { // new partitions. This operation is supported by brokers with version 1.0.0 or higher. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error + // Alter the replica assignment for partitions. + // This operation is supported by brokers with version 2.4.0.0 or higher. + AlterPartitionReassignments(topic string, assignment [][]int32) error + + // Provides info on ongoing partitions replica reassignments. + // This operation is supported by brokers with version 2.4.0.0 or higher. + ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) + // Delete records whose offset is smaller than the given offset of the corresponding partition. // This operation is supported by brokers with version 0.11.0.0 or higher. DeleteRecords(topic string, partitionOffsets map[int32]int64) error @@ -452,6 +460,82 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ }) } +func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error { + if topic == "" { + return ErrInvalidTopic + } + + request := &AlterPartitionReassignmentsRequest{ + TimeoutMs: int32(60000), + Version: int16(0), + } + + for i := 0; i < len(assignment); i++ { + request.AddBlock(topic, int32(i), assignment[i]) + } + + return ca.retryOnError(isErrNoController, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + + errs := make([]error, 0) + + rsp, err := b.AlterPartitionReassignments(request) + + if err != nil { + errs = append(errs, err) + } else { + if rsp.ErrorCode > 0 { + errs = append(errs, errors.New(rsp.ErrorCode.Error())) + } + + for topic, topicErrors := range rsp.Errors { + for partition, partitionError := range topicErrors { + if partitionError.errorCode != ErrNoError { + errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error()) + errs = append(errs, errors.New(errStr)) + } + } + } + } + + if len(errs) > 0 { + return ErrReassignPartitions{MultiError{&errs}} + } + + return nil + }) +} + +func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) { + if topic == "" { + return nil, ErrInvalidTopic + } + + request := &ListPartitionReassignmentsRequest{ + TimeoutMs: int32(60000), + Version: int16(0), + } + + request.AddBlock(topic, partitions) + + b, err := ca.Controller() + if err != nil { + return nil, err + } + _ = b.Open(ca.client.Config()) + + rsp, err := b.ListPartitionReassignments(request) + + if err == nil && rsp != nil { + return rsp.TopicStatus, nil + } else { + return nil, err + } +} + func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error { if topic == "" { return ErrInvalidTopic diff --git a/admin_test.go b/admin_test.go index fcbe539b5..bcec47f61 100644 --- a/admin_test.go +++ b/admin_test.go @@ -332,6 +332,167 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { } } +func TestClusterAdminAlterPartitionReassignments(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + secondBroker := NewMockBroker(t, 2) + defer secondBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(secondBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). + SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), + }) + + secondBroker.SetHandlerByMap(map[string]MockResponse{ + "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), + }) + + config := NewConfig() + config.Version = V2_4_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + var topicAssignment = make([][]int32, 0, 3) + + err = admin.AlterPartitionReassignments("my_topic", topicAssignment) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + secondBroker := NewMockBroker(t, 2) + defer secondBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(secondBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). + SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), + }) + + secondBroker.SetHandlerByMap(map[string]MockResponse{ + "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), + }) + + config := NewConfig() + config.Version = V2_3_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + var topicAssignment = make([][]int32, 0, 3) + + err = admin.AlterPartitionReassignments("my_topic", topicAssignment) + + if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminListPartitionReassignments(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + secondBroker := NewMockBroker(t, 2) + defer secondBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(secondBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). + SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), + }) + + secondBroker.SetHandlerByMap(map[string]MockResponse{ + "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), + }) + + config := NewConfig() + config.Version = V2_4_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1}) + if err != nil { + t.Fatal(err) + } + + partitionStatus, ok := response["my_topic"] + if !ok { + t.Fatalf("topic missing in response") + } else { + if len(partitionStatus) != 2 { + t.Fatalf("partition missing in response") + } + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + secondBroker := NewMockBroker(t, 2) + defer secondBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(secondBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). + SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), + }) + + secondBroker.SetHandlerByMap(map[string]MockResponse{ + "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), + }) + + config := NewConfig() + config.Version = V2_3_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + var partitions = make([]int32, 0) + + _, err = admin.ListPartitionReassignments("my_topic", partitions) + + if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminDeleteRecords(t *testing.T) { topicName := "my_topic" seedBroker := NewMockBroker(t, 1) diff --git a/alter_configs_request.go b/alter_configs_request.go index 26c275b83..c88bb604a 100644 --- a/alter_configs_request.go +++ b/alter_configs_request.go @@ -117,6 +117,10 @@ func (a *AlterConfigsRequest) version() int16 { return 0 } +func (a *AlterConfigsRequest) headerVersion() int16 { + return 1 +} + func (a *AlterConfigsRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/alter_configs_response.go b/alter_configs_response.go index 3893663cf..99a526005 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -92,6 +92,10 @@ func (a *AlterConfigsResponse) version() int16 { return 0 } +func (a *AlterConfigsResponse) headerVersion() int16 { + return 0 +} + func (a *AlterConfigsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/alter_partition_reassignments_request.go b/alter_partition_reassignments_request.go new file mode 100644 index 000000000..f0a2f9dd5 --- /dev/null +++ b/alter_partition_reassignments_request.go @@ -0,0 +1,130 @@ +package sarama + +type alterPartitionReassignmentsBlock struct { + replicas []int32 +} + +func (b *alterPartitionReassignmentsBlock) encode(pe packetEncoder) error { + if err := pe.putNullableCompactInt32Array(b.replicas); err != nil { + return err + } + + pe.putEmptyTaggedFieldArray() + return nil +} + +func (b *alterPartitionReassignmentsBlock) decode(pd packetDecoder) (err error) { + if b.replicas, err = pd.getCompactInt32Array(); err != nil { + return err + } + return nil +} + +type AlterPartitionReassignmentsRequest struct { + TimeoutMs int32 + blocks map[string]map[int32]*alterPartitionReassignmentsBlock + Version int16 +} + +func (r *AlterPartitionReassignmentsRequest) encode(pe packetEncoder) error { + pe.putInt32(r.TimeoutMs) + + pe.putCompactArrayLength(len(r.blocks)) + + for topic, partitions := range r.blocks { + if err := pe.putCompactString(topic); err != nil { + return err + } + pe.putCompactArrayLength(len(partitions)) + for partition, block := range partitions { + pe.putInt32(partition) + if err := block.encode(pe); err != nil { + return err + } + } + pe.putEmptyTaggedFieldArray() + } + + pe.putEmptyTaggedFieldArray() + + return nil +} + +func (r *AlterPartitionReassignmentsRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if r.TimeoutMs, err = pd.getInt32(); err != nil { + return err + } + + topicCount, err := pd.getCompactArrayLength() + if err != nil { + return err + } + if topicCount > 0 { + r.blocks = make(map[string]map[int32]*alterPartitionReassignmentsBlock) + for i := 0; i < topicCount; i++ { + topic, err := pd.getCompactString() + if err != nil { + return err + } + partitionCount, err := pd.getCompactArrayLength() + if err != nil { + return err + } + r.blocks[topic] = make(map[int32]*alterPartitionReassignmentsBlock) + for j := 0; j < partitionCount; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + block := &alterPartitionReassignmentsBlock{} + if err := block.decode(pd); err != nil { + return err + } + r.blocks[topic][partition] = block + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + + return +} + +func (r *AlterPartitionReassignmentsRequest) key() int16 { + return 45 +} + +func (r *AlterPartitionReassignmentsRequest) version() int16 { + return r.Version +} + +func (r *AlterPartitionReassignmentsRequest) headerVersion() int16 { + return 2 +} + +func (r *AlterPartitionReassignmentsRequest) requiredVersion() KafkaVersion { + return V2_4_0_0 +} + +func (r *AlterPartitionReassignmentsRequest) AddBlock(topic string, partitionID int32, replicas []int32) { + if r.blocks == nil { + r.blocks = make(map[string]map[int32]*alterPartitionReassignmentsBlock) + } + + if r.blocks[topic] == nil { + r.blocks[topic] = make(map[int32]*alterPartitionReassignmentsBlock) + } + + r.blocks[topic][partitionID] = &alterPartitionReassignmentsBlock{replicas} +} diff --git a/alter_partition_reassignments_request_test.go b/alter_partition_reassignments_request_test.go new file mode 100644 index 000000000..8d282729d --- /dev/null +++ b/alter_partition_reassignments_request_test.go @@ -0,0 +1,56 @@ +package sarama + +import "testing" + +var ( + alterPartitionReassignmentsRequestNoBlock = []byte{ + 0, 0, 39, 16, // timout 10000 + 1, // 1-1=0 blocks + 0, // empty tagged fields + } + + alterPartitionReassignmentsRequestOneBlock = []byte{ + 0, 0, 39, 16, // timout 10000 + 2, // 2-1=1 block + 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string + 2, // 2-1=1 partitions + 0, 0, 0, 0, // partitionId + 3, // 3-1=2 replica array size + 0, 0, 3, 232, // replica 1000 + 0, 0, 3, 233, // replica 1001 + 0, 0, 0, // empty tagged fields + } + + alterPartitionReassignmentsAbortRequest = []byte{ + 0, 0, 39, 16, // timout 10000 + 2, // 2-1=1 block + 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string + 2, // 2-1=1 partitions + 0, 0, 0, 0, // partitionId + 0, // replica array is null (indicates that a pending reassignment should be aborted) + 0, 0, 0, // empty tagged fields + } +) + +func TestAlterPartitionReassignmentRequest(t *testing.T) { + var request *AlterPartitionReassignmentsRequest + + request = &AlterPartitionReassignmentsRequest{ + TimeoutMs: int32(10000), + Version: int16(0), + } + + testRequest(t, "no block", request, alterPartitionReassignmentsRequestNoBlock) + + request.AddBlock("topic", 0, []int32{1000, 1001}) + + testRequest(t, "one block", request, alterPartitionReassignmentsRequestOneBlock) + + request = &AlterPartitionReassignmentsRequest{ + TimeoutMs: int32(10000), + Version: int16(0), + } + request.AddBlock("topic", 0, nil) + + testRequest(t, "abort assignment", request, alterPartitionReassignmentsAbortRequest) +} diff --git a/alter_partition_reassignments_response.go b/alter_partition_reassignments_response.go new file mode 100644 index 000000000..b3f9a15fe --- /dev/null +++ b/alter_partition_reassignments_response.go @@ -0,0 +1,157 @@ +package sarama + +type alterPartitionReassignmentsErrorBlock struct { + errorCode KError + errorMessage *string +} + +func (b *alterPartitionReassignmentsErrorBlock) encode(pe packetEncoder) error { + pe.putInt16(int16(b.errorCode)) + if err := pe.putNullableCompactString(b.errorMessage); err != nil { + return err + } + pe.putEmptyTaggedFieldArray() + + return nil +} + +func (b *alterPartitionReassignmentsErrorBlock) decode(pd packetDecoder) (err error) { + errorCode, err := pd.getInt16() + if err != nil { + return err + } + b.errorCode = KError(errorCode) + b.errorMessage, err = pd.getCompactNullableString() + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + return err +} + +type AlterPartitionReassignmentsResponse struct { + Version int16 + ThrottleTimeMs int32 + ErrorCode KError + ErrorMessage *string + Errors map[string]map[int32]*alterPartitionReassignmentsErrorBlock +} + +func (r *AlterPartitionReassignmentsResponse) AddError(topic string, partition int32, kerror KError, message *string) { + if r.Errors == nil { + r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock) + } + partitions := r.Errors[topic] + if partitions == nil { + partitions = make(map[int32]*alterPartitionReassignmentsErrorBlock) + r.Errors[topic] = partitions + } + + partitions[partition] = &alterPartitionReassignmentsErrorBlock{errorCode: kerror, errorMessage: message} +} + +func (r *AlterPartitionReassignmentsResponse) encode(pe packetEncoder) error { + pe.putInt32(r.ThrottleTimeMs) + pe.putInt16(int16(r.ErrorCode)) + if err := pe.putNullableCompactString(r.ErrorMessage); err != nil { + return err + } + + pe.putCompactArrayLength(len(r.Errors)) + for topic, partitions := range r.Errors { + if err := pe.putCompactString(topic); err != nil { + return err + } + pe.putCompactArrayLength(len(partitions)) + for partition, block := range partitions { + pe.putInt32(partition) + + if err := block.encode(pe); err != nil { + return err + } + } + pe.putEmptyTaggedFieldArray() + } + + pe.putEmptyTaggedFieldArray() + return nil +} + +func (r *AlterPartitionReassignmentsResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { + return err + } + + kerr, err := pd.getInt16() + if err != nil { + return err + } + + r.ErrorCode = KError(kerr) + + if r.ErrorMessage, err = pd.getCompactNullableString(); err != nil { + return err + } + + numTopics, err := pd.getCompactArrayLength() + if err != nil { + return err + } + + if numTopics > 0 { + r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock, numTopics) + for i := 0; i < numTopics; i++ { + topic, err := pd.getCompactString() + if err != nil { + return err + } + + ongoingPartitionReassignments, err := pd.getCompactArrayLength() + if err != nil { + return err + } + + r.Errors[topic] = make(map[int32]*alterPartitionReassignmentsErrorBlock, ongoingPartitionReassignments) + + for j := 0; j < ongoingPartitionReassignments; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + block := &alterPartitionReassignmentsErrorBlock{} + if err := block.decode(pd); err != nil { + return err + } + + r.Errors[topic][partition] = block + } + if _, err = pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + + if _, err = pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + + return nil +} + +func (r *AlterPartitionReassignmentsResponse) key() int16 { + return 45 +} + +func (r *AlterPartitionReassignmentsResponse) version() int16 { + return r.Version +} + +func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 { + return 1 +} + +func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion { + return V2_4_0_0 +} diff --git a/alter_partition_reassignments_response_test.go b/alter_partition_reassignments_response_test.go new file mode 100644 index 000000000..614b571b3 --- /dev/null +++ b/alter_partition_reassignments_response_test.go @@ -0,0 +1,45 @@ +package sarama + +import "testing" + +var ( + alterPartitionReassignmentsResponseNoError = []byte{ + 0, 0, 39, 16, // ThrottleTimeMs 10000 + 0, 0, // errorcode + 0, // null string + 1, // empty errors array + 0, // empty tagged fields + } + + alterPartitionReassignmentsResponseWithError = []byte{ + 0, 0, 39, 16, // ThrottleTimeMs 10000 + 0, 12, // errorcode + 6, 101, 114, 114, 111, 114, // error string "error" + 2, // errors array length 1 + 6, 116, 111, 112, 105, 99, // topic name "topic" + 2, // partition array length 1 + 0, 0, 0, 1, // partitionId + 0, 3, //kerror + 7, 101, 114, 114, 111, 114, 50, // error string "error2" + 0, 0, 0, // empty tagged fields + } +) + +func TestAlterPartitionReassignmentResponse(t *testing.T) { + var response *AlterPartitionReassignmentsResponse + + response = &AlterPartitionReassignmentsResponse{ + ThrottleTimeMs: int32(10000), + Version: int16(0), + } + + testResponse(t, "no error", response, alterPartitionReassignmentsResponseNoError) + + errorMessage := "error" + partitionError := "error2" + response.ErrorCode = 12 + response.ErrorMessage = &errorMessage + response.AddError("topic", 1, 3, &partitionError) + + testResponse(t, "with error", response, alterPartitionReassignmentsResponseWithError) +} diff --git a/api_versions_request.go b/api_versions_request.go index b33167c0b..d67c5e1e5 100644 --- a/api_versions_request.go +++ b/api_versions_request.go @@ -20,6 +20,10 @@ func (a *ApiVersionsRequest) version() int16 { return 0 } +func (a *ApiVersionsRequest) headerVersion() int16 { + return 1 +} + func (a *ApiVersionsRequest) requiredVersion() KafkaVersion { return V0_10_0_0 } diff --git a/api_versions_response.go b/api_versions_response.go index bb1f0b31a..582e29db4 100644 --- a/api_versions_response.go +++ b/api_versions_response.go @@ -84,6 +84,10 @@ func (r *ApiVersionsResponse) version() int16 { return 0 } +func (a *ApiVersionsResponse) headerVersion() int16 { + return 0 +} + func (r *ApiVersionsResponse) requiredVersion() KafkaVersion { return V0_10_0_0 } diff --git a/async_producer_test.go b/async_producer_test.go index 3de308680..d0f012d24 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -988,7 +988,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { lastBatchFirstSeq := -1 lastBatchSize := -1 lastSequenceWrittenToDisk := -1 - handlerFailBeforeWrite := func(req *request) (res encoder) { + handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) { switch req.body.key() { case 3: return metadataResponse diff --git a/broker.go b/broker.go index 9ca41c91e..4f3991af7 100644 --- a/broker.go +++ b/broker.go @@ -119,6 +119,7 @@ type SCRAMClient interface { type responsePromise struct { requestTime time.Time correlationID int32 + headerVersion int16 packets chan []byte errors chan error } @@ -513,6 +514,32 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart return response, nil } +//AlterPartitionReassignments sends a alter partition reassignments request and +//returns alter partition reassignments response +func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) { + response := new(AlterPartitionReassignmentsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +//ListPartitionReassignments sends a list partition reassignments request and +//returns list partition reassignments response +func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) { + response := new(ListPartitionReassignmentsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + //DeleteRecords send a request to delete records and return delete record //response or error func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) { @@ -693,7 +720,7 @@ func (b *Broker) write(buf []byte) (n int, err error) { return b.conn.Write(buf) } -func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { +func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() @@ -731,14 +758,19 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, return nil, nil } - promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)} + promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)} b.responses <- promise return &promise, nil } -func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error { - promise, err := b.send(req, res != nil) +func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error { + responseHeaderVersion := int16(-1) + if res != nil { + responseHeaderVersion = res.headerVersion() + } + + promise, err := b.send(req, res != nil, responseHeaderVersion) if err != nil { return err } @@ -818,7 +850,6 @@ func (b *Broker) encode(pe packetEncoder, version int16) (err error) { func (b *Broker) responseReceiver() { var dead error - header := make([]byte, 8) for response := range b.responses { if dead != nil { @@ -829,6 +860,9 @@ func (b *Broker) responseReceiver() { continue } + var headerLength = getHeaderLength(response.headerVersion) + header := make([]byte, headerLength) + bytesReadHeader, err := b.readFull(header) requestLatency := time.Since(response.requestTime) if err != nil { @@ -839,7 +873,7 @@ func (b *Broker) responseReceiver() { } decodedHeader := responseHeader{} - err = decode(header, &decodedHeader) + err = versionedDecode(header, &decodedHeader, response.headerVersion) if err != nil { b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency) dead = err @@ -855,7 +889,7 @@ func (b *Broker) responseReceiver() { continue } - buf := make([]byte, decodedHeader.length-4) + buf := make([]byte, decodedHeader.length-int32(headerLength)+4) bytesReadBody, err := b.readFull(buf) b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency) if err != nil { @@ -869,6 +903,15 @@ func (b *Broker) responseReceiver() { close(b.done) } +func getHeaderLength(headerVersion int16) int8 { + if headerVersion < 1 { + return 8 + } else { + // header contains additional tagged field length (0), we don't support actual tags yet. + return 9 + } +} + func (b *Broker) authenticateViaSASL() error { switch b.conf.Net.SASL.Mechanism { case SASLTypeOAuth: @@ -1180,7 +1223,7 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e } header := responseHeader{} - err = decode(buf, &header) + err = versionedDecode(buf, &header, 0) if err != nil { return nil, err } @@ -1269,7 +1312,7 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl } header := responseHeader{} - err = decode(buf, &header) + err = versionedDecode(buf, &header, 0) if err != nil { return bytesRead, err } diff --git a/broker_test.go b/broker_test.go index 387dae952..e2b17462c 100644 --- a/broker_test.go +++ b/broker_test.go @@ -42,6 +42,10 @@ func (m mockEncoder) encode(pe packetEncoder) error { return pe.putRawBytes(m.bytes) } +func (m mockEncoder) headerVersion() int16 { + return 0 +} + type brokerMetrics struct { bytesRead int bytesWritten int diff --git a/consumer_metadata_request.go b/consumer_metadata_request.go index a8dcaefe8..e5ebdaef5 100644 --- a/consumer_metadata_request.go +++ b/consumer_metadata_request.go @@ -29,6 +29,10 @@ func (r *ConsumerMetadataRequest) version() int16 { return 0 } +func (r *ConsumerMetadataRequest) headerVersion() int16 { + return 1 +} + func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion { return V0_8_2_0 } diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index f39a8711c..1b5d00d22 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -73,6 +73,10 @@ func (r *ConsumerMetadataResponse) version() int16 { return 0 } +func (r *ConsumerMetadataResponse) headerVersion() int16 { + return 0 +} + func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion { return V0_8_2_0 } diff --git a/create_partitions_request.go b/create_partitions_request.go index af321e994..46fb04402 100644 --- a/create_partitions_request.go +++ b/create_partitions_request.go @@ -67,6 +67,10 @@ func (r *CreatePartitionsRequest) version() int16 { return 0 } +func (r *CreatePartitionsRequest) headerVersion() int16 { + return 1 +} + func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/create_partitions_response.go b/create_partitions_response.go index bb18204a7..12ce78857 100644 --- a/create_partitions_response.go +++ b/create_partitions_response.go @@ -63,6 +63,10 @@ func (r *CreatePartitionsResponse) version() int16 { return 0 } +func (r *CreatePartitionsResponse) headerVersion() int16 { + return 0 +} + func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/create_topics_request.go b/create_topics_request.go index 709c0a44e..287acd069 100644 --- a/create_topics_request.go +++ b/create_topics_request.go @@ -79,6 +79,10 @@ func (c *CreateTopicsRequest) version() int16 { return c.Version } +func (r *CreateTopicsRequest) headerVersion() int16 { + return 1 +} + func (c *CreateTopicsRequest) requiredVersion() KafkaVersion { switch c.Version { case 2: diff --git a/create_topics_response.go b/create_topics_response.go index a493e02ac..7e1448a66 100644 --- a/create_topics_response.go +++ b/create_topics_response.go @@ -70,6 +70,10 @@ func (c *CreateTopicsResponse) version() int16 { return c.Version } +func (c *CreateTopicsResponse) headerVersion() int16 { + return 0 +} + func (c *CreateTopicsResponse) requiredVersion() KafkaVersion { switch c.Version { case 2: diff --git a/delete_groups_request.go b/delete_groups_request.go index 305a324ac..4ac8bbee4 100644 --- a/delete_groups_request.go +++ b/delete_groups_request.go @@ -21,6 +21,10 @@ func (r *DeleteGroupsRequest) version() int16 { return 0 } +func (r *DeleteGroupsRequest) headerVersion() int16 { + return 1 +} + func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion { return V1_1_0_0 } diff --git a/delete_groups_response.go b/delete_groups_response.go index c067ebb42..5e7b1ed36 100644 --- a/delete_groups_response.go +++ b/delete_groups_response.go @@ -65,6 +65,10 @@ func (r *DeleteGroupsResponse) version() int16 { return 0 } +func (r *DeleteGroupsResponse) headerVersion() int16 { + return 0 +} + func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion { return V1_1_0_0 } diff --git a/delete_records_request.go b/delete_records_request.go index 93efafd4d..dc106b17d 100644 --- a/delete_records_request.go +++ b/delete_records_request.go @@ -77,6 +77,10 @@ func (d *DeleteRecordsRequest) version() int16 { return 0 } +func (d *DeleteRecordsRequest) headerVersion() int16 { + return 1 +} + func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/delete_records_response.go b/delete_records_response.go index 733a58b6b..d530b4c7e 100644 --- a/delete_records_response.go +++ b/delete_records_response.go @@ -80,6 +80,10 @@ func (d *DeleteRecordsResponse) version() int16 { return 0 } +func (d *DeleteRecordsResponse) headerVersion() int16 { + return 0 +} + func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/delete_topics_request.go b/delete_topics_request.go index 911f67d31..ba6780a8e 100644 --- a/delete_topics_request.go +++ b/delete_topics_request.go @@ -38,6 +38,10 @@ func (d *DeleteTopicsRequest) version() int16 { return d.Version } +func (d *DeleteTopicsRequest) headerVersion() int16 { + return 1 +} + func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion { switch d.Version { case 1: diff --git a/delete_topics_response.go b/delete_topics_response.go index 34225460a..733961a89 100644 --- a/delete_topics_response.go +++ b/delete_topics_response.go @@ -68,6 +68,10 @@ func (d *DeleteTopicsResponse) version() int16 { return d.Version } +func (d *DeleteTopicsResponse) headerVersion() int16 { + return 0 +} + func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion { switch d.Version { case 1: diff --git a/describe_configs_request.go b/describe_configs_request.go index ccb587b35..d0c735280 100644 --- a/describe_configs_request.go +++ b/describe_configs_request.go @@ -100,6 +100,10 @@ func (r *DescribeConfigsRequest) version() int16 { return r.Version } +func (r *DescribeConfigsRequest) headerVersion() int16 { + return 1 +} + func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/describe_configs_response.go b/describe_configs_response.go index dd919f127..063ae9112 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -112,6 +112,10 @@ func (r *DescribeConfigsResponse) version() int16 { return r.Version } +func (r *DescribeConfigsResponse) headerVersion() int16 { + return 0 +} + func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/describe_groups_request.go b/describe_groups_request.go index 1fb356777..f8962da58 100644 --- a/describe_groups_request.go +++ b/describe_groups_request.go @@ -21,6 +21,10 @@ func (r *DescribeGroupsRequest) version() int16 { return 0 } +func (r *DescribeGroupsRequest) headerVersion() int16 { + return 1 +} + func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/describe_groups_response.go b/describe_groups_response.go index 542b3a971..bc242e421 100644 --- a/describe_groups_response.go +++ b/describe_groups_response.go @@ -43,6 +43,10 @@ func (r *DescribeGroupsResponse) version() int16 { return 0 } +func (r *DescribeGroupsResponse) headerVersion() int16 { + return 0 +} + func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/describe_log_dirs_request.go b/describe_log_dirs_request.go index cb1e78152..c0bf04e04 100644 --- a/describe_log_dirs_request.go +++ b/describe_log_dirs_request.go @@ -78,6 +78,10 @@ func (r *DescribeLogDirsRequest) version() int16 { return r.Version } +func (r *DescribeLogDirsRequest) headerVersion() int16 { + return 1 +} + func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/describe_log_dirs_response.go b/describe_log_dirs_response.go index d207312ef..a9a747615 100644 --- a/describe_log_dirs_response.go +++ b/describe_log_dirs_response.go @@ -61,6 +61,10 @@ func (r *DescribeLogDirsResponse) version() int16 { return r.Version } +func (r *DescribeLogDirsResponse) headerVersion() int16 { + return 0 +} + func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/encoder_decoder.go b/encoder_decoder.go index 7ce3bc0f6..025bad61f 100644 --- a/encoder_decoder.go +++ b/encoder_decoder.go @@ -12,6 +12,11 @@ type encoder interface { encode(pe packetEncoder) error } +type encoderWithHeader interface { + encoder + headerVersion() int16 +} + // Encode takes an Encoder and turns it into bytes while potentially recording metrics. func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) { if e == nil { diff --git a/end_txn_request.go b/end_txn_request.go index 2cd9b506d..6635425dd 100644 --- a/end_txn_request.go +++ b/end_txn_request.go @@ -45,6 +45,10 @@ func (a *EndTxnRequest) version() int16 { return 0 } +func (r *EndTxnRequest) headerVersion() int16 { + return 1 +} + func (a *EndTxnRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/end_txn_response.go b/end_txn_response.go index 33b27e33d..763976726 100644 --- a/end_txn_response.go +++ b/end_txn_response.go @@ -39,6 +39,10 @@ func (e *EndTxnResponse) version() int16 { return 0 } +func (r *EndTxnResponse) headerVersion() int16 { + return 0 +} + func (e *EndTxnResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/errors.go b/errors.go index 97be3c0f1..ca621b092 100644 --- a/errors.go +++ b/errors.go @@ -94,6 +94,14 @@ func (mErr MultiError) Error() string { return errString } +func (mErr MultiError) PrettyError() string { + var errString = "" + for _, err := range *mErr.Errors { + errString += err.Error() + "\n" + } + return errString +} + // ErrDeleteRecords is the type of error returned when fail to delete the required records type ErrDeleteRecords struct { MultiError @@ -103,6 +111,14 @@ func (err ErrDeleteRecords) Error() string { return "kafka server: failed to delete records " + err.MultiError.Error() } +type ErrReassignPartitions struct { + MultiError +} + +func (err ErrReassignPartitions) Error() string { + return fmt.Sprintf("failed to reassign partitions for topic: \n%s", err.MultiError.PrettyError()) +} + // Numeric error codes returned by the Kafka server. const ( ErrNoError KError = 0 diff --git a/fetch_request.go b/fetch_request.go index 9a3e8dd79..f893aeff7 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -239,6 +239,10 @@ func (r *FetchRequest) version() int16 { return r.Version } +func (r *FetchRequest) headerVersion() int16 { + return 1 +} + func (r *FetchRequest) requiredVersion() KafkaVersion { switch r.Version { case 0: diff --git a/fetch_response.go b/fetch_response.go index dc0aeed2e..ca6d78832 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -335,6 +335,10 @@ func (r *FetchResponse) version() int16 { return r.Version } +func (r *FetchResponse) headerVersion() int16 { + return 0 +} + func (r *FetchResponse) requiredVersion() KafkaVersion { switch r.Version { case 0: diff --git a/find_coordinator_request.go b/find_coordinator_request.go index ff2ad206c..597bcbf78 100644 --- a/find_coordinator_request.go +++ b/find_coordinator_request.go @@ -51,6 +51,10 @@ func (f *FindCoordinatorRequest) version() int16 { return f.Version } +func (r *FindCoordinatorRequest) headerVersion() int16 { + return 1 +} + func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion { switch f.Version { case 1: diff --git a/find_coordinator_response.go b/find_coordinator_response.go index 9c900e8b7..83a648ad4 100644 --- a/find_coordinator_response.go +++ b/find_coordinator_response.go @@ -82,6 +82,10 @@ func (f *FindCoordinatorResponse) version() int16 { return f.Version } +func (r *FindCoordinatorResponse) headerVersion() int16 { + return 0 +} + func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion { switch f.Version { case 1: diff --git a/heartbeat_request.go b/heartbeat_request.go index ce49c4739..e9d9af191 100644 --- a/heartbeat_request.go +++ b/heartbeat_request.go @@ -42,6 +42,10 @@ func (r *HeartbeatRequest) version() int16 { return 0 } +func (r *HeartbeatRequest) headerVersion() int16 { + return 1 +} + func (r *HeartbeatRequest) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/heartbeat_response.go b/heartbeat_response.go index 766f5fdec..577ab72e5 100644 --- a/heartbeat_response.go +++ b/heartbeat_response.go @@ -27,6 +27,10 @@ func (r *HeartbeatResponse) version() int16 { return 0 } +func (r *HeartbeatResponse) headerVersion() int16 { + return 0 +} + func (r *HeartbeatResponse) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/init_producer_id_request.go b/init_producer_id_request.go index 8ceb6c232..689444397 100644 --- a/init_producer_id_request.go +++ b/init_producer_id_request.go @@ -38,6 +38,10 @@ func (i *InitProducerIDRequest) version() int16 { return 0 } +func (i *InitProducerIDRequest) headerVersion() int16 { + return 1 +} + func (i *InitProducerIDRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/init_producer_id_response.go b/init_producer_id_response.go index 1b32eb085..3e1242bf6 100644 --- a/init_producer_id_response.go +++ b/init_producer_id_response.go @@ -50,6 +50,10 @@ func (i *InitProducerIDResponse) version() int16 { return 0 } +func (i *InitProducerIDResponse) headerVersion() int16 { + return 0 +} + func (i *InitProducerIDResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/join_group_request.go b/join_group_request.go index 97e9299ea..3734e82e4 100644 --- a/join_group_request.go +++ b/join_group_request.go @@ -134,6 +134,10 @@ func (r *JoinGroupRequest) version() int16 { return r.Version } +func (r *JoinGroupRequest) headerVersion() int16 { + return 1 +} + func (r *JoinGroupRequest) requiredVersion() KafkaVersion { switch r.Version { case 2: diff --git a/join_group_response.go b/join_group_response.go index 5752acc8a..54b0a45c2 100644 --- a/join_group_response.go +++ b/join_group_response.go @@ -123,6 +123,10 @@ func (r *JoinGroupResponse) version() int16 { return r.Version } +func (r *JoinGroupResponse) headerVersion() int16 { + return 0 +} + func (r *JoinGroupResponse) requiredVersion() KafkaVersion { switch r.Version { case 2: diff --git a/leave_group_request.go b/leave_group_request.go index e17742748..d7789b68d 100644 --- a/leave_group_request.go +++ b/leave_group_request.go @@ -35,6 +35,10 @@ func (r *LeaveGroupRequest) version() int16 { return 0 } +func (r *LeaveGroupRequest) headerVersion() int16 { + return 1 +} + func (r *LeaveGroupRequest) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/leave_group_response.go b/leave_group_response.go index d60c626da..25f8d5eb3 100644 --- a/leave_group_response.go +++ b/leave_group_response.go @@ -27,6 +27,10 @@ func (r *LeaveGroupResponse) version() int16 { return 0 } +func (r *LeaveGroupResponse) headerVersion() int16 { + return 0 +} + func (r *LeaveGroupResponse) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/list_groups_request.go b/list_groups_request.go index 3b16abf7f..ed44cc27e 100644 --- a/list_groups_request.go +++ b/list_groups_request.go @@ -19,6 +19,10 @@ func (r *ListGroupsRequest) version() int16 { return 0 } +func (r *ListGroupsRequest) headerVersion() int16 { + return 1 +} + func (r *ListGroupsRequest) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/list_groups_response.go b/list_groups_response.go index 56115d4c7..777bae7e6 100644 --- a/list_groups_response.go +++ b/list_groups_response.go @@ -64,6 +64,10 @@ func (r *ListGroupsResponse) version() int16 { return 0 } +func (r *ListGroupsResponse) headerVersion() int16 { + return 0 +} + func (r *ListGroupsResponse) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/list_partition_reassignments_request.go b/list_partition_reassignments_request.go new file mode 100644 index 000000000..c1ffa9ba0 --- /dev/null +++ b/list_partition_reassignments_request.go @@ -0,0 +1,98 @@ +package sarama + +type ListPartitionReassignmentsRequest struct { + TimeoutMs int32 + blocks map[string][]int32 + Version int16 +} + +func (r *ListPartitionReassignmentsRequest) encode(pe packetEncoder) error { + pe.putInt32(r.TimeoutMs) + + pe.putCompactArrayLength(len(r.blocks)) + + for topic, partitions := range r.blocks { + if err := pe.putCompactString(topic); err != nil { + return err + } + + if err := pe.putCompactInt32Array(partitions); err != nil { + return err + } + + pe.putEmptyTaggedFieldArray() + } + + pe.putEmptyTaggedFieldArray() + + return nil +} + +func (r *ListPartitionReassignmentsRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if r.TimeoutMs, err = pd.getInt32(); err != nil { + return err + } + + topicCount, err := pd.getCompactArrayLength() + if err != nil { + return err + } + if topicCount > 0 { + r.blocks = make(map[string][]int32) + for i := 0; i < topicCount; i++ { + topic, err := pd.getCompactString() + if err != nil { + return err + } + partitionCount, err := pd.getCompactArrayLength() + if err != nil { + return err + } + r.blocks[topic] = make([]int32, partitionCount) + for j := 0; j < partitionCount; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + r.blocks[topic][j] = partition + } + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + + return +} + +func (r *ListPartitionReassignmentsRequest) key() int16 { + return 46 +} + +func (r *ListPartitionReassignmentsRequest) version() int16 { + return r.Version +} + +func (r *ListPartitionReassignmentsRequest) headerVersion() int16 { + return 2 +} + +func (r *ListPartitionReassignmentsRequest) requiredVersion() KafkaVersion { + return V2_4_0_0 +} + +func (r *ListPartitionReassignmentsRequest) AddBlock(topic string, partitionIDs []int32) { + if r.blocks == nil { + r.blocks = make(map[string][]int32) + } + + if r.blocks[topic] == nil { + r.blocks[topic] = partitionIDs + } +} diff --git a/list_partition_reassignments_request_test.go b/list_partition_reassignments_request_test.go new file mode 100644 index 000000000..d9f9f92ca --- /dev/null +++ b/list_partition_reassignments_request_test.go @@ -0,0 +1,31 @@ +package sarama + +import "testing" + +var ( + listPartitionReassignmentsRequestOneBlock = []byte{ + 0, 0, 39, 16, // timout 10000 + 2, // 2-1=1 block + 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string + 2, // 2-1=1 partitions + 0, 0, 0, 0, // partitionId + 0, 0, // empty tagged fields + } +) + +func TestListPartitionReassignmentRequest(t *testing.T) { + var request *ListPartitionReassignmentsRequest + + request = &ListPartitionReassignmentsRequest{ + TimeoutMs: int32(10000), + Version: int16(0), + } + + request.AddBlock("topic", []int32{0}) + + testRequest(t, "one block", request, listPartitionReassignmentsRequestOneBlock) + + request.AddBlock("topic2", []int32{1, 2}) + + testRequestWithoutByteComparison(t, "two blocks", request) +} diff --git a/list_partition_reassignments_response.go b/list_partition_reassignments_response.go new file mode 100644 index 000000000..a5786ee7f --- /dev/null +++ b/list_partition_reassignments_response.go @@ -0,0 +1,169 @@ +package sarama + +type PartitionReplicaReassignmentsStatus struct { + replicas []int32 + addingReplicas []int32 + removingReplicas []int32 +} + +func (b *PartitionReplicaReassignmentsStatus) encode(pe packetEncoder) error { + if err := pe.putCompactInt32Array(b.replicas); err != nil { + return err + } + if err := pe.putCompactInt32Array(b.addingReplicas); err != nil { + return err + } + if err := pe.putCompactInt32Array(b.removingReplicas); err != nil { + return err + } + + pe.putEmptyTaggedFieldArray() + + return nil +} + +func (b *PartitionReplicaReassignmentsStatus) decode(pd packetDecoder) (err error) { + if b.replicas, err = pd.getCompactInt32Array(); err != nil { + return err + } + + if b.addingReplicas, err = pd.getCompactInt32Array(); err != nil { + return err + } + + if b.removingReplicas, err = pd.getCompactInt32Array(); err != nil { + return err + } + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + + return err +} + +type ListPartitionReassignmentsResponse struct { + Version int16 + ThrottleTimeMs int32 + ErrorCode KError + ErrorMessage *string + TopicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus +} + +func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, replicas, addingReplicas, removingReplicas []int32) { + if r.TopicStatus == nil { + r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus) + } + partitions := r.TopicStatus[topic] + if partitions == nil { + partitions = make(map[int32]*PartitionReplicaReassignmentsStatus) + r.TopicStatus[topic] = partitions + } + + partitions[partition] = &PartitionReplicaReassignmentsStatus{replicas: replicas, addingReplicas: addingReplicas, removingReplicas: removingReplicas} +} + +func (r *ListPartitionReassignmentsResponse) encode(pe packetEncoder) error { + pe.putInt32(r.ThrottleTimeMs) + pe.putInt16(int16(r.ErrorCode)) + if err := pe.putNullableCompactString(r.ErrorMessage); err != nil { + return err + } + + pe.putCompactArrayLength(len(r.TopicStatus)) + for topic, partitions := range r.TopicStatus { + if err := pe.putCompactString(topic); err != nil { + return err + } + pe.putCompactArrayLength(len(partitions)) + for partition, block := range partitions { + pe.putInt32(partition) + + if err := block.encode(pe); err != nil { + return err + } + } + pe.putEmptyTaggedFieldArray() + } + + pe.putEmptyTaggedFieldArray() + + return nil +} + +func (r *ListPartitionReassignmentsResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { + return err + } + + kerr, err := pd.getInt16() + if err != nil { + return err + } + + r.ErrorCode = KError(kerr) + + if r.ErrorMessage, err = pd.getCompactNullableString(); err != nil { + return err + } + + numTopics, err := pd.getCompactArrayLength() + if err != nil || numTopics == 0 { + return err + } + + r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus, numTopics) + for i := 0; i < numTopics; i++ { + topic, err := pd.getCompactString() + if err != nil { + return err + } + + ongoingPartitionReassignments, err := pd.getCompactArrayLength() + if err != nil { + return err + } + + r.TopicStatus[topic] = make(map[int32]*PartitionReplicaReassignmentsStatus, ongoingPartitionReassignments) + + for j := 0; j < ongoingPartitionReassignments; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + + block := &PartitionReplicaReassignmentsStatus{} + if err := block.decode(pd); err != nil { + return err + } + r.TopicStatus[topic][partition] = block + } + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + + return nil +} + +func (r *ListPartitionReassignmentsResponse) key() int16 { + return 46 +} + +func (r *ListPartitionReassignmentsResponse) version() int16 { + return r.Version +} + +func (r *ListPartitionReassignmentsResponse) headerVersion() int16 { + return 1 +} + +func (r *ListPartitionReassignmentsResponse) requiredVersion() KafkaVersion { + return V2_4_0_0 +} diff --git a/list_partition_reassignments_response_test.go b/list_partition_reassignments_response_test.go new file mode 100644 index 000000000..ba6ca5c5b --- /dev/null +++ b/list_partition_reassignments_response_test.go @@ -0,0 +1,32 @@ +package sarama + +import "testing" + +var ( + listPartitionReassignmentsResponse = []byte{ + 0, 0, 39, 16, // ThrottleTimeMs 10000 + 0, 0, // errorcode + 0, // null string + 2, // block array length 1 + 6, 116, 111, 112, 105, 99, // topic name "topic" + 2, // partition array length 1 + 0, 0, 0, 1, // partitionId + 3, 0, 0, 3, 232, 0, 0, 3, 233, // replicas [1000, 1001] + 3, 0, 0, 3, 234, 0, 0, 3, 235, // addingReplicas [1002, 1003] + 3, 0, 0, 3, 236, 0, 0, 3, 237, // addingReplicas [1004, 1005] + 0, 0, 0, // empty tagged fields + } +) + +func TestListPartitionReassignmentResponse(t *testing.T) { + var response *ListPartitionReassignmentsResponse + + response = &ListPartitionReassignmentsResponse{ + ThrottleTimeMs: int32(10000), + Version: int16(0), + } + + response.AddBlock("topic", 1, []int32{1000, 1001}, []int32{1002, 1003}, []int32{1004, 1005}) + + testResponse(t, "one topic", response, listPartitionReassignmentsResponse) +} diff --git a/metadata_request.go b/metadata_request.go index 1b590d368..e835f5a9c 100644 --- a/metadata_request.go +++ b/metadata_request.go @@ -65,6 +65,10 @@ func (r *MetadataRequest) version() int16 { return r.Version } +func (r *MetadataRequest) headerVersion() int16 { + return 1 +} + func (r *MetadataRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/metadata_response.go b/metadata_response.go index 916992d24..0bb8702cc 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -255,6 +255,10 @@ func (r *MetadataResponse) version() int16 { return r.Version } +func (r *MetadataResponse) headerVersion() int16 { + return 0 +} + func (r *MetadataResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/mockbroker.go b/mockbroker.go index 56e3436ef..ff5a68ae7 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -20,7 +20,7 @@ const ( type GSSApiHandlerFunc func([]byte) []byte -type requestHandlerFunc func(req *request) (res encoder) +type requestHandlerFunc func(req *request) (res encoderWithHeader) // RequestNotifierFunc is invoked when a mock broker processes a request successfully // and will provides the number of bytes read and written. @@ -55,7 +55,7 @@ type MockBroker struct { port int32 closing chan none stopper chan none - expectations chan encoder + expectations chan encoderWithHeader listener net.Listener t TestReporter latency time.Duration @@ -83,7 +83,7 @@ func (b *MockBroker) SetLatency(latency time.Duration) { // and uses the found MockResponse instance to generate an appropriate reply. // If the request type is not found in the map then nothing is sent. func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { - b.setHandler(func(req *request) (res encoder) { + b.setHandler(func(req *request) (res encoderWithHeader) { reqTypeName := reflect.TypeOf(req.body).Elem().Name() mockResponse := handlerMap[reqTypeName] if mockResponse == nil { @@ -231,7 +231,6 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W } }() - resHeader := make([]byte, 8) var bytesWritten int var bytesRead int for { @@ -281,8 +280,7 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W continue } - binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4)) - binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID)) + resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes))) if _, err = conn.Write(resHeader); err != nil { b.serverError(err) break @@ -318,7 +316,25 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err) } -func (b *MockBroker) defaultRequestHandler(req *request) (res encoder) { +func (b *MockBroker) encodeHeader(headerVersion int16, correlationId int32, payloadLength uint32) []byte { + headerLength := uint32(8) + + if headerVersion >= 1 { + headerLength = 9 + } + + resHeader := make([]byte, headerLength) + binary.BigEndian.PutUint32(resHeader, payloadLength+headerLength-4) + binary.BigEndian.PutUint32(resHeader[4:], uint32(correlationId)) + + if headerVersion >= 1 { + binary.PutUvarint(resHeader[8:], 0) + } + + return resHeader +} + +func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) { select { case res, ok := <-b.expectations: if !ok { @@ -373,7 +389,7 @@ func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener stopper: make(chan none), t: t, brokerID: brokerID, - expectations: make(chan encoder, 512), + expectations: make(chan encoderWithHeader, 512), listener: listener, } broker.handler = broker.defaultRequestHandler @@ -394,6 +410,6 @@ func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener return broker } -func (b *MockBroker) Returns(e encoder) { +func (b *MockBroker) Returns(e encoderWithHeader) { b.expectations <- e } diff --git a/mockresponses.go b/mockresponses.go index 6fa72ebb0..e77463a58 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -18,20 +18,20 @@ type TestReporter interface { // allows generating a response based on a request body. MockResponses are used // to program behavior of MockBroker in tests. type MockResponse interface { - For(reqBody versionedDecoder) (res encoder) + For(reqBody versionedDecoder) (res encoderWithHeader) } // MockWrapper is a mock response builder that returns a particular concrete // response regardless of the actual request passed to the `For` method. type MockWrapper struct { - res encoder + res encoderWithHeader } -func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) { +func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) { return mw.res } -func NewMockWrapper(res encoder) *MockWrapper { +func NewMockWrapper(res encoderWithHeader) *MockWrapper { return &MockWrapper{res: res} } @@ -50,7 +50,7 @@ func NewMockSequence(responses ...interface{}) *MockSequence { switch res := res.(type) { case MockResponse: ms.responses[i] = res - case encoder: + case encoderWithHeader: ms.responses[i] = NewMockWrapper(res) default: panic(fmt.Sprintf("Unexpected response type: %T", res)) @@ -59,7 +59,7 @@ func NewMockSequence(responses ...interface{}) *MockSequence { return ms } -func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) { +func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) { res = mc.responses[0].For(reqBody) if len(mc.responses) > 1 { mc.responses = mc.responses[1:] @@ -79,7 +79,7 @@ func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse { } } -func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder { +func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader { request := reqBody.(*ListGroupsRequest) _ = request response := &ListGroupsResponse{ @@ -110,7 +110,7 @@ func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, descrip return m } -func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder { +func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader { request := reqBody.(*DescribeGroupsRequest) response := &DescribeGroupsResponse{} @@ -166,7 +166,7 @@ func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResp return mmr } -func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder { +func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader { metadataRequest := reqBody.(*MetadataRequest) metadataResponse := &MetadataResponse{ Version: metadataRequest.version(), @@ -233,7 +233,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of return mor } -func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder { +func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader { offsetRequest := reqBody.(*OffsetRequest) offsetResponse := &OffsetResponse{Version: mor.version} for topic, partitions := range offsetRequest.blocks { @@ -309,7 +309,7 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of return mfr } -func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder { +func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader { fetchRequest := reqBody.(*FetchRequest) res := &FetchResponse{ Version: mfr.version, @@ -393,7 +393,7 @@ func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *M return mr } -func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*ConsumerMetadataRequest) group := req.ConsumerGroup res := &ConsumerMetadataResponse{} @@ -442,7 +442,7 @@ func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, return mr } -func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*FindCoordinatorRequest) res := &FindCoordinatorResponse{} var v interface{} @@ -489,7 +489,7 @@ func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int3 return mr } -func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*OffsetCommitRequest) group := req.ConsumerGroup res := &OffsetCommitResponse{} @@ -546,7 +546,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE return mr } -func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*ProduceRequest) res := &ProduceResponse{ Version: mr.version, @@ -605,7 +605,7 @@ func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchRespo return mr } -func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*OffsetFetchRequest) group := req.ConsumerGroup res := &OffsetFetchResponse{Version: req.Version} @@ -630,7 +630,7 @@ func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse { return &MockCreateTopicsResponse{t: t} } -func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*CreateTopicsRequest) res := &CreateTopicsResponse{ Version: req.Version, @@ -659,7 +659,7 @@ func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse { return &MockDeleteTopicsResponse{t: t} } -func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DeleteTopicsRequest) res := &DeleteTopicsResponse{} res.TopicErrorCodes = make(map[string]KError) @@ -679,7 +679,7 @@ func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsRespon return &MockCreatePartitionsResponse{t: t} } -func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*CreatePartitionsRequest) res := &CreatePartitionsResponse{} res.TopicPartitionErrors = make(map[string]*TopicPartitionError) @@ -698,6 +698,43 @@ func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder { return res } +type MockAlterPartitionReassignmentsResponse struct { + t TestReporter +} + +func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse { + return &MockAlterPartitionReassignmentsResponse{t: t} +} + +func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*AlterPartitionReassignmentsRequest) + _ = req + res := &AlterPartitionReassignmentsResponse{} + return res +} + +type MockListPartitionReassignmentsResponse struct { + t TestReporter +} + +func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse { + return &MockListPartitionReassignmentsResponse{t: t} +} + +func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*ListPartitionReassignmentsRequest) + _ = req + res := &ListPartitionReassignmentsResponse{} + + for topic, partitions := range req.blocks { + for _, partition := range partitions { + res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2}) + } + } + + return res +} + type MockDeleteRecordsResponse struct { t TestReporter } @@ -706,7 +743,7 @@ func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse { return &MockDeleteRecordsResponse{t: t} } -func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DeleteRecordsRequest) res := &DeleteRecordsResponse{} res.Topics = make(map[string]*DeleteRecordsResponseTopic) @@ -729,7 +766,7 @@ func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse return &MockDescribeConfigsResponse{t: t} } -func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DescribeConfigsRequest) res := &DescribeConfigsResponse{ Version: req.Version, @@ -824,7 +861,7 @@ func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse { return &MockAlterConfigsResponse{t: t} } -func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*AlterConfigsRequest) res := &AlterConfigsResponse{} @@ -845,7 +882,7 @@ func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse { return &MockCreateAclsResponse{t: t} } -func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*CreateAclsRequest) res := &CreateAclsResponse{} @@ -863,7 +900,7 @@ func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse { return &MockListAclsResponse{t: t} } -func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DescribeAclsRequest) res := &DescribeAclsResponse{} res.Err = ErrNoError @@ -905,7 +942,7 @@ func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateRespon return &MockSaslAuthenticateResponse{t: t} } -func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder { +func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader { res := &SaslAuthenticateResponse{} res.Err = msar.kerror res.SaslAuthBytes = msar.saslAuthBytes @@ -936,7 +973,7 @@ func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse { return &MockSaslHandshakeResponse{t: t} } -func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder { +func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader { res := &SaslHandshakeResponse{} res.Err = mshr.kerror res.EnabledMechanisms = mshr.enabledMechanisms @@ -957,7 +994,7 @@ func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse { return &MockDeleteAclsResponse{t: t} } -func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder { +func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader { req := reqBody.(*DeleteAclsRequest) res := &DeleteAclsResponse{} @@ -983,7 +1020,7 @@ func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDelete return m } -func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder { +func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader { resp := &DeleteGroupsResponse{ GroupErrorCodes: map[string]KError{}, } diff --git a/offset_commit_request.go b/offset_commit_request.go index 5732ed95c..9931cade5 100644 --- a/offset_commit_request.go +++ b/offset_commit_request.go @@ -170,6 +170,10 @@ func (r *OffsetCommitRequest) version() int16 { return r.Version } +func (r *OffsetCommitRequest) headerVersion() int16 { + return 1 +} + func (r *OffsetCommitRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_commit_response.go b/offset_commit_response.go index e842298db..342260ef5 100644 --- a/offset_commit_response.go +++ b/offset_commit_response.go @@ -94,6 +94,10 @@ func (r *OffsetCommitResponse) version() int16 { return r.Version } +func (r *OffsetCommitResponse) headerVersion() int16 { + return 0 +} + func (r *OffsetCommitResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_fetch_request.go b/offset_fetch_request.go index 68608241f..51e9faa3f 100644 --- a/offset_fetch_request.go +++ b/offset_fetch_request.go @@ -68,6 +68,10 @@ func (r *OffsetFetchRequest) version() int16 { return r.Version } +func (r *OffsetFetchRequest) headerVersion() int16 { + return 1 +} + func (r *OffsetFetchRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_fetch_response.go b/offset_fetch_response.go index 9e2570280..9c64e0708 100644 --- a/offset_fetch_response.go +++ b/offset_fetch_response.go @@ -155,6 +155,10 @@ func (r *OffsetFetchResponse) version() int16 { return r.Version } +func (r *OffsetFetchResponse) headerVersion() int16 { + return 0 +} + func (r *OffsetFetchResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_manager_test.go b/offset_manager_test.go index eca926d52..f1baa9cdb 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -134,7 +134,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { ocResponse := new(OffsetCommitResponse) ocResponse.AddError("my_topic", 0, ErrNoError) - handler := func(req *request) (res encoder) { + handler := func(req *request) (res encoderWithHeader) { close(called) return ocResponse } @@ -329,7 +329,7 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { ocResponse := new(OffsetCommitResponse) ocResponse.AddError("my_topic", 0, ErrNoError) - handler := func(req *request) (res encoder) { + handler := func(req *request) (res encoderWithHeader) { if req.body.version() != 2 { t.Errorf("Expected to be using version 2. Actual: %v", req.body.version()) } @@ -390,7 +390,7 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { ocResponse := new(OffsetCommitResponse) ocResponse.AddError("my_topic", 0, ErrNoError) - handler := func(req *request) (res encoder) { + handler := func(req *request) (res encoderWithHeader) { if req.body.version() != 2 { t.Errorf("Expected to be using version 2. Actual: %v", req.body.version()) } diff --git a/offset_request.go b/offset_request.go index 58e223762..c0b3305f6 100644 --- a/offset_request.go +++ b/offset_request.go @@ -116,6 +116,10 @@ func (r *OffsetRequest) version() int16 { return r.Version } +func (r *OffsetRequest) headerVersion() int16 { + return 1 +} + func (r *OffsetRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/offset_response.go b/offset_response.go index 8b2193f9a..ead3ebbcc 100644 --- a/offset_response.go +++ b/offset_response.go @@ -150,6 +150,10 @@ func (r *OffsetResponse) version() int16 { return r.Version } +func (r *OffsetResponse) headerVersion() int16 { + return 0 +} + func (r *OffsetResponse) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/packet_decoder.go b/packet_decoder.go index 9be854c07..ed00ba350 100644 --- a/packet_decoder.go +++ b/packet_decoder.go @@ -10,8 +10,11 @@ type packetDecoder interface { getInt32() (int32, error) getInt64() (int64, error) getVarint() (int64, error) + getUVarint() (uint64, error) getArrayLength() (int, error) + getCompactArrayLength() (int, error) getBool() (bool, error) + getEmptyTaggedFieldArray() (int, error) // Collections getBytes() ([]byte, error) @@ -19,6 +22,9 @@ type packetDecoder interface { getRawBytes(length int) ([]byte, error) getString() (string, error) getNullableString() (*string, error) + getCompactString() (string, error) + getCompactNullableString() (*string, error) + getCompactInt32Array() ([]int32, error) getInt32Array() ([]int32, error) getInt64Array() ([]int64, error) getStringArray() ([]string, error) diff --git a/packet_encoder.go b/packet_encoder.go index 67b8daed8..50c735c04 100644 --- a/packet_encoder.go +++ b/packet_encoder.go @@ -12,6 +12,8 @@ type packetEncoder interface { putInt32(in int32) putInt64(in int64) putVarint(in int64) + putUVarint(in uint64) + putCompactArrayLength(in int) putArrayLength(in int) error putBool(in bool) @@ -19,11 +21,16 @@ type packetEncoder interface { putBytes(in []byte) error putVarintBytes(in []byte) error putRawBytes(in []byte) error + putCompactString(in string) error + putNullableCompactString(in *string) error putString(in string) error putNullableString(in *string) error putStringArray(in []string) error + putCompactInt32Array(in []int32) error + putNullableCompactInt32Array(in []int32) error putInt32Array(in []int32) error putInt64Array(in []int64) error + putEmptyTaggedFieldArray() // Provide the current offset to record the batch size metric offset() int diff --git a/prep_encoder.go b/prep_encoder.go index b633cd151..827542c50 100644 --- a/prep_encoder.go +++ b/prep_encoder.go @@ -2,6 +2,7 @@ package sarama import ( "encoding/binary" + "errors" "fmt" "math" @@ -36,6 +37,11 @@ func (pe *prepEncoder) putVarint(in int64) { pe.length += binary.PutVarint(buf[:], in) } +func (pe *prepEncoder) putUVarint(in uint64) { + var buf [binary.MaxVarintLen64]byte + pe.length += binary.PutUvarint(buf[:], in) +} + func (pe *prepEncoder) putArrayLength(in int) error { if in > math.MaxInt32 { return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)} @@ -44,6 +50,10 @@ func (pe *prepEncoder) putArrayLength(in int) error { return nil } +func (pe *prepEncoder) putCompactArrayLength(in int) { + pe.putUVarint(uint64(in + 1)) +} + func (pe *prepEncoder) putBool(in bool) { pe.length++ } @@ -67,6 +77,20 @@ func (pe *prepEncoder) putVarintBytes(in []byte) error { return pe.putRawBytes(in) } +func (pe *prepEncoder) putCompactString(in string) error { + pe.putCompactArrayLength(len(in)) + return pe.putRawBytes([]byte(in)) +} + +func (pe *prepEncoder) putNullableCompactString(in *string) error { + if in == nil { + pe.putUVarint(0) + return nil + } else { + return pe.putCompactString(*in) + } +} + func (pe *prepEncoder) putRawBytes(in []byte) error { if len(in) > math.MaxInt32 { return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))} @@ -107,6 +131,27 @@ func (pe *prepEncoder) putStringArray(in []string) error { return nil } +func (pe *prepEncoder) putCompactInt32Array(in []int32) error { + if in == nil { + return errors.New("expected int32 array to be non null") + } + + pe.putUVarint(uint64(len(in)) + 1) + pe.length += 4 * len(in) + return nil +} + +func (pe *prepEncoder) putNullableCompactInt32Array(in []int32) error { + if in == nil { + pe.putUVarint(0) + return nil + } + + pe.putUVarint(uint64(len(in)) + 1) + pe.length += 4 * len(in) + return nil +} + func (pe *prepEncoder) putInt32Array(in []int32) error { err := pe.putArrayLength(len(in)) if err != nil { @@ -125,6 +170,10 @@ func (pe *prepEncoder) putInt64Array(in []int64) error { return nil } +func (pe *prepEncoder) putEmptyTaggedFieldArray() { + pe.putUVarint(0) +} + func (pe *prepEncoder) offset() int { return pe.length } diff --git a/produce_request.go b/produce_request.go index 178972a0f..0034651e2 100644 --- a/produce_request.go +++ b/produce_request.go @@ -206,6 +206,10 @@ func (r *ProduceRequest) version() int16 { return r.Version } +func (r *ProduceRequest) headerVersion() int16 { + return 1 +} + func (r *ProduceRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: diff --git a/produce_response.go b/produce_response.go index 7fc2fffb3..edf978790 100644 --- a/produce_response.go +++ b/produce_response.go @@ -163,6 +163,22 @@ func (r *ProduceResponse) encode(pe packetEncoder) error { return nil } +func (r *ProduceResponse) key() int16 { + return 0 +} + +func (r *ProduceResponse) version() int16 { + return r.Version +} + +func (r *ProduceResponse) headerVersion() int16 { + return 0 +} + +func (r *ProduceResponse) requiredVersion() KafkaVersion { + return MinVersion +} + func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock { if r.Blocks == nil { return nil diff --git a/real_decoder.go b/real_decoder.go index 6c5a1b9b0..8ac576db2 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -9,7 +9,9 @@ var errInvalidArrayLength = PacketDecodingError{"invalid array length"} var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} var errInvalidStringLength = PacketDecodingError{"invalid string length"} var errVarintOverflow = PacketDecodingError{"varint overflow"} +var errUVarintOverflow = PacketDecodingError{"uvarint overflow"} var errInvalidBool = PacketDecodingError{"invalid bool"} +var errUnsupportedTaggedFields = PacketDecodingError{"non-empty tagged fields are not supported yet"} type realDecoder struct { raw []byte @@ -73,6 +75,22 @@ func (rd *realDecoder) getVarint() (int64, error) { return tmp, nil } +func (rd *realDecoder) getUVarint() (uint64, error) { + tmp, n := binary.Uvarint(rd.raw[rd.off:]) + if n == 0 { + rd.off = len(rd.raw) + return 0, ErrInsufficientData + } + + if n < 0 { + rd.off -= n + return 0, errUVarintOverflow + } + + rd.off += n + return tmp, nil +} + func (rd *realDecoder) getArrayLength() (int, error) { if rd.remaining() < 4 { rd.off = len(rd.raw) @@ -89,6 +107,19 @@ func (rd *realDecoder) getArrayLength() (int, error) { return tmp, nil } +func (rd *realDecoder) getCompactArrayLength() (int, error) { + n, err := rd.getUVarint() + if err != nil { + return 0, err + } + + if n == 0 { + return 0, nil + } + + return int(n) - 1, nil +} + func (rd *realDecoder) getBool() (bool, error) { b, err := rd.getInt8() if err != nil || b == 0 { @@ -100,6 +131,19 @@ func (rd *realDecoder) getBool() (bool, error) { return true, nil } +func (rd *realDecoder) getEmptyTaggedFieldArray() (int, error) { + tagCount, err := rd.getUVarint() + if err != nil { + return 0, err + } + + if tagCount != 0 { + return 0, errUnsupportedTaggedFields + } + + return 0, nil +} + // collections func (rd *realDecoder) getBytes() ([]byte, error) { @@ -167,6 +211,58 @@ func (rd *realDecoder) getNullableString() (*string, error) { return &tmpStr, err } +func (rd *realDecoder) getCompactString() (string, error) { + n, err := rd.getUVarint() + if err != nil { + return "", err + } + + var length = int(n - 1) + + tmpStr := string(rd.raw[rd.off : rd.off+length]) + rd.off += length + return tmpStr, nil +} + +func (rd *realDecoder) getCompactNullableString() (*string, error) { + n, err := rd.getUVarint() + + if err != nil { + return nil, err + } + + var length = int(n - 1) + + if length < 0 { + return nil, err + } + + tmpStr := string(rd.raw[rd.off : rd.off+length]) + rd.off += length + return &tmpStr, err +} + +func (rd *realDecoder) getCompactInt32Array() ([]int32, error) { + n, err := rd.getUVarint() + if err != nil { + return nil, err + } + + if n == 0 { + return nil, nil + } + + arrayLength := int(n) - 1 + + ret := make([]int32, arrayLength) + + for i := range ret { + ret[i] = int32(binary.BigEndian.Uint32(rd.raw[rd.off:])) + rd.off += 4 + } + return ret, nil +} + func (rd *realDecoder) getInt32Array() ([]int32, error) { if rd.remaining() < 4 { rd.off = len(rd.raw) diff --git a/real_encoder.go b/real_encoder.go index 3c75387f7..ba073f7d3 100644 --- a/real_encoder.go +++ b/real_encoder.go @@ -2,6 +2,7 @@ package sarama import ( "encoding/binary" + "errors" "github.com/rcrowley/go-metrics" ) @@ -39,11 +40,20 @@ func (re *realEncoder) putVarint(in int64) { re.off += binary.PutVarint(re.raw[re.off:], in) } +func (re *realEncoder) putUVarint(in uint64) { + re.off += binary.PutUvarint(re.raw[re.off:], in) +} + func (re *realEncoder) putArrayLength(in int) error { re.putInt32(int32(in)) return nil } +func (re *realEncoder) putCompactArrayLength(in int) { + // 0 represents a null array, so +1 has to be added + re.putUVarint(uint64(in + 1)) +} + func (re *realEncoder) putBool(in bool) { if in { re.putInt8(1) @@ -78,6 +88,19 @@ func (re *realEncoder) putVarintBytes(in []byte) error { return re.putRawBytes(in) } +func (re *realEncoder) putCompactString(in string) error { + re.putCompactArrayLength(len(in)) + return re.putRawBytes([]byte(in)) +} + +func (re *realEncoder) putNullableCompactString(in *string) error { + if in == nil { + re.putInt8(0) + return nil + } + return re.putCompactString(*in) +} + func (re *realEncoder) putString(in string) error { re.putInt16(int16(len(in))) copy(re.raw[re.off:], in) @@ -108,6 +131,31 @@ func (re *realEncoder) putStringArray(in []string) error { return nil } +func (re *realEncoder) putCompactInt32Array(in []int32) error { + if in == nil { + return errors.New("expected int32 array to be non null") + } + // 0 represents a null array, so +1 has to be added + re.putUVarint(uint64(len(in)) + 1) + for _, val := range in { + re.putInt32(val) + } + return nil +} + +func (re *realEncoder) putNullableCompactInt32Array(in []int32) error { + if in == nil { + re.putUVarint(0) + return nil + } + // 0 represents a null array, so +1 has to be added + re.putUVarint(uint64(len(in)) + 1) + for _, val := range in { + re.putInt32(val) + } + return nil +} + func (re *realEncoder) putInt32Array(in []int32) error { err := re.putArrayLength(len(in)) if err != nil { @@ -130,6 +178,10 @@ func (re *realEncoder) putInt64Array(in []int64) error { return nil } +func (re *realEncoder) putEmptyTaggedFieldArray() { + re.putUVarint(0) +} + func (re *realEncoder) offset() int { return re.off } diff --git a/request.go b/request.go index 6e4ad8731..dcfd3946c 100644 --- a/request.go +++ b/request.go @@ -11,6 +11,7 @@ type protocolBody interface { versionedDecoder key() int16 version() int16 + headerVersion() int16 requiredVersion() KafkaVersion } @@ -26,12 +27,19 @@ func (r *request) encode(pe packetEncoder) error { pe.putInt16(r.body.version()) pe.putInt32(r.correlationID) - err := pe.putString(r.clientID) - if err != nil { - return err + if r.body.headerVersion() >= 1 { + err := pe.putString(r.clientID) + if err != nil { + return err + } + } + + if r.body.headerVersion() >= 2 { + // we don't use tag headers at the moment so we just put an array length of 0 + pe.putUVarint(0) } - err = r.body.encode(pe) + err := r.body.encode(pe) if err != nil { return err } @@ -65,6 +73,14 @@ func (r *request) decode(pd packetDecoder) (err error) { return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)} } + if r.body.headerVersion() >= 2 { + // tagged field + _, err = pd.getUVarint() + if err != nil { + return err + } + } + return r.body.decode(pd, version) } @@ -166,6 +182,10 @@ func allocateBody(key, version int16) protocolBody { return &CreatePartitionsRequest{} case 42: return &DeleteGroupsRequest{} + case 45: + return &AlterPartitionReassignmentsRequest{} + case 46: + return &ListPartitionReassignmentsRequest{} } return nil } diff --git a/request_test.go b/request_test.go index fec190795..95cd6bb32 100644 --- a/request_test.go +++ b/request_test.go @@ -42,13 +42,32 @@ func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) { testRequestDecode(t, name, rb, packet) } +func testRequestWithoutByteComparison(t *testing.T, name string, rb protocolBody) { + if !rb.requiredVersion().IsAtLeast(MinVersion) { + t.Errorf("Request %s has invalid required version", name) + } + packet := testRequestEncode(t, name, rb, nil) + testRequestDecode(t, name, rb, packet) +} + func testRequestEncode(t *testing.T, name string, rb protocolBody, expected []byte) []byte { req := &request{correlationID: 123, clientID: "foo", body: rb} packet, err := encode(req, nil) - headerSize := 14 + len("foo") + + headerSize := 0 + + switch rb.headerVersion() { + case 1: + headerSize = 14 + len("foo") + case 2: + headerSize = 14 + len("foo") + 1 + default: + t.Error("Encoding", name, "failed\nheaderVersion", rb.headerVersion(), "not implemented") + } + if err != nil { t.Error(err) - } else if !bytes.Equal(packet[headerSize:], expected) { + } else if expected != nil && !bytes.Equal(packet[headerSize:], expected) { t.Error("Encoding", name, "failed\ngot ", packet[headerSize:], "\nwant", expected) } return packet diff --git a/response_header.go b/response_header.go index 7a7591851..5dffb75be 100644 --- a/response_header.go +++ b/response_header.go @@ -10,7 +10,7 @@ type responseHeader struct { correlationID int32 } -func (r *responseHeader) decode(pd packetDecoder) (err error) { +func (r *responseHeader) decode(pd packetDecoder, version int16) (err error) { r.length, err = pd.getInt32() if err != nil { return err @@ -20,5 +20,12 @@ func (r *responseHeader) decode(pd packetDecoder) (err error) { } r.correlationID, err = pd.getInt32() + + if version >= 1 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + return err } diff --git a/response_header_test.go b/response_header_test.go index 8f9fdb80c..31c35ae6a 100644 --- a/response_header_test.go +++ b/response_header_test.go @@ -3,15 +3,31 @@ package sarama import "testing" var ( - responseHeaderBytes = []byte{ + responseHeaderBytesV0 = []byte{ 0x00, 0x00, 0x0f, 0x00, 0x0a, 0xbb, 0xcc, 0xff} + + responseHeaderBytesV1 = []byte{ + 0x00, 0x00, 0x0f, 0x00, + 0x0a, 0xbb, 0xcc, 0xff, 0x00} ) -func TestResponseHeader(t *testing.T) { +func TestResponseHeaderV0(t *testing.T) { + header := responseHeader{} + + testVersionDecodable(t, "response header", &header, responseHeaderBytesV0, 0) + if header.length != 0xf00 { + t.Error("Decoding header length failed, got", header.length) + } + if header.correlationID != 0x0abbccff { + t.Error("Decoding header correlation id failed, got", header.correlationID) + } +} + +func TestResponseHeaderV1(t *testing.T) { header := responseHeader{} - testDecodable(t, "response header", &header, responseHeaderBytes) + testVersionDecodable(t, "response header", &header, responseHeaderBytesV1, 1) if header.length != 0xf00 { t.Error("Decoding header length failed, got", header.length) } diff --git a/sasl_authenticate_request.go b/sasl_authenticate_request.go index 54c8b0992..90504df6f 100644 --- a/sasl_authenticate_request.go +++ b/sasl_authenticate_request.go @@ -24,6 +24,10 @@ func (r *SaslAuthenticateRequest) version() int16 { return 0 } +func (r *SaslAuthenticateRequest) headerVersion() int16 { + return 1 +} + func (r *SaslAuthenticateRequest) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/sasl_authenticate_response.go b/sasl_authenticate_response.go index 0038c3f36..3ef57b5af 100644 --- a/sasl_authenticate_response.go +++ b/sasl_authenticate_response.go @@ -39,6 +39,10 @@ func (r *SaslAuthenticateResponse) version() int16 { return 0 } +func (r *SaslAuthenticateResponse) headerVersion() int16 { + return 0 +} + func (r *SaslAuthenticateResponse) requiredVersion() KafkaVersion { return V1_0_0_0 } diff --git a/sasl_handshake_request.go b/sasl_handshake_request.go index fe5ba0504..74dc3072f 100644 --- a/sasl_handshake_request.go +++ b/sasl_handshake_request.go @@ -29,6 +29,10 @@ func (r *SaslHandshakeRequest) version() int16 { return r.Version } +func (r *SaslHandshakeRequest) headerVersion() int16 { + return 1 +} + func (r *SaslHandshakeRequest) requiredVersion() KafkaVersion { return V0_10_0_0 } diff --git a/sasl_handshake_response.go b/sasl_handshake_response.go index ef290d4bc..69dfc3178 100644 --- a/sasl_handshake_response.go +++ b/sasl_handshake_response.go @@ -33,6 +33,10 @@ func (r *SaslHandshakeResponse) version() int16 { return 0 } +func (r *SaslHandshakeResponse) headerVersion() int16 { + return 0 +} + func (r *SaslHandshakeResponse) requiredVersion() KafkaVersion { return V0_10_0_0 } diff --git a/sync_group_request.go b/sync_group_request.go index fe207080e..ac6ecb13e 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -77,6 +77,10 @@ func (r *SyncGroupRequest) version() int16 { return 0 } +func (r *SyncGroupRequest) headerVersion() int16 { + return 1 +} + func (r *SyncGroupRequest) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/sync_group_response.go b/sync_group_response.go index 194b382b4..af019c42f 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -36,6 +36,10 @@ func (r *SyncGroupResponse) version() int16 { return 0 } +func (r *SyncGroupResponse) headerVersion() int16 { + return 0 +} + func (r *SyncGroupResponse) requiredVersion() KafkaVersion { return V0_9_0_0 } diff --git a/txn_offset_commit_request.go b/txn_offset_commit_request.go index 71e95b814..c4043a335 100644 --- a/txn_offset_commit_request.go +++ b/txn_offset_commit_request.go @@ -91,6 +91,10 @@ func (a *TxnOffsetCommitRequest) version() int16 { return 0 } +func (a *TxnOffsetCommitRequest) headerVersion() int16 { + return 1 +} + func (a *TxnOffsetCommitRequest) requiredVersion() KafkaVersion { return V0_11_0_0 } diff --git a/txn_offset_commit_response.go b/txn_offset_commit_response.go index 6c980f406..94d8029da 100644 --- a/txn_offset_commit_response.go +++ b/txn_offset_commit_response.go @@ -78,6 +78,10 @@ func (a *TxnOffsetCommitResponse) version() int16 { return 0 } +func (a *TxnOffsetCommitResponse) headerVersion() int16 { + return 0 +} + func (a *TxnOffsetCommitResponse) requiredVersion() KafkaVersion { return V0_11_0_0 }