Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for alter/list partition reassignements APIs #1617

Closed
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions acl_create_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (c *CreateAclsRequest) version() int16 {
return c.Version
}

func (c *CreateAclsRequest) headerVersion() int16 {
return 1
}

func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (c *CreateAclsResponse) version() int16 {
return 0
}

func (c *CreateAclsResponse) headerVersion() int16 {
return 0
}

func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (d *DeleteAclsRequest) version() int16 {
return int16(d.Version)
}

func (c *DeleteAclsRequest) headerVersion() int16 {
return 1
}

func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (d *DeleteAclsResponse) version() int16 {
return d.Version
}

func (d *DeleteAclsResponse) headerVersion() int16 {
return 0
}

func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (d *DescribeAclsRequest) version() int16 {
return int16(d.Version)
}

func (d *DescribeAclsRequest) headerVersion() int16 {
return 1
}

func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (d *DescribeAclsResponse) version() int16 {
return d.Version
}

func (d *DescribeAclsResponse) headerVersion() int16 {
return 0
}

func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (a *AddOffsetsToTxnRequest) version() int16 {
return 0
}

func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (a *AddOffsetsToTxnResponse) version() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (a *AddPartitionsToTxnRequest) version() int16 {
return 0
}

func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (a *AddPartitionsToTxnResponse) version() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
84 changes: 84 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ type ClusterAdmin interface {
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

// Alter the replica assignment for partitions.
// This operation is supported by brokers with version 2.4.0.0 or higher.
AlterPartitionReassignments(topic string, assignment [][]int32) error

// Provides info on ongoing partitions replica reassignments.
// This operation is supported by brokers with version 2.4.0.0 or higher.
ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)

// Delete records whose offset is smaller than the given offset of the corresponding partition.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
Expand Down Expand Up @@ -443,6 +451,82 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
})
}

func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
if topic == "" {
return ErrInvalidTopic
}

request := &AlterPartitionReassignmentsRequest{
TimeoutMs: int32(10000),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this 10_000 millisecond default timeout come from? The protocol description suggests 60_000 should be the default?

https://github.com/apache/kafka/blob/2.4.0/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json#L23

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably just put something in there :) Using the suggested default makes sense. I added this.

Version: int16(0),
}

for i := 0; i < len(assignment); i++ {
request.AddBlock(topic, int32(i), assignment[i])
}

return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

errs := make([]error, 0)

rsp, err := b.AlterPartitionReassignments(request)

if err != nil {
errs = append(errs, err)
} else {
if rsp.ErrorCode > 0 {
errs = append(errs, errors.New(rsp.ErrorCode.Error()))
}

for topic, topicErrors := range rsp.Errors {
for partition, partitionError := range topicErrors {
if partitionError.errorCode != ErrNoError {
errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
errs = append(errs, errors.New(errStr))
}
}
}
}

if len(errs) > 0 {
return ErrReassignPartitions{MultiError{&errs}}
}

return nil
})
}

func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
if topic == "" {
return nil, ErrInvalidTopic
}

request := &ListPartitionReassignmentsRequest{
TimeoutMs: int32(10000),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, shouldn't this be 60_000 ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Version: int16(0),
}

request.AddBlock(topic, partitions)

b, err := ca.Controller()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this is using the controller you probably want to wrapper the body in a return ca.retryOnError(isErrNoController, ...) call like you did for AlterPartitionReassignments so it refreshes the cached controller if it is stale

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the retryOnError function cannot be used when a value needs to be returned. other functions in admin.go where a controller is used and a return type is needed also don't use retrying. How should this be handled? should we implement a retry function that returns something like interface{}?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dwi-di ah of course — let's just leave this for now and we can always follow-up in another PR

if err != nil {
return nil, err
}
_ = b.Open(ca.client.Config())

rsp, err := b.ListPartitionReassignments(request)

if err == nil && rsp != nil {
return rsp.TopicStatus, nil
} else {
return nil, err
}
}

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
if topic == "" {
return ErrInvalidTopic
Expand Down
161 changes: 161 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,167 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
}
}

func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var topicAssignment = make([][]int32, 0, 3)

err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
if err != nil {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var topicAssignment = make([][]int32, 0, 3)

err = admin.AlterPartitionReassignments("my_topic", topicAssignment)

if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1})
if err != nil {
t.Fatal(err)
}

partitionStatus, ok := response["my_topic"]
if !ok {
t.Fatalf("topic missing in response")
} else {
if len(partitionStatus) != 2 {
t.Fatalf("partition missing in response")
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

secondBroker := NewMockBroker(t, 2)
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var partitions = make([]int32, 0)

_, err = admin.ListPartitionReassignments("my_topic", partitions)

if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminDeleteRecords(t *testing.T) {
topicName := "my_topic"
seedBroker := NewMockBroker(t, 1)
Expand Down
4 changes: 4 additions & 0 deletions alter_configs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (a *AlterConfigsRequest) version() int16 {
return 0
}

func (a *AlterConfigsRequest) headerVersion() int16 {
return 1
}

func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (a *AlterConfigsResponse) version() int16 {
return 0
}

func (a *AlterConfigsResponse) headerVersion() int16 {
return 0
}

func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Loading