From b80a17bbb74de18546af5bde6bd11b662253034a Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Fri, 31 May 2024 15:53:02 -0500 Subject: [PATCH] feat(consumers): mce-consumer throttling based on mae-consumer lag (#10626) --- build.gradle | 1 + metadata-dao-impl/kafka-producer/build.gradle | 1 + .../dao/producer/KafkaProducerThrottle.java | 246 ++++++++++++ .../producer/KafkaProducerThrottleTest.java | 363 ++++++++++++++++++ metadata-jobs/mce-consumer/build.gradle | 1 + .../config/MetadataChangeProposalConfig.java | 30 ++ .../src/main/resources/application.yaml | 23 ++ .../factory/config/ConfigurationProvider.java | 4 + .../kafka/DataHubKafkaProducerFactory.java | 2 +- .../kafka/KafkaEventConsumerFactory.java | 2 +- .../kafka/SimpleKafkaConsumerFactory.java | 2 +- .../KafkaProducerThrottleFactory.java | 93 +++++ ...maRegistryControllerTestConfiguration.java | 3 + 13 files changed, 768 insertions(+), 3 deletions(-) create mode 100644 metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/producer/KafkaProducerThrottle.java create mode 100644 metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/producer/KafkaProducerThrottleTest.java create mode 100644 metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java create mode 100644 metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/throttle/KafkaProducerThrottleFactory.java diff --git a/build.gradle b/build.gradle index 437ac4fbf6f32d..c6e14081c6147e 100644 --- a/build.gradle +++ b/build.gradle @@ -250,6 +250,7 @@ project.ext.externalDependency = [ 'springBootStarterValidation': "org.springframework.boot:spring-boot-starter-validation:$springBootVersion", 'springKafka': "org.springframework.kafka:spring-kafka:$springKafkaVersion", 'springActuator': "org.springframework.boot:spring-boot-starter-actuator:$springBootVersion", + 'springRetry': "org.springframework.retry:spring-retry:2.0.6", 'swaggerAnnotations': 'io.swagger.core.v3:swagger-annotations:2.2.15', 'swaggerCli': 'io.swagger.codegen.v3:swagger-codegen-cli:3.0.46', 'swaggerCore': 'io.swagger.core.v3:swagger-core:2.2.7', diff --git a/metadata-dao-impl/kafka-producer/build.gradle b/metadata-dao-impl/kafka-producer/build.gradle index bc3415b2ccc8c1..2df15309810dba 100644 --- a/metadata-dao-impl/kafka-producer/build.gradle +++ b/metadata-dao-impl/kafka-producer/build.gradle @@ -18,6 +18,7 @@ dependencies { annotationProcessor externalDependency.lombok testImplementation externalDependency.mockito + testImplementation externalDependency.testng constraints { implementation(externalDependency.log4jCore) { diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/producer/KafkaProducerThrottle.java b/metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/producer/KafkaProducerThrottle.java new file mode 100644 index 00000000000000..8fbb34b1eacd6f --- /dev/null +++ b/metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/producer/KafkaProducerThrottle.java @@ -0,0 +1,246 @@ +package com.datahub.metadata.dao.producer; + +import com.codahale.metrics.Gauge; +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.metadata.config.MetadataChangeProposalConfig; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.util.Pair; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.springframework.util.backoff.BackOffExecution; +import org.springframework.util.backoff.ExponentialBackOff; + +@Slf4j +@Builder(toBuilder = true) +public class KafkaProducerThrottle { + @Nonnull private final EntityRegistry entityRegistry; + @Nonnull private final Admin kafkaAdmin; + @Nonnull private final MetadataChangeProposalConfig.ThrottlesConfig config; + @Nonnull private final String mclConsumerGroupId; + @Nonnull private final String versionedTopicName; + @Nonnull private final String timeseriesTopicName; + @Nonnull private final Consumer pauseConsumer; + + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final Map medianLag = new ConcurrentHashMap<>(); + private final Map backoffMap = new ConcurrentHashMap<>(); + + /** Update lag information at a given rate */ + public KafkaProducerThrottle start() { + if ((config.getVersioned().isEnabled() || config.getTimeseries().isEnabled()) + && config.getUpdateIntervalMs() > 0) { + scheduler.scheduleAtFixedRate( + () -> { + refresh(); + try { + throttle(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + config.getUpdateIntervalMs(), + config.getUpdateIntervalMs(), + TimeUnit.MILLISECONDS); + } + return this; + } + + @VisibleForTesting + public void refresh() { + medianLag.putAll(getMedianLag()); + log.info("MCL medianLag: {}", medianLag); + } + + @VisibleForTesting + public void stop() { + scheduler.shutdown(); + } + + /** + * Get copy of the lag info + * + * @return median lag per mcl topic + */ + @VisibleForTesting + public Map getLag() { + return medianLag.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @VisibleForTesting + public boolean isThrottled(MclType mclType) { + if (getThrottleConfig(mclType).isEnabled() && medianLag.containsKey(mclType)) { + return medianLag.get(mclType) > getThrottleConfig(mclType).getThreshold(); + } + return false; + } + + @VisibleForTesting + public long computeNextBackOff(MclType mclType) { + if (isThrottled(mclType)) { + BackOffExecution backOffExecution = + backoffMap.computeIfAbsent( + mclType, + k -> { + MetadataChangeProposalConfig.ThrottleConfig throttleConfig = + getThrottleConfig(mclType); + ExponentialBackOff backoff = + new ExponentialBackOff( + throttleConfig.getInitialIntervalMs(), throttleConfig.getMultiplier()); + backoff.setMaxAttempts(throttleConfig.getMaxAttempts()); + backoff.setMaxInterval(throttleConfig.getMaxIntervalMs()); + return backoff.start(); + }); + return backOffExecution.nextBackOff(); + } + return 0; + } + + @VisibleForTesting + public void throttle() throws InterruptedException { + for (MclType mclType : MclType.values()) { + if (isThrottled(mclType)) { + long backoffWaitMs = computeNextBackOff(mclType); + + if (backoffWaitMs > 0) { + log.warn( + "Throttled producer Topic: {} Duration: {} ms MedianLag: {}", + getTopicName(mclType), + backoffWaitMs, + medianLag.get(mclType)); + MetricUtils.gauge( + this.getClass(), + String.format("%s_throttled", getTopicName(mclType)), + () -> (Gauge) () -> 1); + MetricUtils.counter( + this.getClass(), String.format("%s_throttledCount", getTopicName(mclType))) + .inc(); + + log.info("Pausing MCE consumer for {} ms.", backoffWaitMs); + pauseConsumer.accept(true); + Thread.sleep(backoffWaitMs); + log.info("Resuming MCE consumer."); + pauseConsumer.accept(false); + + // if throttled for one topic, skip remaining + return; + } else { + // no throttle or exceeded configuration limits + log.info("MCE consumer throttle exponential backoff reset."); + backoffMap.remove(mclType); + MetricUtils.gauge( + this.getClass(), + String.format("%s_throttled", getTopicName(mclType)), + () -> (Gauge) () -> 0); + } + } else { + // not throttled, remove backoff tracking + log.info("MCE consumer throttle exponential backoff reset."); + backoffMap.remove(mclType); + MetricUtils.gauge( + this.getClass(), + String.format("%s_throttled", getTopicName(mclType)), + () -> (Gauge) () -> 0); + } + } + } + + private Map getMedianLag() { + try { + Map mclConsumerOffsets = + kafkaAdmin + .listConsumerGroupOffsets(mclConsumerGroupId) + .partitionsToOffsetAndMetadata() + .get() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map latestOffsetRequest = + mclConsumerOffsets.keySet().stream() + .map(offsetAndMetadata -> Map.entry(offsetAndMetadata, OffsetSpec.latest())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map endOffsetValues = + kafkaAdmin.listOffsets(latestOffsetRequest).all().get().entrySet().stream() + .map(entry -> Map.entry(entry.getKey(), entry.getValue().offset())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return Stream.of( + Pair.of(MclType.VERSIONED, versionedTopicName), + Pair.of(MclType.TIMESERIES, timeseriesTopicName)) + .map( + topic -> { + MclType mclType = topic.getFirst(); + String topicName = topic.getSecond(); + + Map topicOffsets = + mclConsumerOffsets.entrySet().stream() + .filter(entry -> entry.getKey().topic().equals(topicName)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List offsetValues = + topicOffsets.values().stream() + .map(OffsetAndMetadata::offset) + .map(Long::doubleValue) + .collect(Collectors.toList()); + long offsetMedian = getMedian(offsetValues).longValue(); + + List topicEndOffsetValues = + topicOffsets.keySet().stream() + .map(topicPart -> endOffsetValues.getOrDefault(topicPart, 0L)) + .map(Long::doubleValue) + .collect(Collectors.toList()); + long endOffsetMedian = getMedian(topicEndOffsetValues).longValue(); + return Map.entry(mclType, Math.max(0, endOffsetMedian - offsetMedian)); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } catch (ExecutionException | InterruptedException e) { + log.error("Error fetching consumer group offsets.", e); + return Map.of(MclType.VERSIONED, 0L, MclType.TIMESERIES, 0L); + } + } + + private MetadataChangeProposalConfig.ThrottleConfig getThrottleConfig(MclType mclType) { + MetadataChangeProposalConfig.ThrottleConfig throttleConfig; + switch (mclType) { + case VERSIONED -> throttleConfig = config.getVersioned(); + case TIMESERIES -> throttleConfig = config.getTimeseries(); + default -> throw new IllegalStateException(); + } + return throttleConfig; + } + + private String getTopicName(MclType mclType) { + return MclType.TIMESERIES.equals(mclType) ? timeseriesTopicName : versionedTopicName; + } + + private static Double getMedian(Collection listValues) { + double[] values = listValues.stream().mapToDouble(d -> d).sorted().toArray(); + double median; + if (values.length % 2 == 0) + median = (values[values.length / 2] + values[values.length / 2 - 1]) / 2; + else median = values[values.length / 2]; + return median; + } + + public enum MclType { + TIMESERIES, + VERSIONED + } +} diff --git a/metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/producer/KafkaProducerThrottleTest.java b/metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/producer/KafkaProducerThrottleTest.java new file mode 100644 index 00000000000000..ce6104ee2ca7dc --- /dev/null +++ b/metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/producer/KafkaProducerThrottleTest.java @@ -0,0 +1,363 @@ +package com.datahub.metadata.dao.producer; + +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import com.linkedin.metadata.config.MetadataChangeProposalConfig; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.mxe.Topics; +import com.linkedin.util.Pair; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.testng.annotations.Test; + +public class KafkaProducerThrottleTest { + private static final List STANDARD_TOPICS = + List.of(Topics.METADATA_CHANGE_LOG_VERSIONED, Topics.METADATA_CHANGE_LOG_TIMESERIES); + private static final String STANDARD_MCL_CONSUMER_GROUP_ID = "generic-mae-consumer-job-client"; + + @Test + public void testLagCalculation() throws ExecutionException, InterruptedException { + // 3 partitions + // Consumer offsets: 1, 2, 3 + // End offsets: 2, 4, 6 + // Lag: 1, 2, 3 + // MedianLag: 2 + AdminClient mockAdmin = + mockKafka( + generateLag( + STANDARD_TOPICS, + topicPart -> (long) topicPart.partition() + 1, + topicPart -> ((long) topicPart.partition() + 1) * 2, + 3)); + + KafkaProducerThrottle test = + KafkaProducerThrottle.builder() + .config(noSchedulerConfig().getThrottle()) + .kafkaAdmin(mockAdmin) + .versionedTopicName(STANDARD_TOPICS.get(0)) + .timeseriesTopicName(STANDARD_TOPICS.get(1)) + .entityRegistry(mock(EntityRegistry.class)) + .mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID) + .pauseConsumer(mock(Consumer.class)) + .build(); + + // Refresh calculations + test.refresh(); + + assertEquals( + test.getLag(), + Map.of( + KafkaProducerThrottle.MclType.VERSIONED, 2L, + KafkaProducerThrottle.MclType.TIMESERIES, 2L)); + } + + @Test + public void testThrottle() throws ExecutionException, InterruptedException { + MetadataChangeProposalConfig.ThrottlesConfig noThrottleConfig = + noSchedulerConfig().getThrottle(); + noThrottleConfig + .getVersioned() + .setThreshold(10) + .setInitialIntervalMs(1) + .setMultiplier(1) + .setMaxAttempts(1) + .setMaxIntervalMs(1); + + MetadataChangeProposalConfig.ThrottlesConfig throttleConfig = noSchedulerConfig().getThrottle(); + throttleConfig + .getVersioned() + .setThreshold(1) + .setInitialIntervalMs(1) + .setMultiplier(1) + .setMaxAttempts(1) + .setMaxIntervalMs(1); + + // 3 partitions + // Consumer offsets: 1, 2, 3 + // End offsets: 2, 4, 6 + // Lag: 1, 2, 3 + // MedianLag: 2 + AdminClient mockAdmin = + mockKafka( + generateLag( + STANDARD_TOPICS, + topicPart -> (long) topicPart.partition() + 1, + topicPart -> ((long) topicPart.partition() + 1) * 2, + 3)); + + Consumer pauseFunction = mock(Consumer.class); + + KafkaProducerThrottle test = + KafkaProducerThrottle.builder() + .config(noThrottleConfig) + .kafkaAdmin(mockAdmin) + .versionedTopicName(STANDARD_TOPICS.get(0)) + .timeseriesTopicName(STANDARD_TOPICS.get(1)) + .entityRegistry(mock(EntityRegistry.class)) + .mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID) + .pauseConsumer(pauseFunction) + .build(); + + // Refresh calculations + test.refresh(); + assertEquals( + test.getLag(), + Map.of( + KafkaProducerThrottle.MclType.VERSIONED, 2L, + KafkaProducerThrottle.MclType.TIMESERIES, 2L)); + assertFalse( + test.isThrottled(KafkaProducerThrottle.MclType.VERSIONED), + "Expected not throttling, lag is below threshold"); + assertFalse(test.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES)); + test.throttle(); + verifyNoInteractions(pauseFunction); + reset(pauseFunction); + + KafkaProducerThrottle test2 = test.toBuilder().config(throttleConfig).build(); + // Refresh calculations + test2.refresh(); + assertEquals( + test2.getLag(), + Map.of( + KafkaProducerThrottle.MclType.VERSIONED, 2L, + KafkaProducerThrottle.MclType.TIMESERIES, 2L)); + assertTrue( + test2.isThrottled(KafkaProducerThrottle.MclType.VERSIONED), + "Expected throttling, lag is above threshold."); + assertFalse( + test2.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES), + "Expected not throttling. Timeseries is disabled"); + test2.throttle(); + + // verify 1ms pause and resume + verify(pauseFunction).accept(eq(true)); + verify(pauseFunction).accept(eq(false)); + verifyNoMoreInteractions(pauseFunction); + } + + @Test + public void testBackOff() throws ExecutionException, InterruptedException { + MetadataChangeProposalConfig.ThrottlesConfig throttleConfig = noSchedulerConfig().getThrottle(); + throttleConfig + .getVersioned() + .setThreshold(1) + .setInitialIntervalMs(1) + .setMultiplier(2) + .setMaxAttempts(5) + .setMaxIntervalMs(8); + + // 3 partitions + // Consumer offsets: 1, 2, 3 + // End offsets: 2, 4, 6 + // Lag: 1, 2, 3 + // MedianLag: 2 + AdminClient mockAdmin = + mockKafka( + generateLag( + STANDARD_TOPICS, + topicPart -> (long) topicPart.partition() + 1, + topicPart -> ((long) topicPart.partition() + 1) * 2, + 3)); + + KafkaProducerThrottle test = + KafkaProducerThrottle.builder() + .config(throttleConfig) + .kafkaAdmin(mockAdmin) + .versionedTopicName(STANDARD_TOPICS.get(0)) + .timeseriesTopicName(STANDARD_TOPICS.get(1)) + .entityRegistry(mock(EntityRegistry.class)) + .mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID) + .pauseConsumer(mock(Consumer.class)) + .build(); + + // Refresh calculations + test.refresh(); + assertEquals( + test.getLag(), + Map.of( + KafkaProducerThrottle.MclType.VERSIONED, 2L, + KafkaProducerThrottle.MclType.TIMESERIES, 2L)); + assertTrue( + test.isThrottled(KafkaProducerThrottle.MclType.VERSIONED), + "Expected throttling, lag is above threshold."); + assertFalse( + test.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES), + "Expected no throttling. Timeseries is disabled"); + + assertEquals( + test.computeNextBackOff(KafkaProducerThrottle.MclType.TIMESERIES), + 0L, + "Expected no backoff. Timeseries is disabled."); + + assertEquals( + test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), 1L, "Expected initial 1"); + assertEquals( + test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), + 2L, + "Expected second 2^1"); + assertEquals( + test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), 4L, "Expected third 2^2"); + assertEquals( + test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), + 8L, + "Expected fourth 2^3"); + assertEquals( + test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), + 8L, + "Expected fifth max interval at 8"); + assertEquals( + test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), + -1L, + "Expected max attempts"); + } + + @Test + public void testScheduler() throws ExecutionException, InterruptedException { + MetadataChangeProposalConfig config = new MetadataChangeProposalConfig(); + MetadataChangeProposalConfig.ThrottlesConfig throttlesConfig = + new MetadataChangeProposalConfig.ThrottlesConfig() + .setUpdateIntervalMs(10); // configure fast update for test + throttlesConfig.setVersioned( + new MetadataChangeProposalConfig.ThrottleConfig() + .setEnabled(true) // enable 1 throttle config to activate + ); + throttlesConfig.setTimeseries( + new MetadataChangeProposalConfig.ThrottleConfig().setEnabled(false)); + config.setThrottle(throttlesConfig); + + // 1 lag, 1 partition + AdminClient mockAdmin = + mockKafka(generateLag(STANDARD_TOPICS, topicPart -> 1L, topicPart -> 2L, 1)); + + KafkaProducerThrottle test = + KafkaProducerThrottle.builder() + .config(throttlesConfig) + .kafkaAdmin(mockAdmin) + .versionedTopicName(STANDARD_TOPICS.get(0)) + .timeseriesTopicName(STANDARD_TOPICS.get(1)) + .entityRegistry(mock(EntityRegistry.class)) + .mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID) + .pauseConsumer(mock(Consumer.class)) + .build(); + + try { + test.start(); + Thread.sleep(50); + assertEquals( + test.getLag(), + Map.of( + KafkaProducerThrottle.MclType.VERSIONED, 1L, + KafkaProducerThrottle.MclType.TIMESERIES, 1L), + "Expected lag updated"); + } finally { + test.stop(); + } + } + + private static MetadataChangeProposalConfig noSchedulerConfig() { + MetadataChangeProposalConfig config = new MetadataChangeProposalConfig(); + MetadataChangeProposalConfig.ThrottlesConfig throttlesConfig = + new MetadataChangeProposalConfig.ThrottlesConfig() + .setUpdateIntervalMs(0); // no scheduler, manual update + throttlesConfig.setVersioned( + new MetadataChangeProposalConfig.ThrottleConfig() + .setEnabled(true) // enable 1 throttle config to activate + ); + throttlesConfig.setTimeseries( + new MetadataChangeProposalConfig.ThrottleConfig().setEnabled(false)); + config.setThrottle(throttlesConfig); + return config; + } + + private static Pair, Map> + generateLag( + Collection topicNames, + Function consumerOffset, + Function endOffset, + int partitions) { + + Set topicPartitions = + topicNames.stream() + .flatMap( + topicName -> + IntStream.range(0, partitions) + .mapToObj(partitionNum -> new TopicPartition(topicName, partitionNum))) + .collect(Collectors.toSet()); + + Map consumerOffsetMap = + topicPartitions.stream() + .map( + topicPartition -> + Map.entry( + topicPartition, + new OffsetAndMetadata(consumerOffset.apply(topicPartition)))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map endOffsetMap = + topicPartitions.stream() + .map(topicPartition -> Map.entry(topicPartition, endOffset.apply(topicPartition))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return Pair.of(consumerOffsetMap, endOffsetMap); + } + + private static AdminClient mockKafka( + Pair, Map> offsetPair) + throws ExecutionException, InterruptedException { + + AdminClient mockKafkaAdmin = mock(AdminClient.class); + + // consumer offsets + ListConsumerGroupOffsetsResult mockConsumerOffsetsResult = + mock(ListConsumerGroupOffsetsResult.class); + KafkaFuture> mockConsumerFuture = + mock(KafkaFuture.class); + when(mockConsumerOffsetsResult.partitionsToOffsetAndMetadata()).thenReturn(mockConsumerFuture); + when(mockConsumerFuture.get()).thenReturn(offsetPair.getFirst()); + when(mockKafkaAdmin.listConsumerGroupOffsets(anyString())) + .thenReturn(mockConsumerOffsetsResult); + + // end offsets + ListOffsetsResult mockOffsetsResult = mock(ListOffsetsResult.class); + KafkaFuture> mockOffsetFuture = + mock(KafkaFuture.class); + Map resultMap = + offsetPair.getSecond().entrySet().stream() + .map( + entry -> { + ListOffsetsResult.ListOffsetsResultInfo mockInfo = + mock(ListOffsetsResult.ListOffsetsResultInfo.class); + when(mockInfo.offset()).thenReturn(entry.getValue()); + return Map.entry(entry.getKey(), mockInfo); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + when(mockOffsetFuture.get()).thenReturn(resultMap); + when(mockOffsetsResult.all()).thenReturn(mockOffsetFuture); + when(mockKafkaAdmin.listOffsets(anyMap())).thenReturn(mockOffsetsResult); + + return mockKafkaAdmin; + } +} diff --git a/metadata-jobs/mce-consumer/build.gradle b/metadata-jobs/mce-consumer/build.gradle index b062547724138f..5ea24059a3ee33 100644 --- a/metadata-jobs/mce-consumer/build.gradle +++ b/metadata-jobs/mce-consumer/build.gradle @@ -22,6 +22,7 @@ dependencies { implementation project(':metadata-events:mxe-utils-avro') implementation project(':metadata-io') implementation project(':metadata-service:restli-client-api') + implementation project(':metadata-dao-impl:kafka-producer') implementation spec.product.pegasus.restliClient implementation spec.product.pegasus.restliCommon implementation externalDependency.elasticSearchRest diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java new file mode 100644 index 00000000000000..3d3808bc5feb45 --- /dev/null +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java @@ -0,0 +1,30 @@ +package com.linkedin.metadata.config; + +import lombok.Data; +import lombok.experimental.Accessors; + +@Data +@Accessors(chain = true) +public class MetadataChangeProposalConfig { + + ThrottlesConfig throttle; + + @Data + @Accessors(chain = true) + public static class ThrottlesConfig { + Integer updateIntervalMs; + ThrottleConfig versioned; + ThrottleConfig timeseries; + } + + @Data + @Accessors(chain = true) + public static class ThrottleConfig { + boolean enabled; + Integer threshold; + Integer maxAttempts; + Integer initialIntervalMs; + Integer multiplier; + Integer maxIntervalMs; + } +} diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 770be86e254b18..cae315e96e4d71 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -460,3 +460,26 @@ forms: businessAttribute: fetchRelatedEntitiesCount: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_COUNT:20000} fetchRelatedEntitiesBatchSize: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_BATCH_SIZE:1000} + +metadataChangeProposal: + throttle: + updateIntervalMs: ${MCP_THROTTLE_UPDATE_INTERVAL_MS:60000} + + # Versioned MCL topic + versioned: + # Whether to throttle MCP processing based on MCL backlog + enabled: ${MCP_VERSIONED_THROTTLE_ENABLED:false} + threshold: ${MCP_VERSIONED_THRESHOLD:4000} # throttle threshold + maxAttempts: ${MCP_VERSIONED_MAX_ATTEMPTS:1000} + initialIntervalMs: ${MCP_VERSIONED_INITIAL_INTERVAL_MS:100} + multiplier: ${MCP_VERSIONED_MULTIPLIER:10} + maxIntervalMs: ${MCP_VERSIONED_MAX_INTERVAL_MS:30000} + # Timeseries MCL topic + timeseries: + # Whether to throttle MCP processing based on MCL backlog + enabled: ${MCP_TIMESERIES_THROTTLE_ENABLED:false} + threshold: ${MCP_TIMESERIES_THRESHOLD:4000} # throttle threshold + maxAttempts: ${MCP_TIMESERIES_MAX_ATTEMPTS:1000} + initialIntervalMs: ${MCP_TIMESERIES_INITIAL_INTERVAL_MS:100} + multiplier: ${MCP_TIMESERIES_MULTIPLIER:10} + maxIntervalMs: ${MCP_TIMESERIES_MAX_INTERVAL_MS:30000} \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java index 9381e24fabab60..08adbd54730a74 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java @@ -7,6 +7,7 @@ import com.linkedin.metadata.config.EbeanConfiguration; import com.linkedin.metadata.config.GraphQLConfiguration; import com.linkedin.metadata.config.IngestionConfiguration; +import com.linkedin.metadata.config.MetadataChangeProposalConfig; import com.linkedin.metadata.config.SystemUpdateConfiguration; import com.linkedin.metadata.config.TestsConfiguration; import com.linkedin.metadata.config.ViewsConfiguration; @@ -80,4 +81,7 @@ public class ConfigurationProvider { /** GraphQL Configurations */ private GraphQLConfiguration graphQL; + + /** MCP throttling configuration */ + private MetadataChangeProposalConfig metadataChangeProposal; } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java index 5844dc4a8f72a2..6a2b9f511b79f8 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java @@ -44,7 +44,7 @@ public static Map buildProducerProperties( Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); } // else we rely on KafkaProperties which defaults to localhost:9092 - Map props = properties.buildProducerProperties(); + Map props = properties.buildProducerProperties(null); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer()); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index d5210213185bea..9501b03482d045 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -73,7 +73,7 @@ private static Map buildCustomizedProperties( Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); } // else we rely on KafkaProperties which defaults to localhost:9092 - Map customizedProperties = baseKafkaProperties.buildConsumerProperties(); + Map customizedProperties = baseKafkaProperties.buildConsumerProperties(null); customizedProperties.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); customizedProperties.put( diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java index 3a6c9770fd3623..0193ded97f81b5 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java @@ -44,7 +44,7 @@ protected KafkaListenerContainerFactory createInstance( Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); } // else we rely on KafkaProperties which defaults to localhost:9092 - Map customizedProperties = properties.buildConsumerProperties(); + Map customizedProperties = properties.buildConsumerProperties(null); customizedProperties.put( ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaConfiguration.getConsumer().getMaxPartitionFetchBytes()); diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/throttle/KafkaProducerThrottleFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/throttle/KafkaProducerThrottleFactory.java new file mode 100644 index 00000000000000..1eaff82fd517f0 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/throttle/KafkaProducerThrottleFactory.java @@ -0,0 +1,93 @@ +package com.linkedin.gms.factory.kafka.throttle; + +import com.datahub.metadata.dao.producer.KafkaProducerThrottle; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.config.MetadataChangeProposalConfig; +import com.linkedin.metadata.config.kafka.KafkaConfiguration; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.mxe.Topics; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; + +@Slf4j +@Configuration +public class KafkaProducerThrottleFactory { + + @Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}") + private String maeConsumerGroupId; + + @Value("${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}") + private String mceConsumerGroupId; + + @Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}") + private String versionedTopicName; + + @Value( + "${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}") + private String timeseriesTopicName; + + @Bean + public KafkaProducerThrottle kafkaProducerThrottle( + @Qualifier("configurationProvider") ConfigurationProvider provider, + final KafkaProperties kafkaProperties, + final EntityRegistry entityRegistry, + final KafkaListenerEndpointRegistry registry) { + + KafkaConfiguration kafkaConfiguration = provider.getKafka(); + MetadataChangeProposalConfig mcpConfig = provider.getMetadataChangeProposal(); + + return KafkaProducerThrottle.builder() + .entityRegistry(entityRegistry) + .kafkaAdmin(kafkaAdmin(kafkaConfiguration, kafkaProperties)) + .config(mcpConfig.getThrottle()) + .mclConsumerGroupId(maeConsumerGroupId) + .timeseriesTopicName(timeseriesTopicName) + .versionedTopicName(versionedTopicName) + .pauseConsumer( + (pause) -> { + Optional container = + Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId)); + if (container.isEmpty()) { + log.warn( + "Expected container was missing: {} throttling is not possible.", + mceConsumerGroupId); + } else { + if (pause) { + container.ifPresent(MessageListenerContainer::pause); + } else { + container.ifPresent(MessageListenerContainer::resume); + } + } + }) + .build() + .start(); + } + + private static AdminClient kafkaAdmin( + KafkaConfiguration kafkaConfiguration, final KafkaProperties kafkaProperties) { + Map adminProperties = new HashMap<>(kafkaProperties.buildAdminProperties(null)); + + // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS + if (kafkaConfiguration.getBootstrapServers() != null + && !kafkaConfiguration.getBootstrapServers().isEmpty()) { + adminProperties.put( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + Arrays.asList(kafkaConfiguration.getBootstrapServers().split(","))); + } // else we rely on KafkaProperties which defaults to localhost:9092 or environment variables + + return KafkaAdminClient.create(adminProperties); + } +} diff --git a/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTestConfiguration.java b/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTestConfiguration.java index 7ab673b0a46feb..6901cd665f1661 100644 --- a/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTestConfiguration.java +++ b/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTestConfiguration.java @@ -1,6 +1,7 @@ package io.datahubproject.openapi.test; import com.linkedin.metadata.dao.producer.KafkaHealthChecker; +import com.linkedin.metadata.models.registry.EntityRegistry; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.ComponentScan; @@ -11,4 +12,6 @@ @ComponentScan(basePackages = {"com.linkedin.gms.factory.kafka", "com.linkedin.gms.factory.config"}) public class SchemaRegistryControllerTestConfiguration { @MockBean KafkaHealthChecker kafkaHealthChecker; + + @MockBean EntityRegistry entityRegistry; }