Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncProducer Close blocked for infinity #2121

Closed
3AceShowHand opened this issue Jan 25, 2022 · 16 comments
Closed

AsyncProducer Close blocked for infinity #2121

3AceShowHand opened this issue Jan 25, 2022 · 16 comments

Comments

@3AceShowHand
Copy link

3AceShowHand commented Jan 25, 2022

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.27.2 3.0.0 1.7.5
Configuration

What configuration values are you using for Sarama and Kafka?

	config.Metadata.Retry.Max = 3
	config.Metadata.Retry.Backoff = 250 * time.Millisecond
	config.Metadata.Timeout = 1 * time.Minute

	// Admin.Retry take effect on `ClusterAdmin` related operations,
	// only `CreateTopic` for cdc now. Just use default values.
	config.Admin.Retry.Max = 5
	config.Admin.Retry.Backoff = 100 * time.Millisecond
	config.Admin.Timeout = 3 * time.Second

	config.Producer.Retry.Max = 3
	config.Producer.Retry.Backoff = 100 * time.Millisecond

	config.Producer.Partitioner = sarama.NewManualPartitioner
	config.Producer.MaxMessageBytes = c.MaxMessageBytes
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	config.Producer.RequiredAcks = sarama.WaitForAll
Logs

We are testing sarama in an extremely rare scenario:

  • producer can send request to the 1 machine Kafka cluster, but cannot get a response
  • Kill -s STOP the kafka broker process, this will make the TCP connection remain, but will not send a response.
  • Close the producer by call asyncProducer.Close()

we do not have a log from sarama, but have grabbed some goroutine stack, like the following one, it looks blocked on trying to receive a response from the broker.
image

Our purpose is that when try to close the producer, it should not be blocked for a long time, instead of return as soon as possible.

WechatIMG3202

But the reality as shown in the picture above, 33 messages failed to deliver after 38minutes, and it was after the process resume by kill -s CONT

Problem Description
@3AceShowHand
Copy link
Author

3AceShowHand commented Jan 25, 2022

expected feature, when try to close the asyncProducer, just drop all buffered message, and response immediately.

@samuelhewitt
Copy link

samuelhewitt commented Jan 25, 2022

We are facing an issue along similar lines with 1.31.0, though we haven't seen it in any prior version.

Sarama Kafka Go
1.31.0 2.3.1 1.17.6
	config := sarama.NewConfig()
	config.Producer.Compression = sarama.CompressionSnappy
	config.ChannelBufferSize = 32

We are not directly calling asyncProducer.Close(), but the producer is Closing itself as part of error handling - leading to the infinite block. I am working to get more specifics and an example.

WIP details:
Close() - https://github.com/Shopify/sarama/blob/v1.31.0/broker.go#L269
which is holding the b.lock, waiting for channel b.done
Close() is called when an error response is returned here - https://github.com/Shopify/sarama/blob/v1.31.0/async_producer.go#L1022-L1031
b.done is closed when responseReceiver is no longer processing an event
At least 1 goroutine is waiting to handle a dead error, rather than on the select (haven't pinpointed exactly what its waiting for, in relation to processing the dead error)
Potentially a race condition between Close() and the handling of the dead error that initiated the close? TBD

97 @ 0x4bf0c 0x5eee8 0x5eed9 0x7b5c0 0x88b44 0x6df4e4 0x6df475 0x6df3f0 0x6df9a8 0x6dd394 0x6ea68c 0x6e7504 0x6d6fcc 0x5f9964 0x5f9864 0x6d6f0c 0x6d6ead 0x6d6198 0x720d34 0x7f2d4
#	0x7b5bf		sync.runtime_SemacquireMutex+0x3f					/usr/local/go/src/runtime/sema.go:71
#	0x88b43		sync.(*Mutex).lockSlow+0x193						/usr/local/go/src/sync/mutex.go:138
#	0x6df4e3	sync.(*Mutex).Lock+0xa3							/usr/local/go/src/sync/mutex.go:81
#	0x6df474	github.com/Shopify/sarama.(*Broker).sendWithPromise+0x34		/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:884
#	0x6df3ef	github.com/Shopify/sarama.(*Broker).send+0xdf				/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:876
#	0x6df9a7	github.com/Shopify/sarama.(*Broker).sendAndReceive+0x77			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:934
#	0x6dd393	github.com/Shopify/sarama.(*Broker).GetMetadata+0x63			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:316
#	0x6ea68b	github.com/Shopify/sarama.(*client).tryRefreshMetadata+0x46b		/go/pkg/mod/github.com/!shopify/[email protected]/client.go:895
#	0x6e7503	github.com/Shopify/sarama.(*client).RefreshMetadata+0x103		/go/pkg/mod/github.com/!shopify/[email protected]/client.go:489
#	0x6d6fcb	github.com/Shopify/sarama.(*partitionProducer).updateLeader.func1+0x8b	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:657
#	0x5f9963	github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1+0x53	/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:85
#	0x5f9863	github.com/eapache/go-resiliency/breaker.(*Breaker).doWork+0x33		/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:86
#	0x6d6f0b	github.com/eapache/go-resiliency/breaker.(*Breaker).Run+0x7b		/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:55
#	0x6d6eac	github.com/Shopify/sarama.(*partitionProducer).updateLeader+0x1c	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:656
#	0x6d6197	github.com/Shopify/sarama.(*partitionProducer).dispatch+0x537		/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:589
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43				/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

4 @ 0x4bf0c 0x17f94 0x17978 0x6d5df0 0x720d34 0x7f2d4
#	0x6d5def	github.com/Shopify/sarama.(*partitionProducer).dispatch+0x18f	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:546
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43			/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

4 @ 0x4bf0c 0x17f94 0x17978 0x6d7538 0x720d34 0x7f2d4
#	0x6d7537	github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1+0x57	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:695
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43				/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

4 @ 0x4bf0c 0x5de98 0x6d798c 0x720d34 0x7f2d4
#	0x6d798b	github.com/Shopify/sarama.(*brokerProducer).run+0x17b	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:765
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43		/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

2 @ 0x4bf0c 0x16fc4 0x16b10 0x6d63b8 0x720d34 0x7f2d4
#	0x6d63b7	github.com/Shopify/sarama.(*partitionProducer).dispatch+0x757	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:606
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43			/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

2 @ 0x4bf0c 0x17f94 0x17938 0x6da1bc 0x720d34 0x7f2d4
#	0x6da1bb	github.com/Shopify/sarama.(*asyncProducer).retryHandler+0x14b	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:1052
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43			/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

2 @ 0x4bf0c 0x17f94 0x17978 0x6d4d40 0x720d34 0x7f2d4
#	0x6d4d3f	github.com/Shopify/sarama.(*asyncProducer).dispatcher+0x8f	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:331
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43			/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

2 @ 0x4bf0c 0x17f94 0x17978 0x6d5494 0x720d34 0x7f2d4
#	0x6d5493	github.com/Shopify/sarama.(*topicProducer).dispatch+0x43	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:413
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43			/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

1 @ 0x4bf0c 0x16fc4 0x16b10 0x6d77ec 0x6dd84c 0x6db894 0x6e0078 0x720d34 0x7f2d4
#	0x6d77eb	github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.2+0xcb	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:701
#	0x6dd84b	github.com/Shopify/sarama.(*Broker).AsyncProduce.func1+0xfb			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:388
#	0x6db893	github.com/Shopify/sarama.(*responsePromise).handle+0xc3			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:132
#	0x6e0077	github.com/Shopify/sarama.(*Broker).responseReceiver+0xb7			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:1020
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43					/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

1 @ 0x4bf0c 0x17f94 0x17938 0x6dcfd8 0x6d9e44 0x6d8988 0x6d79c0 0x720d34 0x7f2d4
#	0x6dcfd7	github.com/Shopify/sarama.(*Broker).Close+0xe7			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:269
#	0x6d9e43	github.com/Shopify/sarama.(*brokerProducer).handleError+0x1a3	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:1031
#	0x6d8987	github.com/Shopify/sarama.(*brokerProducer).handleResponse+0x47	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:898
#	0x6d79bf	github.com/Shopify/sarama.(*brokerProducer).run+0x1af		/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:831
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43			/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

1 @ 0x4bf0c 0x17f94 0x17978 0x6e0024 0x720d34 0x7f2d4
#	0x6e0023	github.com/Shopify/sarama.(*Broker).responseReceiver+0x63	/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:1015
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43			/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

1 @ 0x4bf0c 0x5de98 0x6e9ff0 0x720d34 0x7f2d4
#	0x6e9fef	github.com/Shopify/sarama.(*client).backgroundMetadataUpdater+0xff	/go/pkg/mod/github.com/!shopify/[email protected]/client.go:824
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43				/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

1 @ 0x4bf0c 0x5eee8 0x5eed9 0x7b5c0 0x88b44 0x6df4e4 0x6df475 0x6dd724 0x6d75d0 0x720d34 0x7f2d4
#	0x7b5bf		sync.runtime_SemacquireMutex+0x3f					/usr/local/go/src/runtime/sema.go:71
#	0x88b43		sync.(*Mutex).lockSlow+0x193						/usr/local/go/src/sync/mutex.go:138
#	0x6df4e3	sync.(*Mutex).Lock+0xa3							/usr/local/go/src/sync/mutex.go:81
#	0x6df474	github.com/Shopify/sarama.(*Broker).sendWithPromise+0x34		/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:884
#	0x6dd723	github.com/Shopify/sarama.(*Broker).AsyncProduce+0x133			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:405
#	0x6d75cf	github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1+0xef	/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:712
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43				/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

1 @ 0x4bf0c 0x5eee8 0x5eed9 0x7b5c0 0x88b44 0x6df4e4 0x6df475 0x6df3f0 0x6df9a8 0x6dd394 0x6ea68c 0x6e7504 0x6ea19c 0x6ea00c 0x720d34 0x7f2d4
#	0x7b5bf		sync.runtime_SemacquireMutex+0x3f					/usr/local/go/src/runtime/sema.go:71
#	0x88b43		sync.(*Mutex).lockSlow+0x193						/usr/local/go/src/sync/mutex.go:138
#	0x6df4e3	sync.(*Mutex).Lock+0xa3							/usr/local/go/src/sync/mutex.go:81
#	0x6df474	github.com/Shopify/sarama.(*Broker).sendWithPromise+0x34		/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:884
#	0x6df3ef	github.com/Shopify/sarama.(*Broker).send+0xdf				/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:876
#	0x6df9a7	github.com/Shopify/sarama.(*Broker).sendAndReceive+0x77			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:934
#	0x6dd393	github.com/Shopify/sarama.(*Broker).GetMetadata+0x63			/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:316
#	0x6ea68b	github.com/Shopify/sarama.(*client).tryRefreshMetadata+0x46b		/go/pkg/mod/github.com/!shopify/[email protected]/client.go:895
#	0x6e7503	github.com/Shopify/sarama.(*client).RefreshMetadata+0x103		/go/pkg/mod/github.com/!shopify/[email protected]/client.go:489
#	0x6ea19b	github.com/Shopify/sarama.(*client).refreshMetadata+0x7b		/go/pkg/mod/github.com/!shopify/[email protected]/client.go:848
#	0x6ea00b	github.com/Shopify/sarama.(*client).backgroundMetadataUpdater+0x11b	/go/pkg/mod/github.com/!shopify/[email protected]/client.go:826
#	0x720d33	github.com/Shopify/sarama.withRecover+0x43				/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

@3AceShowHand
Copy link
Author

github.com/Shopify/sarama.(*asyncProducer).dispatcher(0x1400012c310)
	github.com/Shopify/[email protected]/async_producer.go:330 +0x8c
github.com/Shopify/sarama.withRecover(0x140019c8360)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/[email protected]/async_producer.go:166 +0x270

goroutine 404 [chan receive, 2 minutes]:
github.com/Shopify/sarama.(*asyncProducer).Close(0x1400012c310)
	github.com/Shopify/[email protected]/async_producer.go:307 +0x154
github.com/pingcap/tiflow/cdc/sink/producer/kafka.(*kafkaSaramaProducer).Close(0x14000e86dc0)
	github.com/pingcap/tiflow/cdc/sink/producer/kafka/kafka.go:217 +0x11c
github.com/pingcap/tiflow/cdc/sink.(*mqSink).Close(0x140008a4600, {0x103c77048, 0x14005410d00})
	github.com/pingcap/tiflow/cdc/sink/mq.go:309 +0x34
github.com/pingcap/tiflow/cdc/sink.(*Manager).Close(0x140027a7800, {0x103c77048, 0x14005410d00})
	github.com/pingcap/tiflow/cdc/sink/manager.go:92 +0x278
github.com/pingcap/tiflow/cdc/processor.(*processor).Close(0x140023ce000)
	github.com/pingcap/tiflow/cdc/processor/processor.go:1068 +0x774
github.com/pingcap/tiflow/cdc/processor.(*Manager).closeProcessor(0x14000f5e020, {0x14000fda2da, 0x24})
	github.com/pingcap/tiflow/cdc/processor/manager.go:132 +0x68
github.com/pingcap/tiflow/cdc/processor.(*Manager).Tick(0x14000f5e020, {0x12ec19bb0, 0x14000ec0e80}, {0x103c31300, 0x14000a78c80})
	github.com/pingcap/tiflow/cdc/processor/manager.go:112 +0x540
github.com/pingcap/tiflow/pkg/orchestrator.(*EtcdWorker).Run(0x140009fbc00, {0x12ec19bb0, 0x14000ec0e80}, 0x14000effcb0, 0x5f5e100, {0x14001b30020, 0xe}, {0x102db3ae6, 0x9})
	github.com/pingcap/tiflow/pkg/orchestrator/etcd_worker.go:239 +0xa04
github.com/pingcap/tiflow/cdc/capture.(*Capture).runEtcdWorker(0x1400090d2b0, {0x103caae20, 0x14000ec0e80}, {0x103c17400, 0x14000f5e020}, {0x103c31300, 0x14000a78c80}, 0x5f5e100, {0x102db3ae6, 0x9})
	github.com/pingcap/tiflow/cdc/capture/capture.go:456 +0x120
github.com/pingcap/tiflow/cdc/capture.(*Capture).run.func3(0x14000994210, 0x1400090d2b0, {0x103caae20, 0x14000ec0e80}, 0x140006e61c0)
	github.com/pingcap/tiflow/cdc/capture/capture.go:316 +0x2d4
created by github.com/pingcap/tiflow/cdc/capture.(*Capture).run
	github.com/pingcap/tiflow/cdc/capture/capture.go:296 +0x478

goroutine 1785 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).Close.func1()
	github.com/Shopify/[email protected]/async_producer.go:300 +0x48
github.com/Shopify/sarama.withRecover(0x140073d4ab0)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).Close
	github.com/Shopify/[email protected]/async_producer.go:299 +0x9c

goroutine 929 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).retryHandler(0x1400012c310)
	github.com/Shopify/[email protected]/async_producer.go:1034 +0x148
github.com/Shopify/sarama.withRecover(0x140019c8370)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/[email protected]/async_producer.go:167 +0x2d4

goroutine 670 [chan receive]:
github.com/rcrowley/go-metrics.(*meterArbiter).tick(0x10679ea20)
	github.com/rcrowley/[email protected]/meter.go:239 +0x34
created by github.com/rcrowley/go-metrics.NewMeter
	github.com/rcrowley/[email protected]/meter.go:46 +0xe4

goroutine 303 [select, 4 minutes]:
github.com/Shopify/sarama.(*client).backgroundMetadataUpdater(0x14000c2a090)
	github.com/Shopify/[email protected]/client.go:809 +0x104
github.com/Shopify/sarama.withRecover(0x140019c8340)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.NewClient
	github.com/Shopify/[email protected]/client.go:180 +0x3f8

goroutine 1784 [semacquire, 12 minutes]:
sync.runtime_Semacquire(0x1400012c350)
	runtime/sema.go:56 +0x38
sync.(*WaitGroup).Wait(0x1400012c348)
	sync/waitgroup.go:130 +0xa4
github.com/Shopify/sarama.(*asyncProducer).shutdown(0x1400012c310)
	github.com/Shopify/[email protected]/async_producer.go:1059 +0xd8
github.com/Shopify/sarama.withRecover(0x140073d4aa0)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).AsyncClose
	github.com/Shopify/[email protected]/async_producer.go:321 +0x80

goroutine 948 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*syncProducer).handleSuccesses(0x140026ae4c8)
	github.com/Shopify/[email protected]/sync_producer.go:131 +0x94
github.com/Shopify/sarama.withRecover(0x14001b871c0)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newSyncProducerFromAsyncProducer
	github.com/Shopify/[email protected]/sync_producer.go:76 +0xd0

goroutine 947 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*asyncProducer).retryHandler(0x14000555500)
	github.com/Shopify/[email protected]/async_producer.go:1034 +0x148
github.com/Shopify/sarama.withRecover(0x14001b871b0)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/[email protected]/async_producer.go:167 +0x2d4

goroutine 946 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*asyncProducer).dispatcher(0x14000555500)
	github.com/Shopify/[email protected]/async_producer.go:330 +0x8c
github.com/Shopify/sarama.withRecover(0x14001b871a0)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/[email protected]/async_producer.go:166 +0x270

goroutine 945 [select, 4 minutes]:
github.com/Shopify/sarama.(*client).backgroundMetadataUpdater(0x14000c2a2d0)
	github.com/Shopify/[email protected]/client.go:809 +0x104
github.com/Shopify/sarama.withRecover(0x14001b87180)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.NewClient
	github.com/Shopify/[email protected]/client.go:180 +0x3f8

goroutine 949 [chan receive, 13 minutes]:
github.com/Shopify/sarama.(*syncProducer).handleErrors(0x140026ae4c8)
	github.com/Shopify/[email protected]/sync_producer.go:139 +0x98
github.com/Shopify/sarama.withRecover(0x14001b871d0)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.newSyncProducerFromAsyncProducer
	github.com/Shopify/[email protected]/sync_producer.go:77 +0x134

goroutine 1125 [select, 2 minutes]:
github.com/Shopify/sarama.(*Broker).sendAndReceive(0x14000d5d180, {0x103c8c220, 0x140021afe30}, {0x103c8c268, 0x140059cbd10})
	github.com/Shopify/[email protected]/broker.go:774 +0xf4
github.com/Shopify/sarama.(*Broker).GetMetadata(0x14000d5d180, 0x140021afe30)
	github.com/Shopify/[email protected]/broker.go:283 +0x64
github.com/Shopify/sarama.(*client).tryRefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1}, 0x2, {0xc0740bc096c208d8, 0xce70638466, 0x10679f9c0})
	github.com/Shopify/[email protected]/client.go:880 +0x450
github.com/Shopify/sarama.(*client).tryRefreshMetadata.func2({0x103c15140, 0x1400087ece0})
	github.com/Shopify/[email protected]/client.go:859 +0x1f0
github.com/Shopify/sarama.(*client).tryRefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1}, 0x3, {0xc0740bc096c208d8, 0xce70638466, 0x10679f9c0})
	github.com/Shopify/[email protected]/client.go:927 +0xac0
github.com/Shopify/sarama.(*client).RefreshMetadata(0x14000c2a090, {0x140393807b0, 0x1, 0x1})
	github.com/Shopify/[email protected]/client.go:473 +0xfc
github.com/Shopify/sarama.(*partitionProducer).updateLeader.func1()
	github.com/Shopify/[email protected]/async_producer.go:657 +0x8c
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1(0x14002c39e68, 0x14002c39eb8)
	github.com/eapache/[email protected]/breaker/breaker.go:85 +0x54
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork(0x14002518050, 0x0, 0x14002c39eb8)
	github.com/eapache/[email protected]/breaker/breaker.go:86 +0x34
github.com/eapache/go-resiliency/breaker.(*Breaker).Run(...)
	github.com/eapache/[email protected]/breaker/breaker.go:55
github.com/Shopify/sarama.(*partitionProducer).updateLeader(0x1400408e060)
	github.com/Shopify/[email protected]/async_producer.go:656 +0x74
github.com/Shopify/sarama.(*partitionProducer).dispatch(0x1400408e060)
	github.com/Shopify/[email protected]/async_producer.go:589 +0x55c
github.com/Shopify/sarama.withRecover(0x1400557c7c0)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer
	github.com/Shopify/[email protected]/async_producer.go:513 +0x200

goroutine 3458 [IO wait, 2 minutes]:
internal/poll.runtime_pollWait(0x12ea13d18, 0x72)
	runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x14003396c18, 0x72, 0x0)
	internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
	internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14003396c00, {0x140022ed000, 0x1000, 0x1000})
	internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x14003396c00, {0x140022ed000, 0x1000, 0x1000})
	net/fd_posix.go:56 +0x44
net.(*conn).Read(0x140027d8c30, {0x140022ed000, 0x1000, 0x1000})
	net/net.go:183 +0x4c
bufio.(*Reader).Read(0x1400387fa40, {0x14000b08128, 0x8, 0x8})
	bufio/bufio.go:227 +0x20c
github.com/Shopify/sarama.(*bufConn).Read(0x14005305d10, {0x14000b08128, 0x8, 0x8})
	github.com/Shopify/[email protected]/utils.go:107 +0x44
io.ReadAtLeast({0x12e9cde50, 0x14005305d10}, {0x14000b08128, 0x8, 0x8}, 0x8)
	io/io.go:328 +0xa0
io.ReadFull(...)
	io/io.go:347
github.com/Shopify/sarama.(*Broker).readFull(0x14000d5d180, {0x14000b08128, 0x8, 0x8})
	github.com/Shopify/[email protected]/broker.go:702 +0xe0
github.com/Shopify/sarama.(*Broker).responseReceiver(0x14000d5d180)
	github.com/Shopify/[email protected]/broker.go:858 +0x148
github.com/Shopify/sarama.withRecover(0x14000fac070)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*Broker).Open.func1
	github.com/Shopify/[email protected]/broker.go:211 +0xbac

goroutine 3277 [IO wait, 2 minutes]:
internal/poll.runtime_pollWait(0x12ea141a0, 0x72)
	runtime/netpoll.go:234 +0xa4
internal/poll.(*pollDesc).wait(0x14000742018, 0x72, 0x0)
	internal/poll/fd_poll_runtime.go:84 +0x38
internal/poll.(*pollDesc).waitRead(...)
	internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14000742000, {0x1400248f000, 0x1000, 0x1000})
	internal/poll/fd_unix.go:167 +0x1dc
net.(*netFD).Read(0x14000742000, {0x1400248f000, 0x1000, 0x1000})
	net/fd_posix.go:56 +0x44
net.(*conn).Read(0x140506829f0, {0x1400248f000, 0x1000, 0x1000})
	net/net.go:183 +0x4c
bufio.(*Reader).Read(0x1400408f1a0, {0x14002d0daf0, 0x8, 0x8})
	bufio/bufio.go:227 +0x20c
github.com/Shopify/sarama.(*bufConn).Read(0x14006ec6528, {0x14002d0daf0, 0x8, 0x8})
	github.com/Shopify/[email protected]/utils.go:107 +0x44
io.ReadAtLeast({0x12e9cde50, 0x14006ec6528}, {0x14002d0daf0, 0x8, 0x8}, 0x8)
	io/io.go:328 +0xa0
io.ReadFull(...)
	io/io.go:347
github.com/Shopify/sarama.(*Broker).readFull(0x14000d5c000, {0x14002d0daf0, 0x8, 0x8})
	github.com/Shopify/[email protected]/broker.go:702 +0xe0
github.com/Shopify/sarama.(*Broker).responseReceiver(0x14000d5c000)
	github.com/Shopify/[email protected]/broker.go:858 +0x148
github.com/Shopify/sarama.withRecover(0x140078cd9e0)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*Broker).Open.func1
	github.com/Shopify/[email protected]/broker.go:211 +0xbac

goroutine 1278 [chan send, 2 minutes]:
github.com/Shopify/sarama.(*topicProducer).dispatch(0x14000d2ea80)
	github.com/Shopify/[email protected]/async_producer.go:426 +0x154
github.com/Shopify/sarama.withRecover(0x14039381830)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newTopicProducer
	github.com/Shopify/[email protected]/async_producer.go:407 +0x1fc

goroutine 1279 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*partitionProducer).dispatch(0x140025bcd80)
	github.com/Shopify/[email protected]/async_producer.go:546 +0x198
github.com/Shopify/sarama.withRecover(0x14039381840)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer
	github.com/Shopify/[email protected]/async_producer.go:513 +0x200

goroutine 1409 [select, 12 minutes]:
github.com/Shopify/sarama.(*brokerProducer).run(0x14003b530a0)
	github.com/Shopify/[email protected]/async_producer.go:747 +0x17c
github.com/Shopify/sarama.withRecover(0x14039381860)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
	github.com/Shopify/[email protected]/async_producer.go:691 +0x25c

goroutine 1410 [chan receive, 12 minutes]:
github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1()
	github.com/Shopify/[email protected]/async_producer.go:695 +0x68
github.com/Shopify/sarama.withRecover(0x14000e72e00)
	github.com/Shopify/[email protected]/utils.go:43 +0x44
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
	github.com/Shopify/[email protected]/async_producer.go:694 +0x2e8

@3AceShowHand
Copy link
Author

Our service need Success and Errors enabled, since we have to track message flushed offset and errors.

@hxiaodon
Copy link

hxiaodon commented Jan 28, 2022

I think I came across the same issue with Sarama 1.31.0 kafka 1.1.0 with golang 1.17.5
And below is my extracted goroutine stack

goroutine profile: total 334


45 @ 0x4383b6 0x40640c 0x405e78 0xd38906 0xd906de 0x468c21
#	0xd38905	github.com/Shopify/sarama.(*partitionProducer).dispatch+0x1a5	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:546
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43


12 @ 0x4383b6 0x448052 0xd3a331 0xd906de 0x468c21
#	0xd3a330	github.com/Shopify/sarama.(*brokerProducer).run+0x190	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:765
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d		/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43


9 @ 0x4383b6 0x40640c 0x405e38 0xd8ecb0 0xd9f542 0xf0f2d1 0xf152a6 0xf152b4 0xf27a05 0x468c21
#	0xd8ecaf	github.com/Shopify/sarama.(*syncProducer).SendMessage+0x8f						/root/go/pkg/mod/github.com/!shopify/[email protected]/sync_producer.go:96
#   ..........my project code.......
#   ..........my project code.......
#   ..........my project code.......

9 @ 0x4383b6 0x40640c 0x405e78 0xd42a14 0xd906de 0x468c21
#	0xd42a13	github.com/Shopify/sarama.(*Broker).responseReceiver+0x73	/root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:1015
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43


3 @ 0x4383b6 0x405565 0x40511d 0xd38e58 0xd906de 0x468c21
#	0xd38e57	github.com/Shopify/sarama.(*partitionProducer).dispatch+0x6f7	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:606
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

3 @ 0x4383b6 0x40640c 0x405e38 0xd3caff 0xd906de 0x468c21
#	0xd3cafe	github.com/Shopify/sarama.(*asyncProducer).retryHandler+0x19e	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:1052
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43


3 @ 0x4383b6 0x40640c 0x405e78 0xd37852 0xd906de 0x468c21
#	0xd37851	github.com/Shopify/sarama.(*asyncProducer).dispatcher+0xd1	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:331
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

3 @ 0x4383b6 0x40640c 0x405e78 0xd8f13c 0xd906de 0x468c21
#	0xd8f13b	github.com/Shopify/sarama.(*syncProducer).handleSuccesses+0x9b	/root/go/pkg/mod/github.com/!shopify/[email protected]/sync_producer.go:130
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43

3 @ 0x4383b6 0x40640c 0x405e78 0xd8f2a5 0xd906de 0x468c21
#	0xd8f2a4	github.com/Shopify/sarama.(*syncProducer).handleErrors+0xa4	/root/go/pkg/mod/github.com/!shopify/[email protected]/sync_producer.go:138
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43


1 @ 0x4383b6 0x405565 0x40511d 0xd3a105 0xd402c3 0xd3e419 0xd42dae 0xd906de 0x468c21
#	0xd3a104	github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.2+0xc4	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:701
#	0xd402c2	github.com/Shopify/sarama.(*Broker).AsyncProduce.func1+0xc2			/root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:388
#	0xd3e418	github.com/Shopify/sarama.(*responsePromise).handle+0x98			/root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:132
#	0xd42dad	github.com/Shopify/sarama.(*Broker).responseReceiver+0x40d			/root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:1020
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d					/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43


1 @ 0x4383b6 0x40640c 0x405e38 0xd3faab 0xd3c6c5 0xd3b20e 0xd3a369 0xd906de 0x468c21
#	0xd3faaa	github.com/Shopify/sarama.(*Broker).Close+0xca			/root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:269
#	0xd3c6c4	github.com/Shopify/sarama.(*brokerProducer).handleError+0x184	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:1031
#	0xd3b20d	github.com/Shopify/sarama.(*brokerProducer).handleResponse+0x2d	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:898
#	0xd3a368	github.com/Shopify/sarama.(*brokerProducer).run+0x1c8		/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:831
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43



1 @ 0x4383b6 0x40640c 0x405e78 0xd37fd4 0xd906de 0x468c21
#	0xd37fd3	github.com/Shopify/sarama.(*topicProducer).dispatch+0x53	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:413
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d			/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43


1 @ 0x4383b6 0x44914c 0x449126 0x464cc5 0x472b05 0xd41d37 0xd41d12 0xd401cb 0xd39f1c 0xd906de 0x468c21
#	0x464cc4	sync.runtime_SemacquireMutex+0x24					/usr/local/go/src/runtime/sema.go:71
#	0x472b04	sync.(*Mutex).lockSlow+0x164						/usr/local/go/src/sync/mutex.go:138
#	0xd41d36	sync.(*Mutex).Lock+0x96							/usr/local/go/src/sync/mutex.go:81
#	0xd41d11	github.com/Shopify/sarama.(*Broker).sendWithPromise+0x71		/root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:884
#	0xd401ca	github.com/Shopify/sarama.(*Broker).AsyncProduce+0x10a			/root/go/pkg/mod/github.com/!shopify/[email protected]/broker.go:405
#	0xd39f1b	github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1+0xdb	/root/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:712
#	0xd906dd	github.com/Shopify/sarama.withRecover+0x3d				/root/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43


goroutine A  (*Broker).Close+0xca
brokerProducer run() loop
    -> handleError 
          -> bp.broker.Close
               -> bp.broker.lock.Locked, close broker's "responses" channel(buffered channel) ,blocked by broker's "done" channel, and asyncProducer's "responses" channel(zero buffered) could never be closed

goroutine B (*Broker).responseReceiver+0x40d
try pushing broker's dead info into asyncProducer's "responses" channel(zero buffered), blocked, and broker's done channel could never be closed, goroutine A will be blocked forever 



then other application code's goroutine(producers) will not work since bp.broker's lock will never be released


how about the below fix

func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
	var (
		input     = make(chan *ProducerMessage)
		bridge    = make(chan *produceSet)
		responses = make(chan *brokerProducerResponse,1) //buffered channel should work?
	)
       //.......
       //.......
}

@slaunay
Copy link
Contributor

slaunay commented Feb 7, 2022

I believe there are two issues discussed here with similar outcomes.

Default configuration of the AsyncProducer can lead to long delays

By default the AsyncProducer has retries and backoff enabled that can lead to long delays before failing.
Such long delays are often a combination of multiple records being buffered and the current in flight request being "stuck" reading from a broker.
The default timeout when reading from a TCP socket is 30 seconds (config.Net.ReadTimeout), so it might take up to 30 seconds to notice that the connection to a given broker is broken (assuming the connection was not properly closed on both ends).
Then you might see a 100ms backoff per record (if you have 600 pending records going to a given partition, you might be waiting up to 1 minute) before you can replay all those records.
Also if the target broker is not reachable anymore, you might also hit another 30 seconds delay (config.Net.DialTimeout) before triggering another retry (up to 3 by default).

So depending on how the AsyncProducer is configured and the type of network error, it might look the producer is stuck but it is actually mostly idle because of the retry logic.

See #1359 for yet another example on how it can take up to 4 minutes to fail trying to connect to a 2 brokers cluster.

expected feature, when try to close the asyncProducer, just drop all buffered message, and response immediately.

Unfortunately because of how the pipeline logic works, I don't think the shutdown message (created when closing the AsyncProducer) is handled till all the queued records are processed.
That is existing retries need to finish (which is often what is taking time) but "new" retries will be cancelled.
To fail faster, you could disable retries and handle them yourself:

config.Net.DialTimeout = <short-enough-for-your-need>
config.Net.ReadTimeout = <short-enough-for-your-need>
config.Metadata.Timeout = <short-enough-for-your-need>
config.Producer.Retry.Max = 0
config.Producer.Retry.Backoff = 0

Deadlock on retries (specific Sarama 1.31.0)

The other issue is indeed a deadlock when a brokerProducer is trying to call Close on its broker inside the callback of broker.AsyncProduce, this happens when dealing when receiving failed Produce response while sending concurrently another Produce request.
Such callback is called from the responseReceiver goroutine but the Close receiver blocks:

  • if there is an extra Produce request that reaches the maximum number of in flights requests (config.Net.MaxOpenRequests) by trying to acquire the b.lock.
  • till responseReceiver goroutine is done (by reading from b.done).

This is a regression from #2094 and I should have a fix with a simple unit test for that soon.
I believe #2129 describes that regression as well and I don't think it is specific to the SyncProducer.

@3AceShowHand
Copy link
Author

3AceShowHand commented Feb 11, 2022

I added some logs to customize the sarama, and there is some of the output:

[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=1"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=2"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=3"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=4"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=5"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=6"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=7"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=8"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=9"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=10"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=11"] [name=sarama]
[2022/02/07 11:05:33.083 +08:00] [DEBUG] [async_producer.go:574] ["add new message to retryState, retries=0, len=12"] [name=sarama]
....

more than 100 messages like this, I think the mount of this log has relations with the mount of messages.

Later happens this kind of messages.

[2022/02/07 11:06:48.427 +08:00] [DEBUG] [async_producer.go:592] ["try to updateLeader"] [name=sarama]
[2022/02/07 11:06:48.427 +08:00] [DEBUG] [async_producer.go:594] ["update leader meet error, err=circuit breaker is open"] [name=sarama]
[2022/02/07 11:06:48.528 +08:00] [DEBUG] [async_producer.go:592] ["try to updateLeader"] [name=sarama]
[2022/02/07 11:06:48.528 +08:00] [DEBUG] [async_producer.go:594] ["update leader meet error, err=circuit breaker is open"] [name=sarama]
[2022/02/07 11:06:48.628 +08:00] [DEBUG] [async_producer.go:592] ["try to updateLeader"] [name=sarama]
[2022/02/07 11:06:48.628 +08:00] [DEBUG] [async_producer.go:594] ["update leader meet error, err=circuit breaker is open"] [name=sarama]
....

And finally

[2022/02/07 11:06:48.831 +08:00] [DEBUG] [async_producer.go:592] ["try to updateLeader"] [name=sarama]
[2022/02/07 11:06:48.831 +08:00] [DEBUG] [async_producer.go:594] ["update leader meet error, err=circuit breaker is open"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:580] ["fin received, try flushRetryBuffers, highWatermark=1"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:632] ["producer/leader/fuck-sarama/0 state change to [flushing-1]"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:654] ["producer/leader/fuck-sarama/0 state change to [normal]"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1065] ["asyncProducer shutdown, inflight wait, elapsed=65.626644333"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1072] ["asyncProducer shutdown, client close elapsed = 4.58e-07"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [ERROR] [kafka.go:236] ["close async client with error"] [error="kafka: Failed to deliver 802 messages."] [duration=1m15.637246541s] [changefeed=747ff99b-56d4-4c22-b20f-d3b515417baf] [role=processor]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [sync_producer.go:167] ["syncProducer close, elapsed = 7.92e-07"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1058] ["Producer shutting down."] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1061] ["asyncProducer send `shutdown`, input size=0"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1065] ["asyncProducer shutdown, inflight wait, elapsed=9.17e-07"] [name=sarama]
[2022/02/07 11:06:48.932 +08:00] [DEBUG] [async_producer.go:1072] ["asyncProducer shutdown, client close elapsed = 4.1e-08"] [name=sarama]

I think the steps like this:

  1. send a message to brokerProducer, but failed, so the brokerProducer will be closed.
  2. try to updateLeader, by using the breaker
  3. updateLeader will not succeed, since the broker is already dead. For each message in the retryState, it will try to do that, and finally cost a lot of time.
  4. receive a fin message, finally closed.

@3AceShowHand
Copy link
Author

3AceShowHand commented Feb 11, 2022

How about add a timeout for the close, like https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close--

or a method like ForceClose()

@3AceShowHand
Copy link
Author

So depending on how the AsyncProducer is configured and the type of network error, it might look the producer is stuck but it is actually mostly idle because of the retry logic.

From my observation, it's trying to refreshMetadata from dead brokers.

@tsuna
Copy link

tsuna commented Feb 11, 2022

We've seen this deadlock repeatedly on several Kafka clusters as soon as we picked up v1.31.x, and we haven't seen it since we rolled out back our vendored deps to v1.30.1, so there is definitely a bug in the code, it's not a problem due to a dead broker (our brokers are fine).

@colinwm
Copy link

colinwm commented Feb 22, 2022

FWIW this is affecting us too, an upgrade to 1.31 caused our producers to lock up. Downgrading to 1.30.1 resolved the issue

@lavoiesl
Copy link
Contributor

This seems like a dupe of #2129, which is fixed in the latest main: #2133

@suvorovis
Copy link

It still exists in 1.32 (which includes fix #2133).
Downgrading to 1.30.1 resolved the issue.
Thanks, @tsuna

@github-actions

This comment was marked as outdated.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Aug 18, 2023
@dnwe dnwe removed the stale Issues and pull requests without any recent activity label Aug 18, 2023
@dnwe
Copy link
Collaborator

dnwe commented Aug 18, 2023

We believe this issue is fixed in recent versions of Sarama. Feel free to re-open if you're still seeing a problem on the latest

@dnwe dnwe closed this as completed Aug 18, 2023
@yitian108
Copy link

yitian108 commented Mar 15, 2024

@dnwe, we faced the same issue with version 1.37, should I update the latest sarama version (My kafka broker version is 3.6.1)?
And I reviewed the comments by @slaunay

Unfortunately because of how the pipeline logic works, I don't think the shutdown message (created when closing the AsyncProducer) is handled till all the queued records are processed.
That is existing retries need to finish (which is often what is taking time) but "new" retries will be cancelled.
To fail faster, you could disable retries and handle them yourself:

config.Net.DialTimeout = <short-enough-for-your-need>
config.Net.ReadTimeout = <short-enough-for-your-need>
config.Metadata.Timeout = <short-enough-for-your-need>
config.Producer.Retry.Max = 0
config.Producer.Retry.Backoff = 0

If I set the configuration like above, will the issue be solved?
Another configuration, Metadata.Timeout. If I set it to a small value like 1 second, I'm not sure if it will cause other issues.

Could you give me some advice on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants