Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getting i/o timeout error when using sarama consumer group #1192

Closed
varun06 opened this issue Oct 15, 2018 · 3 comments
Closed

getting i/o timeout error when using sarama consumer group #1192

varun06 opened this issue Oct 15, 2018 · 3 comments

Comments

@varun06
Copy link
Contributor

varun06 commented Oct 15, 2018

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: v1.19.0
Kafka Version: v2.0
Go Version: v1.11

Problem Description

Getting i/o timeout error from broker when using consumer group.

Error:          Received unexpected error read tcp <ip>:63133-><ip>:9092: i/o timeout

Error is returned from https://github.com/Shopify/sarama/blob/master/broker.go#L592

Note: same kafka cluster is working fine when not using consumer group

@robertPiro
Copy link

We're experiencing the same problem and cranked up the time outs. It did not resolve the issue. It looks like a connectivity issue, though we are experiencing the same problem on serval deployments which excludes that this is hardware related. We are getting stuck in a loop of rebalances.

Sarama v1.19.0
Sarama Cluster v2.1.15
Kafka v2.0.
Go v1.11

Reverting Sarama v1.17 did not help.

@roxit
Copy link

roxit commented Nov 5, 2018

I experienced perhaps the same problem. One consumer instance using the ConsumerGroup API is fine, but when I tried to start another instance, the other instance got the same error and could not join the group.

Sarama v1.19.0
Kafka 1.1.0
Go v1.10.3

Solved the problem by watching ConsumerGroupSession.Context().Done() as well in ConsumeClaim.
The comment at https://github.com/Shopify/sarama/blob/v1.19.0/consumer_group.go#L18 is clear enough, but it seems that the example at https://godoc.org/github.com/Shopify/sarama#example-ConsumerGroup does not take rebalancing into consideration.

func (h MyHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	var msg SyncMessage
	for {
		select {
		case cMsg := <-claim.Messages():
			err := json.Unmarshal(cMsg.Value, &msg)
			if err != nil {
				return err
			}
			// do something
			sess.MarkMessage(cMsg, "")
		case <-sess.Context().Done():
			return nil
		}
	}
}

@yitian108
Copy link

How I create several consumers with the same consume group via the same kafka client to implement point to point consume?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants