diff --git a/admin.go b/admin.go index fe440c041..2f6c8618e 100644 --- a/admin.go +++ b/admin.go @@ -42,13 +42,14 @@ type ClusterAdmin interface { // new partitions. This operation is supported by brokers with version 1.0.0 or higher. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error - // Update the configuration for the specified resources with the default options. - // This operation is supported by brokers with version 0.11.0.0 or higher. - // The resources with their configs (topic is the only resource type with configs - // that can be updated currently Updates are not transactional so they may succeed - // for some resources while fail for others. The configs for a particular resource are updated automatically. + // Alter the replica assignment for partitions. + // This operation is supported by brokers with version 2.4.0.0 or higher. AlterPartitionReassignments(topic string, assignment [][]int32) error + // Provides info on ongoing partitions replica reassignments. + // This operation is supported by brokers with version 2.4.0.0 or higher. + ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) + // Delete records whose offset is smaller than the given offset of the corresponding partition. // This operation is supported by brokers with version 0.11.0.0 or higher. DeleteRecords(topic string, partitionOffsets map[int32]int64) error @@ -500,6 +501,33 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][ }) } +func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) { + if topic == "" { + return nil, ErrInvalidTopic + } + + request := &ListPartitionReassignmentsRequest{ + TimeoutMs: int32(10000), + Version: int16(0), + } + + request.AddBlock(topic, partitions) + + b, err := ca.findAnyBroker() + if err != nil { + return nil, err + } + _ = b.Open(ca.client.Config()) + + rsp, err := b.ListPartitionReassignments(request) + + if err == nil && rsp != nil { + return rsp.TopicStatus, nil + } else { + return nil, err + } +} + func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error { if topic == "" { return ErrInvalidTopic diff --git a/admin_test.go b/admin_test.go index 92b76ff51..17a0fd99d 100644 --- a/admin_test.go +++ b/admin_test.go @@ -395,6 +395,76 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { } } +func TestClusterAdminListPartitionReassignments(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), + }) + + config := NewConfig() + config.Version = V2_4_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1}) + if err != nil { + t.Fatal(err) + } + + partitionStatus, ok := response["my_topic"] + if !ok { + t.Fatalf("topic missing in response") + } else { + if len(partitionStatus) != 2 { + t.Fatalf("partition missing in response") + } + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), + }) + + config := NewConfig() + config.Version = V2_3_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + var partitions = make([]int32, 0) + + _, err = admin.ListPartitionReassignments("my_topic", partitions) + + if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminDeleteRecords(t *testing.T) { topicName := "my_topic" seedBroker := NewMockBroker(t, 1) diff --git a/list_partition_reassignments_response.go b/list_partition_reassignments_response.go index bab634a26..cfc4ae5a0 100644 --- a/list_partition_reassignments_response.go +++ b/list_partition_reassignments_response.go @@ -1,12 +1,12 @@ package sarama -type listPartitionReassignmentsResponseBlock struct { +type PartitionReplicaReassignmentsStatus struct { replicas []int32 addingReplicas []int32 removingReplicas []int32 } -func (b *listPartitionReassignmentsResponseBlock) encode(pe packetEncoder) error { +func (b *PartitionReplicaReassignmentsStatus) encode(pe packetEncoder) error { if err := pe.putCompactInt32Array(b.replicas); err != nil { return err @@ -23,7 +23,7 @@ func (b *listPartitionReassignmentsResponseBlock) encode(pe packetEncoder) error return nil } -func (b *listPartitionReassignmentsResponseBlock) decode(pd packetDecoder) (err error) { +func (b *PartitionReplicaReassignmentsStatus) decode(pd packetDecoder) (err error) { if b.replicas, err = pd.getCompactInt32Array(); err != nil { return err @@ -49,20 +49,20 @@ type ListPartitionReassignmentsResponse struct { ThrottleTimeMs int32 ErrorCode KError ErrorMessage *string - blocks map[string]map[int32]*listPartitionReassignmentsResponseBlock + TopicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus } func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, replicas, addingReplicas, removingReplicas []int32) { - if r.blocks == nil { - r.blocks = make(map[string]map[int32]*listPartitionReassignmentsResponseBlock) + if r.TopicStatus == nil { + r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus) } - partitions := r.blocks[topic] + partitions := r.TopicStatus[topic] if partitions == nil { - partitions = make(map[int32]*listPartitionReassignmentsResponseBlock) - r.blocks[topic] = partitions + partitions = make(map[int32]*PartitionReplicaReassignmentsStatus) + r.TopicStatus[topic] = partitions } - partitions[partition] = &listPartitionReassignmentsResponseBlock{replicas: replicas, addingReplicas: addingReplicas, removingReplicas: removingReplicas} + partitions[partition] = &PartitionReplicaReassignmentsStatus{replicas: replicas, addingReplicas: addingReplicas, removingReplicas: removingReplicas} } func (r *ListPartitionReassignmentsResponse) encode(pe packetEncoder) error { @@ -72,8 +72,8 @@ func (r *ListPartitionReassignmentsResponse) encode(pe packetEncoder) error { return err } - pe.putCompactArrayLength(len(r.blocks)) - for topic, partitions := range r.blocks { + pe.putCompactArrayLength(len(r.TopicStatus)) + for topic, partitions := range r.TopicStatus { if err := pe.putCompactString(topic); err != nil { return err } @@ -116,7 +116,7 @@ func (r *ListPartitionReassignmentsResponse) decode(pd packetDecoder, version in return err } - r.blocks = make(map[string]map[int32]*listPartitionReassignmentsResponseBlock, numTopics) + r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus, numTopics) for i := 0; i < numTopics; i++ { topic, err := pd.getCompactString() if err != nil { @@ -128,7 +128,7 @@ func (r *ListPartitionReassignmentsResponse) decode(pd packetDecoder, version in return err } - r.blocks[topic] = make(map[int32]*listPartitionReassignmentsResponseBlock, ongoingPartitionReassignments) + r.TopicStatus[topic] = make(map[int32]*PartitionReplicaReassignmentsStatus, ongoingPartitionReassignments) for j := 0; j < ongoingPartitionReassignments; j++ { partition, err := pd.getInt32() @@ -136,11 +136,11 @@ func (r *ListPartitionReassignmentsResponse) decode(pd packetDecoder, version in return err } - block := &listPartitionReassignmentsResponseBlock{} + block := &PartitionReplicaReassignmentsStatus{} if err := block.decode(pd); err != nil { return err } - r.blocks[topic][partition] = block + r.TopicStatus[topic][partition] = block } if _, err := pd.getEmptyTaggedFieldArray(); err != nil { diff --git a/mockresponses.go b/mockresponses.go index 6857d237b..00e507bee 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -713,6 +713,29 @@ func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) return res } +type MockListPartitionReassignmentsResponse struct { + t TestReporter +} + +func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse { + return &MockListPartitionReassignmentsResponse{t: t} +} + +func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*ListPartitionReassignmentsRequest) + _ = req + res := &ListPartitionReassignmentsResponse{} + + for topic, partitions := range req.blocks { + + for _, partition := range partitions { + res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2}) + } + } + + return res +} + type MockDeleteRecordsResponse struct { t TestReporter }