-
Notifications
You must be signed in to change notification settings - Fork 800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changed AlterPartitionReassignments request to support multiple topics #1204
Changed AlterPartitionReassignments request to support multiple topics #1204
Conversation
bgranvea
commented
Oct 9, 2023
•
edited by petedannemann
Loading
edited by petedannemann
- Allowed assigning to multiple topics with AlterPartitionReassignments
- Make BrokerIds in AlterPartitionReassignments nullable to support canceling ongoing partition reassignments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR @bgranvea please see my comment above and let me know what you think
alterpartitionreassignments.go
Outdated
// Assignments is the list of partition reassignments to submit to the API. | ||
Assignments []AlterPartitionReassignmentsRequestAssignment | ||
// Mappings of topic names to list of partitions we want to reassign. | ||
Topics map[string][]AlterPartitionReassignmentsRequestAssignment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR contains several breaking API changes that we cannot support such as this line. Can you please refactor this PR to not introduce breaking changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, sure. I see 2 options to keep compatibility, which one do you prefer?
option 1: modify existing request and support 2 modes of operation
type AlterPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Topic is the name of the topic to alter partitions in.
Topic string
// Assignments is the list of partition reassignments to submit to the API.
Assignments []AlterPartitionReassignmentsRequestAssignment
// Mappings of topic names to list of partitions we want to reassign. Use either Topic/Assignments or Topics.
Topics map[string][]AlterPartitionReassignmentsRequestAssignment
// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}
option 2: create a new request (more code but seems cleaner)
// AlterPartitionReassignmentsMultiRequest is a request to the AlterPartitionReassignments API for multiple topics.
type AlterPartitionReassignmentsMultiRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Mappings of topic names to list of partitions we want to reassign.
Topics map[string][]AlterPartitionReassignmentsRequestAssignment
// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}
func (c *Client) AlterPartitionReassignmentsMulti(
ctx context.Context,
req *AlterPartitionReassignmentsMultiRequest,
) (*AlterPartitionReassignmentsResponse, error) {
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I prefer Option 1 as we will want to mark Topic
as deprecated and remove it when we do a major version bump. It's a little ugly now but will be the better long term option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it makes sense.
I've changed a little bit the approach for a better integration with existing code: I just add Topic
in AlterPartitionReassignmentsRequestAssignment
and in AlterPartitionReassignmentsResponsePartitionResult
, what do you think?
here is what it looks like:
// AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
type AlterPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to
// reassign to multiple topics.
Topic string
// Assignments is the list of partition reassignments to submit to the API.
Assignments []AlterPartitionReassignmentsRequestAssignment
// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}
// AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
// partition.
type AlterPartitionReassignmentsRequestAssignment struct {
// Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.
Topic string
// PartitionID is the ID of the partition to make the reassignments in.
PartitionID int
// BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.
BrokerIDs []int
}
// AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
type AlterPartitionReassignmentsResponse struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered when doing the update.
Error error
// PartitionResults contains the specific results for each partition.
PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
}
// AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
// doing reassignments for a single partition.
type AlterPartitionReassignmentsResponsePartitionResult struct {
// Topic is the topic name.
Topic string
// PartitionID is the ID of the partition that was altered.
PartitionID int
// Error is set to a non-nil value including the code and message if an error was encountered
// during the update for this partition.
Error error
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@petedannemann any feedback on my last comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bgranvea I agree your new suggestion is great. Good work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @bgranvea !