diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 85fb54761b63b..86c7e8abba7b0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -258,11 +258,11 @@ public GroupCoordinatorShard build() { static final String GROUP_EXPIRATION_KEY = "expire-group-metadata"; /** - * The classic group size counter key to schedule a timer task. + * The classic and consumer group size counter key to schedule a timer task. * * Visible for testing. */ - static final String CLASSIC_GROUP_SIZE_COUNTER_KEY = "classic-group-size-counter"; + static final String GROUP_SIZE_COUNTER_KEY = "group-size-counter"; /** * Hardcoded default value of the interval to update the classic group size counter. @@ -699,27 +699,27 @@ public CoordinatorResult onPartitionsDeleted( } /** - * Schedules (or reschedules) the group size counter for the classic groups. + * Schedules (or reschedules) the group size counter for the classic/consumer groups. */ - private void scheduleClassicGroupSizeCounter() { + private void scheduleGroupSizeCounter() { timer.schedule( - CLASSIC_GROUP_SIZE_COUNTER_KEY, + GROUP_SIZE_COUNTER_KEY, DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS, true, () -> { - groupMetadataManager.updateClassicGroupSizeCounter(); - scheduleClassicGroupSizeCounter(); + groupMetadataManager.updateGroupSizeCounter(); + scheduleGroupSizeCounter(); return GroupMetadataManager.EMPTY_RESULT; } ); } /** - * Cancels the group size counter for the classic groups. + * Cancels the group size counter for the classic/consumer groups. */ - private void cancelClassicGroupSizeCounter() { - timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY); + private void cancelGroupSizeCounter() { + timer.cancel(GROUP_SIZE_COUNTER_KEY); } /** @@ -736,7 +736,7 @@ public void onLoaded(MetadataImage newImage) { groupMetadataManager.onLoaded(); scheduleGroupMetadataExpiration(); - scheduleClassicGroupSizeCounter(); + scheduleGroupSizeCounter(); } @Override @@ -744,7 +744,7 @@ public void onUnloaded() { timer.cancel(GROUP_EXPIRATION_KEY); coordinatorMetrics.deactivateMetricsShard(metricsShard); groupMetadataManager.onUnloaded(); - cancelClassicGroupSizeCounter(); + cancelGroupSizeCounter(); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index e1e151d53d3f9..ab2720680d801 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -188,7 +188,6 @@ import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged; -import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged; /** * The GroupMetadataManager manages the metadata of all classic and consumer groups. It holds @@ -745,7 +744,6 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup( if (group == null) { ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); groups.put(groupId, consumerGroup); - metrics.onConsumerGroupStateTransition(null, consumerGroup.state()); return consumerGroup; } else if (group.type() == CONSUMER) { return (ConsumerGroup) group; @@ -756,7 +754,6 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup( // replaying consumer group records after offset commit records would not work. ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); groups.put(groupId, consumerGroup); - metrics.onConsumerGroupStateTransition(null, consumerGroup.state()); return consumerGroup; } else { throw new IllegalStateException(String.format("Group %s is not a consumer group", groupId)); @@ -1134,24 +1131,7 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List groupSizeCounter = new HashMap<>(); + public void updateGroupSizeCounter() { + Map classicGroupSizeCounter = new HashMap<>(); + Map consumerGroupSizeCounter = new HashMap<>(); groups.forEach((__, group) -> { - if (group.type() == CLASSIC) { - groupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue); + switch (group.type()) { + case CLASSIC: + classicGroupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue); + break; + case CONSUMER: + consumerGroupSizeCounter.compute(((ConsumerGroup) group).state(), Utils::incValue); + break; + default: + break; } }); - metrics.setClassicGroupGauges(groupSizeCounter); + metrics.setClassicGroupGauges(classicGroupSizeCounter); + metrics.setConsumerGroupGauges(consumerGroupSizeCounter); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java index 219a4f0a22cac..1ed75229f58f5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java @@ -71,7 +71,7 @@ public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) { /** * Consumer group size gauge counters keyed by the metric name. */ - private final Map consumerGroupGauges; + private volatile Map consumerGroupGauges; /** * Share group size gauge counters keyed by the metric name. @@ -108,19 +108,7 @@ public GroupCoordinatorMetricsShard( numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)); this.classicGroupGauges = Collections.emptyMap(); - - this.consumerGroupGauges = Utils.mkMap( - Utils.mkEntry(ConsumerGroupState.EMPTY, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))), - Utils.mkEntry(ConsumerGroupState.ASSIGNING, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))), - Utils.mkEntry(ConsumerGroupState.RECONCILING, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))), - Utils.mkEntry(ConsumerGroupState.STABLE, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))), - Utils.mkEntry(ConsumerGroupState.DEAD, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))) - ); + this.consumerGroupGauges = Collections.emptyMap(); this.shareGroupGauges = Utils.mkMap( Utils.mkEntry(ShareGroup.ShareGroupState.EMPTY, @@ -145,17 +133,15 @@ public void incrementNumOffsets() { } /** - * Increment the number of consumer groups. + * Set the number of consumer groups. + * This method should be the only way to update the map and is called by the scheduled task + * that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}. + * Breaking this will result in inconsistent behavior. * - * @param state the consumer group state. + * @param consumerGroupGauges The map counting the number of consumer groups in each state. */ - public void incrementNumConsumerGroups(ConsumerGroupState state) { - TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state); - if (gaugeCounter != null) { - synchronized (gaugeCounter.timelineLong) { - gaugeCounter.timelineLong.increment(); - } - } + public void setConsumerGroupGauges(Map consumerGroupGauges) { + this.consumerGroupGauges = consumerGroupGauges; } /** @@ -167,20 +153,6 @@ public void decrementNumOffsets() { } } - /** - * Decrement the number of consumer groups. - * - * @param state the consumer group state. - */ - public void decrementNumConsumerGroups(ConsumerGroupState state) { - TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state); - if (gaugeCounter != null) { - synchronized (gaugeCounter.timelineLong) { - gaugeCounter.timelineLong.decrement(); - } - } - } - /** * @return The number of offsets. */ @@ -219,9 +191,9 @@ public long numClassicGroups() { * @return The number of consumer groups in `state`. */ public long numConsumerGroups(ConsumerGroupState state) { - TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state); - if (gaugeCounter != null) { - return gaugeCounter.atomicLong.get(); + Long counter = consumerGroupGauges.get(state); + if (counter != null) { + return counter; } return 0L; } @@ -231,7 +203,7 @@ public long numConsumerGroups(ConsumerGroupState state) { */ public long numConsumerGroups() { return consumerGroupGauges.values().stream() - .mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum(); + .mapToLong(Long::longValue).sum(); } @Override @@ -257,14 +229,6 @@ public TopicPartition topicPartition() { @Override public void commitUpTo(long offset) { - this.consumerGroupGauges.forEach((__, gaugeCounter) -> { - long value; - synchronized (gaugeCounter.timelineLong) { - value = gaugeCounter.timelineLong.get(offset); - } - gaugeCounter.atomicLong.set(value); - }); - synchronized (numClassicGroupsTimelineCounter.timelineLong) { long value = numClassicGroupsTimelineCounter.timelineLong.get(offset); numClassicGroupsTimelineCounter.atomicLong.set(value); @@ -286,8 +250,11 @@ public void commitUpTo(long offset) { /** * Sets the classicGroupGauges. + * This method should be the only way to update the map and is called by the scheduled task + * that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}. + * Breaking this will result in inconsistent behavior. * - * @param classicGroupGauges The new classicGroupGauges. + * @param classicGroupGauges The map counting the number of classic groups in each state. */ public void setClassicGroupGauges( Map classicGroupGauges @@ -295,56 +262,6 @@ public void setClassicGroupGauges( this.classicGroupGauges = classicGroupGauges; } - /** - * Called when a consumer group's state has changed. Increment/decrement - * the counter accordingly. - * - * @param oldState The previous state. null value means that it's a new group. - * @param newState The next state. null value means that the group has been removed. - */ - public void onConsumerGroupStateTransition( - ConsumerGroupState oldState, - ConsumerGroupState newState - ) { - if (newState != null) { - switch (newState) { - case EMPTY: - incrementNumConsumerGroups(ConsumerGroupState.EMPTY); - break; - case ASSIGNING: - incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING); - break; - case RECONCILING: - incrementNumConsumerGroups(ConsumerGroupState.RECONCILING); - break; - case STABLE: - incrementNumConsumerGroups(ConsumerGroupState.STABLE); - break; - case DEAD: - incrementNumConsumerGroups(ConsumerGroupState.DEAD); - } - } - - if (oldState != null) { - switch (oldState) { - case EMPTY: - decrementNumConsumerGroups(ConsumerGroupState.EMPTY); - break; - case ASSIGNING: - decrementNumConsumerGroups(ConsumerGroupState.ASSIGNING); - break; - case RECONCILING: - decrementNumConsumerGroups(ConsumerGroupState.RECONCILING); - break; - case STABLE: - decrementNumConsumerGroups(ConsumerGroupState.STABLE); - break; - case DEAD: - decrementNumConsumerGroups(ConsumerGroupState.DEAD); - } - } - } - public void incrementNumShareGroups(ShareGroup.ShareGroupState state) { TimelineGaugeCounter gaugeCounter = shareGroupGauges.get(state); if (gaugeCounter != null) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 58d70ffe99aff..16e0d0fae4a18 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -885,7 +885,6 @@ protected void maybeUpdateGroupSubscriptionType() { @Override protected void maybeUpdateGroupState() { - ConsumerGroupState previousState = state.get(); ConsumerGroupState newState = STABLE; if (members.isEmpty()) { newState = EMPTY; @@ -901,7 +900,6 @@ protected void maybeUpdateGroupState() { } state.set(newState); - metrics.onConsumerGroupStateTransition(previousState, newState); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 4648f35670b19..ea0c2b02e4d45 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -79,9 +79,9 @@ import java.util.Set; import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext; -import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY; import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS; import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY; +import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_SIZE_COUNTER_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -1024,7 +1024,7 @@ public void testCleanupGroupMetadata() { } @Test - public void testScheduleClassicGroupSizeCounter() { + public void testScheduleGroupSizeCounter() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); @@ -1046,21 +1046,21 @@ public void testScheduleClassicGroupSizeCounter() { ); coordinator.onLoaded(MetadataImage.EMPTY); - // The classic group size counter is scheduled. + // The counter is scheduled. assertEquals( DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS, - timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds() + timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds() ); // Advance the timer to trigger the update. time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1); timer.poll(); - verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter(); + verify(groupMetadataManager, times(1)).updateGroupSizeCounter(); - // The classic group size counter is scheduled. + // The counter is scheduled. assertEquals( DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS, - timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds() + timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds() ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4e4e7cf7645cb..1287f121327d8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -3589,42 +3589,71 @@ public void testOnLoaded() { } @Test - public void testUpdateClassicGroupSizeCounter() { - String groupId0 = "group-0"; - String groupId1 = "group-1"; - String groupId2 = "group-2"; - String groupId3 = "group-3"; - String groupId4 = "group-4"; + public void testUpdateGroupSizeCounter() { + List groupIds = new ArrayList<>(); + IntStream.range(0, 8).forEach(i -> groupIds.add("group-" + i)); + List consumerMemberIds = List.of("consumer-member-id-0", "consumer-member-id-1", "consumer-member-id-2"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroup(new ConsumerGroupBuilder(groupId0, 10)) + .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(0), 10)) // Empty group + .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(1), 10) // Stable group + .withAssignmentEpoch(10) + .withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(0)) + .setMemberEpoch(10) + .build())) + .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(2), 10) // Assigning group + .withAssignmentEpoch(9) + .withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(1)) + .setMemberEpoch(9) + .build())) + .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(3), 10) // Reconciling group + .withAssignmentEpoch(10) + .withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(2)) + .setMemberEpoch(9) + .build())) .build(); - ClassicGroup group1 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId1, true); - ClassicGroup group2 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId2, true); - ClassicGroup group3 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId3, true); - ClassicGroup group4 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId4, true); + ClassicGroup group4 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(4), true); + ClassicGroup group5 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(5), true); + ClassicGroup group6 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(6), true); + ClassicGroup group7 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(7), true); - context.groupMetadataManager.updateClassicGroupSizeCounter(); + context.groupMetadataManager.updateGroupSizeCounter(); verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap( Utils.mkEntry(ClassicGroupState.EMPTY, 4L) ))); + verify(context.metrics, times(1)).setConsumerGroupGauges(eq(Utils.mkMap( + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.EMPTY, 1L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.ASSIGNING, 1L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.RECONCILING, 1L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.STABLE, 1L) + ))); - group1.transitionTo(PREPARING_REBALANCE); - group2.transitionTo(PREPARING_REBALANCE); - group2.transitionTo(COMPLETING_REBALANCE); - group3.transitionTo(PREPARING_REBALANCE); - group3.transitionTo(COMPLETING_REBALANCE); - group3.transitionTo(STABLE); - group4.transitionTo(DEAD); + group4.transitionTo(PREPARING_REBALANCE); + group5.transitionTo(PREPARING_REBALANCE); + group5.transitionTo(COMPLETING_REBALANCE); + group6.transitionTo(PREPARING_REBALANCE); + group6.transitionTo(COMPLETING_REBALANCE); + group6.transitionTo(STABLE); + group7.transitionTo(DEAD); - context.groupMetadataManager.updateClassicGroupSizeCounter(); + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupIds.get(1), false, Collections.emptyList()) + .removeMember(consumerMemberIds.get(0)); + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupIds.get(3), false, Collections.emptyList()) + .updateMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(2)).setMemberEpoch(10).build()); + + context.groupMetadataManager.updateGroupSizeCounter(); verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap( Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L), Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L), Utils.mkEntry(ClassicGroupState.STABLE, 1L), Utils.mkEntry(ClassicGroupState.DEAD, 1L) ))); + verify(context.metrics, times(1)).setConsumerGroupGauges(eq(Utils.mkMap( + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.EMPTY, 2L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.ASSIGNING, 1L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.STABLE, 1L) + ))); } @Test @@ -9548,75 +9577,6 @@ public void testConsumerGroupRebalanceSensor() { verify(context.metrics).record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); } - @Test - public void testOnClassicGroupStateTransitionOnLoading() { - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .build(); - - ClassicGroup group = new ClassicGroup( - new LogContext(), - "group-id", - EMPTY, - context.time - ); - - // Even if there are more group metadata records loaded than tombstone records, the last replayed record - // (tombstone in this test) is the latest state of the group. Hence, the overall metric count should be 0. - IntStream.range(0, 5).forEach(__ -> - context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, Collections.emptyMap())) - ); - IntStream.range(0, 4).forEach(__ -> - context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id")) - ); - } - - @Test - public void testOnConsumerGroupStateTransition() { - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .build(); - - // Replaying a consumer group epoch record should increment metric. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 1)); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); - - // Replaying a consumer group epoch record for a group that has already been created should not increment metric. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 1)); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); - - // Creating and replaying tombstones for a group should remove group and decrement metric. - List tombstones = new ArrayList<>(); - Group group = context.groupMetadataManager.group("group-id"); - group.createGroupTombstoneRecords(tombstones); - tombstones.forEach(context::replay); - assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.group("group-id")); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); - - // Replaying a tombstone for a group that has already been removed should not decrement metric. - tombstones.forEach(context::replay); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); - } - - @Test - public void testOnConsumerGroupStateTransitionOnLoading() { - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .build(); - - // Even if there are more group epoch records loaded than tombstone records, the last replayed record - // (tombstone in this test) is the latest state of the group. Hence, the overall metric count should be 0. - IntStream.range(0, 5).forEach(__ -> - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 0)) - ); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id")); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id")); - IntStream.range(0, 3).forEach(__ -> { - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id")); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id")); - }); - - verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); - } - @Test public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() { String classicGroupId = "classic-group-id"; @@ -11153,8 +11113,6 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) result.records() ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) @@ -11340,8 +11298,6 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) timeout.result.records() ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) @@ -11545,8 +11501,6 @@ memberId2, new MemberAssignmentImpl(mkAssignment( timeout.result.records() ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) @@ -11780,8 +11734,6 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 2) result.records ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) @@ -14298,8 +14250,6 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 2) leaveResult.records() ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java index 06ffeaa84d139..7dc3a4d17f637 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java @@ -17,22 +17,15 @@ package org.apache.kafka.coordinator.group.metrics; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; -import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.timeline.SnapshotRegistry; import com.yammer.metrics.core.MetricsRegistry; import org.junit.jupiter.api.Test; -import java.util.Collections; -import java.util.stream.IntStream; - -import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue; import static org.junit.jupiter.api.Assertions.assertEquals; public class GroupCoordinatorMetricsShardTest { @@ -47,135 +40,18 @@ public void testTimelineGaugeCounters() { GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); shard.incrementNumOffsets(); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD); snapshotRegistry.idempotentCreateSnapshot(1000); // The value should not be updated until the offset has been committed. assertEquals(0, shard.numOffsets()); - assertEquals(0, shard.numConsumerGroups()); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD)); shard.commitUpTo(1000); assertEquals(1, shard.numOffsets()); - assertEquals(5, shard.numConsumerGroups()); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD)); shard.decrementNumOffsets(); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD); snapshotRegistry.idempotentCreateSnapshot(2000); shard.commitUpTo(2000); assertEquals(0, shard.numOffsets()); - assertEquals(0, shard.numConsumerGroups()); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD)); - } - - @Test - public void testConsumerGroupStateTransitionMetrics() { - MetricsRegistry registry = new MetricsRegistry(); - Metrics metrics = new Metrics(); - SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - TopicPartition tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); - GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); - GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); - coordinatorMetrics.activateMetricsShard(shard); - - ConsumerGroup group0 = new ConsumerGroup( - snapshotRegistry, - "group-0", - shard - ); - ConsumerGroup group1 = new ConsumerGroup( - snapshotRegistry, - "group-1", - shard - ); - ConsumerGroup group2 = new ConsumerGroup( - snapshotRegistry, - "group-2", - shard - ); - ConsumerGroup group3 = new ConsumerGroup( - snapshotRegistry, - "group-3", - shard - ); - - IntStream.range(0, 4).forEach(__ -> shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - - snapshotRegistry.idempotentCreateSnapshot(1000); - shard.commitUpTo(1000); - assertEquals(4, shard.numConsumerGroups()); - assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - - ConsumerGroupMember member0 = group0.getOrMaybeCreateMember("member-id", true); - ConsumerGroupMember member1 = group1.getOrMaybeCreateMember("member-id", true); - ConsumerGroupMember member2 = group2.getOrMaybeCreateMember("member-id", true); - ConsumerGroupMember member3 = group3.getOrMaybeCreateMember("member-id", true); - group0.updateMember(member0); - group1.updateMember(member1); - group2.updateMember(member2); - group3.updateMember(member3); - - snapshotRegistry.idempotentCreateSnapshot(2000); - shard.commitUpTo(2000); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - - group2.setGroupEpoch(1); - group3.setGroupEpoch(1); - - snapshotRegistry.idempotentCreateSnapshot(3000); - shard.commitUpTo(3000); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - - group2.setTargetAssignmentEpoch(1); - - // Set member2 to ASSIGNING state. - new ConsumerGroupMember.Builder(member2) - .setPartitionsPendingRevocation(Collections.singletonMap(Uuid.ZERO_UUID, Collections.singleton(0))) - .build(); - - snapshotRegistry.idempotentCreateSnapshot(4000); - shard.commitUpTo(4000); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING)); - assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - - assertGaugeValue(metrics, metrics.metricName("group-count", "group-coordinator-metrics", - Collections.singletonMap("protocol", "consumer")), 4); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.EMPTY.toString())), 0); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())), 1); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.RECONCILING.toString())), 1); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.STABLE.toString())), 2); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.DEAD.toString())), 0); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java index d04aa5338736d..05da88f9f7a78 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.stream.IntStream; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; @@ -181,9 +182,11 @@ public void aggregateShards() { Utils.mkEntry(ClassicGroupState.DEAD, 1L) )); - IntStream.range(0, 5).forEach(__ -> shard0.incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING)); - IntStream.range(0, 5).forEach(__ -> shard1.incrementNumConsumerGroups(ConsumerGroupState.RECONCILING)); - IntStream.range(0, 3).forEach(__ -> shard1.decrementNumConsumerGroups(ConsumerGroupState.DEAD)); + shard0.setConsumerGroupGauges(Collections.singletonMap(ConsumerGroupState.ASSIGNING, 5L)); + shard1.setConsumerGroupGauges(Map.of( + ConsumerGroupState.RECONCILING, 1L, + ConsumerGroupState.DEAD, 1L + )); IntStream.range(0, 6).forEach(__ -> shard0.incrementNumOffsets()); IntStream.range(0, 2).forEach(__ -> shard1.incrementNumOffsets()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index c2e091aa3548b..886820e55138a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -81,8 +81,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; public class ConsumerGroupTest { @@ -426,7 +424,6 @@ public void testWaitingOnUnreleasedPartition() { @Test public void testGroupState() { - Uuid fooTopicId = Uuid.randomUuid(); ConsumerGroup consumerGroup = createConsumerGroup("foo"); assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state()); @@ -1295,46 +1292,6 @@ public void testAsDescribedGroup() { assertEquals(expected, actual); } - @Test - public void testStateTransitionMetrics() { - // Confirm metrics is not updated when a new ConsumerGroup is created but only when the group transitions - // its state. - GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); - ConsumerGroup consumerGroup = new ConsumerGroup( - new SnapshotRegistry(new LogContext()), - "group-id", - metrics - ); - - assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state()); - verify(metrics, times(0)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); - - ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") - .setMemberEpoch(1) - .setPreviousMemberEpoch(0) - .build(); - - consumerGroup.updateMember(member); - - assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, consumerGroup.state()); - verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, ConsumerGroup.ConsumerGroupState.RECONCILING); - - consumerGroup.setGroupEpoch(1); - - assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, consumerGroup.state()); - verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING, ConsumerGroup.ConsumerGroupState.ASSIGNING); - - consumerGroup.setTargetAssignmentEpoch(1); - - assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, consumerGroup.state()); - verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.ASSIGNING, ConsumerGroup.ConsumerGroupState.STABLE); - - consumerGroup.removeMember("member"); - - assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state()); - verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, ConsumerGroup.ConsumerGroupState.EMPTY); - } - @Test public void testIsInStatesCaseInsensitive() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());