Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the consumer close function with the latest rd_kafka_consumer_close_queue API #800

Merged
merged 1 commit into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ This is a feature release:

### Fixes

* Fix Rebalance events behavior for static membership (@jliunyu, #757).
* Fix Rebalance events behavior for static membership (@jliunyu, #757,
#798).
* Fix consumer close taking 10 seconds when there's no rebalance
needed (@jliunyu, #757).

Expand Down
23 changes: 3 additions & 20 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,29 +424,12 @@ func (c *Consumer) Close() (err error) {
close(c.events)
}

doneChan := make(chan bool)

go func() {
C.rd_kafka_consumer_close(c.handle.rk)
// wake up Poll()
C.rd_kafka_queue_yield(c.handle.rkq)
doneChan <- true
}()

// wait for consumer_close() to finish while serving c.Poll() for rebalance callbacks/events
run := true
for run {
select {
case <-doneChan:
run = false
C.rd_kafka_consumer_close_queue(c.handle.rk, c.handle.rkq)

default:
c.Poll(100)
}
for C.rd_kafka_consumer_closed(c.handle.rk) != 1 {
c.Poll(100)
}

close(doneChan)

// Destroy our queue
C.rd_kafka_queue_destroy(c.handle.rkq)
c.handle.rkq = nil
Expand Down
34 changes: 20 additions & 14 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,27 +520,33 @@ func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup)
// then AssignedPartitions happens to both consumers.
// 3) Third consumer joins, RevokedPartitions happens from the previous two
// consumers, then AssignedPartitions happens to all the three consumers.
// 4) Close the second consumer, RevokedPartitions should not happen.
// 4) Close the second consumer, revoke its assignments will happen, but it
// should not notice other consumers.
// 5) Rejoin the second consumer, rebalance should not happen to all the other
// consumers since it's not the leader, AssignedPartitions only happened
// to this consumer to assign the partitions.
// 6) Close the third consumer, revoke its assignments will happen, but it
// should not notice other consumers.
// 7) Close the rejoined consumer, revoke its assignments will happen,
// but it should not notice other consumers.
// 8) Close the first consumer, revoke its assignments will happen.
//
// The total number of AssignedPartitions for the first consumer is 3,
// and the total number of RevokedPartitions for the first consumer is 2.
// and the total number of RevokedPartitions for the first consumer is 3.
// The total number of AssignedPartitions for the second consumer is 2,
// and the total number of RevokedPartitions for the second consumer is 1.
// and the total number of RevokedPartitions for the second consumer is 2.
// The total number of AssignedPartitions for the third consumer is 1,
// and the total number of RevokedPartitions for the second consumer is 0.
// and the total number of RevokedPartitions for the third consumer is 1.
// The total number of AssignedPartitions for the rejoined consumer
// (originally second consumer) is 1,
// and the total number of RevokedPartitions for the rejoined consumer
// (originally second consumer) is 0.
// (originally second consumer) is 1.
func TestConsumerCloseForStaticMember(t *testing.T) {
if !testconfRead() {
t.Skipf("Missing testconf.json")
}
broker := testconf.Brokers
topic := testconf.Topic
topic := createTestTopic(t, "staticMembership", 3, 1)

var assignedEvents1 int32
var revokedEvents1 int32
Expand Down Expand Up @@ -655,35 +661,35 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
atomic.LoadInt32(&assignedEvents1))
}

if atomic.LoadInt32(&revokedEvents1) != 2 {
t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents1) != 3 {
t.Fatalf("3 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents1))
}

if atomic.LoadInt32(&assignedEvents2) != 2 {
t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents2))
}
if atomic.LoadInt32(&revokedEvents2) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents2) != 2 {
t.Fatalf("2 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents2))
}

if atomic.LoadInt32(&assignedEvents3) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents3))
}
if atomic.LoadInt32(&revokedEvents3) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents3) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents3))
}

if atomic.LoadInt32(&assignedEvents4) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&assignedEvents4))
}
if atomic.LoadInt32(&revokedEvents4) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
if atomic.LoadInt32(&revokedEvents4) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&revokedEvents4))
}
}
Expand Down