Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18655: Implement the consumer group size counter with scheduled task #18717

Merged
merged 6 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,11 @@ public GroupCoordinatorShard build() {
static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";

/**
* The classic group size counter key to schedule a timer task.
* The classic and consumer group size counter key to schedule a timer task.
*
* Visible for testing.
*/
static final String CLASSIC_GROUP_SIZE_COUNTER_KEY = "classic-group-size-counter";
static final String GROUP_SIZE_COUNTER_KEY = "group-size-counter";

/**
* Hardcoded default value of the interval to update the classic group size counter.
Expand Down Expand Up @@ -699,27 +699,27 @@ public CoordinatorResult<Void, CoordinatorRecord> onPartitionsDeleted(
}

/**
* Schedules (or reschedules) the group size counter for the classic groups.
* Schedules (or reschedules) the group size counter for the classic/consumer groups.
*/
private void scheduleClassicGroupSizeCounter() {
private void scheduleGroupSizeCounter() {
timer.schedule(
CLASSIC_GROUP_SIZE_COUNTER_KEY,
GROUP_SIZE_COUNTER_KEY,
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS,
true,
() -> {
groupMetadataManager.updateClassicGroupSizeCounter();
scheduleClassicGroupSizeCounter();
groupMetadataManager.updateGroupSizeCounter();
scheduleGroupSizeCounter();
return GroupMetadataManager.EMPTY_RESULT;
}
);
}

/**
* Cancels the group size counter for the classic groups.
* Cancels the group size counter for the classic/consumer groups.
*/
private void cancelClassicGroupSizeCounter() {
timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY);
private void cancelGroupSizeCounter() {
timer.cancel(GROUP_SIZE_COUNTER_KEY);
}

/**
Expand All @@ -736,15 +736,15 @@ public void onLoaded(MetadataImage newImage) {

groupMetadataManager.onLoaded();
scheduleGroupMetadataExpiration();
scheduleClassicGroupSizeCounter();
scheduleGroupSizeCounter();
}

@Override
public void onUnloaded() {
timer.cancel(GROUP_EXPIRATION_KEY);
coordinatorMetrics.deactivateMetricsShard(metricsShard);
groupMetadataManager.onUnloaded();
cancelClassicGroupSizeCounter();
cancelGroupSizeCounter();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;

/**
* The GroupMetadataManager manages the metadata of all classic and consumer groups. It holds
Expand Down Expand Up @@ -745,7 +744,6 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
return consumerGroup;
} else if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
Expand All @@ -756,7 +754,6 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
// replaying consumer group records after offset commit records would not work.
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
return consumerGroup;
} else {
throw new IllegalStateException(String.format("Group %s is not a consumer group", groupId));
Expand Down Expand Up @@ -1134,24 +1131,7 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator
private void removeGroup(
String groupId
) {
Group group = groups.remove(groupId);
if (group != null) {
switch (group.type()) {
case CONSUMER:
ConsumerGroup consumerGroup = (ConsumerGroup) group;
metrics.onConsumerGroupStateTransition(consumerGroup.state(), null);
break;
case CLASSIC:
// The classic group size counter is implemented as scheduled task.
break;
case SHARE:
// Nothing for now, but we may want to add metrics in the future.
break;
default:
log.warn("Removed group {} with an unknown group type {}.", groupId, group.type());
break;
}
}
groups.remove(groupId);
}

/**
Expand Down Expand Up @@ -4137,16 +4117,25 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
}

/**
* Counts and updates the number of classic groups in different states.
* Counts and updates the number of classic and consumer groups in different states.
*/
public void updateClassicGroupSizeCounter() {
Map<ClassicGroupState, Long> groupSizeCounter = new HashMap<>();
public void updateGroupSizeCounter() {
Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
Map<ConsumerGroup.ConsumerGroupState, Long> consumerGroupSizeCounter = new HashMap<>();
groups.forEach((__, group) -> {
if (group.type() == CLASSIC) {
groupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue);
switch (group.type()) {
case CLASSIC:
classicGroupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue);
break;
case CONSUMER:
consumerGroupSizeCounter.compute(((ConsumerGroup) group).state(), Utils::incValue);
break;
default:
break;
}
});
metrics.setClassicGroupGauges(groupSizeCounter);
metrics.setClassicGroupGauges(classicGroupSizeCounter);
metrics.setConsumerGroupGauges(consumerGroupSizeCounter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) {
/**
* Consumer group size gauge counters keyed by the metric name.
*/
private final Map<ConsumerGroupState, TimelineGaugeCounter> consumerGroupGauges;
private volatile Map<ConsumerGroupState, Long> consumerGroupGauges;
Copy link
Collaborator Author

@dongnuo123 dongnuo123 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the timeline gauge counter because we recompute the metric from scratch every 60 second, so the result will eventually be consistent despite any rollbacks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirming the concurrency pattern here:

  • we have only one reader (metrics collector) and writer (runtime) thread for a shard at any given time.
  • we are atomically updating the map to a new map

does this sound right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes correct


/**
* Share group size gauge counters keyed by the metric name.
Expand Down Expand Up @@ -108,19 +108,7 @@ public GroupCoordinatorMetricsShard(
numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0));

this.classicGroupGauges = Collections.emptyMap();

this.consumerGroupGauges = Utils.mkMap(
Utils.mkEntry(ConsumerGroupState.EMPTY,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.ASSIGNING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.RECONCILING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.STABLE,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.DEAD,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
);
this.consumerGroupGauges = Collections.emptyMap();

this.shareGroupGauges = Utils.mkMap(
Utils.mkEntry(ShareGroup.ShareGroupState.EMPTY,
Expand All @@ -145,17 +133,15 @@ public void incrementNumOffsets() {
}

/**
* Increment the number of consumer groups.
* Set the number of consumer groups.
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
* This method should be the only way to update the map and is called by the scheduled task
* that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
* Breaking this will result in inconsistent behavior.
*
* @param state the consumer group state.
* @param consumerGroupGauges The map counting the number of consumer groups in each state.
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
*/
public void incrementNumConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.increment();
}
}
public void setConsumerGroupGauges(Map<ConsumerGroupState, Long> consumerGroupGauges) {
this.consumerGroupGauges = consumerGroupGauges;
}

/**
Expand All @@ -167,20 +153,6 @@ public void decrementNumOffsets() {
}
}

/**
* Decrement the number of consumer groups.
*
* @param state the consumer group state.
*/
public void decrementNumConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.decrement();
}
}
}

/**
* @return The number of offsets.
*/
Expand Down Expand Up @@ -219,9 +191,9 @@ public long numClassicGroups() {
* @return The number of consumer groups in `state`.
*/
public long numConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
return gaugeCounter.atomicLong.get();
Long counter = consumerGroupGauges.get(state);
if (counter != null) {
return counter;
}
return 0L;
}
Expand All @@ -231,7 +203,7 @@ public long numConsumerGroups(ConsumerGroupState state) {
*/
public long numConsumerGroups() {
return consumerGroupGauges.values().stream()
.mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum();
.mapToLong(Long::longValue).sum();
}

@Override
Expand All @@ -257,14 +229,6 @@ public TopicPartition topicPartition() {

@Override
public void commitUpTo(long offset) {
this.consumerGroupGauges.forEach((__, gaugeCounter) -> {
long value;
synchronized (gaugeCounter.timelineLong) {
value = gaugeCounter.timelineLong.get(offset);
}
gaugeCounter.atomicLong.set(value);
});

synchronized (numClassicGroupsTimelineCounter.timelineLong) {
long value = numClassicGroupsTimelineCounter.timelineLong.get(offset);
numClassicGroupsTimelineCounter.atomicLong.set(value);
Expand All @@ -286,65 +250,18 @@ public void commitUpTo(long offset) {

/**
* Sets the classicGroupGauges.
* This method should be the only way to update the map and is called by the scheduled task
* that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
* Breaking this will result in inconsistent behavior.
*
* @param classicGroupGauges The new classicGroupGauges.
* @param classicGroupGauges The map counting the number of classic groups in each state.
*/
public void setClassicGroupGauges(
Map<ClassicGroupState, Long> classicGroupGauges
) {
this.classicGroupGauges = classicGroupGauges;
}

/**
* Called when a consumer group's state has changed. Increment/decrement
* the counter accordingly.
*
* @param oldState The previous state. null value means that it's a new group.
* @param newState The next state. null value means that the group has been removed.
*/
public void onConsumerGroupStateTransition(
ConsumerGroupState oldState,
ConsumerGroupState newState
) {
if (newState != null) {
switch (newState) {
case EMPTY:
incrementNumConsumerGroups(ConsumerGroupState.EMPTY);
break;
case ASSIGNING:
incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
break;
case RECONCILING:
incrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
break;
case STABLE:
incrementNumConsumerGroups(ConsumerGroupState.STABLE);
break;
case DEAD:
incrementNumConsumerGroups(ConsumerGroupState.DEAD);
}
}

if (oldState != null) {
switch (oldState) {
case EMPTY:
decrementNumConsumerGroups(ConsumerGroupState.EMPTY);
break;
case ASSIGNING:
decrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
break;
case RECONCILING:
decrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
break;
case STABLE:
decrementNumConsumerGroups(ConsumerGroupState.STABLE);
break;
case DEAD:
decrementNumConsumerGroups(ConsumerGroupState.DEAD);
}
}
}

public void incrementNumShareGroups(ShareGroup.ShareGroupState state) {
TimelineGaugeCounter gaugeCounter = shareGroupGauges.get(state);
if (gaugeCounter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,6 @@ protected void maybeUpdateGroupSubscriptionType() {

@Override
protected void maybeUpdateGroupState() {
ConsumerGroupState previousState = state.get();
ConsumerGroupState newState = STABLE;
if (members.isEmpty()) {
newState = EMPTY;
Expand All @@ -901,7 +900,6 @@ protected void maybeUpdateGroupState() {
}

state.set(newState);
metrics.onConsumerGroupStateTransition(previousState, newState);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@
import java.util.Set;

import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_SIZE_COUNTER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -1024,7 +1024,7 @@ public void testCleanupGroupMetadata() {
}

@Test
public void testScheduleClassicGroupSizeCounter() {
public void testScheduleGroupSizeCounter() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
Expand All @@ -1046,21 +1046,21 @@ public void testScheduleClassicGroupSizeCounter() {
);
coordinator.onLoaded(MetadataImage.EMPTY);

// The classic group size counter is scheduled.
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
);

// Advance the timer to trigger the update.
time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1);
timer.poll();
verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter();
verify(groupMetadataManager, times(1)).updateGroupSizeCounter();

// The classic group size counter is scheduled.
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
);
}

Expand Down
Loading
Loading