From 4ca24a7dbf84b83b6693442d416a111ba8e53caf Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 3 Feb 2025 17:35:28 +0100 Subject: [PATCH] KAFKA-18325: Add TargetAssignmentBuilder (#18676) A class to build a new target assignment based on the provided parameters. As a result, it yields the records that must be persisted to the log and the new member assignments as a map. Compared to the feature branch, I extended the unit tests (testing also standby and warm-up task logic) and adopted simplifications due to the TasksTuple class. Reviewers: Bruno Cadonna , Bill Bejeck --- .../streams/TargetAssignmentBuilder.java | 351 ++++++++ .../group/streams/TopologyMetadata.java | 105 +++ .../group/streams/assignor/MockAssignor.java | 2 +- .../streams/assignor/StickyTaskAssignor.java | 4 +- .../streams/assignor/TopologyDescriber.java | 18 +- .../streams/topics/ConfiguredTopology.java | 3 +- .../streams/topics/InternalTopicManager.java | 12 +- .../streams/TargetAssignmentBuilderTest.java | 852 ++++++++++++++++++ .../group/streams/TopologyMetadataTest.java | 126 +++ .../streams/assignor/MockAssignorTest.java | 2 +- .../assignor/StickyTaskAssignorTest.java | 4 +- .../topics/ConfiguredTopologyTest.java | 10 +- 12 files changed, 1468 insertions(+), 21 deletions(-) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java new file mode 100644 index 0000000000000..4c1adeec839fd --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl; +import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor; +import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Build the new target member assignments based on the provided parameters by calling the task assignor. + * As a result, + * it yields the records that must be persisted to the log and the new member assignments as a map from member ID to tasks tuple. + *

+ * Records are only created for members which have a new target assignment. If their assignment did not change, no new record is needed. + *

+ * When a member is deleted, it is assumed that its target assignment record is deleted as part of the member deletion process. In other + * words, this class does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { + + /** + * The group ID. + */ + private final String groupId; + /** + * The group epoch. + */ + private final int groupEpoch; + + /** + * The partition assignor used to compute the assignment. + */ + private final TaskAssignor assignor; + + /** + * The assignment configs. + */ + private final Map assignmentConfigs; + + /** + * The members which have been updated or deleted. A null value signals deleted members. + */ + private final Map updatedMembers = new HashMap<>(); + + /** + * The members in the group. + */ + private Map members = Map.of(); + + /** + * The partition metadata. + */ + private Map partitionMetadata = Map.of(); + + /** + * The existing target assignment. + */ + private Map targetAssignment = Map.of(); + + /** + * The topology. + */ + private ConfiguredTopology topology; + + /** + * The static members in the group. + */ + private Map staticMembers = Map.of(); + + /** + * Constructs the object. + * + * @param groupId The group ID. + * @param groupEpoch The group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ + public TargetAssignmentBuilder( + String groupId, + int groupEpoch, + TaskAssignor assignor, + Map assignmentConfigs + ) { + this.groupId = Objects.requireNonNull(groupId); + this.groupEpoch = groupEpoch; + this.assignor = Objects.requireNonNull(assignor); + this.assignmentConfigs = Objects.requireNonNull(assignmentConfigs); + } + + static AssignmentMemberSpec createAssignmentMemberSpec( + StreamsGroupMember member, + TasksTuple targetAssignment + ) { + return new AssignmentMemberSpec( + member.instanceId(), + member.rackId(), + targetAssignment.activeTasks(), + targetAssignment.standbyTasks(), + targetAssignment.warmupTasks(), + member.processId(), + member.clientTags(), + Map.of(), + Map.of() + ); + } + + /** + * Adds all the existing members. + * + * @param members The existing members in the streams group. + * @return This object. + */ + public TargetAssignmentBuilder withMembers( + Map members + ) { + this.members = members; + return this; + } + + /** + * Adds all the existing static members. + * + * @param staticMembers The existing static members in the streams group. + * @return This object. + */ + public TargetAssignmentBuilder withStaticMembers( + Map staticMembers + ) { + this.staticMembers = staticMembers; + return this; + } + + /** + * Adds the partition metadata to use. + * + * @param partitionMetadata The partition metadata. + * @return This object. + */ + public TargetAssignmentBuilder withPartitionMetadata( + Map partitionMetadata + ) { + this.partitionMetadata = partitionMetadata; + return this; + } + + /** + * Adds the existing target assignment. + * + * @param targetAssignment The existing target assignment. + * @return This object. + */ + public TargetAssignmentBuilder withTargetAssignment( + Map targetAssignment + ) { + this.targetAssignment = targetAssignment; + return this; + } + + /** + * Adds the topology image. + * + * @param topology The topology. + * @return This object. + */ + public TargetAssignmentBuilder withTopology( + ConfiguredTopology topology + ) { + this.topology = topology; + return this; + } + + + /** + * Adds or updates a member. This is useful when the updated member is not yet materialized in memory. + * + * @param memberId The member ID. + * @param member The member to add or update. + * @return This object. + */ + public TargetAssignmentBuilder addOrUpdateMember( + String memberId, + StreamsGroupMember member + ) { + this.updatedMembers.put(memberId, member); + return this; + } + + /** + * Removes a member. This is useful when the removed member is not yet materialized in memory. + * + * @param memberId The member ID. + * @return This object. + */ + public TargetAssignmentBuilder removeMember( + String memberId + ) { + return addOrUpdateMember(memberId, null); + } + + /** + * Builds the new target assignment. + * + * @return A TargetAssignmentResult which contains the records to update the existing target assignment. + * @throws TaskAssignorException if the target assignment cannot be computed. + */ + public TargetAssignmentResult build() throws TaskAssignorException { + Map memberSpecs = new HashMap<>(); + + // Prepare the member spec for all members. + members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( + member, + targetAssignment.getOrDefault(memberId, org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY) + ))); + + // Update the member spec if updated or deleted members. + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull == null) { + memberSpecs.remove(memberId); + } else { + org.apache.kafka.coordinator.group.streams.TasksTuple assignment = targetAssignment.getOrDefault(memberId, + org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY); + + // A new static member joins and needs to replace an existing departed one. + if (updatedMemberOrNull.instanceId().isPresent()) { + String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId().get()); + if (previousMemberId != null && !previousMemberId.equals(memberId)) { + assignment = targetAssignment.getOrDefault(previousMemberId, + org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY); + } + } + + memberSpecs.put(memberId, createAssignmentMemberSpec( + updatedMemberOrNull, + assignment + )); + } + }); + + // Compute the assignment. + GroupAssignment newGroupAssignment; + if (topology.isReady()) { + if (topology.subtopologies().isEmpty()) { + throw new IllegalStateException("Subtopologies must be present if topology is ready."); + } + newGroupAssignment = assignor.assign( + new GroupSpecImpl( + Collections.unmodifiableMap(memberSpecs), + assignmentConfigs + ), + new TopologyMetadata(partitionMetadata, topology.subtopologies().get()) + ); + } else { + newGroupAssignment = new GroupAssignment( + memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x, x -> MemberAssignment.empty()))); + } + + // Compute delta from previous to new target assignment and create the + // relevant records. + List records = new ArrayList<>(); + Map newTargetAssignment = new HashMap<>(); + + memberSpecs.keySet().forEach(memberId -> { + org.apache.kafka.coordinator.group.streams.TasksTuple oldMemberAssignment = targetAssignment.get(memberId); + org.apache.kafka.coordinator.group.streams.TasksTuple newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId); + + newTargetAssignment.put(memberId, newMemberAssignment); + + if (oldMemberAssignment == null) { + // If the member had no assignment, we always create a record for it. + records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord( + groupId, + memberId, + newMemberAssignment + )); + } else { + // If the member had an assignment, we only create a record if the + // new assignment is different. + if (!newMemberAssignment.equals(oldMemberAssignment)) { + records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord( + groupId, + memberId, + newMemberAssignment + )); + } + } + }); + + // Bump the target assignment epoch. + records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, groupEpoch)); + + return new TargetAssignmentResult(records, newTargetAssignment); + } + + private TasksTuple newMemberAssignment( + GroupAssignment newGroupAssignment, + String memberId + ) { + MemberAssignment newMemberAssignment = newGroupAssignment.members().get(memberId); + if (newMemberAssignment != null) { + return new TasksTuple( + newMemberAssignment.activeTasks(), + newMemberAssignment.standbyTasks(), + newMemberAssignment.warmupTasks() + ); + } else { + return TasksTuple.EMPTY; + } + } + + /** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + * + * @param records The records that must be applied to the __consumer_offsets topics to persist the new target assignment. + * @param targetAssignment The new target assignment for the group. + */ + public record TargetAssignmentResult( + List records, + Map targetAssignment + ) { + public TargetAssignmentResult { + Objects.requireNonNull(records); + Objects.requireNonNull(targetAssignment); + } + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java new file mode 100644 index 0000000000000..d1119cfe0112d --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.SortedMap; +import java.util.stream.Stream; + +/** + * The topology metadata class is used by the {@link org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic and + * partition metadata for the topology that the streams group using. + * + * @param topicMetadata The topic Ids mapped to their corresponding {@link TopicMetadata} object, which contains topic and partition + * metadata. + * @param subtopologyMap The configured subtopologies + */ +public record TopologyMetadata(Map topicMetadata, SortedMap subtopologyMap) implements TopologyDescriber { + + public TopologyMetadata { + topicMetadata = Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata)); + subtopologyMap = Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap)); + } + + /** + * Map of topic names to topic metadata. + * + * @return The map of topic Ids to topic metadata. + */ + @Override + public Map topicMetadata() { + return this.topicMetadata; + } + + /** + * Checks whether the given subtopology is associated with a changelog topic. + * + * @param subtopologyId String identifying the subtopology. + * @throws NoSuchElementException if the subtopology ID does not exist. + * @return true if the subtopology is associated with a changelog topic, false otherwise. + */ + @Override + public boolean isStateful(String subtopologyId) { + final ConfiguredSubtopology subtopology = getSubtopologyOrFail(subtopologyId); + return !subtopology.stateChangelogTopics().isEmpty(); + } + + /** + * The list of subtopologies in the topology. + * + * @return a list of subtopology IDs. + */ + @Override + public List subtopologies() { + return subtopologyMap.keySet().stream().toList(); + } + + /** + * The maximal number of input partitions among all source topics for the given subtopology. + * + * @param subtopologyId String identifying the subtopology. + * + * @throws NoSuchElementException if the subtopology ID does not exist. + * @throws IllegalStateException if the subtopology contains no source topics. + * @return The maximal number of input partitions among all source topics for the given subtopology. + */ + @Override + public int maxNumInputPartitions(String subtopologyId) { + final ConfiguredSubtopology subtopology = getSubtopologyOrFail(subtopologyId); + return Stream.concat( + subtopology.sourceTopics().stream(), + subtopology.repartitionSourceTopics().keySet().stream() + ).map(topic -> this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow( + () -> new IllegalStateException("Subtopology does not contain any source topics") + ); + } + + private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) { + if (!subtopologyMap.containsKey(subtopologyId)) { + throw new NoSuchElementException(String.format("Topology does not contain subtopology %s", subtopologyId)); + } + return subtopologyMap.get(subtopologyId); + } + +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java index ce0bc101101ec..3e980bad93ea4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java @@ -46,7 +46,7 @@ public GroupAssignment assign( Map subtopologyToActiveMember = new HashMap<>(); for (String subtopology : topologyDescriber.subtopologies()) { - int numberOfPartitions = topologyDescriber.numTasks(subtopology); + int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); subtopologyToActiveMember.put(subtopology, new String[numberOfPartitions]); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java index 00ba701d3e200..c8d3c97d50402 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java @@ -72,7 +72,7 @@ private Set taskIds(final TopologyDescriber topologyDescriber, final boo Set ret = new HashSet<>(); for (String subtopology : topologyDescriber.subtopologies()) { if (isActive || topologyDescriber.isStateful(subtopology)) { - int numberOfPartitions = topologyDescriber.numTasks(subtopology); + int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); for (int i = 0; i < numberOfPartitions; i++) { ret.add(new TaskId(subtopology, i)); } @@ -85,7 +85,7 @@ private void initialize(final GroupSpec groupSpec, final TopologyDescriber topol localState = new LocalState(); localState.allTasks = 0; for (String subtopology : topologyDescriber.subtopologies()) { - int numberOfPartitions = topologyDescriber.numTasks(subtopology); + int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); localState.allTasks += numberOfPartitions; } localState.totalCapacity = groupSpec.members().size(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java index 2f913bf5514e8..d6f7a3ab579c6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java @@ -20,30 +20,34 @@ import java.util.NoSuchElementException; /** - * The subscribed topic describer is used by the {@link TaskAssignor} to obtain topic and task metadata of the groups topology. + * The topology describer is used by the {@link TaskAssignor} to get topic and task metadata of the group's topology. */ public interface TopologyDescriber { /** + * Map of topic names to topic metadata. + * * @return The list of subtopologies IDs. */ List subtopologies(); /** - * The number of tasks for the given subtopology. + * The maximal number of input partitions among all source topics for the given subtopology. * * @param subtopologyId String identifying the subtopology. * - * @return The number of tasks corresponding to the given subtopology ID. - * @throws NoSuchElementException if subtopology does not exist in the topology. + * @throws NoSuchElementException if the subtopology ID does not exist. + * @throws IllegalStateException if the subtopology contains no source topics. + * @return The maximal number of input partitions among all source topics for the given subtopology. */ - int numTasks(String subtopologyId) throws NoSuchElementException; + int maxNumInputPartitions(String subtopologyId) throws NoSuchElementException; /** - * Whether the given subtopology is stateful. + * Checks whether the given subtopology is associated with a changelog topic. * * @param subtopologyId String identifying the subtopology. - * @return true if the subtopology is stateful, false otherwise. + * @throws NoSuchElementException if the subtopology ID does not exist. + * @return true if the subtopology is associated with a changelog topic, false otherwise. */ boolean isStateful(String subtopologyId); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java index b6ccb87f7a224..6c7eede16fc40 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.SortedMap; import java.util.stream.Collectors; /** @@ -41,7 +42,7 @@ * reported back to the client. */ public record ConfiguredTopology(int topologyEpoch, - Optional> subtopologies, + Optional> subtopologies, Map internalTopicsToBeCreated, Optional topicConfigurationException) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java index 31029d9fd9e39..33d9f2c48740f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java @@ -34,6 +34,8 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,12 +76,16 @@ public static ConfiguredTopology configureTopics(LogContext logContext, Map decidedPartitionCountsForInternalTopics = decidePartitionCounts(logContext, topology, topicMetadata, copartitionGroupsBySubtopology, log); - final Map configuredSubtopologies = + final SortedMap configuredSubtopologies = subtopologies.stream() .collect(Collectors.toMap( StreamsGroupTopologyValue.Subtopology::subtopologyId, - x -> fromPersistedSubtopology(x, decidedPartitionCountsForInternalTopics)) - ); + x -> fromPersistedSubtopology(x, decidedPartitionCountsForInternalTopics), + (v1, v2) -> { + throw new RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2)); + }, + TreeMap::new + )); Map internalTopicsToCreate = missingInternalTopics(configuredSubtopologies, topicMetadata); if (!internalTopicsToCreate.isEmpty()) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java new file mode 100644 index 0000000000000..114974558b83d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java @@ -0,0 +1,852 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.group.MetadataImageBuilder; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; +import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl; +import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; +import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + + @Test + public void testBuildEmptyAssignmentWhenTopologyNotReady() { + String groupId = "test-group"; + int groupEpoch = 1; + TaskAssignor assignor = mock(TaskAssignor.class); + ConfiguredTopology topology = mock(ConfiguredTopology.class); + Map assignmentConfigs = new HashMap<>(); + + when(topology.isReady()).thenReturn(false); + + TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor, assignmentConfigs) + .withTopology(topology); + + TargetAssignmentBuilder.TargetAssignmentResult result = builder.build(); + + List expectedRecords = Collections.singletonList( + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, groupEpoch) + ); + + assertEquals(expectedRecords, result.records()); + assertEquals(Collections.emptyMap(), result.targetAssignment()); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testCreateAssignmentMemberSpec(TaskRole taskRole) { + String fooSubtopologyId = Uuid.randomUuid().toString(); + String barSubtopologyId = Uuid.randomUuid().toString(); + + final Map clientTags = mkMap(mkEntry("tag1", "value1"), mkEntry("tag2", "value2")); + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .setRackId("rackId") + .setInstanceId("instanceId") + .setProcessId("processId") + .setClientTags(clientTags) + .build(); + + TasksTuple assignment = mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + ); + + AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec( + member, + assignment + ); + + assertEquals(new AssignmentMemberSpec( + Optional.of("instanceId"), + Optional.of("rackId"), + assignment.activeTasks(), + assignment.standbyTasks(), + assignment.warmupTasks(), + "processId", + clientTags, + Map.of(), + Map.of() + ), assignmentMemberSpec); + } + + @Test + public void testEmpty() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord( + "my-group", + 20 + )), result.records()); + assertEquals(Map.of(), result.targetAssignment()); + } + + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testAssignmentHasNotChanged(TaskRole taskRole) { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); + String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); + + context.addGroupMember("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + + context.addGroupMember("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord( + "my-group", + 20 + )), result.records()); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + expectedAssignment.put("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testAssignmentSwapped(TaskRole taskRole) { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); + String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); + + context.addGroupMember("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + + context.addGroupMember("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(3, result.records().size()); + + assertUnorderedRecordsEquals(List.of(List.of( + newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )), + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )) + )), result.records().subList(0, 2)); + + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(2)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + expectedAssignment.put("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testNewMember(TaskRole taskRole) { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); + String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); + + context.addGroupMember("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + + context.addGroupMember("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + context.updateMemberMetadata("member-3"); + + context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + + context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(4, result.records().size()); + + assertUnorderedRecordsEquals(List.of(List.of( + newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )), + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )), + newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )) + )), result.records().subList(0, 3)); + + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(3)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + expectedAssignment.put("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + expectedAssignment.put("member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testUpdateMember(TaskRole taskRole) { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); + String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); + + context.addGroupMember("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2) + )); + + context.addGroupMember("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 3, 4) + )); + + context.addGroupMember("member-3", mkTasksTuple(taskRole, + mkTasks(barSubtopologyId, 5, 6) + )); + + context.updateMemberMetadata( + "member-3", + Optional.of("instance-id-3"), + Optional.of("rack-0") + ); + + context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + + context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(4, result.records().size()); + + assertUnorderedRecordsEquals(List.of(List.of( + newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )), + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )), + newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )) + )), result.records().subList(0, 3)); + + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(3)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + expectedAssignment.put("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + expectedAssignment.put("member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testPartialAssignmentUpdate(TaskRole taskRole) { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, mkMapOfPartitionRacks(6)); + String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, mkMapOfPartitionRacks(6)); + + context.addGroupMember("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + + context.addGroupMember("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + + context.addGroupMember("member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4, 5), + mkTasks(barSubtopologyId, 3, 4, 5) + )); + + context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 6), + mkTasks(barSubtopologyId, 6) + )); + + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(3, result.records().size()); + + // Member 1 has no record because its assignment did not change. + assertUnorderedRecordsEquals(List.of(List.of( + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4, 5), + mkTasks(barSubtopologyId, 3, 4, 5) + )), + newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 6), + mkTasks(barSubtopologyId, 6) + )) + )), result.records().subList(0, 2)); + + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(2)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + expectedAssignment.put("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4, 5), + mkTasks(barSubtopologyId, 3, 4, 5) + )); + expectedAssignment.put("member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 6), + mkTasks(barSubtopologyId, 6) + )); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testDeleteMember(TaskRole taskRole) { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); + String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); + + context.addGroupMember("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + + context.addGroupMember("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + + context.addGroupMember("member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + context.removeMember("member-3"); + + context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(3, result.records().size()); + + assertUnorderedRecordsEquals(List.of(List.of( + newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )), + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )) + )), result.records().subList(0, 2)); + + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(2)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2, 3), + mkTasks(barSubtopologyId, 1, 2, 3) + )); + expectedAssignment.put("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 4, 5, 6), + mkTasks(barSubtopologyId, 4, 5, 6) + )); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testReplaceStaticMember(TaskRole taskRole) { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); + String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); + + context.addGroupMember("member-1", "instance-member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + + context.addGroupMember("member-2", "instance-member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + + context.addGroupMember("member-3", "instance-member-3", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + // Static member 3 leaves + context.removeMember("member-3"); + + // Another static member joins with the same instance id as the departed one + context.updateMemberMetadata("member-3-a", Optional.of("instance-member-3"), + Optional.empty()); + + context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + + context.prepareMemberAssignment("member-3-a", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(2, result.records().size()); + + assertUnorderedRecordsEquals(List.of(List.of( + newStreamsGroupTargetAssignmentRecord("my-group", "member-3-a", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )) + )), result.records().subList(0, 1)); + + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(1)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 1, 2), + mkTasks(barSubtopologyId, 1, 2) + )); + expectedAssignment.put("member-2", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 3, 4), + mkTasks(barSubtopologyId, 3, 4) + )); + + expectedAssignment.put("member-3-a", mkTasksTuple(taskRole, + mkTasks(fooSubtopologyId, 5, 6), + mkTasks(barSubtopologyId, 5, 6) + )); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + public static class TargetAssignmentBuilderTestContext { + + private final String groupId; + private final int groupEpoch; + private final TaskAssignor assignor = mock(TaskAssignor.class); + private final SortedMap subtopologies = new TreeMap<>(); + private final ConfiguredTopology topology = new ConfiguredTopology(0, Optional.of(subtopologies), new HashMap<>(), + Optional.empty()); + private final Map members = new HashMap<>(); + private final Map subscriptionMetadata = new HashMap<>(); + private final Map updatedMembers = new HashMap<>(); + private final Map targetAssignment = new HashMap<>(); + private final Map memberAssignments = new HashMap<>(); + private final Map staticMembers = new HashMap<>(); + private MetadataImageBuilder topicsImageBuilder = new MetadataImageBuilder(); + + public TargetAssignmentBuilderTestContext( + String groupId, + int groupEpoch + ) { + this.groupId = groupId; + this.groupEpoch = groupEpoch; + } + + public void addGroupMember( + String memberId, + TasksTuple targetTasks + ) { + addGroupMember(memberId, null, targetTasks); + } + + private void addGroupMember( + String memberId, + String instanceId, + TasksTuple targetTasks + ) { + StreamsGroupMember.Builder memberBuilder = new StreamsGroupMember.Builder(memberId); + memberBuilder.setProcessId("processId"); + memberBuilder.setClientTags(Map.of()); + memberBuilder.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)); + + if (instanceId != null) { + memberBuilder.setInstanceId(instanceId); + staticMembers.put(instanceId, memberId); + } else { + memberBuilder.setInstanceId(null); + } + memberBuilder.setRackId(null); + members.put(memberId, memberBuilder.build()); + targetAssignment.put(memberId, targetTasks); + } + + public String addSubtopologyWithSingleSourceTopic( + String topicName, + int numTasks, + Map> partitionRacks + ) { + String subtopologyId = Uuid.randomUuid().toString(); + Uuid topicId = Uuid.randomUuid(); + subscriptionMetadata.put(topicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata( + topicId, + topicName, + numTasks, + partitionRacks + )); + topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numTasks); + subtopologies.put(subtopologyId, new ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), Map.of())); + + return subtopologyId; + } + + public void updateMemberMetadata( + String memberId + ) { + updateMemberMetadata( + memberId, + Optional.empty(), + Optional.empty() + ); + } + + public void updateMemberMetadata( + String memberId, + Optional instanceId, + Optional rackId + ) { + StreamsGroupMember existingMember = members.get(memberId); + StreamsGroupMember.Builder builder; + if (existingMember != null) { + builder = new StreamsGroupMember.Builder(existingMember); + } else { + builder = new StreamsGroupMember.Builder(memberId); + builder.setProcessId("processId"); + builder.setRackId(null); + builder.setInstanceId(null); + builder.setClientTags(Map.of()); + builder.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)); + } + updatedMembers.put(memberId, builder + .maybeUpdateInstanceId(instanceId) + .maybeUpdateRackId(rackId) + .build()); + } + + public void removeMember( + String memberId + ) { + this.updatedMembers.put(memberId, null); + } + + public void prepareMemberAssignment( + String memberId, + TasksTuple assignment + ) { + memberAssignments.put(memberId, new MemberAssignment(assignment.activeTasks(), assignment.standbyTasks(), assignment.warmupTasks())); + } + + public org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult build() { + // Prepare expected member specs. + Map memberSpecs = new HashMap<>(); + + // All the existing members are prepared. + members.forEach((memberId, member) -> + memberSpecs.put(memberId, createAssignmentMemberSpec( + member, + targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY) + ) + )); + + // All the updated are added and all the deleted + // members are removed. + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull == null) { + memberSpecs.remove(memberId); + } else { + TasksTuple assignment = targetAssignment.getOrDefault(memberId, + TasksTuple.EMPTY); + + // A new static member joins and needs to replace an existing departed one. + if (updatedMemberOrNull.instanceId().isPresent()) { + String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId().get()); + if (previousMemberId != null && !previousMemberId.equals(memberId)) { + assignment = targetAssignment.getOrDefault(previousMemberId, + TasksTuple.EMPTY); + } + } + + memberSpecs.put(memberId, createAssignmentMemberSpec( + updatedMemberOrNull, + assignment + )); + } + }); + + // Prepare the expected topology metadata. + TopologyMetadata topologyMetadata = new TopologyMetadata(subscriptionMetadata, subtopologies); + + // Prepare the expected assignment spec. + GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new HashMap<>()); + + // We use `any` here to always return an assignment but use `verify` later on + // to ensure that the input was correct. + when(assignor.assign(any(), any())) + .thenReturn(new GroupAssignment(memberAssignments)); + + // Create and populate the assignment builder. + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder builder = new org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder( + groupId, groupEpoch, assignor, Map.of()) + .withMembers(members) + .withTopology(topology) + .withStaticMembers(staticMembers) + .withPartitionMetadata(subscriptionMetadata) + .withTargetAssignment(targetAssignment); + + // Add the updated members or delete the deleted members. + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull != null) { + builder.addOrUpdateMember(memberId, updatedMemberOrNull); + } else { + builder.removeMember(memberId); + } + }); + + // Execute the builder. + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = builder.build(); + + // Verify that the assignor was called once with the expected + // assignment spec. + verify(assignor, times(1)) + .assign(groupSpec, topologyMetadata); + + return result; + } + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java new file mode 100644 index 0000000000000..a5c18a6f0f2f8 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +class TopologyMetadataTest { + + private Map topicMetadata; + private SortedMap subtopologyMap; + private TopologyMetadata topologyMetadata; + + @BeforeEach + void setUp() { + topicMetadata = new HashMap<>(); + subtopologyMap = new TreeMap<>(); + topologyMetadata = new TopologyMetadata(topicMetadata, subtopologyMap); + } + + @Test + void testTopicMetadata() { + assertEquals(topicMetadata, topologyMetadata.topicMetadata()); + } + + @Test + void testTopology() { + assertEquals(subtopologyMap, topologyMetadata.subtopologyMap()); + } + + @Test + void testIsStateful() { + ConfiguredInternalTopic internalTopic = mock(ConfiguredInternalTopic.class); + ConfiguredSubtopology subtopology1 = mock(ConfiguredSubtopology.class); + ConfiguredSubtopology subtopology2 = mock(ConfiguredSubtopology.class); + subtopologyMap.put("subtopology1", subtopology1); + subtopologyMap.put("subtopology2", subtopology2); + when(subtopology1.stateChangelogTopics()).thenReturn(Map.of("state_changelog_topic", internalTopic)); + when(subtopology2.stateChangelogTopics()).thenReturn(Map.of()); + + assertTrue(topologyMetadata.isStateful("subtopology1")); + assertFalse(topologyMetadata.isStateful("subtopology2")); + } + + @Test + void testMaxNumInputPartitions() { + ConfiguredInternalTopic internalTopic = mock(ConfiguredInternalTopic.class); + ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class); + subtopologyMap.put("subtopology1", subtopology); + when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic")); + when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic", internalTopic)); + + TopicMetadata topicMeta1 = mock(TopicMetadata.class); + TopicMetadata topicMeta2 = mock(TopicMetadata.class); + topicMetadata.put("source_topic", topicMeta1); + topicMetadata.put("repartition_source_topic", topicMeta2); + when(topicMeta1.numPartitions()).thenReturn(3); + when(topicMeta2.numPartitions()).thenReturn(4); + + assertEquals(4, topologyMetadata.maxNumInputPartitions("subtopology1")); + } + + @Test + void testSubtopologies() { + ConfiguredSubtopology subtopology1 = mock(ConfiguredSubtopology.class); + ConfiguredSubtopology subtopology2 = mock(ConfiguredSubtopology.class); + subtopologyMap.put("subtopology1", subtopology1); + subtopologyMap.put("subtopology2", subtopology2); + + List expectedSubtopologies = List.of("subtopology1", "subtopology2"); + assertEquals(expectedSubtopologies, topologyMetadata.subtopologies()); + } + + @Test + void testIsStatefulThrowsExceptionWhenSubtopologyIdDoesNotExist() { + assertThrows(NoSuchElementException.class, () -> topologyMetadata.isStateful("non_existent_subtopology")); + } + + @Test + void testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() { + assertThrows(NoSuchElementException.class, () -> topologyMetadata.maxNumInputPartitions("non_existent_subtopology")); + } + + @Test + void testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyContainsNoSourceTopics() { + ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class); + when(subtopology.sourceTopics()).thenReturn(Set.of()); + when(subtopology.repartitionSourceTopics()).thenReturn(Map.of()); + subtopologyMap.put("subtopology1", subtopology); + + assertThrows(IllegalStateException.class, () -> topologyMetadata.maxNumInputPartitions("subtopology1")); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java index 25dada072df13..d44b24549e0f0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java @@ -253,7 +253,7 @@ public List subtopologies() { } @Override - public int numTasks(String subtopologyId) { + public int maxNumInputPartitions(String subtopologyId) { return numPartitions; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java index 112dd2281d809..542aad73deef3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java @@ -1094,7 +1094,7 @@ public List subtopologies() { } @Override - public int numTasks(String subtopologyId) throws NoSuchElementException { + public int maxNumInputPartitions(String subtopologyId) throws NoSuchElementException { return numTasks; } @@ -1112,7 +1112,7 @@ public List subtopologies() { } @Override - public int numTasks(String subtopologyId) throws NoSuchElementException { + public int maxNumInputPartitions(String subtopologyId) throws NoSuchElementException { if (subtopologyId.equals("test-subtopology1")) return 6; return 1; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java index 2d6d096235a62..a909629fa2057 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -53,7 +55,7 @@ public void testConstructorWithNullInternalTopicsToBeCreated() { assertThrows(NullPointerException.class, () -> new ConfiguredTopology( 0, - Optional.of(Map.of()), + Optional.of(new TreeMap<>()), null, Optional.empty() ) @@ -77,7 +79,7 @@ public void testConstructorWithInvalidTopologyEpoch() { assertThrows(IllegalArgumentException.class, () -> new ConfiguredTopology( -1, - Optional.of(Map.of()), + Optional.of(new TreeMap<>()), Collections.emptyMap(), Optional.empty() ) @@ -100,7 +102,7 @@ public void testNoExceptionButNoSubtopologies() { @Test public void testIsReady() { ConfiguredTopology readyTopology = new ConfiguredTopology( - 1, Optional.of(Map.of()), new HashMap<>(), Optional.empty()); + 1, Optional.of(new TreeMap<>()), new HashMap<>(), Optional.empty()); assertTrue(readyTopology.isReady()); ConfiguredTopology notReadyTopology = new ConfiguredTopology( @@ -114,7 +116,7 @@ public void testAsStreamsGroupDescribeTopology() { ConfiguredSubtopology subtopologyMock = mock(ConfiguredSubtopology.class); StreamsGroupDescribeResponseData.Subtopology subtopologyResponse = new StreamsGroupDescribeResponseData.Subtopology(); when(subtopologyMock.asStreamsGroupDescribeSubtopology(Mockito.anyString())).thenReturn(subtopologyResponse); - Map subtopologies = new HashMap<>(); + SortedMap subtopologies = new TreeMap<>(); subtopologies.put("subtopology1", subtopologyMock); Map internalTopicsToBeCreated = new HashMap<>(); Optional topicConfigurationException = Optional.empty();