Skip to content

Commit

Permalink
feat: Add alter and list partition reassignments
Browse files Browse the repository at this point in the history
Implementation of KIP-455. Also includes work to make Sarama protocol
support the new optional tagged fields functionality from KIP-482

- add headerVersion for all requests (Ref: KIP-482)
- implement AlterPartitionReassignmentsRequest/Reponse protocol
- add tests for alter_partition_reassignments
- pretty print partition reassignment errors
- add ListPartitionReassignmentsRequest/Response protocol
- decode empty tagged fields in response header v1
- make sure mockbroker can handle different reponse header versions
- make sure partition reassignment can be aborted
- add Alter/ListPartitionReassignments to admin client api

https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment
https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields

Co-authored-by: Dirk Wilden <[email protected]>
Co-authored-by: Leonid Koftun <[email protected]>
Co-authored-by: iyacontrol <[email protected]>
  • Loading branch information
3 people authored and dnwe committed Mar 6, 2020
1 parent 5812345 commit 2d2326e
Show file tree
Hide file tree
Showing 93 changed files with 1,692 additions and 60 deletions.
4 changes: 4 additions & 0 deletions acl_create_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (c *CreateAclsRequest) version() int16 {
return c.Version
}

func (c *CreateAclsRequest) headerVersion() int16 {
return 1
}

func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (c *CreateAclsResponse) version() int16 {
return 0
}

func (c *CreateAclsResponse) headerVersion() int16 {
return 0
}

func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (d *DeleteAclsRequest) version() int16 {
return int16(d.Version)
}

func (c *DeleteAclsRequest) headerVersion() int16 {
return 1
}

func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (d *DeleteAclsResponse) version() int16 {
return d.Version
}

func (d *DeleteAclsResponse) headerVersion() int16 {
return 0
}

func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (d *DescribeAclsRequest) version() int16 {
return int16(d.Version)
}

func (d *DescribeAclsRequest) headerVersion() int16 {
return 1
}

func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (d *DescribeAclsResponse) version() int16 {
return d.Version
}

func (d *DescribeAclsResponse) headerVersion() int16 {
return 0
}

func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (a *AddOffsetsToTxnRequest) version() int16 {
return 0
}

func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (a *AddOffsetsToTxnResponse) version() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (a *AddPartitionsToTxnRequest) version() int16 {
return 0
}

func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (a *AddPartitionsToTxnResponse) version() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
84 changes: 84 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ type ClusterAdmin interface {
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

// Alter the replica assignment for partitions.
// This operation is supported by brokers with version 2.4.0.0 or higher.
AlterPartitionReassignments(topic string, assignment [][]int32) error

// Provides info on ongoing partitions replica reassignments.
// This operation is supported by brokers with version 2.4.0.0 or higher.
ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)

// Delete records whose offset is smaller than the given offset of the corresponding partition.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
Expand Down Expand Up @@ -452,6 +460,82 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
})
}

func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
if topic == "" {
return ErrInvalidTopic
}

request := &AlterPartitionReassignmentsRequest{
TimeoutMs: int32(60000),
Version: int16(0),
}

for i := 0; i < len(assignment); i++ {
request.AddBlock(topic, int32(i), assignment[i])
}

return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

errs := make([]error, 0)

rsp, err := b.AlterPartitionReassignments(request)

if err != nil {
errs = append(errs, err)
} else {
if rsp.ErrorCode > 0 {
errs = append(errs, errors.New(rsp.ErrorCode.Error()))
}

for topic, topicErrors := range rsp.Errors {
for partition, partitionError := range topicErrors {
if partitionError.errorCode != ErrNoError {
errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
errs = append(errs, errors.New(errStr))
}
}
}
}

if len(errs) > 0 {
return ErrReassignPartitions{MultiError{&errs}}
}

return nil
})
}

func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
if topic == "" {
return nil, ErrInvalidTopic
}

request := &ListPartitionReassignmentsRequest{
TimeoutMs: int32(60000),
Version: int16(0),
}

request.AddBlock(topic, partitions)

b, err := ca.Controller()
if err != nil {
return nil, err
}
_ = b.Open(ca.client.Config())

rsp, err := b.ListPartitionReassignments(request)

if err == nil && rsp != nil {
return rsp.TopicStatus, nil
} else {
return nil, err
}
}

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
if topic == "" {
return ErrInvalidTopic
Expand Down
161 changes: 161 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,167 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
}
}

func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var topicAssignment = make([][]int32, 0, 3)

err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
if err != nil {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var topicAssignment = make([][]int32, 0, 3)

err = admin.AlterPartitionReassignments("my_topic", topicAssignment)

if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1})
if err != nil {
t.Fatal(err)
}

partitionStatus, ok := response["my_topic"]
if !ok {
t.Fatalf("topic missing in response")
} else {
if len(partitionStatus) != 2 {
t.Fatalf("partition missing in response")
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var partitions = make([]int32, 0)

_, err = admin.ListPartitionReassignments("my_topic", partitions)

if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminDeleteRecords(t *testing.T) {
topicName := "my_topic"
seedBroker := NewMockBroker(t, 1)
Expand Down
4 changes: 4 additions & 0 deletions alter_configs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (a *AlterConfigsRequest) version() int16 {
return 0
}

func (a *AlterConfigsRequest) headerVersion() int16 {
return 1
}

func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (a *AlterConfigsResponse) version() int16 {
return 0
}

func (a *AlterConfigsResponse) headerVersion() int16 {
return 0
}

func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Loading

0 comments on commit 2d2326e

Please sign in to comment.