Skip to content

Commit

Permalink
KAFKA-18655: Implement the consumer group size counter with scheduled…
Browse files Browse the repository at this point in the history
… task (#18717)

During testing we discovered that the empty group count is not updated in group conversion, but when the new group is transition to other state, the empty group count is decremented. This could result in negative empty group count.

We can have a new consumer group count implementation that follows the pattern we did for the classic group count. The timeout task periodically refreshes the metrics based on the current groups soft state.

Reviewers: Jeff Kim <[email protected]>
  • Loading branch information
dongnuo123 authored Feb 3, 2025
1 parent 7fdd112 commit 1a106e4
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,11 @@ public GroupCoordinatorShard build() {
static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";

/**
* The classic group size counter key to schedule a timer task.
* The classic and consumer group size counter key to schedule a timer task.
*
* Visible for testing.
*/
static final String CLASSIC_GROUP_SIZE_COUNTER_KEY = "classic-group-size-counter";
static final String GROUP_SIZE_COUNTER_KEY = "group-size-counter";

/**
* Hardcoded default value of the interval to update the classic group size counter.
Expand Down Expand Up @@ -699,27 +699,27 @@ public CoordinatorResult<Void, CoordinatorRecord> onPartitionsDeleted(
}

/**
* Schedules (or reschedules) the group size counter for the classic groups.
* Schedules (or reschedules) the group size counter for the classic/consumer groups.
*/
private void scheduleClassicGroupSizeCounter() {
private void scheduleGroupSizeCounter() {
timer.schedule(
CLASSIC_GROUP_SIZE_COUNTER_KEY,
GROUP_SIZE_COUNTER_KEY,
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS,
true,
() -> {
groupMetadataManager.updateClassicGroupSizeCounter();
scheduleClassicGroupSizeCounter();
groupMetadataManager.updateGroupSizeCounter();
scheduleGroupSizeCounter();
return GroupMetadataManager.EMPTY_RESULT;
}
);
}

/**
* Cancels the group size counter for the classic groups.
* Cancels the group size counter for the classic/consumer groups.
*/
private void cancelClassicGroupSizeCounter() {
timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY);
private void cancelGroupSizeCounter() {
timer.cancel(GROUP_SIZE_COUNTER_KEY);
}

/**
Expand All @@ -736,15 +736,15 @@ public void onLoaded(MetadataImage newImage) {

groupMetadataManager.onLoaded();
scheduleGroupMetadataExpiration();
scheduleClassicGroupSizeCounter();
scheduleGroupSizeCounter();
}

@Override
public void onUnloaded() {
timer.cancel(GROUP_EXPIRATION_KEY);
coordinatorMetrics.deactivateMetricsShard(metricsShard);
groupMetadataManager.onUnloaded();
cancelClassicGroupSizeCounter();
cancelGroupSizeCounter();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;

/**
* The GroupMetadataManager manages the metadata of all classic and consumer groups. It holds
Expand Down Expand Up @@ -745,7 +744,6 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
return consumerGroup;
} else if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
Expand All @@ -756,7 +754,6 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
// replaying consumer group records after offset commit records would not work.
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
return consumerGroup;
} else {
throw new IllegalStateException(String.format("Group %s is not a consumer group", groupId));
Expand Down Expand Up @@ -1134,24 +1131,7 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator
private void removeGroup(
String groupId
) {
Group group = groups.remove(groupId);
if (group != null) {
switch (group.type()) {
case CONSUMER:
ConsumerGroup consumerGroup = (ConsumerGroup) group;
metrics.onConsumerGroupStateTransition(consumerGroup.state(), null);
break;
case CLASSIC:
// The classic group size counter is implemented as scheduled task.
break;
case SHARE:
// Nothing for now, but we may want to add metrics in the future.
break;
default:
log.warn("Removed group {} with an unknown group type {}.", groupId, group.type());
break;
}
}
groups.remove(groupId);
}

/**
Expand Down Expand Up @@ -4137,16 +4117,25 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
}

/**
* Counts and updates the number of classic groups in different states.
* Counts and updates the number of classic and consumer groups in different states.
*/
public void updateClassicGroupSizeCounter() {
Map<ClassicGroupState, Long> groupSizeCounter = new HashMap<>();
public void updateGroupSizeCounter() {
Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
Map<ConsumerGroup.ConsumerGroupState, Long> consumerGroupSizeCounter = new HashMap<>();
groups.forEach((__, group) -> {
if (group.type() == CLASSIC) {
groupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue);
switch (group.type()) {
case CLASSIC:
classicGroupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue);
break;
case CONSUMER:
consumerGroupSizeCounter.compute(((ConsumerGroup) group).state(), Utils::incValue);
break;
default:
break;
}
});
metrics.setClassicGroupGauges(groupSizeCounter);
metrics.setClassicGroupGauges(classicGroupSizeCounter);
metrics.setConsumerGroupGauges(consumerGroupSizeCounter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) {
/**
* Consumer group size gauge counters keyed by the metric name.
*/
private final Map<ConsumerGroupState, TimelineGaugeCounter> consumerGroupGauges;
private volatile Map<ConsumerGroupState, Long> consumerGroupGauges;

/**
* Share group size gauge counters keyed by the metric name.
Expand Down Expand Up @@ -108,19 +108,7 @@ public GroupCoordinatorMetricsShard(
numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0));

this.classicGroupGauges = Collections.emptyMap();

this.consumerGroupGauges = Utils.mkMap(
Utils.mkEntry(ConsumerGroupState.EMPTY,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.ASSIGNING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.RECONCILING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.STABLE,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.DEAD,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
);
this.consumerGroupGauges = Collections.emptyMap();

this.shareGroupGauges = Utils.mkMap(
Utils.mkEntry(ShareGroup.ShareGroupState.EMPTY,
Expand All @@ -145,17 +133,15 @@ public void incrementNumOffsets() {
}

/**
* Increment the number of consumer groups.
* Set the number of consumer groups.
* This method should be the only way to update the map and is called by the scheduled task
* that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
* Breaking this will result in inconsistent behavior.
*
* @param state the consumer group state.
* @param consumerGroupGauges The map counting the number of consumer groups in each state.
*/
public void incrementNumConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.increment();
}
}
public void setConsumerGroupGauges(Map<ConsumerGroupState, Long> consumerGroupGauges) {
this.consumerGroupGauges = consumerGroupGauges;
}

/**
Expand All @@ -167,20 +153,6 @@ public void decrementNumOffsets() {
}
}

/**
* Decrement the number of consumer groups.
*
* @param state the consumer group state.
*/
public void decrementNumConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.decrement();
}
}
}

/**
* @return The number of offsets.
*/
Expand Down Expand Up @@ -219,9 +191,9 @@ public long numClassicGroups() {
* @return The number of consumer groups in `state`.
*/
public long numConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
return gaugeCounter.atomicLong.get();
Long counter = consumerGroupGauges.get(state);
if (counter != null) {
return counter;
}
return 0L;
}
Expand All @@ -231,7 +203,7 @@ public long numConsumerGroups(ConsumerGroupState state) {
*/
public long numConsumerGroups() {
return consumerGroupGauges.values().stream()
.mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum();
.mapToLong(Long::longValue).sum();
}

@Override
Expand All @@ -257,14 +229,6 @@ public TopicPartition topicPartition() {

@Override
public void commitUpTo(long offset) {
this.consumerGroupGauges.forEach((__, gaugeCounter) -> {
long value;
synchronized (gaugeCounter.timelineLong) {
value = gaugeCounter.timelineLong.get(offset);
}
gaugeCounter.atomicLong.set(value);
});

synchronized (numClassicGroupsTimelineCounter.timelineLong) {
long value = numClassicGroupsTimelineCounter.timelineLong.get(offset);
numClassicGroupsTimelineCounter.atomicLong.set(value);
Expand All @@ -286,65 +250,18 @@ public void commitUpTo(long offset) {

/**
* Sets the classicGroupGauges.
* This method should be the only way to update the map and is called by the scheduled task
* that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
* Breaking this will result in inconsistent behavior.
*
* @param classicGroupGauges The new classicGroupGauges.
* @param classicGroupGauges The map counting the number of classic groups in each state.
*/
public void setClassicGroupGauges(
Map<ClassicGroupState, Long> classicGroupGauges
) {
this.classicGroupGauges = classicGroupGauges;
}

/**
* Called when a consumer group's state has changed. Increment/decrement
* the counter accordingly.
*
* @param oldState The previous state. null value means that it's a new group.
* @param newState The next state. null value means that the group has been removed.
*/
public void onConsumerGroupStateTransition(
ConsumerGroupState oldState,
ConsumerGroupState newState
) {
if (newState != null) {
switch (newState) {
case EMPTY:
incrementNumConsumerGroups(ConsumerGroupState.EMPTY);
break;
case ASSIGNING:
incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
break;
case RECONCILING:
incrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
break;
case STABLE:
incrementNumConsumerGroups(ConsumerGroupState.STABLE);
break;
case DEAD:
incrementNumConsumerGroups(ConsumerGroupState.DEAD);
}
}

if (oldState != null) {
switch (oldState) {
case EMPTY:
decrementNumConsumerGroups(ConsumerGroupState.EMPTY);
break;
case ASSIGNING:
decrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
break;
case RECONCILING:
decrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
break;
case STABLE:
decrementNumConsumerGroups(ConsumerGroupState.STABLE);
break;
case DEAD:
decrementNumConsumerGroups(ConsumerGroupState.DEAD);
}
}
}

public void incrementNumShareGroups(ShareGroup.ShareGroupState state) {
TimelineGaugeCounter gaugeCounter = shareGroupGauges.get(state);
if (gaugeCounter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,6 @@ protected void maybeUpdateGroupSubscriptionType() {

@Override
protected void maybeUpdateGroupState() {
ConsumerGroupState previousState = state.get();
ConsumerGroupState newState = STABLE;
if (members.isEmpty()) {
newState = EMPTY;
Expand All @@ -901,7 +900,6 @@ protected void maybeUpdateGroupState() {
}

state.set(newState);
metrics.onConsumerGroupStateTransition(previousState, newState);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@
import java.util.Set;

import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_SIZE_COUNTER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -1024,7 +1024,7 @@ public void testCleanupGroupMetadata() {
}

@Test
public void testScheduleClassicGroupSizeCounter() {
public void testScheduleGroupSizeCounter() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
Expand All @@ -1046,21 +1046,21 @@ public void testScheduleClassicGroupSizeCounter() {
);
coordinator.onLoaded(MetadataImage.EMPTY);

// The classic group size counter is scheduled.
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
);

// Advance the timer to trigger the update.
time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1);
timer.poll();
verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter();
verify(groupMetadataManager, times(1)).updateGroupSizeCounter();

// The classic group size counter is scheduled.
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
);
}

Expand Down
Loading

0 comments on commit 1a106e4

Please sign in to comment.