Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add methods to pause/resume consumer's consumption #2005

Merged
merged 3 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,26 @@ type Consumer interface {
// Close shuts down the consumer. It must be called after all child
// PartitionConsumers have already been closed.
Close() error

// Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
// records from these partitions until they have been resumed using Resume()/ResumeAll().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
Pause(topicPartitions map[string][]int32)

// Resume resumes specified partitions which have been paused with Pause()/PauseAll().
// New calls to the broker will return records from these partitions if there are any to be fetched.
Resume(topicPartitions map[string][]int32)

// Pause suspends fetching from all partitions. Future calls to the broker will not return any
// records from these partitions until they have been resumed using Resume()/ResumeAll().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
PauseAll()

// Resume resumes all partitions which have been paused with Pause()/PauseAll().
// New calls to the broker will return records from these partitions if there are any to be fetched.
ResumeAll()
}

type consumer struct {
Expand Down Expand Up @@ -245,6 +265,62 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
delete(c.brokerConsumers, brokerWorker.broker)
}

// Pause implements Consumer.
func (c *consumer) Pause(topicPartitions map[string][]int32) {
c.lock.Lock()
defer c.lock.Unlock()

for topic, partitions := range topicPartitions {
for _, partition := range partitions {
if topicConsumers, ok := c.children[topic]; ok {
if partitionConsumer, ok := topicConsumers[partition]; ok {
partitionConsumer.Pause()
}
}
}
}
}

// Resume implements Consumer.
func (c *consumer) Resume(topicPartitions map[string][]int32) {
c.lock.Lock()
defer c.lock.Unlock()

for topic, partitions := range topicPartitions {
for _, partition := range partitions {
if topicConsumers, ok := c.children[topic]; ok {
if partitionConsumer, ok := topicConsumers[partition]; ok {
partitionConsumer.Resume()
}
}
}
}
}

// PauseAll implements Consumer.
func (c *consumer) PauseAll() {
c.lock.Lock()
defer c.lock.Unlock()

for _, partitions := range c.children {
for _, partitionConsumer := range partitions {
partitionConsumer.Pause()
}
}
}

// ResumeAll implements Consumer.
func (c *consumer) ResumeAll() {
c.lock.Lock()
defer c.lock.Unlock()

for _, partitions := range c.children {
for _, partitionConsumer := range partitions {
partitionConsumer.Resume()
}
}
}

// PartitionConsumer

// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
Expand Down Expand Up @@ -292,6 +368,20 @@ type PartitionConsumer interface {
// i.e. the offset that will be used for the next message that will be produced.
// You can use this to determine how far behind the processing is.
HighWaterMarkOffset() int64

// Pause suspends fetching from this partition. Future calls to the broker will not return
// any records from these partition until it have been resumed using Resume().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
Pause()

// Resume resumes this partition which have been paused with Pause().
// New calls to the broker will return records from these partitions if there are any to be fetched.
// If the partition was not previously paused, this method is a no-op.
Resume()

// IsPaused indicates if this partition consumer is paused or not
IsPaused() bool
}

type partitionConsumer struct {
Expand All @@ -314,6 +404,8 @@ type partitionConsumer struct {
fetchSize int32
offset int64
retries int32

paused int32
}

var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
Expand Down Expand Up @@ -737,6 +829,21 @@ func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
}
}

// Pause implements PartitionConsumer.
func (child *partitionConsumer) Pause() {
atomic.StoreInt32(&child.paused, 1)
}

// Resume implements PartitionConsumer.
func (child *partitionConsumer) Resume() {
atomic.StoreInt32(&child.paused, 0)
}

// IsPaused implements PartitionConsumer.
func (child *partitionConsumer) IsPaused() bool {
return atomic.LoadInt32(&child.paused) == 1
}

type brokerConsumer struct {
consumer *consumer
broker *Broker
Expand Down Expand Up @@ -962,7 +1069,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
if !child.IsPaused() {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
}
}

return bc.broker.Fetch(request)
Expand Down
40 changes: 40 additions & 0 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ type ConsumerGroup interface {
// Close stops the ConsumerGroup and detaches any running sessions. It is required to call
// this function before the object passes out of scope, as it will otherwise leak memory.
Close() error

// Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
// records from these partitions until they have been resumed using Resume()/ResumeAll().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
Pause(partitions map[string][]int32)

// Resume resumes specified partitions which have been paused with Pause()/PauseAll().
// New calls to the broker will return records from these partitions if there are any to be fetched.
Resume(partitions map[string][]int32)

// Pause suspends fetching from all partitions. Future calls to the broker will not return any
// records from these partitions until they have been resumed using Resume()/ResumeAll().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
PauseAll()

// Resume resumes all partitions which have been paused with Pause()/PauseAll().
// New calls to the broker will return records from these partitions if there are any to be fetched.
ResumeAll()
}

type consumerGroup struct {
Expand Down Expand Up @@ -188,6 +208,26 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
return sess.release(true)
}

// Pause implements ConsumerGroup.
func (c *consumerGroup) Pause(partitions map[string][]int32) {
c.consumer.Pause(partitions)
}

// Resume implements ConsumerGroup.
func (c *consumerGroup) Resume(partitions map[string][]int32) {
c.consumer.Resume(partitions)
}

// PauseAll implements ConsumerGroup.
func (c *consumerGroup) PauseAll() {
c.consumer.PauseAll()
}

// ResumeAll implements ConsumerGroup.
func (c *consumerGroup) ResumeAll() {
c.consumer.ResumeAll()
}

func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
select {
case <-c.closed:
Expand Down
81 changes: 81 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,87 @@ func TestConsumerOffsetManual(t *testing.T) {
broker0.Close()
}

func TestPauseResumeConsumption(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)

const newestOffsetBroker = 1233
const maxOffsetBroker = newestOffsetBroker + 10
offsetBroker := newestOffsetBroker
offsetClient := offsetBroker

mockFetchResponse := NewMockFetchResponse(t, 1)
mockFetchResponse.SetMessage("my_topic", 0, int64(newestOffsetBroker), testMsg)
offsetBroker++

brokerResponses := map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, int64(newestOffsetBroker)),
"FetchRequest": mockFetchResponse,
}

broker0.SetHandlerByMap(brokerResponses)

// When
master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
}

// pause the consumption
consumer.Pause()

// set more msgs on broker
for ; offsetBroker < maxOffsetBroker; offsetBroker++ {
mockFetchResponse = mockFetchResponse.SetMessage("my_topic", 0, int64(offsetBroker), testMsg)
}
brokerResponses["FetchRequest"] = mockFetchResponse
broker0.SetHandlerByMap(brokerResponses)

keepConsuming := true
for keepConsuming {
select {
case message := <-consumer.Messages():
// only the first msg is expected to be consumed
offsetClient++
assertMessageOffset(t, message, int64(newestOffsetBroker))
case err := <-consumer.Errors():
t.Fatal(err)
case <-time.After(time.Second):
// is expected to timedout once the consumption is pauses
keepConsuming = false
}
}

// lets resume the consumption in order to consume the new msgs
consumer.Resume()

for offsetClient < maxOffsetBroker {
select {
case message := <-consumer.Messages():
assertMessageOffset(t, message, int64(offsetClient))
offsetClient += 1
case err := <-consumer.Errors():
t.Fatal("Error: ", err)
case <-time.After(time.Second * 10):
t.Fatal("consumer timed out . Offset: ", offsetClient)
}
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

// If `OffsetNewest` is passed as the initial offset then the first consumed
// message indeed corresponds to the offset that broker claims to be the
// newest in its metadata response.
Expand Down
4 changes: 3 additions & 1 deletion examples/consumergroup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ This example shows you how to use the Sarama consumer group consumer. The exampl

```bash
$ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example"
```
```

You can also toggle (pause/resume) the consumption by sending SIGUSR1
35 changes: 30 additions & 5 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package main

// SIGUSR1 toggle the pause/resume consumption
import (
"context"
"flag"
Expand Down Expand Up @@ -48,6 +49,7 @@ func init() {
}

func main() {
keepRunning := true
log.Println("Starting a new Sarama consumer")

if verbose {
Expand Down Expand Up @@ -94,6 +96,7 @@ func main() {
log.Panicf("Error creating consumer group client: %v", err)
}

consumptionIsPaused := false
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
Expand All @@ -116,13 +119,23 @@ func main() {
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")

sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")

for keepRunning {
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
case <-sigusr1:
toggleConsumptionFlow(client, &consumptionIsPaused)
}
}
cancel()
wg.Wait()
Expand All @@ -131,6 +144,18 @@ func main() {
}
}

func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
if *isPaused {
client.ResumeAll()
log.Println("Resuming consumption")
} else {
client.PauseAll()
log.Println("Pausing consumption")
}

*isPaused = !*isPaused
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
Expand Down
Loading