Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for alter/list partition reassignements APIs #1617

Closed
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions acl_create_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (c *CreateAclsRequest) version() int16 {
return c.Version
}

func (c *CreateAclsRequest) headerVersion() int16 {
return 1
}

func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (c *CreateAclsResponse) version() int16 {
return 0
}

func (c *CreateAclsResponse) headerVersion() int16 {
return 0
}

func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (d *DeleteAclsRequest) version() int16 {
return int16(d.Version)
}

func (c *DeleteAclsRequest) headerVersion() int16 {
return 1
}

func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (d *DeleteAclsResponse) version() int16 {
return d.Version
}

func (d *DeleteAclsResponse) headerVersion() int16 {
return 0
}

func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (d *DescribeAclsRequest) version() int16 {
return int16(d.Version)
}

func (d *DescribeAclsRequest) headerVersion() int16 {
return 1
}

func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (d *DescribeAclsResponse) version() int16 {
return d.Version
}

func (d *DescribeAclsResponse) headerVersion() int16 {
return 0
}

func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (a *AddOffsetsToTxnRequest) version() int16 {
return 0
}

func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (a *AddOffsetsToTxnResponse) version() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (a *AddPartitionsToTxnRequest) version() int16 {
return 0
}

func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (a *AddPartitionsToTxnResponse) version() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
84 changes: 84 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ type ClusterAdmin interface {
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

// Alter the replica assignment for partitions.
// This operation is supported by brokers with version 2.4.0.0 or higher.
AlterPartitionReassignments(topic string, assignment [][]int32) error

// Provides info on ongoing partitions replica reassignments.
// This operation is supported by brokers with version 2.4.0.0 or higher.
ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)

// Delete records whose offset is smaller than the given offset of the corresponding partition.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
Expand Down Expand Up @@ -443,6 +451,82 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
})
}

func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
if topic == "" {
return ErrInvalidTopic
}

request := &AlterPartitionReassignmentsRequest{
TimeoutMs: int32(10000),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this 10_000 millisecond default timeout come from? The protocol description suggests 60_000 should be the default?

https://github.com/apache/kafka/blob/2.4.0/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json#L23

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably just put something in there :) Using the suggested default makes sense. I added this.

Version: int16(0),
}

for i := 0; i < len(assignment); i++ {
request.AddBlock(topic, int32(i), assignment[i])
}

return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

errs := make([]error, 0)

rsp, err := b.AlterPartitionReassignments(request)

if err != nil {
errs = append(errs, err)
} else {
if rsp.ErrorCode > 0 {
errs = append(errs, errors.New(rsp.ErrorCode.Error()))
}

for topic, topicErrors := range rsp.Errors {
for partition, partitionError := range topicErrors {
if partitionError.errorCode != ErrNoError {
errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
errs = append(errs, errors.New(errStr))
}
}
}
}

if len(errs) > 0 {
return ErrReassignPartitions{MultiError{&errs}}
}

return nil
})
}

func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
if topic == "" {
return nil, ErrInvalidTopic
}

request := &ListPartitionReassignmentsRequest{
TimeoutMs: int32(10000),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, shouldn't this be 60_000 ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Version: int16(0),
}

request.AddBlock(topic, partitions)

b, err := ca.findAnyBroker()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that ListPartitionReassignments also needed to be sent to the Controller (not just any broker) like the Alter request? The Java Admin client seems to do so here

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. I fixed that

if err != nil {
return nil, err
}
_ = b.Open(ca.client.Config())

rsp, err := b.ListPartitionReassignments(request)

if err == nil && rsp != nil {
return rsp.TopicStatus, nil
} else {
return nil, err
}
}

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
if topic == "" {
return ErrInvalidTopic
Expand Down
133 changes: 133 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,139 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
}
}

func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please could you add a second mockBroker to the test and set that as the controller in the mock metadata response and not the seed broker? That should exercise that the request must be sent to the Controller and not just any broker (or the broker currently connected to)

"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var topicAssignment = make([][]int32, 0, 3)

err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
if err != nil {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var topicAssignment = make([][]int32, 0, 3)

err = admin.AlterPartitionReassignments("my_topic", topicAssignment)

if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListPartitionReassignments(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, please use different MockBroker for the Controller

"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_4_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1})
if err != nil {
t.Fatal(err)
}

partitionStatus, ok := response["my_topic"]
if !ok {
t.Fatalf("topic missing in response")
} else {
if len(partitionStatus) != 2 {
t.Fatalf("partition missing in response")
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

config := NewConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var partitions = make([]int32, 0)

_, err = admin.ListPartitionReassignments("my_topic", partitions)

if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminDeleteRecords(t *testing.T) {
topicName := "my_topic"
seedBroker := NewMockBroker(t, 1)
Expand Down
4 changes: 4 additions & 0 deletions alter_configs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (a *AlterConfigsRequest) version() int16 {
return 0
}

func (a *AlterConfigsRequest) headerVersion() int16 {
return 1
}

func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
4 changes: 4 additions & 0 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (a *AlterConfigsResponse) version() int16 {
return 0
}

func (a *AlterConfigsResponse) headerVersion() int16 {
return 0
}

func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Loading