-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MCE failing when writing to the FailedMetadataChangeEventTopic #1829
Comments
If I read the log correctly, the failure actually came from |
But in that case also, it shouldn't give error while writing to FMCE. Am I correct? Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. |
Argh my bad. Misread the log. Yeah it's failing to produce FMCE because of the missing schema registry URL config. Is this docker or k8s? |
It is in both docker and k8s. |
There's a default value set for schema registry here: https://github.com/linkedin/datahub/blob/master/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java#L30 I wonder if it's got overridden somewhere... |
Can reproduce this locally. Seems like somehow this line isn't working correctly during spring DI: https://github.com/linkedin/datahub/blob/master/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaConfig.java#L74 |
Thanks @mars-lan |
Describe the bug
I put the fabricType as "TEST" which resulted in message parsing error: IllegalArgumentException("Unsupported Fabric Type: TEST"). It should have written the message to the FailedMCE topic as per the expected behaviour.
Instead, it is throwing the below error:
01:49:45.955 [mce-consumer-job-client-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = MetadataChangeEvent_v4, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1598579382866, serialized key size = 30, serialized value size = 135507, headers = RecordHeaders(headers = [], isReadOnly = false), key = kafka,db-market-quote-raw,TEST, value = {"auditHeader": null, "proposedSnapshot": {______}, "proposedDelta": null})
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.linkedin.metadata.kafka.MetadataChangeEventsProcessor.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>)' threw exception; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka producer; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1376)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1365)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1277)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1248)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1162)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:971)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:775)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:708)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:431)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:336)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:322)
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:490)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:406)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:221)
at com.linkedin.metadata.kafka.MetadataChangeEventsProcessor.sendFailedMCE(MetadataChangeEventsProcessor.java:78)
at com.linkedin.metadata.kafka.MetadataChangeEventsProcessor.consume(MetadataChangeEventsProcessor.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1327)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1307)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1267)
... 8 common frames omitted
Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:251)
at io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:105)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:371)
... 29 common frames omitted
02:22:54.115 [kafka-coordinator-heartbeat-thread | mce-consumer-job-client] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=mce-consumer-job-client] Group coordinator broker:29092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
02:22:54.164 [mce-consumer-job-client-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consum
To Reproduce
Steps to reproduce the behavior:
Expected behavior
No exception in writing message to FailedMCE topic
The text was updated successfully, but these errors were encountered: