diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
new file mode 100644
index 0000000000000..4c1adeec839fd
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
+import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
+import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Build the new target member assignments based on the provided parameters by calling the task assignor.
+ * As a result,
+ * it yields the records that must be persisted to the log and the new member assignments as a map from member ID to tasks tuple.
+ *
+ * Records are only created for members which have a new target assignment. If their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record is deleted as part of the member deletion process. In other
+ * words, this class does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+
+ /**
+ * The group ID.
+ */
+ private final String groupId;
+ /**
+ * The group epoch.
+ */
+ private final int groupEpoch;
+
+ /**
+ * The partition assignor used to compute the assignment.
+ */
+ private final TaskAssignor assignor;
+
+ /**
+ * The assignment configs.
+ */
+ private final Map assignmentConfigs;
+
+ /**
+ * The members which have been updated or deleted. A null value signals deleted members.
+ */
+ private final Map updatedMembers = new HashMap<>();
+
+ /**
+ * The members in the group.
+ */
+ private Map members = Map.of();
+
+ /**
+ * The partition metadata.
+ */
+ private Map partitionMetadata = Map.of();
+
+ /**
+ * The existing target assignment.
+ */
+ private Map targetAssignment = Map.of();
+
+ /**
+ * The topology.
+ */
+ private ConfiguredTopology topology;
+
+ /**
+ * The static members in the group.
+ */
+ private Map staticMembers = Map.of();
+
+ /**
+ * Constructs the object.
+ *
+ * @param groupId The group ID.
+ * @param groupEpoch The group epoch to compute a target assignment for.
+ * @param assignor The assignor to use to compute the target assignment.
+ */
+ public TargetAssignmentBuilder(
+ String groupId,
+ int groupEpoch,
+ TaskAssignor assignor,
+ Map assignmentConfigs
+ ) {
+ this.groupId = Objects.requireNonNull(groupId);
+ this.groupEpoch = groupEpoch;
+ this.assignor = Objects.requireNonNull(assignor);
+ this.assignmentConfigs = Objects.requireNonNull(assignmentConfigs);
+ }
+
+ static AssignmentMemberSpec createAssignmentMemberSpec(
+ StreamsGroupMember member,
+ TasksTuple targetAssignment
+ ) {
+ return new AssignmentMemberSpec(
+ member.instanceId(),
+ member.rackId(),
+ targetAssignment.activeTasks(),
+ targetAssignment.standbyTasks(),
+ targetAssignment.warmupTasks(),
+ member.processId(),
+ member.clientTags(),
+ Map.of(),
+ Map.of()
+ );
+ }
+
+ /**
+ * Adds all the existing members.
+ *
+ * @param members The existing members in the streams group.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withMembers(
+ Map members
+ ) {
+ this.members = members;
+ return this;
+ }
+
+ /**
+ * Adds all the existing static members.
+ *
+ * @param staticMembers The existing static members in the streams group.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withStaticMembers(
+ Map staticMembers
+ ) {
+ this.staticMembers = staticMembers;
+ return this;
+ }
+
+ /**
+ * Adds the partition metadata to use.
+ *
+ * @param partitionMetadata The partition metadata.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withPartitionMetadata(
+ Map partitionMetadata
+ ) {
+ this.partitionMetadata = partitionMetadata;
+ return this;
+ }
+
+ /**
+ * Adds the existing target assignment.
+ *
+ * @param targetAssignment The existing target assignment.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withTargetAssignment(
+ Map targetAssignment
+ ) {
+ this.targetAssignment = targetAssignment;
+ return this;
+ }
+
+ /**
+ * Adds the topology image.
+ *
+ * @param topology The topology.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withTopology(
+ ConfiguredTopology topology
+ ) {
+ this.topology = topology;
+ return this;
+ }
+
+
+ /**
+ * Adds or updates a member. This is useful when the updated member is not yet materialized in memory.
+ *
+ * @param memberId The member ID.
+ * @param member The member to add or update.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder addOrUpdateMember(
+ String memberId,
+ StreamsGroupMember member
+ ) {
+ this.updatedMembers.put(memberId, member);
+ return this;
+ }
+
+ /**
+ * Removes a member. This is useful when the removed member is not yet materialized in memory.
+ *
+ * @param memberId The member ID.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder removeMember(
+ String memberId
+ ) {
+ return addOrUpdateMember(memberId, null);
+ }
+
+ /**
+ * Builds the new target assignment.
+ *
+ * @return A TargetAssignmentResult which contains the records to update the existing target assignment.
+ * @throws TaskAssignorException if the target assignment cannot be computed.
+ */
+ public TargetAssignmentResult build() throws TaskAssignorException {
+ Map memberSpecs = new HashMap<>();
+
+ // Prepare the member spec for all members.
+ members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec(
+ member,
+ targetAssignment.getOrDefault(memberId, org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY)
+ )));
+
+ // Update the member spec if updated or deleted members.
+ updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
+ if (updatedMemberOrNull == null) {
+ memberSpecs.remove(memberId);
+ } else {
+ org.apache.kafka.coordinator.group.streams.TasksTuple assignment = targetAssignment.getOrDefault(memberId,
+ org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY);
+
+ // A new static member joins and needs to replace an existing departed one.
+ if (updatedMemberOrNull.instanceId().isPresent()) {
+ String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId().get());
+ if (previousMemberId != null && !previousMemberId.equals(memberId)) {
+ assignment = targetAssignment.getOrDefault(previousMemberId,
+ org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY);
+ }
+ }
+
+ memberSpecs.put(memberId, createAssignmentMemberSpec(
+ updatedMemberOrNull,
+ assignment
+ ));
+ }
+ });
+
+ // Compute the assignment.
+ GroupAssignment newGroupAssignment;
+ if (topology.isReady()) {
+ if (topology.subtopologies().isEmpty()) {
+ throw new IllegalStateException("Subtopologies must be present if topology is ready.");
+ }
+ newGroupAssignment = assignor.assign(
+ new GroupSpecImpl(
+ Collections.unmodifiableMap(memberSpecs),
+ assignmentConfigs
+ ),
+ new TopologyMetadata(partitionMetadata, topology.subtopologies().get())
+ );
+ } else {
+ newGroupAssignment = new GroupAssignment(
+ memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x, x -> MemberAssignment.empty())));
+ }
+
+ // Compute delta from previous to new target assignment and create the
+ // relevant records.
+ List records = new ArrayList<>();
+ Map newTargetAssignment = new HashMap<>();
+
+ memberSpecs.keySet().forEach(memberId -> {
+ org.apache.kafka.coordinator.group.streams.TasksTuple oldMemberAssignment = targetAssignment.get(memberId);
+ org.apache.kafka.coordinator.group.streams.TasksTuple newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId);
+
+ newTargetAssignment.put(memberId, newMemberAssignment);
+
+ if (oldMemberAssignment == null) {
+ // If the member had no assignment, we always create a record for it.
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+ groupId,
+ memberId,
+ newMemberAssignment
+ ));
+ } else {
+ // If the member had an assignment, we only create a record if the
+ // new assignment is different.
+ if (!newMemberAssignment.equals(oldMemberAssignment)) {
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+ groupId,
+ memberId,
+ newMemberAssignment
+ ));
+ }
+ }
+ });
+
+ // Bump the target assignment epoch.
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, groupEpoch));
+
+ return new TargetAssignmentResult(records, newTargetAssignment);
+ }
+
+ private TasksTuple newMemberAssignment(
+ GroupAssignment newGroupAssignment,
+ String memberId
+ ) {
+ MemberAssignment newMemberAssignment = newGroupAssignment.members().get(memberId);
+ if (newMemberAssignment != null) {
+ return new TasksTuple(
+ newMemberAssignment.activeTasks(),
+ newMemberAssignment.standbyTasks(),
+ newMemberAssignment.warmupTasks()
+ );
+ } else {
+ return TasksTuple.EMPTY;
+ }
+ }
+
+ /**
+ * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}.
+ *
+ * @param records The records that must be applied to the __consumer_offsets topics to persist the new target assignment.
+ * @param targetAssignment The new target assignment for the group.
+ */
+ public record TargetAssignmentResult(
+ List records,
+ Map targetAssignment
+ ) {
+ public TargetAssignmentResult {
+ Objects.requireNonNull(records);
+ Objects.requireNonNull(targetAssignment);
+ }
+ }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
new file mode 100644
index 0000000000000..d1119cfe0112d
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.stream.Stream;
+
+/**
+ * The topology metadata class is used by the {@link org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic and
+ * partition metadata for the topology that the streams group using.
+ *
+ * @param topicMetadata The topic Ids mapped to their corresponding {@link TopicMetadata} object, which contains topic and partition
+ * metadata.
+ * @param subtopologyMap The configured subtopologies
+ */
+public record TopologyMetadata(Map topicMetadata, SortedMap subtopologyMap) implements TopologyDescriber {
+
+ public TopologyMetadata {
+ topicMetadata = Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata));
+ subtopologyMap = Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
+ }
+
+ /**
+ * Map of topic names to topic metadata.
+ *
+ * @return The map of topic Ids to topic metadata.
+ */
+ @Override
+ public Map topicMetadata() {
+ return this.topicMetadata;
+ }
+
+ /**
+ * Checks whether the given subtopology is associated with a changelog topic.
+ *
+ * @param subtopologyId String identifying the subtopology.
+ * @throws NoSuchElementException if the subtopology ID does not exist.
+ * @return true if the subtopology is associated with a changelog topic, false otherwise.
+ */
+ @Override
+ public boolean isStateful(String subtopologyId) {
+ final ConfiguredSubtopology subtopology = getSubtopologyOrFail(subtopologyId);
+ return !subtopology.stateChangelogTopics().isEmpty();
+ }
+
+ /**
+ * The list of subtopologies in the topology.
+ *
+ * @return a list of subtopology IDs.
+ */
+ @Override
+ public List subtopologies() {
+ return subtopologyMap.keySet().stream().toList();
+ }
+
+ /**
+ * The maximal number of input partitions among all source topics for the given subtopology.
+ *
+ * @param subtopologyId String identifying the subtopology.
+ *
+ * @throws NoSuchElementException if the subtopology ID does not exist.
+ * @throws IllegalStateException if the subtopology contains no source topics.
+ * @return The maximal number of input partitions among all source topics for the given subtopology.
+ */
+ @Override
+ public int maxNumInputPartitions(String subtopologyId) {
+ final ConfiguredSubtopology subtopology = getSubtopologyOrFail(subtopologyId);
+ return Stream.concat(
+ subtopology.sourceTopics().stream(),
+ subtopology.repartitionSourceTopics().keySet().stream()
+ ).map(topic -> this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow(
+ () -> new IllegalStateException("Subtopology does not contain any source topics")
+ );
+ }
+
+ private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {
+ if (!subtopologyMap.containsKey(subtopologyId)) {
+ throw new NoSuchElementException(String.format("Topology does not contain subtopology %s", subtopologyId));
+ }
+ return subtopologyMap.get(subtopologyId);
+ }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
index ce0bc101101ec..3e980bad93ea4 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
@@ -46,7 +46,7 @@ public GroupAssignment assign(
Map subtopologyToActiveMember = new HashMap<>();
for (String subtopology : topologyDescriber.subtopologies()) {
- int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+ int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
subtopologyToActiveMember.put(subtopology, new String[numberOfPartitions]);
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index 00ba701d3e200..c8d3c97d50402 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -72,7 +72,7 @@ private Set taskIds(final TopologyDescriber topologyDescriber, final boo
Set ret = new HashSet<>();
for (String subtopology : topologyDescriber.subtopologies()) {
if (isActive || topologyDescriber.isStateful(subtopology)) {
- int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+ int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
for (int i = 0; i < numberOfPartitions; i++) {
ret.add(new TaskId(subtopology, i));
}
@@ -85,7 +85,7 @@ private void initialize(final GroupSpec groupSpec, final TopologyDescriber topol
localState = new LocalState();
localState.allTasks = 0;
for (String subtopology : topologyDescriber.subtopologies()) {
- int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+ int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
localState.allTasks += numberOfPartitions;
}
localState.totalCapacity = groupSpec.members().size();
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
index 2f913bf5514e8..d6f7a3ab579c6 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
@@ -20,30 +20,34 @@
import java.util.NoSuchElementException;
/**
- * The subscribed topic describer is used by the {@link TaskAssignor} to obtain topic and task metadata of the groups topology.
+ * The topology describer is used by the {@link TaskAssignor} to get topic and task metadata of the group's topology.
*/
public interface TopologyDescriber {
/**
+ * Map of topic names to topic metadata.
+ *
* @return The list of subtopologies IDs.
*/
List subtopologies();
/**
- * The number of tasks for the given subtopology.
+ * The maximal number of input partitions among all source topics for the given subtopology.
*
* @param subtopologyId String identifying the subtopology.
*
- * @return The number of tasks corresponding to the given subtopology ID.
- * @throws NoSuchElementException if subtopology does not exist in the topology.
+ * @throws NoSuchElementException if the subtopology ID does not exist.
+ * @throws IllegalStateException if the subtopology contains no source topics.
+ * @return The maximal number of input partitions among all source topics for the given subtopology.
*/
- int numTasks(String subtopologyId) throws NoSuchElementException;
+ int maxNumInputPartitions(String subtopologyId) throws NoSuchElementException;
/**
- * Whether the given subtopology is stateful.
+ * Checks whether the given subtopology is associated with a changelog topic.
*
* @param subtopologyId String identifying the subtopology.
- * @return true if the subtopology is stateful, false otherwise.
+ * @throws NoSuchElementException if the subtopology ID does not exist.
+ * @return true if the subtopology is associated with a changelog topic, false otherwise.
*/
boolean isStateful(String subtopologyId);
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
index b6ccb87f7a224..6c7eede16fc40 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.SortedMap;
import java.util.stream.Collectors;
/**
@@ -41,7 +42,7 @@
* reported back to the client.
*/
public record ConfiguredTopology(int topologyEpoch,
- Optional