Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PartitionConsumer.Close() hangs frequently #325

Closed
dim opened this issue Mar 7, 2015 · 16 comments · Fixed by #437
Closed

PartitionConsumer.Close() hangs frequently #325

dim opened this issue Mar 7, 2015 · 16 comments · Fixed by #437
Labels

Comments

@dim
Copy link
Contributor

dim commented Mar 7, 2015

I have a difficult time to recreate the exact issue, since it is intermittent, but it happens frequently enough to be a problem. Calls to PartitionConsumer.Close() tend to hang in https://github.com/Shopify/sarama/blob/6e7ef3c/consumer.go#L341 because the errors channel is never closed by the dispatcher in https://github.com/Shopify/sarama/blob/6e7ef3c/consumer.go#L260. When I dump my goroutines, I can see that it's hanging in https://github.com/Shopify/sarama/blob/6e7ef3c/consumer.go#L231. Does AsyncClose() need an additional client.trigger <- none{}?

@eapache
Copy link
Contributor

eapache commented Mar 7, 2015

Could you provide the complete goroutine trace? The client.trigger channel should be closed by updateSubscriptionCache when it detects that the dying channel has closed.

@dim
Copy link
Contributor Author

dim commented Mar 7, 2015

Here's the relevant bits:

goroutine 44 [chan receive]:
github.com/Shopify/sarama.(*Broker).responseReceiver(0xc2080b40e0)
  /home/me/go/src/github.com/Shopify/sarama/broker.go:332 +0xe3
github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm()
  /home/me/go/src/github.com/Shopify/sarama/broker.go:84 +0x27
github.com/Shopify/sarama.withRecover(0xc2080d61b0)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.func·001
  /home/me/go/src/github.com/Shopify/sarama/broker.go:84 +0x535

goroutine 57 [chan receive]:
github.com/Shopify/sarama.(*Broker).responseReceiver(0xc2080b4540)
  /home/me/go/src/github.com/Shopify/sarama/broker.go:332 +0xe3
github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm()
  /home/me/go/src/github.com/Shopify/sarama/broker.go:84 +0x27
github.com/Shopify/sarama.withRecover(0xc2080d6760)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.func·001
  /home/me/go/src/github.com/Shopify/sarama/broker.go:84 +0x535

goroutine 54 [select]:
github.com/Shopify/sarama.(*Client).backgroundMetadataUpdater(0xc2080b4150)
  /home/me/go/src/github.com/Shopify/sarama/client.go:455 +0x285
github.com/Shopify/sarama.*Client.(github.com/Shopify/sarama.backgroundMetadataUpdater)·fm()
  /home/me/go/src/github.com/Shopify/sarama/client.go:75 +0x27
github.com/Shopify/sarama.withRecover(0xc2080d6670)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.NewClient
  /home/me/go/src/github.com/Shopify/sarama/client.go:75 +0x62d

goroutine 62 [chan receive]:
github.com/Shopify/sarama.(*PartitionConsumer).Close(0xc2080de7e0, 0x0, 0x0)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:341 +0x129
github.com/bsm/sarama-cluster.(*Consumer).reset(0xc2080b8820, 0xc20803f740, 0x0, 0x0)
  /home/me/go/src/github.com/bsm/sarama-cluster/consumer.go:369 +0x269
github.com/bsm/sarama-cluster.(*Consumer).shutdown(0xc2080b8820, 0xc20803f740, 0x0, 0x0)
  /home/me/go/src/github.com/bsm/sarama-cluster/consumer.go:302 +0x44
github.com/bsm/sarama-cluster.(*Consumer).signalLoop(0xc2080b8820, 0x0, 0x0)
  /home/me/go/src/github.com/bsm/sarama-cluster/consumer.go:247 +0x371
github.com/bsm/sarama-cluster.*Consumer.(github.com/bsm/sarama-cluster.signalLoop)·fm(0x0, 0x0)
  /home/me/go/src/github.com/bsm/sarama-cluster/consumer.go:142 +0x39
gopkg.in/tomb%2ev2.(*Tomb).run(0xc2080b88c0, 0xc2080d6d70)
  /home/me/go/src/gopkg.in/tomb.v2/tomb.go:153 +0x24
created by gopkg.in/tomb%2ev2.(*Tomb).Go
  /home/me/go/src/gopkg.in/tomb.v2/tomb.go:149 +0x119

goroutine 67 [chan receive]:
github.com/Shopify/sarama.(*PartitionConsumer).dispatcher(0xc2080de7e0)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:231 +0x57
github.com/Shopify/sarama.*PartitionConsumer.(github.com/Shopify/sarama.dispatcher)·fm()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:130 +0x27
github.com/Shopify/sarama.withRecover(0xc2080d77d0)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*Consumer).ConsumePartition
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:130 +0x426

goroutine 68 [select]:
github.com/Shopify/sarama.(*brokerConsumer).subscriptionManager(0xc208075040)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:384 +0x49e
github.com/Shopify/sarama.*brokerConsumer.(github.com/Shopify/sarama.subscriptionManager)·fm()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:178 +0x27
github.com/Shopify/sarama.withRecover(0xc2080d77e0)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*Consumer).refBrokerConsumer
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:178 +0x2af

goroutine 69 [chan send]:
github.com/Shopify/sarama.(*brokerConsumer).handleResponse(0xc208075040, 0xc2080df9e0, 0xc2080afb00)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:540 +0x6a1
github.com/Shopify/sarama.(*brokerConsumer).subscriptionConsumer(0xc208075040)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:434 +0x57b
github.com/Shopify/sarama.*brokerConsumer.(github.com/Shopify/sarama.subscriptionConsumer)·fm()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:179 +0x27
github.com/Shopify/sarama.withRecover(0xc2080d77f0)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*Consumer).refBrokerConsumer
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:179 +0x310

goroutine 73 [chan receive]:
github.com/Shopify/sarama.(*PartitionConsumer).dispatcher(0xc2080df9e0)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:231 +0x57
github.com/Shopify/sarama.*PartitionConsumer.(github.com/Shopify/sarama.dispatcher)·fm()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:130 +0x27
github.com/Shopify/sarama.withRecover(0xc208edd110)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*Consumer).ConsumePartition
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:130 +0x426

@eapache
Copy link
Contributor

eapache commented Mar 7, 2015

Based on that stack trace the part that's stuck is goroutine 69, which is trying to return a message to the user. However, the Close function spawns a goroutine to drain those messages (https://github.com/Shopify/sarama/blob/d60bf3731577978392ce290a80e8dcf510e17bca/consumer.go#L334-L338) and it's not at all clear to me why that goroutine isn't showing up in your backtrace. Unless you snipped it out because you didn't think it was relevant?

@dim
Copy link
Contributor Author

dim commented Mar 7, 2015

Captured one more, including the message-consuming goroutine :

goroutine 37 [select]:
github.com/Shopify/sarama.(*Client).backgroundMetadataUpdater(0xc2080be150)
  /home/me/go/src/github.com/Shopify/sarama/client.go:455 +0x285
github.com/Shopify/sarama.*Client.(github.com/Shopify/sarama.backgroundMetadataUpdater)·fm()
  /home/me/go/src/github.com/Shopify/sarama/client.go:75 +0x27
github.com/Shopify/sarama.withRecover(0xc2080c8330)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.NewClient
  /home/me/go/src/github.com/Shopify/sarama/client.go:75 +0x62d

goroutine 34 [chan receive]:
github.com/Shopify/sarama.(*Broker).responseReceiver(0xc2080be0e0)
  /home/me/go/src/github.com/Shopify/sarama/broker.go:332 +0xe3
github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm()
  /home/me/go/src/github.com/Shopify/sarama/broker.go:84 +0x27
github.com/Shopify/sarama.withRecover(0xc2080c8150)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.func·001
  /home/me/go/src/github.com/Shopify/sarama/broker.go:84 +0x535

goroutine 40 [chan receive]:
github.com/Shopify/sarama.(*Broker).responseReceiver(0xc2080be2a0)
  /home/me/go/src/github.com/Shopify/sarama/broker.go:332 +0xe3
github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm()
  /home/me/go/src/github.com/Shopify/sarama/broker.go:84 +0x27
github.com/Shopify/sarama.withRecover(0xc2080c8430)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.func·001
  /home/me/go/src/github.com/Shopify/sarama/broker.go:84 +0x535

goroutine 50 [chan receive]:
github.com/Shopify/sarama.(*PartitionConsumer).Close(0xc2080d8cc0, 0x0, 0x0)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:341 +0x129
github.com/bsm/sarama-cluster.(*Consumer).reset(0xc2080bc4e0, 0xc2080df470, 0x0, 0x0)
  /home/me/go/src/github.com/bsm/sarama-cluster/consumer.go:369 +0x269
github.com/bsm/sarama-cluster.(*Consumer).shutdown(0xc2080bc4e0, 0xc2080df470, 0x0, 0x0)
  /home/me/go/src/github.com/bsm/sarama-cluster/consumer.go:302 +0x44
github.com/bsm/sarama-cluster.(*Consumer).signalLoop(0xc2080bc4e0, 0x0, 0x0)
  /home/me/go/src/github.com/bsm/sarama-cluster/consumer.go:247 +0x371
github.com/bsm/sarama-cluster.*Consumer.(github.com/bsm/sarama-cluster.signalLoop)·fm(0x0, 0x0)
  /home/me/go/src/github.com/bsm/sarama-cluster/consumer.go:142 +0x39
gopkg.in/tomb%2ev2.(*Tomb).run(0xc2080bc580, 0xc2080c8bd0)
  /home/me/go/src/gopkg.in/tomb.v2/tomb.go:153 +0x24
created by gopkg.in/tomb%2ev2.(*Tomb).Go
  /home/me/go/src/gopkg.in/tomb.v2/tomb.go:149 +0x119

goroutine 54 [chan receive]:
github.com/Shopify/sarama.(*PartitionConsumer).dispatcher(0xc2080d8cc0)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:231 +0x57
github.com/Shopify/sarama.*PartitionConsumer.(github.com/Shopify/sarama.dispatcher)·fm()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:130 +0x27
github.com/Shopify/sarama.withRecover(0xc2080c9400)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*Consumer).ConsumePartition
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:130 +0x426

goroutine 55 [select]:
github.com/Shopify/sarama.(*brokerConsumer).subscriptionManager(0xc2080752c0)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:384 +0x49e
github.com/Shopify/sarama.*brokerConsumer.(github.com/Shopify/sarama.subscriptionManager)·fm()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:178 +0x27
github.com/Shopify/sarama.withRecover(0xc2080c9410)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*Consumer).refBrokerConsumer
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:178 +0x2af

goroutine 56 [chan send]:
github.com/Shopify/sarama.(*brokerConsumer).handleResponse(0xc2080752c0, 0xc208ed6540, 0xc2080fdad0)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:540 +0x6a1
github.com/Shopify/sarama.(*brokerConsumer).subscriptionConsumer(0xc2080752c0)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:434 +0x57b
github.com/Shopify/sarama.*brokerConsumer.(github.com/Shopify/sarama.subscriptionConsumer)·fm()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:179 +0x27
github.com/Shopify/sarama.withRecover(0xc2080c9420)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*Consumer).refBrokerConsumer
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:179 +0x310

goroutine 60 [chan receive]:
github.com/Shopify/sarama.(*PartitionConsumer).dispatcher(0xc208ed6540)
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:231 +0x57
github.com/Shopify/sarama.*PartitionConsumer.(github.com/Shopify/sarama.dispatcher)·fm()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:130 +0x27
github.com/Shopify/sarama.withRecover(0xc208ecd4b0)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*Consumer).ConsumePartition
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:130 +0x426

goroutine 66 [chan receive]:
github.com/Shopify/sarama.func·003()
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:335 +0x57
github.com/Shopify/sarama.withRecover(0xc208f32060)
  /home/me/go/src/github.com/Shopify/sarama/utils.go:42 +0x3a
created by github.com/Shopify/sarama.(*PartitionConsumer).Close
  /home/me/go/src/github.com/Shopify/sarama/consumer.go:338 +0xce

@eapache
Copy link
Contributor

eapache commented Mar 7, 2015

The new stack trace on the surface looks really weird. Goroutine 56 is blocked trying to write to a channel, and goroutine 66 is apparently blocked trying to read from that same channel.

The only possible explanation is if they're not the same channel, because they belong to different PartitionConsumers. After some time digging through the sarama-cluster code, this actually makes sense. It owns multiple PartitionConsumers, but on shutdown it stops reading from all of them, then does effectively:

for c := range myPartitionConsumers {
    c.Close()
}

Since you're no longer reading from any of the PartitionConsumers, the internal consumer code blocks waiting to write a message out to e.g. partition 3, while you're trying to close partition 1. This deadlocks. This is a Sarama design bug (the partitions should not be quite so tightly locked together), but it's not immediately obvious to me what the best fix is internally.

Fortunately, there is a very easy workaround: just launch all your Close calls in goroutines, and the scheduler will order them correctly to handle the interdependencies. Something like the following should work:

wg := sync.WaitGroup{}
for c := range myPartitionConsumers {
    wg.Add(1)
    go func(c *PartitionConsumer) {
        defer wg.Done()
        c.Close()
    }(c)
}
wg.Wait()

Concurrency is hard :(

@eapache
Copy link
Contributor

eapache commented Mar 7, 2015

Here's one (sort of hacky?) theoretical fix for this on Sarama's end: if at any point the brokerConsumer spends more than 10ms waiting to write to a channel, spawn that write out into a goroutine and unsubscribe the relevant PartitionConsumer so that the rest of them can keep going...

@dim
Copy link
Contributor Author

dim commented Mar 7, 2015 via email

@eapache
Copy link
Contributor

eapache commented Mar 7, 2015

Alternatively, I could create a Consumer instance for each partition, with exactly one PartitionConsumer each.

That would work, but would defeat the entire purpose of the new consumer, which was to share the network connection and win a substantial efficiency gain in doing so.

We have a wiki page that gives an architectural overview of the new consumer: https://github.com/Shopify/sarama/wiki/Consumer. It is slightly out of date (this has now landed in master) but the architectural info is correct.

@wvanbergen
Copy link
Contributor

What I do in my very similar consumergroup library is to have a stopper channel, which I close when calling Close() on the consumergroup. All my partition consumers use this channel to break their consumer loop.

This approach seems to avoid the deadlocks you are seeing.

@dim
Copy link
Contributor Author

dim commented Mar 7, 2015

Breaking out of the loop isn't the problem, that bit works just fine. Your
library is also using the old sarama version. My problems only started when
I ported my library to master. I will push my changes to a separate branch
tomorrow, I would appreciate if you could help me with debugging.
On 7 Mar 2015 22:41, "Willem van Bergen" [email protected] wrote:

What I do in my very similar consumergroup library
https://github.com/wvanbergen/kafka is to have a stopper channel, which
I close when calling Close()
https://github.com/wvanbergen/kafka/blob/master/consumergroup/consumer_group.go#L201-L231
on the consumergroup.

All my partition consumers use this channel to break their consumer loop
https://github.com/wvanbergen/kafka/blob/master/consumergroup/consumer_group.go#L385-L404
.

This approach seems to avoid the deadlocks you are seeing.


Reply to this email directly or view it on GitHub
#325 (comment).

@dim
Copy link
Contributor Author

dim commented Mar 8, 2015

Pushed the WaitGroup workaround https://github.com/bsm/sarama-cluster/blob/ad1454f/consumer.go#L386 and it seems to work, except for this case https://travis-ci.org/bsm/sarama-cluster/jobs/53535191. I couldn't reproduce it locally though

@eapache
Copy link
Contributor

eapache commented Mar 8, 2015

That travis failure is the result of calling Close twice on the same PartitionConsumer. There's an argument that we should return a proper error instead of blowing up, but regardless it probably indicates a bug in the cluster code somewhere.

@wvanbergen
Copy link
Contributor

I updated my consumergroup code to the latest sarama and I am not seeing issues like this. Check out this branch if you're interested: wvanbergen/kafka#37

@dim
Copy link
Contributor Author

dim commented Mar 8, 2015

@eapache not sure, PartitionConsumers are closed synchronously and exclusively in reset() (https://github.com/bsm/sarama-cluster/blob/ad1454f/consumer.go#L392) which itself is called either by shutdown() when the consumer is closed or in rebalance() when the consumer is renewed. Both methods are exclusively used in the same signal loop. I simply cannot see how they could possibly run in parallel (https://github.com/bsm/sarama-cluster/blob/ad1454f/consumer.go#L238).

@wvanbergen all the problems are related to races under non-trivial conditions. I've never seen any issues in my integration tests either and that's why I specifically added a 'fuzzing' test (https://github.com/bsm/sarama-cluster/blob/master/fuzzing_test.go) which has - so far - been a pretty good at uncovering unexpected behaviour. As you can also see, the test failed only intermittently https://travis-ci.org/bsm/sarama-cluster/builds/53535189 and only when run with -race.

@eapache
Copy link
Contributor

eapache commented Mar 9, 2015

@dim the panic is "panic: close of closed channel"; the channel being closed is child.dying, which is created in ConsumePartition and is only closed in Close, so the only way for this to occur is if you are calling Close (or AsyncClose) twice on the same PartitionConsumer.

@dim
Copy link
Contributor Author

dim commented Mar 9, 2015

@eapache yep, thanks, that's what I thought too and - after more digging - I think I can see an edge case in sarama-cluster where the logic could go wrong. will fix on my end.

eapache added a commit that referenced this issue Mar 10, 2015
Skip it until the bug is fixed.
eapache added a commit that referenced this issue Mar 10, 2015
Skip it until the bug is fixed.

Also add a missing error check in the test above.
eapache added a commit that referenced this issue Mar 10, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants