Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer is constantly being terminated without an error #591

Closed
toddkazakov opened this issue Sep 28, 2020 · 3 comments · Fixed by #594
Closed

Kafka consumer is constantly being terminated without an error #591

toddkazakov opened this issue Sep 28, 2020 · 3 comments · Fixed by #594

Comments

@toddkazakov
Copy link

We're considering adopting cloud events for asynchornous message processing using Kafka. However, during our POC we've ran into an issue, which causes the consumer to be constantly terminated. client.StartReceive(ctx, receive) simply returns nil after roughly 3 minutes. Here are the relevant logs:

todd@Todds-MacBook-Pro marketo-service % KAFKA_BROKERS="172.20.46.141:9092" KAFKA_PREFIX="todd2-" KAFKA_TOPIC="events" KAFKA_CONSUMER_GROUP="todd2" ./dist/marketo-service
marketo-service2020/09/28 19:28:37 client.go:133: Initializing new client
marketo-service2020/09/28 19:28:37 config.go:544: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
marketo-service2020/09/28 19:28:37 config.go:544: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
marketo-service2020/09/28 19:28:37 client.go:841: client/metadata fetching metadata for all topics from broker 172.20.46.141:9092
marketo-service2020/09/28 19:28:38 broker.go:209: Connected to broker at 172.20.46.141:9092 (unregistered)
marketo-service2020/09/28 19:28:38 client.go:572: client/brokers registered new broker #0 at kafka-kafka-0.kafka-kafka-brokers.kafka.svc:9092
marketo-service2020/09/28 19:28:38 client.go:572: client/brokers registered new broker #2 at kafka-kafka-2.kafka-kafka-brokers.kafka.svc:9092
marketo-service2020/09/28 19:28:38 client.go:572: client/brokers registered new broker #1 at kafka-kafka-1.kafka-kafka-brokers.kafka.svc:9092
marketo-service2020/09/28 19:28:38 client.go:180: Successfully initialized new client
marketo-service2020/09/28 19:28:38 client.go:838: client/metadata fetching metadata for [todd2-events] from broker 172.20.46.141:9092
marketo-service2020/09/28 19:28:38 client.go:1005: client/coordinator requesting coordinator for consumergroup todd2 from 172.20.46.141:9092
marketo-service2020/09/28 19:28:39 client.go:1028: client/coordinator coordinator for consumergroup todd2 is #2 (kafka-kafka-2.kafka-kafka-brokers.kafka.svc:9092)
marketo-service2020/09/28 19:28:39 config.go:544: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
marketo-service2020/09/28 19:28:39 broker.go:207: Connected to broker at kafka-kafka-2.kafka-kafka-brokers.kafka.svc:9092 (registered as #2)
marketo-service2020/09/28 19:28:42 client.go:1005: client/coordinator requesting coordinator for consumergroup todd2 from 172.20.46.141:9092
marketo-service2020/09/28 19:28:43 client.go:1028: client/coordinator coordinator for consumergroup todd2 is #2 (kafka-kafka-2.kafka-kafka-brokers.kafka.svc:9092)
marketo-service2020/09/28 19:28:43 config.go:544: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
marketo-service2020/09/28 19:28:43 broker.go:207: Connected to broker at kafka-kafka-0.kafka-kafka-brokers.kafka.svc:9092 (registered as #0)
marketo-service2020/09/28 19:28:43 consumer.go:801: consumer/broker/0 added subscription to todd2-events/0
2020/09/28 19:28:44 Received event Validation: valid
Context Attributes,
  specversion: 1.0
  type: users:delete
  source: shoreline
  id: 9d3b2e56-55da-41ab-844a-0bffe29c5836
  time: 2020-09-28T16:27:58.238284Z
  datacontenttype: application/json
Data,
  {
    "userid": "1234567890",
    ...
  }

marketo-service2020/09/28 19:29:55 consumer_group.go:426: kafka server: Request was for a consumer group that is not coordinated by this broker.
marketo-service2020/09/28 19:29:55 consumer_group.go:466: loop check partition number coroutine will exit, topics [todd2-events]
marketo-service2020/09/28 19:29:55 consumer.go:807: consumer/broker/0 closed dead subscription to todd2-events/0
marketo-service2020/09/28 19:29:55 client.go:232: Closing Client
2020/09/28 19:29:55 Consumer stopped

The receive function just logs the received event and returns protocol.ResultACK. The behavior doesn't change even if we don't return a result from the function.

I suspect that the offsets are not being committed, because when we restart the receiver, it starts from the beginning of the topic as if there are no committed offsets (saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest). If this is true, we might be running into IBM/sarama#1608.

Could you please advise what to do?

@slinkydeveloper
Copy link
Member

slinkydeveloper commented Sep 28, 2020

The first thing that comes in my mind is: did you tried to manually start the consumer group using sarama and check if that behaviours happens? This might help us to understand if it's a sarama problem or our problem. Even without using the client, you can easily access the deserialization of consumer message to cloudevent using https://github.com/cloudevents/sdk-go/blob/master/protocol/kafka_sarama/v2/message_benchmark_test.go#L38

@toddkazakov
Copy link
Author

toddkazakov commented Sep 29, 2020

I implemented my own consumer using sarama and I saw the exact same behavior. Looking at the kafka logs, it looks like every few minutes rebalancing happens and the offsets are not being committed. Switching to a different cluster resolved the issue.

Leaving uncommitted offsets aside, I think the cloudevents kafka consumer doesn't handle rebalancing correctly. I think it should handle consumer group reballancing events under the hood, instead of returning nil from StartReceive.

@slinkydeveloper
Copy link
Member

@toddkazakov can you review the #594?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants