Skip to content

Commit

Permalink
fix: fix some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleiphir2769 committed Mar 14, 2023
1 parent 4a242b7 commit 57a1340
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 105 deletions.
18 changes: 11 additions & 7 deletions perf/perf-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (

// ConsumeArgs define the parameters required by consume
type ConsumeArgs struct {
Topic string
SubscriptionName string
ReceiverQueueSize int
EnableBatchIndexAck bool
Topic string
SubscriptionName string
ReceiverQueueSize int
EnableBatchIndexAck bool
EnableAutoScaledReceiverQueueSize bool
}

func newConsumerCommand() *cobra.Command {
Expand All @@ -57,6 +58,8 @@ func newConsumerCommand() *cobra.Command {
flags.StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", "sub", "Subscription name")
flags.IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", "r", 1000, "Receiver queue size")
flags.BoolVar(&consumeArgs.EnableBatchIndexAck, "enable-batch-index-ack", false, "Whether to enable batch index ACK")
flags.BoolVar(&consumeArgs.EnableAutoScaledReceiverQueueSize, "enable-auto-scaled-queue-size", false,
"Whether to enable auto scaled receiver queue size")

return cmd
}
Expand All @@ -76,9 +79,10 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: consumeArgs.Topic,
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
Topic: consumeArgs.Topic,
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
EnableAutoScaledReceiverQueueSize: consumeArgs.EnableAutoScaledReceiverQueueSize,
})

if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ type ConsumerOptions struct {
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize int

// EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled
// by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer
// receive queue can be scaled.
// Default is false.
EnableAutoScaledReceiverQueueSize bool

// NackRedeliveryDelay specifies the delay after which to redeliver the messages that failed to be
// processed. Default is 1 min. (See `Consumer.Nack()`)
NackRedeliveryDelay time.Duration
Expand Down
1 change: 1 addition & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
consumerEventListener: c.options.EventListener,
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: c.options.AckGroupingOptions,
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
Expand Down
208 changes: 117 additions & 91 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package pulsar

import (
"container/list"
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -149,12 +147,13 @@ type partitionConsumer struct {

// the size of the queue channel for buffering messages
maxQueueSize int32
queueCh chan *message
queueCh chan []*message
startMessageID atomicMessageID
lastDequeuedMsg *trackingMessageID

currentQueueSize atomic.Int32
scaleReceiverQueueHint atomic.Bool
currentQueueSize uAtomic.Int32
scaleReceiverQueueHint uAtomic.Bool
incomingMessages uAtomic.Int32

eventsCh chan interface{}
connectedCh chan struct{}
Expand Down Expand Up @@ -190,7 +189,7 @@ func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
}

type availablePermits struct {
permits int32
permits uAtomic.Int32
pc *partitionConsumer
}

Expand All @@ -200,28 +199,34 @@ func (p *availablePermits) inc() {
}

func (p *availablePermits) add(delta int32) {
atomic.AddInt32(&p.permits, delta)
p.permits.Add(delta)
p.flowIfNeed()
}

func (p *availablePermits) reset() {
atomic.StoreInt32(&p.permits, 0)
p.permits.Store(0)
}

func (p *availablePermits) get() int32 {
return atomic.LoadInt32(&p.permits)
return p.permits.Load()
}

func (p *availablePermits) flowIfNeed() {
// TODO implement a better flow controller
// send more permits if needed
var flowThreshold int32
if p.pc.options.autoReceiverQueueSize {
flowThreshold = int32(math.Max(float64(p.pc.currentQueueSize.Load()/2), 1))
} else {
flowThreshold = int32(math.Max(float64(p.pc.maxQueueSize/2), 1))
}

current := p.get()
flowThreshold := int32(math.Max(float64(p.pc.currentQueueSize.Load()/2), 1))
if current >= flowThreshold {
availablePermits := current
requestedPermits := current
// check if permits changed
if !atomic.CompareAndSwapInt32(&p.permits, current, 0) {
if !p.permits.CAS(current, 0) {
return
}

Expand Down Expand Up @@ -314,7 +319,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan *message, options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
Expand Down Expand Up @@ -878,29 +883,37 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return fmt.Errorf("discarding message on decryption error :%v", err)
case crypto.ConsumerCryptoFailureActionConsume:
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
msg := &message{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: newMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
pbMsgID.GetBatchSize(),
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
encryptionContext: createEncryptionContext(msgMeta),
orderingKey: string(msgMeta.OrderingKey),
messages := []*message{
{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: newMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
pbMsgID.GetBatchSize(),
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
encryptionContext: createEncryptionContext(msgMeta),
orderingKey: string(msgMeta.OrderingKey),
},
}

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Inc()
pc.markScaleIfNeed()
}
pc.queueCh <- msg

pc.queueCh <- messages
return nil
}
}
Expand Down Expand Up @@ -1069,10 +1082,13 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
messages = append(messages, msg)
}

// send messages to the dispatcher
for _, msg := range messages {
pc.queueCh <- msg
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Add(int32(len(messages)))
pc.markScaleIfNeed()
}

// send messages to the dispatcher
pc.queueCh <- messages
return nil
}

Expand Down Expand Up @@ -1214,54 +1230,33 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.Debug("exiting dispatch loop")
}()
var messages []*message

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
var queueCh chan []*message
var messageCh chan ConsumerMessage
for {
if len(pc.queueCh) == 0 {
pc.expectMoreIncomingMessages()
}

select {
case <-ctx.Done():
return

case msg, ok := <-pc.queueCh:
if !ok {
return
}
nextMessage := ConsumerMessage{
Consumer: pc.parentConsumer,
Message: msg,
}
if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
pc.metrics.DlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
// pass the message to application channel
messageCh = pc.messageCh
}
var nextMessage ConsumerMessage

select {
case messageCh <- nextMessage:
// messageCh still can be pushed message. Does not need scale
default:
pc.markScaleIfNeed()
messageCh <- nextMessage
}

pc.availablePermits.inc()
// are there more messages to send?
if len(messages) > 0 {
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
Message: messages[0],
}

pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
pc.metrics.DlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
// pass the message to application channel
messageCh = pc.messageCh
}

pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
} else {
queueCh = pc.queueCh
}
}()

for {
select {
case <-pc.closeCh:
return
Expand All @@ -1276,14 +1271,41 @@ func (pc *partitionConsumer) dispatcher() {

// reset available permits
pc.availablePermits.reset()
initialPermits := uint32(pc.maxQueueSize)

var initialPermits uint32
if pc.options.autoReceiverQueueSize {
initialPermits = uint32(pc.currentQueueSize.Load())
} else {
initialPermits = uint32(pc.maxQueueSize)
}

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
// send initial permits
if err := pc.internalFlow(initialPermits); err != nil {
pc.log.WithError(err).Error("unable to send initial permits to broker")
}

case msgs, ok := <-queueCh:
if !ok {
return
}
// we only read messages here after the consumer has processed all messages
// in the previous batch
messages = msgs

// if the messageCh is nil or the messageCh is full this will not be selected
case messageCh <- nextMessage:
// allow this message to be garbage collected
messages[0] = nil
messages = messages[1:]

pc.availablePermits.inc()

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.expectMoreIncomingMessages()
}

case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
Expand All @@ -1297,12 +1319,16 @@ func (pc *partitionConsumer) dispatcher() {
if m == nil {
break
} else if nextMessageInQueue == nil {
nextMessageInQueue = toTrackingMessageID(m.msgID)
nextMessageInQueue = toTrackingMessageID(m[0].msgID)
}
}

messages = nil

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Store(0)
}

clearQueueCb(nextMessageInQueue)
}
}
Expand Down Expand Up @@ -1675,25 +1701,25 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() {
if !pc.options.autoReceiverQueueSize {
return
}
if pc.scaleReceiverQueueHint.CompareAndSwap(true, false) {
if pc.scaleReceiverQueueHint.CAS(true, false) {
oldSize := pc.currentQueueSize.Load()
maxSize := int32(pc.options.receiverQueueSize)
// todo: replace internal.MinUInt64 with math.min when go version dump to 1.18
newSize := internal.MinInt32(maxSize, oldSize*2)
newSize := int32(math.Min(float64(maxSize), float64(oldSize*2)))
if newSize > oldSize {
pc.currentQueueSize.CompareAndSwap(oldSize, newSize)
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}
}
}

func (pc *partitionConsumer) markScaleIfNeed() {
needScale := pc.availablePermits.get()+int32(len(pc.queueCh)) >=
pc.currentQueueSize.Load()
if needScale {
if pc.scaleReceiverQueueHint.CompareAndSwap(false, true) {
pc.log.Debug("update scaleReceiverQueueHint from false -> false")
}
// availablePermits + incomingMessages (messages in queueCh) is the number of prefetched messages
// The result of auto-scale we expected is currentQueueSize is slightly bigger than prefetched messages
prev := pc.scaleReceiverQueueHint.Swap(pc.availablePermits.get()+pc.incomingMessages.Load() >=
pc.currentQueueSize.Load())
if prev != pc.scaleReceiverQueueHint.Load() {
pc.log.Debugf("update scaleReceiverQueueHint from %t -> %t", prev, pc.scaleReceiverQueueHint.Load())
}
}

Expand Down
Loading

0 comments on commit 57a1340

Please sign in to comment.