Skip to content

Commit

Permalink
fix(kafka): fix infinite deserialization logging (#9494)
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien authored Dec 21, 2023
1 parent a49a435 commit 55cb568
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 4 deletions.
2 changes: 2 additions & 0 deletions docker/docker-compose-without-neo4j.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ services:
context: ../
dockerfile: docker/datahub-gms/Dockerfile
env_file: datahub-gms/env/docker-without-neo4j.env
environment:
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
healthcheck:
test: curl -sS --fail http://datahub-gms:${DATAHUB_GMS_PORT:-8080}/health
start_period: 90s
Expand Down
3 changes: 3 additions & 0 deletions docker/docker-compose.consumers-without-neo4j.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ services:
context: ../
dockerfile: docker/datahub-mae-consumer/Dockerfile
env_file: datahub-mae-consumer/env/docker-without-neo4j.env
environment:
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
datahub-mce-consumer:
container_name: datahub-mce-consumer
hostname: datahub-mce-consumer
Expand All @@ -28,3 +30,4 @@ services:
environment:
- DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-quickstart}
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true}
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
3 changes: 3 additions & 0 deletions docker/docker-compose.consumers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ services:
context: ../
dockerfile: docker/datahub-mae-consumer/Dockerfile
env_file: datahub-mae-consumer/env/docker.env
environment:
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
depends_on:
neo4j:
condition: service_healthy
Expand All @@ -36,6 +38,7 @@ services:
- NEO4J_USERNAME=neo4j
- NEO4J_PASSWORD=datahub
- GRAPH_SERVICE_IMPL=neo4j
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
depends_on:
neo4j:
condition: service_healthy
1 change: 1 addition & 0 deletions docker/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ services:
- SEARCH_SERVICE_ENABLE_CACHE=false
- LINEAGE_SEARCH_CACHE_ENABLED=false
- SHOW_BROWSE_V2=true
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
volumes:
- ./datahub-gms/start.sh:/datahub/datahub-gms/scripts/start.sh
- ./datahub-gms/jetty.xml:/datahub/datahub-gms/scripts/jetty.xml
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ services:
container_name: datahub-gms
hostname: datahub-gms
image: ${DATAHUB_GMS_IMAGE:-linkedin/datahub-gms}:${DATAHUB_VERSION:-head}
environment:
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
ports:
- ${DATAHUB_MAPPED_GMS_PORT:-8080}:8080
build:
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ services:
- GRAPH_SERVICE_IMPL=${GRAPH_SERVICE_IMPL:-elasticsearch}
- JAVA_OPTS=-Xms1g -Xmx1g
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- MAE_CONSUMER_ENABLED=true
- MCE_CONSUMER_ENABLED=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ services:
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- MAE_CONSUMER_ENABLED=true
- MCE_CONSUMER_ENABLED=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ services:
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- MAE_CONSUMER_ENABLED=true
- MCE_CONSUMER_ENABLED=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
datahub-mae-consumer:
container_name: datahub-mae-consumer
environment:
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
- DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
Expand Down Expand Up @@ -44,6 +45,7 @@ services:
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- MAE_CONSUMER_ENABLED=false
- MCE_CONSUMER_ENABLED=true
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ services:
neo4j:
condition: service_healthy
environment:
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
- DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
Expand Down Expand Up @@ -54,6 +55,7 @@ services:
- GRAPH_SERVICE_IMPL=neo4j
- JAVA_OPTS=-Xms1g -Xmx1g
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- MAE_CONSUMER_ENABLED=false
- MCE_CONSUMER_ENABLED=true
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ services:
- GRAPH_SERVICE_IMPL=${GRAPH_SERVICE_IMPL:-elasticsearch}
- JAVA_OPTS=-Xms1g -Xmx1g
- KAFKA_BOOTSTRAP_SERVER=broker:29092
- KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR=${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:-true}
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- MAE_CONSUMER_ENABLED=true
- MCE_CONSUMER_ENABLED=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
public class ConsumerConfiguration {

private int maxPartitionFetchBytes;
private boolean stopOnDeserializationError;
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ kafka:
maxRequestSize: ${KAFKA_PRODUCER_MAX_REQUEST_SIZE:5242880} # the max bytes sent by the producer, also see kafka-setup MAX_MESSAGE_BYTES for matching value
consumer:
maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition
stopOnDeserializationError: ${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:true} # Stops kafka listener container on deserialization error, allows user to fix problems before moving past problematic offset. If false will log and move forward past the offset
schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE
url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
import org.springframework.kafka.listener.CommonDelegatingErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;

@Slf4j
@Configuration
Expand Down Expand Up @@ -66,8 +71,6 @@ private static Map<String, Object> buildCustomizedProperties(
SchemaRegistryConfig schemaRegistryConfig) {
KafkaProperties.Consumer consumerProps = baseKafkaProperties.getConsumer();

// Specify (de)serializers for record keys and for record values.
consumerProps.setKeyDeserializer(StringDeserializer.class);
// Records will be flushed every 10 seconds.
consumerProps.setEnableAutoCommit(true);
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10));
Expand All @@ -81,7 +84,13 @@ private static Map<String, Object> buildCustomizedProperties(

Map<String, Object> customizedProperties = baseKafkaProperties.buildConsumerProperties();
customizedProperties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getDeserializer());
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
customizedProperties.put(
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
customizedProperties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
customizedProperties.put(
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, schemaRegistryConfig.getDeserializer());

// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet().stream()
Expand All @@ -98,14 +107,27 @@ private static Map<String, Object> buildCustomizedProperties(
@Bean(name = "kafkaEventConsumer")
protected KafkaListenerContainerFactory<?> createInstance(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory) {
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConcurrency(kafkaEventConsumerConcurrency);

/* Sets up a delegating error handler for Deserialization errors, if disabled will
use DefaultErrorHandler (does back-off retry and then logs) rather than stopping the container. Stopping the container
prevents lost messages until the error can be examined, disabling this will allow progress, but may lose data
*/
if (configurationProvider.getKafka().getConsumer().isStopOnDeserializationError()) {
CommonDelegatingErrorHandler delegatingErrorHandler =
new CommonDelegatingErrorHandler(new DefaultErrorHandler());
delegatingErrorHandler.addDelegate(
DeserializationException.class, new CommonContainerStoppingErrorHandler());
factory.setCommonErrorHandler(delegatingErrorHandler);
}

log.info(
String.format(
"Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s",
Expand Down

0 comments on commit 55cb568

Please sign in to comment.