diff --git a/admin.go b/admin.go index dcf1d7659..6549c7e6f 100644 --- a/admin.go +++ b/admin.go @@ -99,6 +99,9 @@ type ClusterAdmin interface { // This operation is supported by brokers with version 0.11.0.0 or higher. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) + // ElectLeaders allows to trigger the election of preferred leaders for a set of partitions. + ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error) + // List the consumer groups available in the cluster. ListConsumerGroups() (map[string]string, error) @@ -907,6 +910,39 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi return mAcls, nil } +func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) { + request := &ElectLeadersRequest{ + Type: electionType, + TopicPartitions: partitions, + TimeoutMs: int32(60000), + } + + if ca.conf.Version.IsAtLeast(V2_4_0_0) { + request.Version = 2 + } else if ca.conf.Version.IsAtLeast(V0_11_0_0) { + request.Version = 1 + } + + var res *ElectLeadersResponse + err := ca.retryOnError(isErrNotController, func() error { + b, err := ca.Controller() + if err != nil { + return err + } + _ = b.Open(ca.client.Config()) + + res, err = b.ElectLeaders(request) + if isErrNotController(err) { + _, _ = ca.refreshController() + } + return err + }) + if err != nil { + return nil, err + } + return res.ReplicaElectionResults, nil +} + func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) { groupsPerBroker := make(map[*Broker][]string) diff --git a/admin_test.go b/admin_test.go index 2b70aa9bb..5b42a5a6a 100644 --- a/admin_test.go +++ b/admin_test.go @@ -1325,6 +1325,45 @@ func TestClusterAdminDeleteAcl(t *testing.T) { } } +func TestElectLeaders(t *testing.T) { + broker := NewMockBroker(t, 1) + defer broker.Close() + + broker.SetHandlerByMap(map[string]MockResponse{ + "ApiVersionsRequest": NewMockApiVersionsResponse(t), + "MetadataRequest": NewMockMetadataResponse(t). + SetController(broker.BrokerID()). + SetBroker(broker.Addr(), broker.BrokerID()), + "ElectLeadersRequest": NewMockElectLeadersResponse(t), + }) + + config := NewTestConfig() + config.Version = V2_4_0_0 + admin, err := NewClusterAdmin([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + response, err := admin.ElectLeaders(PreferredElection, map[string][]int32{"my_topic": {0, 1}}) + if err != nil { + t.Fatal(err) + } + + partitionResult, ok := response["my_topic"] + if !ok { + t.Fatalf("topic missing in response") + } + + if len(partitionResult) != 1 { + t.Fatalf("partition missing in response") + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestDescribeTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() diff --git a/broker.go b/broker.go index d5f7c4ab9..c4f1005f5 100644 --- a/broker.go +++ b/broker.go @@ -689,6 +689,18 @@ func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsR return response, nil } +// ElectLeaders sends aa elect leaders request and returns list partitions elect result +func (b *Broker) ElectLeaders(request *ElectLeadersRequest) (*ElectLeadersResponse, error) { + response := new(ElectLeadersResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + // DeleteRecords send a request to delete records and return delete record // response or error func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) { diff --git a/elect_leaders_request.go b/elect_leaders_request.go new file mode 100644 index 000000000..cd8d6a7f0 --- /dev/null +++ b/elect_leaders_request.go @@ -0,0 +1,134 @@ +package sarama + +type ElectLeadersRequest struct { + Version int16 + Type ElectionType + TopicPartitions map[string][]int32 + TimeoutMs int32 +} + +func (r *ElectLeadersRequest) encode(pe packetEncoder) error { + if r.Version > 0 { + pe.putInt8(int8(r.Type)) + } + + pe.putCompactArrayLength(len(r.TopicPartitions)) + + for topic, partitions := range r.TopicPartitions { + if r.Version < 2 { + if err := pe.putString(topic); err != nil { + return err + } + } else { + if err := pe.putCompactString(topic); err != nil { + return err + } + } + + if err := pe.putCompactInt32Array(partitions); err != nil { + return err + } + + if r.Version >= 2 { + pe.putEmptyTaggedFieldArray() + } + } + + pe.putInt32(r.TimeoutMs) + + if r.Version >= 2 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (r *ElectLeadersRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.Version > 0 { + t, err := pd.getInt8() + if err != nil { + return err + } + r.Type = ElectionType(t) + } + + topicCount, err := pd.getCompactArrayLength() + if err != nil { + return err + } + if topicCount > 0 { + r.TopicPartitions = make(map[string][]int32) + for i := 0; i < topicCount; i++ { + var topic string + if r.Version < 2 { + topic, err = pd.getString() + } else { + topic, err = pd.getCompactString() + } + if err != nil { + return err + } + partitionCount, err := pd.getCompactArrayLength() + if err != nil { + return err + } + partitions := make([]int32, partitionCount) + for j := 0; j < partitionCount; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + partitions[j] = partition + } + r.TopicPartitions[topic] = partitions + if r.Version >= 2 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + } + + r.TimeoutMs, err = pd.getInt32() + if err != nil { + return err + } + + if r.Version >= 2 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + + return nil +} + +func (r *ElectLeadersRequest) key() int16 { + return 43 +} + +func (r *ElectLeadersRequest) version() int16 { + return r.Version +} + +func (r *ElectLeadersRequest) headerVersion() int16 { + return 2 +} + +func (r *ElectLeadersRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + +func (r *ElectLeadersRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 2: + return V2_4_0_0 + case 1: + return V0_11_0_0 + case 0: + return V0_10_0_0 + default: + return V2_4_0_0 + } +} diff --git a/elect_leaders_request_test.go b/elect_leaders_request_test.go new file mode 100644 index 000000000..0133c0cad --- /dev/null +++ b/elect_leaders_request_test.go @@ -0,0 +1,26 @@ +package sarama + +import "testing" + +var electLeadersRequestOneTopic = []byte{ + 0, // preferred election type + 2, // 2-1=1 topic + 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string + 2, // 2-1=1 partition + 0, 0, 0, 0, // partition 0 + 0, 0, // empty tagged fields + 0, 39, 16, 0, // timeout 10000 +} + +func TestElectLeadersRequest(t *testing.T) { + var request = &ElectLeadersRequest{ + TimeoutMs: int32(10000), + Version: int16(2), + TopicPartitions: map[string][]int32{ + "topic": {0}, + }, + Type: PreferredElection, + } + + testRequest(t, "one topic", request, electLeadersRequestOneTopic) +} diff --git a/elect_leaders_response.go b/elect_leaders_response.go new file mode 100644 index 000000000..8c85249ac --- /dev/null +++ b/elect_leaders_response.go @@ -0,0 +1,173 @@ +package sarama + +import "time" + +type PartitionResult struct { + ErrorCode KError + ErrorMessage *string +} + +func (b *PartitionResult) encode(pe packetEncoder, version int16) error { + pe.putInt16(int16(b.ErrorCode)) + if version < 2 { + if err := pe.putNullableString(b.ErrorMessage); err != nil { + return err + } + } else { + if err := pe.putNullableCompactString(b.ErrorMessage); err != nil { + return err + } + } + if version >= 2 { + pe.putEmptyTaggedFieldArray() + } + return nil +} + +func (b *PartitionResult) decode(pd packetDecoder, version int16) (err error) { + kerr, err := pd.getInt16() + if err != nil { + return err + } + b.ErrorCode = KError(kerr) + if version < 2 { + b.ErrorMessage, err = pd.getNullableString() + } else { + b.ErrorMessage, err = pd.getCompactNullableString() + } + if version >= 2 { + _, err = pd.getEmptyTaggedFieldArray() + } + return err +} + +type ElectLeadersResponse struct { + Version int16 + ThrottleTimeMs int32 + ErrorCode KError + ReplicaElectionResults map[string]map[int32]*PartitionResult +} + +func (r *ElectLeadersResponse) encode(pe packetEncoder) error { + pe.putInt32(r.ThrottleTimeMs) + + if r.Version > 0 { + pe.putInt16(int16(r.ErrorCode)) + } + + pe.putCompactArrayLength(len(r.ReplicaElectionResults)) + for topic, partitions := range r.ReplicaElectionResults { + if r.Version < 2 { + if err := pe.putString(topic); err != nil { + return err + } + } else { + if err := pe.putCompactString(topic); err != nil { + return err + } + } + pe.putCompactArrayLength(len(partitions)) + for partition, result := range partitions { + pe.putInt32(partition) + if err := result.encode(pe, r.Version); err != nil { + return err + } + } + pe.putEmptyTaggedFieldArray() + } + + pe.putEmptyTaggedFieldArray() + + return nil +} + +func (r *ElectLeadersResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { + return err + } + if r.Version > 0 { + kerr, err := pd.getInt16() + if err != nil { + return err + } + r.ErrorCode = KError(kerr) + } + + numTopics, err := pd.getCompactArrayLength() + if err != nil { + return err + } + + r.ReplicaElectionResults = make(map[string]map[int32]*PartitionResult, numTopics) + for i := 0; i < numTopics; i++ { + var topic string + if r.Version < 2 { + topic, err = pd.getString() + } else { + topic, err = pd.getCompactString() + } + if err != nil { + return err + } + + numPartitions, err := pd.getCompactArrayLength() + if err != nil { + return err + } + r.ReplicaElectionResults[topic] = make(map[int32]*PartitionResult, numPartitions) + for j := 0; j < numPartitions; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + result := new(PartitionResult) + if err := result.decode(pd, r.Version); err != nil { + return err + } + r.ReplicaElectionResults[topic][partition] = result + } + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + + return nil +} + +func (r *ElectLeadersResponse) key() int16 { + return 43 +} + +func (r *ElectLeadersResponse) version() int16 { + return r.Version +} + +func (r *ElectLeadersResponse) headerVersion() int16 { + return 1 +} + +func (r *ElectLeadersResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + +func (r *ElectLeadersResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 2: + return V2_4_0_0 + case 1: + return V0_11_0_0 + case 0: + return V0_10_0_0 + default: + return V2_4_0_0 + } +} + +func (r *ElectLeadersResponse) throttleTime() time.Duration { + return time.Duration(r.ThrottleTimeMs) * time.Millisecond +} diff --git a/elect_leaders_response_test.go b/elect_leaders_response_test.go new file mode 100644 index 000000000..8768dfbb8 --- /dev/null +++ b/elect_leaders_response_test.go @@ -0,0 +1,29 @@ +package sarama + +import "testing" + +var electLeadersResponseOneTopic = []byte{ + 0, 0, 3, 232, // ThrottleTimeMs 1000 + 0, 0, // errorCode + 2, // number of topics + 6, 116, 111, 112, 105, 99, // topic name "topic" + 2, // number of partitions + 0, 0, 0, 0, // partition 0 + 0, 0, // empty tagged fields + 0, 0, // empty tagged fields + 0, 0, // empty tagged fields +} + +func TestElectLeadersResponse(t *testing.T) { + var response = &ElectLeadersResponse{ + Version: int16(2), + ThrottleTimeMs: int32(1000), + ReplicaElectionResults: map[string]map[int32]*PartitionResult{ + "topic": { + 0: {}, + }, + }, + } + + testResponse(t, "one topic", response, electLeadersResponseOneTopic) +} diff --git a/election_type.go b/election_type.go new file mode 100644 index 000000000..01f3b65b3 --- /dev/null +++ b/election_type.go @@ -0,0 +1,10 @@ +package sarama + +type ElectionType int8 + +const ( + // PreferredElection constant type + PreferredElection ElectionType = 0 + // UncleanElection constant type + UncleanElection ElectionType = 1 +) diff --git a/mockresponses.go b/mockresponses.go index d09415b49..2c352797f 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -778,6 +778,28 @@ func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) return res } +type MockElectLeadersResponse struct { + t TestReporter +} + +func NewMockElectLeadersResponse(t TestReporter) *MockElectLeadersResponse { + return &MockElectLeadersResponse{t: t} +} + +func (mr *MockElectLeadersResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*ElectLeadersRequest) + res := &ElectLeadersResponse{Version: req.version(), ReplicaElectionResults: map[string]map[int32]*PartitionResult{}} + + for topic, partitions := range req.TopicPartitions { + for _, partition := range partitions { + res.ReplicaElectionResults[topic] = map[int32]*PartitionResult{ + partition: {ErrorCode: ErrNoError}, + } + } + } + return res +} + type MockDeleteRecordsResponse struct { t TestReporter } diff --git a/request.go b/request.go index e8e74ca34..8f0c2b579 100644 --- a/request.go +++ b/request.go @@ -194,7 +194,8 @@ func allocateBody(key, version int16) protocolBody { // 41: DescribeDelegationTokenRequest case 42: return &DeleteGroupsRequest{Version: version} - // 43: ElectLeadersRequest + case 43: + return &ElectLeadersRequest{Version: version} case 44: return &IncrementalAlterConfigsRequest{Version: version} case 45: