Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected user-specified time limit error #1562

Closed
kimtree opened this issue Dec 17, 2019 · 18 comments
Closed

Unexpected user-specified time limit error #1562

kimtree opened this issue Dec 17, 2019 · 18 comments

Comments

@kimtree
Copy link

kimtree commented Dec 17, 2019

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
v1.24.1 v1.1.1 v1.13.1
Configuration

What configuration values are you using for Sarama and Kafka?

config := sarama.NewConfig()
config.Version = sarama.V1_1_1_0
config.Consumer.MaxWaitTime = 500 * time.Millisecond
config.Consumer.Return.Errors = true
Logs

When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.

logs: CLICK ME

time="2019-12-13 15:38:23.746" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER3.local:9092"
time="2019-12-13 15:38:23.746" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER2.local:9092"
time="2019-12-13 15:38:23.890" level=info msg="client/coordinator requesting coordinator for consumergroup consumer_group_name from BROKER_SERVER4.local:9092"
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/9: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/11: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/10: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/67: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.890" level=info msg="client/coordinator requesting coordinator for consumergroup consumer_group_name from BROKER_SERVER3.local:9092"
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/12: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/7: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/68: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/8: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/6: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/65: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/50: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/54: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/66: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/55: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/63: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/64: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=info msg="client/coordinator coordinator for consumergroup consumer_group_name is #3 (BROKER_SERVER3.local:9092)"
time="2019-12-13 15:38:23.891" level=info msg="client/coordinator requesting coordinator for consumergroup consumer_group_name from BROKER_SERVER2.local:9092"
time="2019-12-13 15:38:23.891" level=info msg="client/coordinator requesting coordinator for consumergroup consumer_group_name from BROKER_SERVER2.local:9092"
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/51: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/52: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/53: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/1: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/4: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/2: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/3: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/5: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=error msg="kafka: error while consuming kafka_topic_name/0: kafka server: Request exceeded the user-specified time limit in the request." logLevel=ERROR
time="2019-12-13 15:38:23.891" level=info msg="client/coordinator coordinator for consumergroup consumer_group_name is #3 (BROKER_SERVER3.local:9092)"
time="2019-12-13 15:38:23.891" level=info msg="client/coordinator coordinator for consumergroup consumer_group_name is #3 (BROKER_SERVER3.local:9092)"
time="2019-12-13 15:38:23.892" level=info msg="client/coordinator coordinator for consumergroup consumer_group_name is #3 (BROKER_SERVER3.local:9092)"
time="2019-12-13 15:38:26.726" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER4.local:9092"
time="2019-12-13 15:38:26.726" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER3.local:9092"
time="2019-12-13 15:38:26.726" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER2.local:9092"
time="2019-12-13 15:38:26.725" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER2.local:9092"
time="2019-12-13 15:38:26.891" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER2.local:9092"
time="2019-12-13 15:38:26.891" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER2.local:9092"
time="2019-12-13 15:38:26.891" level=info msg="client/metadata fetching metadata for [kafka_topic_name] from broker BROKER_SERVER3.local:9092"

Problem Description

I got unexpected user-specified time limit errors from Kafka server while consuming. I'm not sure what user-specified time limit value from Sarama config. Do I have to take a look at our Kafka server's configurations?

Below is the code where the error comes from.

go func() {
	for err := range group.Errors() {
		log.Error(err)
	}
}()
@dnwe
Copy link
Collaborator

dnwe commented Jan 15, 2020

So your configured MaxWaitTime of 500 milliseconds will be passed into the FetchRequests that your client is sending to the broker(s) and the brokers are responding with a Kafka protocol error code 7 REQUEST_TIMED_OUT (https://kafka.apache.org/protocol#protocol_error_codes) so it seems that your brokers are unable to process the requests within your configured limits for some reason?

@kimtree
Copy link
Author

kimtree commented Jan 16, 2020

@dnwe In fact, I increased one of the time-related value which is MaxWaitTime for now since I couldn't get any clue of this timeout issue. Is it true that only MaxWaitTime value is related to REQUEST_TIMED_OUT error and this is a server-side problem?

@antsbean
Copy link
Contributor

The same problem on my online environment, @dnwe modifying MaxWaitTime doesn't solve this problem.

@kimtree you can try to modify the following parameters

	cfg := sarama.NewConfig()
	....
	
	cfg.Consumer.Group.Session.Timeout = 30 * time.Second
	cfg.Consumer.Group.Heartbeat.Interval = 10 * time.Second

This problem is caused by frequent refresh of metadata, since see #1544

@kimtree if you merge #1544 code, Consumer.Group.Session.Timeout and cfg.Consumer.Group.Heartbeat.Interval use default.

Sorry again for your troubles

@kimtree
Copy link
Author

kimtree commented Jan 17, 2020

@antsbean Thanks for your clarification. Increasing Session.Timeout, Heartbeat.Interval value seems quite vulnerable to handling unexpected behavior from the consumer group at the moment. So I'll wait for PR #1544 being merged. Thanks.

@d1egoaz
Copy link
Contributor

d1egoaz commented Jan 22, 2020

fixed by #1544

@kimtree
Copy link
Author

kimtree commented Feb 6, 2020

@dnwe @antsbean
After I've updated to v1.26.0 and removed config.Consumer.MaxWaitTime line, but I still can Unexpected user-specified time limit error. Is there any way to handle this error?

@antsbean
Copy link
Contributor

antsbean commented Feb 11, 2020

@dnwe @antsbean
After I've updated to v1.26.0 and removed config.Consumer.MaxWaitTime line, but I still can Unexpected user-specified time limit error. Is there any way to handle this error?

This problem is caused by offset manager commit offset https://github.com/Shopify/sarama/blob/82c97b2bcec5c4f3fca4dc0aa9b2013d8976cc47/offset_manager.go#L237

I suggest you use the following configuration, try it

	cfg := sarama.NewConfig()
	
	cfg.Net.DialTimeout = 2 * time.Second
	cfg.Net.MaxOpenRequests = 255
	cfg.Consumer.MaxWaitTime = 100 * time.Millisecond
	cfg.Consumer.MaxProcessingTime = 200 * time.Millisecond
	cfg.Consumer.Group.Session.Timeout = 15 * time.Second
	cfg.Consumer.Group.Heartbeat.Interval = 5 * time.Second


offset manager flushToBroker function should be add retry feature

@kimtree
Copy link
Author

kimtree commented Feb 13, 2020

@antsbean Thanks for the information. But the configurations seem quite complicated. Do you have a simpler version of this configurations which handle an essential part of resolving the time limit error?

@antsbean
Copy link
Contributor

@antsbean Thanks for the information. But the configurations seem quite complicated. Do you have a simpler version of this configurations which handle an essential part of resolving the time limit error?

cfg.Consumer.Group.Session.Timeout = 15 * time.Second
cfg.Consumer.Group.Heartbeat.Interval = 5 * time.Second

@kimtree
Copy link
Author

kimtree commented Feb 13, 2020

@antsbean Thanks for pointing out. Let me ask for more detail. As you mentioned before, you thought OffsetManager's CommitOffset invokes time limit error right? But I'm not sure how Session.Timeout and Heartbeat.Interval related to time limit error.

As I understand, Session.Timeout and Heartbeat.Interval is more likely to adjust a timeout for participating in a consumer group and keep sending heartbeat responses. Seems not related to any type of Offset or some requests.

@antsbean
Copy link
Contributor

@kimtree offset manager and consumer group use same broker. Frequent sending of messages to kafka server causes processing timeout.

@kimtree
Copy link
Author

kimtree commented Feb 14, 2020

@antsbean Okay, I see. That means #1544 couldn't resolve all these kinds of errors yet. I'll try with the configurations that you gave me.

@kimtree
Copy link
Author

kimtree commented Feb 26, 2020

@antsbean The time limit error still occurs even though I adjust the config values that you gave me. Is there a way not to get these errors?

@antsbean
Copy link
Contributor

@antsbean The time limit error still occurs even though I adjust the config values that you gave me. Is there a way not to get these errors?

offset manager flushToBroker function should be add retry feature

@kimtree
Copy link
Author

kimtree commented Mar 10, 2020

@antsbean Do you have a plan to implement retry feature? I'm not sure this way works. #1637

@antsbean
Copy link
Contributor

@antsbean Do you have a plan to implement retry feature? I'm not sure this way works. #1637

sorry, I've been busy recently.
I think
Determine if retry is required based on the returned error code
The following code is provided for your reference

func (om *offsetManager) mainLoop() {
	defer om.ticker.Stop()
	defer close(om.closed)
	hasErr := func(err error) bool {
		if err == nil {
			return false
		}
		if kErr, ok := err.(KError); ok && kErr == ErrNoError {
			return false
		}
		return true
	}
	for {
		select {
		case <-om.ticker.C:
			// will retry
			var err error
			var hasErrFlag bool
			for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
				err = om.flushToBroker()
				if hasErrFlag = hasErr(err); hasErrFlag {
					continue
				}
				om.releasePOMs(false)
				break
			}
			// avoid second call hasErr function
			if hasErrFlag {
				Logger.Printf("offset manager commit offset to kafka server failed err %s", err.Error())
				om.handleError(err)
			}

		case <-om.closing:
			return
		}
	}
}
....
// flushToBroker is ignored if auto-commit offsets is disabled
// if err is not nil, will retry
func (om *offsetManager) flushToBroker() error {
	if !om.conf.Consumer.Offsets.AutoCommit.Enable {
		return nil
	}

	req := om.constructRequest()
	if req == nil {
		return nil
	}

	broker, err := om.coordinator()
	if err != nil {
		Logger.Printf("offset manager get coordinator %s ", err.Error())
		return err // will retry
	}

	resp, err := broker.CommitOffset(req)
	if err != nil {
		Logger.Printf("offset manager commit offset error %s", err.Error())
		om.releaseCoordinator(broker)
		_ = broker.Close()
		return err // will retry
	}
	return om.handleResponse(broker, req, resp)
}

func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) error {
	om.pomsLock.RLock()
	defer om.pomsLock.RUnlock()
	// lastErr just only record last error
	var lastErr KError
	for _, topicManagers := range om.poms {
		for _, pom := range topicManagers {
			if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
				continue
			}
			if resp.Errors[pom.topic] == nil {
				pom.handleError(ErrIncompleteResponse)
				continue
			}
			if err, ok := resp.Errors[pom.topic][pom.partition]; !ok {
				pom.handleError(ErrIncompleteResponse)
				continue
			} else {
				switch err {
				case ErrNoError:
					block := req.blocks[pom.topic][pom.partition]
					pom.updateCommitted(block.offset, block.metadata)
				case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
					ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
					// not a critical error, we just need to redispatch
					om.releaseCoordinator(broker)
				case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
					// nothing we can do about this, just tell the user and carry on
					pom.handleError(err)
				case ErrOffsetsLoadInProgress:
				// nothing wrong but we didn't commit, we'll get it next time round
				case ErrUnknownTopicOrPartition:
					// let the user know *and* try redispatching - if topic-auto-create is
					// enabled, redispatching should trigger a metadata req and create the
					// topic; if not then re-dispatching won't help, but we've let the user
					// know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
					Logger.Printf("offset manager commit to offset error %s", err.Error())
					pom.handleError(err)
					om.releaseCoordinator(broker)
				case ErrRequestTimedOut:
					// request timeout(https://github.com/Shopify/sarama/issues/1562), will retry (
					// why is this wrong? kafka server https://kafka.apache.org/22/documentation.html#brokerconfigs
					// offsets.commit.timeout.ms default 5s, kafka server offset replicas timeout
					Logger.Printf("offset manager commit offset to kafka server timeout error %s topic %s partition %d",
						err.Error(), pom.topic, pom.partition)
					lastErr = err
					om.releaseCoordinator(broker)
				default:
					// dunno, tell the user and try redispatching
					Logger.Printf("offset manager receive unknown error %s, will retry", err.Error())
					om.releaseCoordinator(broker)
					lastErr = err
				}
			}
		}
	}
	return lastErr
}

@kimtree
Copy link
Author

kimtree commented Mar 11, 2020

@antsbean No problem!
I got what you meant from the code. Are you going to upload this patch? It would be great if you upload this patch and be released soon since we're trying to use a released version of Sarama not just editing code from the local environment.

@kimtree
Copy link
Author

kimtree commented Mar 20, 2020

@antsbean ping?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants