Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18325: Add TargetAssignmentBuilder #18676

Merged

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented Jan 23, 2025

A class to build a new target assignment based on the provided parameters. As a result, it yields the records that must be persisted to the log and the new member assignments as a map.

Compared to the feature branch, I extended the unit tests (testing also standby and warm-up task logic) and adopted simplifications due to the TasksTuple class.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lucasbru lucasbru force-pushed the kip1071merge/target_assignment_builder branch from 727f1a1 to 177ca42 Compare January 24, 2025 09:48
@lucasbru
Copy link
Member Author

Rebased on latest version of streams coordinator record helpers

@lucasbru lucasbru force-pushed the kip1071merge/target_assignment_builder branch from 177ca42 to 24eea87 Compare January 28, 2025 16:11
@lucasbru
Copy link
Member Author

Rebased on latest trunk (and minor test fix)

@lucasbru lucasbru force-pushed the kip1071merge/target_assignment_builder branch from 24eea87 to e211eb6 Compare January 30, 2025 08:58
@lucasbru
Copy link
Member Author

Rebased on latest trunk

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @lucasbru overall this looks great, I have a few comments.

public TargetAssignmentBuilder removeMember(
String memberId
) {
return addOrUpdateMember(memberId, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just outright remove the member at this point? I get that in the build() method will filter it out, but does that have any advantage other than a removal here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you cannot remove the member right away, because due to the builder pattern there is not guarantee that the call to withMembers() occurs before the call to removeMembers().

* @return The number of partitions corresponding to the given subtopology ID, or -1 if the subtopology ID does not exist.
*/
@Override
public int numTasks(String subtopologyId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the javadoc says number of partitions should the method be named numPartitions? It's a minor point, so I'll leave it to you to decide.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we go back and forth with the question when to use partitions and when to use tasks. I believe the confusion comes from the fact that the max number of input partitions per topic of a subtopology determines the number of tasks.
I propose to call this method maxNumInputPartitions(). My rationale is that a topology does not have a notion of tasks. The task assignor uses maxNumInputPartitions() to know how many tasks to assign.
I am sorry, in case I am contradicting myself regarding an earlier comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me.

}

@Override
public boolean isStateful(String subtopologyId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the methods that use getXOrFail would it be worth adding a brief javadoc about the possibility of an IllegalStateException being thrown?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

new TopologyMetadata(subscriptionMetadata, topology)
);
} else {
newGroupAssignment = new GroupAssignment(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch isn't covered by the unit test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {
final Map<String, ConfiguredSubtopology> subtopologies = getSubtopologiesOrFail();
if (!subtopologies.containsKey(subtopologyId)) {
throw new IllegalStateException(String.format("Topology does not contain subtopology %s", subtopologyId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch isn't tested. Same for below in getSubtopologiesOrFail()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @lucasbru !

I made a pass over the production code.

Comment on lines 39 to 40
Objects.requireNonNull(topicMetadata);
Objects.requireNonNull(topology);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the topicMetadata is mutable, because topic configs may change at any time. Since topic metadata can change at any time also topology can change at any time, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not mutable, since TopicMetadata is short-lived and passed only once to the assingor. I added unmodifiableMap, if that is what you were going for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't topology then also be immutable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it an immutable collection of subtopologies.

*
* @param topicMetadata The topic Ids mapped to their corresponding {@link TopicMetadata} object, which contains topic and partition
* metadata.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java doc for topology is missing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @return The number of partitions corresponding to the given subtopology ID, or -1 if the subtopology ID does not exist.
*/
@Override
public int numTasks(String subtopologyId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we go back and forth with the question when to use partitions and when to use tasks. I believe the confusion comes from the fact that the max number of input partitions per topic of a subtopology determines the number of tasks.
I propose to call this method maxNumInputPartitions(). My rationale is that a topology does not have a notion of tasks. The task assignor uses maxNumInputPartitions() to know how many tasks to assign.
I am sorry, in case I am contradicting myself regarding an earlier comment.

public static class TargetAssignmentResult {

/**
* The records that must be applied to the __streams_offsets topics to persist the new target assignment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably mean __consumer_offsets? Replace error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* Build a new Target TasksTuple based on the provided parameters. As a result, it yields the records that must be persisted to the log and
* the new member assignments as a map.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explicitly add that the task assignor is called that computes the new assigment?
Something like
"Build the new target member assignments based on the provided parameters by calling the task assignor. As a result, it yields the records that must be persisted to the log and the new member assignments as a map from member ID to tasks tuple."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Adds the subscription metadata to use.
*
* @param partitionMetadata The subscription metadata.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"subscription" or "partition"? This is a bit confusing.
Maybe topicMetadata?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public TargetAssignmentBuilder removeMember(
String memberId
) {
return addOrUpdateMember(memberId, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you cannot remove the member right away, because due to the builder pattern there is not guarantee that the call to withMembers() occurs before the call to removeMembers().

if (updatedMemberOrNull == null) {
memberSpecs.remove(memberId);
} else {
org.apache.kafka.coordinator.group.streams.TasksTuple assignment = targetAssignment.getOrDefault(memberId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe, we do not need the full-qualified name of the TasksTuple here, right? TasksTuple is specific to the streams package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* The assignment result returned by {{@link TargetAssignmentBuilder#build()}}.
*/
public static class TargetAssignmentResult {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be implemented as a record class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lucasbru
Copy link
Member Author

@cadonna @bbejeck Thanks for the comments! All comments addressed, ready for re-review.

Comment on lines 110 to 112
if (subtopologies.isEmpty()) {
throw new IllegalStateException("Topology is not configured");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason, you cannot make this check at construction time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some offline discussion, we agreed to unpack the optional inside TopologyMetadata to avoid the isPresent check altogether.

@lucasbru lucasbru force-pushed the kip1071merge/target_assignment_builder branch from debeeb3 to ea8723f Compare February 3, 2025 14:13
@@ -265,12 +265,15 @@ public TargetAssignmentResult build() throws TaskAssignorException {
// Compute the assignment.
GroupAssignment newGroupAssignment;
if (topology.isReady()) {
if (topology.subtopologies().isEmpty()) {
throw new IllegalStateException("Subtopologies must be present if topology is ready.");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Central check. This is implied by isReady, but I added the IllegalStateException just in case some future updates breaks the internal invariants of ConfiguredTopology (and to silence IDE warnings).

@lucasbru
Copy link
Member Author

lucasbru commented Feb 3, 2025

@cadonna Ready for re-review.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @lucasbru !

LGTM!

@lucasbru lucasbru merged commit 4ca24a7 into apache:trunk Feb 3, 2025
9 checks passed
@lucasbru lucasbru added core Kafka Broker KIP-1071 PRs related to KIP-1071 streams labels Feb 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-1071 PRs related to KIP-1071 streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants