Skip to content

Commit

Permalink
feat(admin): implement leader election api
Browse files Browse the repository at this point in the history
Signed-off-by: joey <[email protected]>
  • Loading branch information
chengjoey authored and joeyczheng committed Dec 11, 2024
1 parent 4178837 commit b7da05f
Show file tree
Hide file tree
Showing 10 changed files with 483 additions and 1 deletion.
36 changes: 36 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type ClusterAdmin interface {
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)

// ElectLeaders allows to trigger the election of preferred leaders for a set of partitions.
ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error)

// List the consumer groups available in the cluster.
ListConsumerGroups() (map[string]string, error)

Expand Down Expand Up @@ -907,6 +910,39 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi
return mAcls, nil
}

func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) {
request := &ElectLeadersRequest{
Type: electionType,
TopicPartitions: partitions,
TimeoutMs: int32(60000),
}

if ca.conf.Version.IsAtLeast(V2_4_0_0) {
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
}

var res *ElectLeadersResponse
err := ca.retryOnError(isErrNotController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}
_ = b.Open(ca.client.Config())

res, err = b.ElectLeaders(request)
if isErrNotController(err) {
_, _ = ca.refreshController()
}
return err
})
if err != nil {
return nil, err
}
return res.ReplicaElectionResults, nil
}

func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
groupsPerBroker := make(map[*Broker][]string)

Expand Down
39 changes: 39 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,45 @@ func TestClusterAdminDeleteAcl(t *testing.T) {
}
}

func TestElectLeaders(t *testing.T) {
broker := NewMockBroker(t, 1)
defer broker.Close()

broker.SetHandlerByMap(map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"MetadataRequest": NewMockMetadataResponse(t).
SetController(broker.BrokerID()).
SetBroker(broker.Addr(), broker.BrokerID()),
"ElectLeadersRequest": NewMockElectLeadersResponse(t),
})

config := NewTestConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

response, err := admin.ElectLeaders(PreferredElection, map[string][]int32{"my_topic": {0, 1}})
if err != nil {
t.Fatal(err)
}

partitionResult, ok := response["my_topic"]
if !ok {
t.Fatalf("topic missing in response")
}

if len(partitionResult) != 1 {
t.Fatalf("partition missing in response")
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestDescribeTopic(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
12 changes: 12 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,18 @@ func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsR
return response, nil
}

// ElectLeaders sends aa elect leaders request and returns list partitions elect result
func (b *Broker) ElectLeaders(request *ElectLeadersRequest) (*ElectLeadersResponse, error) {
response := new(ElectLeadersResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

// DeleteRecords send a request to delete records and return delete record
// response or error
func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
Expand Down
134 changes: 134 additions & 0 deletions elect_leaders_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package sarama

type ElectLeadersRequest struct {
Version int16
Type ElectionType
TopicPartitions map[string][]int32
TimeoutMs int32
}

func (r *ElectLeadersRequest) encode(pe packetEncoder) error {
if r.Version > 0 {
pe.putInt8(int8(r.Type))
}

pe.putCompactArrayLength(len(r.TopicPartitions))

for topic, partitions := range r.TopicPartitions {
if r.Version < 2 {
if err := pe.putString(topic); err != nil {
return err
}
} else {
if err := pe.putCompactString(topic); err != nil {
return err
}
}

if err := pe.putCompactInt32Array(partitions); err != nil {
return err
}

if r.Version >= 2 {
pe.putEmptyTaggedFieldArray()
}
}

pe.putInt32(r.TimeoutMs)

if r.Version >= 2 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (r *ElectLeadersRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.Version > 0 {
t, err := pd.getInt8()
if err != nil {
return err
}
r.Type = ElectionType(t)
}

topicCount, err := pd.getCompactArrayLength()
if err != nil {
return err
}
if topicCount > 0 {
r.TopicPartitions = make(map[string][]int32)
for i := 0; i < topicCount; i++ {
var topic string
if r.Version < 2 {
topic, err = pd.getString()
} else {
topic, err = pd.getCompactString()
}
if err != nil {
return err
}
partitionCount, err := pd.getCompactArrayLength()
if err != nil {
return err
}
partitions := make([]int32, partitionCount)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
partitions[j] = partition
}
r.TopicPartitions[topic] = partitions
if r.Version >= 2 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}
}

r.TimeoutMs, err = pd.getInt32()
if err != nil {
return err
}

if r.Version >= 2 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

return nil
}

func (r *ElectLeadersRequest) key() int16 {
return 43
}

func (r *ElectLeadersRequest) version() int16 {
return r.Version
}

func (r *ElectLeadersRequest) headerVersion() int16 {
return 2
}

func (r *ElectLeadersRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 2
}

func (r *ElectLeadersRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 2:
return V2_4_0_0
case 1:
return V0_11_0_0
case 0:
return V0_10_0_0
default:
return V2_4_0_0
}
}
26 changes: 26 additions & 0 deletions elect_leaders_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sarama

import "testing"

var electLeadersRequestOneTopic = []byte{
0, // preferred election type
2, // 2-1=1 topic
6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
2, // 2-1=1 partition
0, 0, 0, 0, // partition 0
0, 0, // empty tagged fields
0, 39, 16, 0, // timeout 10000
}

func TestElectLeadersRequest(t *testing.T) {
var request = &ElectLeadersRequest{
TimeoutMs: int32(10000),
Version: int16(2),
TopicPartitions: map[string][]int32{
"topic": {0},
},
Type: PreferredElection,
}

testRequest(t, "one topic", request, electLeadersRequestOneTopic)
}
Loading

0 comments on commit b7da05f

Please sign in to comment.