Skip to content

Commit

Permalink
Merge pull request #2331 from Jacob-bzx/fix-panic-in-errors
Browse files Browse the repository at this point in the history
fix: race condition(may panic) when closing consumer group
  • Loading branch information
dnwe authored Sep 27, 2022
2 parents 5e2c2ef + e26c683 commit 338bf4f
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ type consumerGroup struct {
memberID string
errors chan error

lock sync.Mutex
closed chan none
closeOnce sync.Once
lock sync.Mutex
errorsLock sync.RWMutex
closed chan none
closeOnce sync.Once

userData []byte

Expand Down Expand Up @@ -159,10 +160,13 @@ func (c *consumerGroup) Close() (err error) {
err = e
}

// drain errors
go func() {
c.errorsLock.Lock()
defer c.errorsLock.Unlock()
close(c.errors)
}()

// drain errors
for e := range c.errors {
err = e
}
Expand Down Expand Up @@ -592,6 +596,8 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
return
}

c.errorsLock.RLock()
defer c.errorsLock.RUnlock()
select {
case <-c.closed:
// consumer is closed
Expand Down

0 comments on commit 338bf4f

Please sign in to comment.