Skip to content

Commit

Permalink
[feat] Support consumer client memory limit (#991)
Browse files Browse the repository at this point in the history
* feat: support consumer memory limit

* fix: avoid data race

* fix: shrinking triggered by memlimit

* fix: fix flaky test

* fix: fix flaky test

* fix: modify trigger threshold

* fix: fix memory limit controller unit test

* fix: fix setRunning

* fix: fix TestRegisterTrigger and TestMultiConsumerMemoryLimit
  • Loading branch information
Gleiphir2769 authored Mar 23, 2023
1 parent 6c3ee77 commit 20291f5
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 19 deletions.
15 changes: 8 additions & 7 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
)

const (
defaultConnectionTimeout = 10 * time.Second
defaultOperationTimeout = 30 * time.Second
defaultKeepAliveInterval = 30 * time.Second
defaultMemoryLimitBytes = 64 * 1024 * 1024
defaultConnMaxIdleTime = 180 * time.Second
minConnMaxIdleTime = 60 * time.Second
defaultConnectionTimeout = 10 * time.Second
defaultOperationTimeout = 30 * time.Second
defaultKeepAliveInterval = 30 * time.Second
defaultMemoryLimitBytes = 64 * 1024 * 1024
defaultMemoryLimitTriggerThreshold = 0.95
defaultConnMaxIdleTime = 180 * time.Second
minConnMaxIdleTime = 60 * time.Second
)

type client struct {
Expand Down Expand Up @@ -158,7 +159,7 @@ func newClient(options ClientOptions) (Client, error) {
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes),
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
}
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

Expand Down
28 changes: 26 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ const (
)

const (
initialReceiverQueueSize = 1
initialReceiverQueueSize = 1
receiverQueueExpansionMemThreshold = 0.75
)

const (
Expand Down Expand Up @@ -333,6 +334,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
pc.client.memLimit.RegisterTrigger(pc.shrinkReceiverQueueSize)
} else {
pc.currentQueueSize.Store(int32(pc.options.receiverQueueSize))
}
Expand Down Expand Up @@ -1002,6 +1004,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.metrics.MessagesReceived.Add(float64(numMsgs))
pc.metrics.PrefetchedMessages.Add(float64(numMsgs))

var bytesReceived int
for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
if err != nil || payload == nil {
Expand Down Expand Up @@ -1116,9 +1119,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
})

messages = append(messages, msg)
bytesReceived += msg.size()
}

if pc.options.autoReceiverQueueSize {
pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
pc.incomingMessages.Add(int32(len(messages)))
pc.markScaleIfNeed()
}
Expand Down Expand Up @@ -1270,13 +1275,15 @@ func (pc *partitionConsumer) dispatcher() {
var queueCh chan []*message
var messageCh chan ConsumerMessage
var nextMessage ConsumerMessage
var nextMessageSize int

// are there more messages to send?
if len(messages) > 0 {
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
Message: messages[0],
}
nextMessageSize = messages[0].size()

if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
Expand Down Expand Up @@ -1339,6 +1346,7 @@ func (pc *partitionConsumer) dispatcher() {

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.client.memLimit.ReleaseMemory(int64(nextMessageSize))
pc.expectMoreIncomingMessages()
}

Expand Down Expand Up @@ -1742,7 +1750,8 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() {
oldSize := pc.currentQueueSize.Load()
maxSize := int32(pc.options.receiverQueueSize)
newSize := int32(math.Min(float64(maxSize), float64(oldSize*2)))
if newSize > oldSize {
usagePercent := pc.client.memLimit.CurrentUsagePercent()
if usagePercent < receiverQueueExpansionMemThreshold && newSize > oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
Expand All @@ -1760,6 +1769,21 @@ func (pc *partitionConsumer) markScaleIfNeed() {
}
}

func (pc *partitionConsumer) shrinkReceiverQueueSize() {
if !pc.options.autoReceiverQueueSize {
return
}

oldSize := pc.currentQueueSize.Load()
minSize := int32(math.Min(float64(initialReceiverQueueSize), float64(pc.options.receiverQueueSize)))
newSize := int32(math.Max(float64(minSize), float64(oldSize/2)))
if newSize < oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}
}

func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
providerEntry, ok := pc.compressionProviders.Load(msgMeta.GetCompression())
if !ok {
Expand Down
201 changes: 201 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4113,3 +4113,204 @@ func TestConsumerBatchIndexAckDisabled(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, []byte("done"), message.Payload())
}

func TestConsumerMemoryLimit(t *testing.T) {
// Create client 1 without memory limit
cli1, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer cli1.Close()

// Create client 1 with memory limit
cli2, err := NewClient(ClientOptions{
URL: lookupURL,
MemoryLimitBytes: 10 * 1024,
})

assert.Nil(t, err)
defer cli2.Close()

topic := newTopicName()

// Use client 1 to create producer p1
p1, err := cli1.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer p1.Close()

// Use mem-limited client 2 to create consumer c1
c1, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-1",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c1.Close()
pc1 := c1.(*consumer).consumers[0]

// Fill up the messageCh of c1
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}

retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})

// Get current receiver queue size of c1
prevQueueSize := pc1.currentQueueSize.Load()

// Make the client 1 exceed the memory limit
_, err = p1.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(10*1024 + 1),
})
assert.NoError(t, err)

// c1 should shrink it's receiver queue size
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, prevQueueSize/2, pc1.currentQueueSize.Load())
})

// Use mem-limited client 2 to create consumer c2
c2, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-2",
Type: Exclusive,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c2.Close()
pc2 := c2.(*consumer).consumers[0]

// Try to induce c2 receiver queue size expansion
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}

retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})

// c2 receiver queue size should not expansion because client 1 has exceeded the memory limit
assert.Equal(t, 1, int(pc2.currentQueueSize.Load()))

// Use mem-limited client 2 to create producer p2
p2, err := cli2.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
DisableBlockIfQueueFull: true,
})
assert.Nil(t, err)
defer p2.Close()

_, err = p2.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(1),
})
// Producer can't send message
assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))
}

func TestMultiConsumerMemoryLimit(t *testing.T) {
// Create client 1 without memory limit
cli1, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer cli1.Close()

// Create client 1 with memory limit
cli2, err := NewClient(ClientOptions{
URL: lookupURL,
MemoryLimitBytes: 10 * 1024,
})

assert.Nil(t, err)
defer cli2.Close()

topic := newTopicName()

// Use client 1 to create producer p1
p1, err := cli1.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer p1.Close()

// Use mem-limited client 2 to create consumer c1
c1, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-1",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c1.Close()
pc1 := c1.(*consumer).consumers[0]

// Use mem-limited client 2 to create consumer c1
c2, err := cli2.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub-2",
Type: Exclusive,
EnableAutoScaledReceiverQueueSize: true,
})
assert.Nil(t, err)
defer c2.Close()
pc2 := c2.(*consumer).consumers[0]

// Fill up the messageCh of c1 nad c2
for i := 0; i < 10; i++ {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
func(id MessageID, producerMessage *ProducerMessage, err error) {
},
)
}

retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc1.messageCh))
})

retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 10, len(pc2.messageCh))
})

// Get current receiver queue size of c1 and c2
pc1PrevQueueSize := pc1.currentQueueSize.Load()
pc2PrevQueueSize := pc2.currentQueueSize.Load()

// Make the client 1 exceed the memory limit
_, err = p1.Send(context.Background(), &ProducerMessage{
Payload: createTestMessagePayload(10*1024 + 1),
})
assert.NoError(t, err)

// c1 should shrink it's receiver queue size
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, pc1PrevQueueSize/2, pc1.currentQueueSize.Load())
})

// c2 should shrink it's receiver queue size too
retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, pc2PrevQueueSize/2, pc2.currentQueueSize.Load())
})
}
4 changes: 4 additions & 0 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ func (msg *message) BrokerPublishTime() *time.Time {
return msg.brokerPublishTime
}

func (msg *message) size() int {
return len(msg.payLoad)
}

func newAckTracker(size uint) *ackTracker {
batchIDs := bitset.New(size)
for i := uint(0); i < size; i++ {
Expand Down
Loading

0 comments on commit 20291f5

Please sign in to comment.