Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(admin): implement leader election api #3030

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type ClusterAdmin interface {
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)

// ElectLeaders allows to trigger the election of preferred leaders for a set of partitions.
ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error)

// List the consumer groups available in the cluster.
ListConsumerGroups() (map[string]string, error)

Expand Down Expand Up @@ -907,6 +910,39 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi
return mAcls, nil
}

func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) {
request := &ElectLeadersRequest{
Type: electionType,
TopicPartitions: partitions,
TimeoutMs: int32(60000),
}

if ca.conf.Version.IsAtLeast(V2_4_0_0) {
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
}

var res *ElectLeadersResponse
err := ca.retryOnError(isErrNotController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}
_ = b.Open(ca.client.Config())

res, err = b.ElectLeaders(request)
if isErrNotController(err) {
_, _ = ca.refreshController()
}
return err
})
if err != nil {
return nil, err
}
return res.ReplicaElectionResults, nil
}

func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
groupsPerBroker := make(map[*Broker][]string)

Expand Down
39 changes: 39 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,45 @@ func TestClusterAdminDeleteAcl(t *testing.T) {
}
}

func TestElectLeaders(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

broker.SetHandlerByMap(map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"MetadataRequest": NewMockMetadataResponse(t).
SetController(broker.BrokerID()).
SetBroker(broker.Addr(), broker.BrokerID()),
"ElectLeadersRequest": NewMockElectLeadersResponse(t),
})

config := NewTestConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

response, err := admin.ElectLeaders(PreferredElection, map[string][]int32{"my_topic": {0, 1}})
if err != nil {
t.Fatal(err)
}

partitionResult, ok := response["my_topic"]
if !ok {
t.Fatalf("topic missing in response")
}

if len(partitionResult) != 1 {
t.Fatalf("partition missing in response")
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestDescribeTopic(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
12 changes: 12 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,18 @@ func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsR
return response, nil
}

// ElectLeaders sends aa elect leaders request and returns list partitions elect result
func (b *Broker) ElectLeaders(request *ElectLeadersRequest) (*ElectLeadersResponse, error) {
response := new(ElectLeadersResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

// DeleteRecords send a request to delete records and return delete record
// response or error
func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
Expand Down
134 changes: 134 additions & 0 deletions elect_leaders_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package sarama

type ElectLeadersRequest struct {
Version int16
Type ElectionType
TopicPartitions map[string][]int32
TimeoutMs int32
}

func (r *ElectLeadersRequest) encode(pe packetEncoder) error {
if r.Version > 0 {
pe.putInt8(int8(r.Type))
}

pe.putCompactArrayLength(len(r.TopicPartitions))

for topic, partitions := range r.TopicPartitions {
if r.Version < 2 {
if err := pe.putString(topic); err != nil {
return err
}
} else {
if err := pe.putCompactString(topic); err != nil {
return err
}
}

if err := pe.putCompactInt32Array(partitions); err != nil {
return err
}

if r.Version >= 2 {
pe.putEmptyTaggedFieldArray()
}
}

pe.putInt32(r.TimeoutMs)

if r.Version >= 2 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (r *ElectLeadersRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.Version > 0 {
t, err := pd.getInt8()
if err != nil {
return err
}
r.Type = ElectionType(t)
}

topicCount, err := pd.getCompactArrayLength()
if err != nil {
return err
}
if topicCount > 0 {
r.TopicPartitions = make(map[string][]int32)
for i := 0; i < topicCount; i++ {
var topic string
if r.Version < 2 {
topic, err = pd.getString()
} else {
topic, err = pd.getCompactString()
}
if err != nil {
return err
}
partitionCount, err := pd.getCompactArrayLength()
if err != nil {
return err
}
partitions := make([]int32, partitionCount)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
partitions[j] = partition
}
r.TopicPartitions[topic] = partitions
if r.Version >= 2 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}
}

r.TimeoutMs, err = pd.getInt32()
if err != nil {
return err
}

if r.Version >= 2 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

return nil
}

func (r *ElectLeadersRequest) key() int16 {
return 43
}

func (r *ElectLeadersRequest) version() int16 {
return r.Version
}

func (r *ElectLeadersRequest) headerVersion() int16 {
return 2
}

func (r *ElectLeadersRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 2
}

func (r *ElectLeadersRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 2:
return V2_4_0_0
case 1:
return V0_11_0_0
case 0:
return V0_10_0_0
default:
return V2_4_0_0
}
}
26 changes: 26 additions & 0 deletions elect_leaders_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sarama

import "testing"

var electLeadersRequestOneTopic = []byte{
0, // preferred election type
2, // 2-1=1 topic
6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
2, // 2-1=1 partition
0, 0, 0, 0, // partition 0
0, 0, // empty tagged fields
0, 39, 16, 0, // timeout 10000
}

func TestElectLeadersRequest(t *testing.T) {
var request = &ElectLeadersRequest{
TimeoutMs: int32(10000),
Version: int16(2),
TopicPartitions: map[string][]int32{
"topic": {0},
},
Type: PreferredElection,
}

testRequest(t, "one topic", request, electLeadersRequestOneTopic)
}
Loading
Loading