Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18655: Implement the consumer group size counter with scheduled task #18717

Merged
merged 6 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,11 @@ public GroupCoordinatorShard build() {
static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";

/**
* The classic group size counter key to schedule a timer task.
* The classic and consumer group size counter key to schedule a timer task.
*
* Visible for testing.
*/
static final String CLASSIC_GROUP_SIZE_COUNTER_KEY = "classic-group-size-counter";
static final String GROUP_SIZE_COUNTER_KEY = "group-size-counter";

/**
* Hardcoded default value of the interval to update the classic group size counter.
Expand Down Expand Up @@ -699,27 +699,27 @@ public CoordinatorResult<Void, CoordinatorRecord> onPartitionsDeleted(
}

/**
* Schedules (or reschedules) the group size counter for the classic groups.
* Schedules (or reschedules) the group size counter for the classic/consumer groups.
*/
private void scheduleClassicGroupSizeCounter() {
private void scheduleGroupSizeCounter() {
timer.schedule(
CLASSIC_GROUP_SIZE_COUNTER_KEY,
GROUP_SIZE_COUNTER_KEY,
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS,
true,
() -> {
groupMetadataManager.updateClassicGroupSizeCounter();
scheduleClassicGroupSizeCounter();
groupMetadataManager.updateGroupSizeCounter();
scheduleGroupSizeCounter();
return GroupMetadataManager.EMPTY_RESULT;
}
);
}

/**
* Cancels the group size counter for the classic groups.
* Cancels the group size counter for the classic/consumer groups.
*/
private void cancelClassicGroupSizeCounter() {
timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY);
private void cancelGroupSizeCounter() {
timer.cancel(GROUP_SIZE_COUNTER_KEY);
}

/**
Expand All @@ -736,15 +736,15 @@ public void onLoaded(MetadataImage newImage) {

groupMetadataManager.onLoaded();
scheduleGroupMetadataExpiration();
scheduleClassicGroupSizeCounter();
scheduleGroupSizeCounter();
}

@Override
public void onUnloaded() {
timer.cancel(GROUP_EXPIRATION_KEY);
coordinatorMetrics.deactivateMetricsShard(metricsShard);
groupMetadataManager.onUnloaded();
cancelClassicGroupSizeCounter();
cancelGroupSizeCounter();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;

/**
* The GroupMetadataManager manages the metadata of all classic and consumer groups. It holds
Expand Down Expand Up @@ -745,7 +744,6 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
return consumerGroup;
} else if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
Expand All @@ -756,7 +754,6 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
// replaying consumer group records after offset commit records would not work.
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
return consumerGroup;
} else {
throw new IllegalStateException(String.format("Group %s is not a consumer group", groupId));
Expand Down Expand Up @@ -1134,24 +1131,7 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator
private void removeGroup(
String groupId
) {
Group group = groups.remove(groupId);
if (group != null) {
switch (group.type()) {
case CONSUMER:
ConsumerGroup consumerGroup = (ConsumerGroup) group;
metrics.onConsumerGroupStateTransition(consumerGroup.state(), null);
break;
case CLASSIC:
// The classic group size counter is implemented as scheduled task.
break;
case SHARE:
// Nothing for now, but we may want to add metrics in the future.
break;
default:
log.warn("Removed group {} with an unknown group type {}.", groupId, group.type());
break;
}
}
groups.remove(groupId);
}

/**
Expand Down Expand Up @@ -4137,16 +4117,25 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
}

/**
* Counts and updates the number of classic groups in different states.
* Counts and updates the number of classic and consumer groups in different states.
*/
public void updateClassicGroupSizeCounter() {
Map<ClassicGroupState, Long> groupSizeCounter = new HashMap<>();
public void updateGroupSizeCounter() {
Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
Map<ConsumerGroup.ConsumerGroupState, Long> consumerGroupSizeCounter = new HashMap<>();
groups.forEach((__, group) -> {
if (group.type() == CLASSIC) {
groupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue);
switch (group.type()) {
case CLASSIC:
classicGroupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue);
break;
case CONSUMER:
consumerGroupSizeCounter.compute(((ConsumerGroup) group).state(), Utils::incValue);
break;
default:
break;
}
});
metrics.setClassicGroupGauges(groupSizeCounter);
metrics.setClassicGroupGauges(classicGroupSizeCounter);
metrics.setConsumerGroupGauges(consumerGroupSizeCounter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) {
/**
* Consumer group size gauge counters keyed by the metric name.
*/
private final Map<ConsumerGroupState, TimelineGaugeCounter> consumerGroupGauges;
private volatile Map<ConsumerGroupState, Long> consumerGroupGauges;
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Share group size gauge counters keyed by the metric name.
Expand Down Expand Up @@ -108,19 +108,7 @@ public GroupCoordinatorMetricsShard(
numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0));

this.classicGroupGauges = Collections.emptyMap();

this.consumerGroupGauges = Utils.mkMap(
Utils.mkEntry(ConsumerGroupState.EMPTY,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.ASSIGNING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.RECONCILING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.STABLE,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.DEAD,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
);
this.consumerGroupGauges = Collections.emptyMap();

this.shareGroupGauges = Utils.mkMap(
Utils.mkEntry(ShareGroup.ShareGroupState.EMPTY,
Expand All @@ -145,17 +133,14 @@ public void incrementNumOffsets() {
}

/**
* Increment the number of consumer groups.
* Set the number of consumer groups. The method is the only way to update
* the map and is called by the scheduled task that updates the metrics
* in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param state the consumer group state.
* @param consumerGroupGauges The map counting the number of consumer groups in each state.
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
*/
public void incrementNumConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.increment();
}
}
public void setConsumerGroupGauges(Map<ConsumerGroupState, Long> consumerGroupGauges) {
this.consumerGroupGauges = consumerGroupGauges;
}

/**
Expand All @@ -167,20 +152,6 @@ public void decrementNumOffsets() {
}
}

/**
* Decrement the number of consumer groups.
*
* @param state the consumer group state.
*/
public void decrementNumConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.decrement();
}
}
}

/**
* @return The number of offsets.
*/
Expand Down Expand Up @@ -219,9 +190,9 @@ public long numClassicGroups() {
* @return The number of consumer groups in `state`.
*/
public long numConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
return gaugeCounter.atomicLong.get();
Long counter = consumerGroupGauges.get(state);
if (counter != null) {
return counter;
}
return 0L;
}
Expand All @@ -231,7 +202,7 @@ public long numConsumerGroups(ConsumerGroupState state) {
*/
public long numConsumerGroups() {
return consumerGroupGauges.values().stream()
.mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum();
.mapToLong(Long::longValue).sum();
}

@Override
Expand All @@ -257,14 +228,6 @@ public TopicPartition topicPartition() {

@Override
public void commitUpTo(long offset) {
this.consumerGroupGauges.forEach((__, gaugeCounter) -> {
long value;
synchronized (gaugeCounter.timelineLong) {
value = gaugeCounter.timelineLong.get(offset);
}
gaugeCounter.atomicLong.set(value);
});

synchronized (numClassicGroupsTimelineCounter.timelineLong) {
long value = numClassicGroupsTimelineCounter.timelineLong.get(offset);
numClassicGroupsTimelineCounter.atomicLong.set(value);
Expand All @@ -285,7 +248,9 @@ public void commitUpTo(long offset) {
}

/**
* Sets the classicGroupGauges.
* Sets the classicGroupGauges. The method is the only way to update
* the map and is called by the scheduled task that updates the metrics
* in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
*
* @param classicGroupGauges The new classicGroupGauges.
*/
Expand All @@ -295,56 +260,6 @@ public void setClassicGroupGauges(
this.classicGroupGauges = classicGroupGauges;
}

/**
* Called when a consumer group's state has changed. Increment/decrement
* the counter accordingly.
*
* @param oldState The previous state. null value means that it's a new group.
* @param newState The next state. null value means that the group has been removed.
*/
public void onConsumerGroupStateTransition(
ConsumerGroupState oldState,
ConsumerGroupState newState
) {
if (newState != null) {
switch (newState) {
case EMPTY:
incrementNumConsumerGroups(ConsumerGroupState.EMPTY);
break;
case ASSIGNING:
incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
break;
case RECONCILING:
incrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
break;
case STABLE:
incrementNumConsumerGroups(ConsumerGroupState.STABLE);
break;
case DEAD:
incrementNumConsumerGroups(ConsumerGroupState.DEAD);
}
}

if (oldState != null) {
switch (oldState) {
case EMPTY:
decrementNumConsumerGroups(ConsumerGroupState.EMPTY);
break;
case ASSIGNING:
decrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
break;
case RECONCILING:
decrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
break;
case STABLE:
decrementNumConsumerGroups(ConsumerGroupState.STABLE);
break;
case DEAD:
decrementNumConsumerGroups(ConsumerGroupState.DEAD);
}
}
}

public void incrementNumShareGroups(ShareGroup.ShareGroupState state) {
TimelineGaugeCounter gaugeCounter = shareGroupGauges.get(state);
if (gaugeCounter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,6 @@ protected void maybeUpdateGroupSubscriptionType() {

@Override
protected void maybeUpdateGroupState() {
ConsumerGroupState previousState = state.get();
ConsumerGroupState newState = STABLE;
if (members.isEmpty()) {
newState = EMPTY;
Expand All @@ -901,7 +900,6 @@ protected void maybeUpdateGroupState() {
}

state.set(newState);
metrics.onConsumerGroupStateTransition(previousState, newState);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@
import java.util.Set;

import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_SIZE_COUNTER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -1024,7 +1024,7 @@ public void testCleanupGroupMetadata() {
}

@Test
public void testScheduleClassicGroupSizeCounter() {
public void testScheduleGroupSizeCounter() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
Expand All @@ -1046,21 +1046,21 @@ public void testScheduleClassicGroupSizeCounter() {
);
coordinator.onLoaded(MetadataImage.EMPTY);

// The classic group size counter is scheduled.
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
);

// Advance the timer to trigger the update.
time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1);
timer.poll();
verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter();
verify(groupMetadataManager, times(1)).updateGroupSizeCounter();

// The classic group size counter is scheduled.
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
);
}

Expand Down
Loading