Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP packet capture found that the Partitions.Offset returned by Fetch V1 Request increased, but MessageSet was empty #3006

Closed
memory-overflow opened this issue Oct 31, 2024 · 2 comments

Comments

@memory-overflow
Copy link

memory-overflow commented Oct 31, 2024

Description

A topic consumption is suddenly stuck.

Using the packet capture tool, it is found that for normal topics, the return packet of Fetch V1 Request will have an increased offset and will return MessageSet.

As shown below, the normal request and response packets:
企业微信截图_83187aba-8ff5-4100-a316-479c2939f5e5

企业微信截图_d895bbe6-d225-4e73-a73e-eaaacef7f9e4

Abnormal request and response packets:
企业微信截图_19aa65b9-8c6c-49b9-b42b-13f8d97262f2

企业微信截图_a2249e23-e5ed-4c3b-8792-67608929d7e7

Once an exception occurs, the consumer will keep retrying indefinitely, causing the topic consumption to be stuck.

Versions
Sarama Kafka Go
v1.29.1 v3.5.1 1.18
Configuration
func setupKafka() (c *cluster.Consumer, err error) {
	topics := []string{}
	for {
		topics, err = dao.TIMatrixTemplateDao{}.GetAllTopics(context.Background())
		if err != nil {
			log.Error("get all topics error: ", err)
		}

		if len(topics) > 0 {
			break
		}
		log.Error("topic_id not update yet")
		time.Sleep(2 * time.Second)
	}

	clusterConfig := cluster.NewConfig()
	clusterConfig.Consumer.Return.Errors = true
	clusterConfig.Group.Return.Notifications = true
	clusterConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
	clusterConfig.Consumer.Offsets.CommitInterval = time.Second
	clusterConfig.Consumer.Fetch.Default = int32(config.GetKafkaConsumer().KafkaMaxMessageBytes)
	if len(config.Get().Kafka.User) > 0 {
		clusterConfig.Net.SASL.Enable = true
		clusterConfig.Net.SASL.User = config.Get().Kafka.User
		clusterConfig.Net.SASL.Password = config.Get().Kafka.PassWord
	}
	consumer, err := cluster.NewConsumer(strings.Split(config.Get().Kafka.Host, ","),
		config.GetKafkaConsumer().KafkaGroupName, topics, clusterConfig)
	go dynamicConsumerTopic(consumer, topics)
	log.Infof("create kakfa consumer success, topics: %v", topics)
	return consumer, nil
}
additional

We have some complete packet capture data, but it involves sensitive information and cannot be provided directly. If you need it, you can contact me for assistance at [email protected]

@dnwe
Copy link
Collaborator

dnwe commented Nov 4, 2024

@memory-overflow did you resolve this issue?

@memory-overflow
Copy link
Author

@memory-overflow did you resolve this issue?

Yes, we found that the v1 version of the fetch protocol will force the max_bytes check. If a packet larger than max_bytes appears, the client will be stuck and can neither get the message nor skip it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants