You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
asyncProducer creates a brokerProducer to interact with each Broker. brokerProducers send records to a Broker via its AsyncProduce() method. This method takes a callback, which brokerProducer sets to a function that sends the ProduceResponse and any error to its own responses chan.
If a brokerProducer reads an error from its responses chan, it calls Close() on its Broker, which is synchronous. The broker closes its responses chan and waits for the current response to finish processing.
In the case where the current response being processed for the broker is an error, it invokes the brokerProducer's callback, sending a struct to the brokerProducer's responses chan. This deadlocks, as the channel is unbuffered, and brokerProducer is waiting for Broker's Close() method to return rather than reading from this channel.
In terms of raw goroutine dumps:
// Broker waiting for current response to finish processing so it can shutdown
goroutine 133 [chan receive, 283 minutes]:
github.com/Shopify/sarama.(*Broker).Close(0xc0000d8700, 0x0, 0x0)
external/com_github_shopify_sarama/broker.go:287 +0xb6
github.com/Shopify/sarama.(*brokerProducer).handleError(0xc00036b960, 0xc0004e6120, 0xbd5ae0, 0xc0000da000)
external/com_github_shopify_sarama/async_producer.go:1031 +0x1ab
github.com/Shopify/sarama.(*brokerProducer).handleResponse(0xc00036b960, 0xc000398040)
external/com_github_shopify_sarama/async_producer.go:898 +0x53
github.com/Shopify/sarama.(*brokerProducer).run(0xc00036b960)
external/com_github_shopify_sarama/async_producer.go:831 +0x27b
github.com/Shopify/sarama.withRecover(0xc0002b7670)
external/com_github_shopify_sarama/utils.go:43 +0x49
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
external/com_github_shopify_sarama/async_producer.go:691 +0x20c
// asyncProducer's callback waiting to send to its responses channel, which is not being read from
goroutine 183 [chan send, 283 minutes]:
github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.1.1(0x0, 0xbd5ae0, 0xc0000da000)
external/com_github_shopify_sarama/async_producer.go:701 +0x9c
github.com/Shopify/sarama.(*Broker).AsyncProduce.func1(0x0, 0x0, 0x0, 0xbd5ae0, 0xc0000da000)
external/com_github_shopify_sarama/broker.go:406 +0x12c
github.com/Shopify/sarama.(*responsePromise).handle(0xc0000ba380, 0x0, 0x0, 0x0, 0xbd5ae0, 0xc0000da000)
external/com_github_shopify_sarama/broker.go:132 +0xf3
github.com/Shopify/sarama.(*Broker).responseReceiver(0xc0000d8700)
external/com_github_shopify_sarama/broker.go:1038 +0x618
github.com/Shopify/sarama.withRecover(0xc000410400)
external/com_github_shopify_sarama/utils.go:43 +0x49
created by github.com/Shopify/sarama.(*Broker).Open.func1
external/com_github_shopify_sarama/broker.go:244 +0x8b0
The text was updated successfully, but these errors were encountered:
Versions
Problem Description
asyncProducer
creates abrokerProducer
to interact with eachBroker
.brokerProducer
s send records to aBroker
via itsAsyncProduce()
method. This method takes a callback, whichbrokerProducer
sets to a function that sends theProduceResponse
and anyerror
to its ownresponses
chan.If a
brokerProducer
reads an error from itsresponses
chan, it callsClose()
on itsBroker
, which is synchronous. The broker closes itsresponses
chan and waits for the current response to finish processing.In the case where the current response being processed for the broker is an error, it invokes the
brokerProducer
's callback, sending a struct to thebrokerProducer
'sresponses
chan. This deadlocks, as the channel is unbuffered, andbrokerProducer
is waiting forBroker
'sClose()
method to return rather than reading from this channel.In terms of raw goroutine dumps:
The text was updated successfully, but these errors were encountered: