From 20291f5cf5d5ea706e6a913c5d9eb303fedaef0d Mon Sep 17 00:00:00 2001 From: Jiaqi Shen <18863662628@163.com> Date: Thu, 23 Mar 2023 16:54:58 +0800 Subject: [PATCH] [feat] Support consumer client memory limit (#991) * feat: support consumer memory limit * fix: avoid data race * fix: shrinking triggered by memlimit * fix: fix flaky test * fix: fix flaky test * fix: modify trigger threshold * fix: fix memory limit controller unit test * fix: fix setRunning * fix: fix TestRegisterTrigger and TestMultiConsumerMemoryLimit --- pulsar/client_impl.go | 15 +- pulsar/consumer_partition.go | 28 ++- pulsar/consumer_test.go | 201 ++++++++++++++++++ pulsar/impl_message.go | 4 + pulsar/internal/memory_limit_controller.go | 58 ++++- .../internal/memory_limit_controller_test.go | 63 +++++- 6 files changed, 350 insertions(+), 19 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 7c8fcc9cf4..5322597569 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -30,12 +30,13 @@ import ( ) const ( - defaultConnectionTimeout = 10 * time.Second - defaultOperationTimeout = 30 * time.Second - defaultKeepAliveInterval = 30 * time.Second - defaultMemoryLimitBytes = 64 * 1024 * 1024 - defaultConnMaxIdleTime = 180 * time.Second - minConnMaxIdleTime = 60 * time.Second + defaultConnectionTimeout = 10 * time.Second + defaultOperationTimeout = 30 * time.Second + defaultKeepAliveInterval = 30 * time.Second + defaultMemoryLimitBytes = 64 * 1024 * 1024 + defaultMemoryLimitTriggerThreshold = 0.95 + defaultConnMaxIdleTime = 180 * time.Second + minConnMaxIdleTime = 60 * time.Second ) type client struct { @@ -158,7 +159,7 @@ func newClient(options ClientOptions) (Client, error) { maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime), log: logger, metrics: metrics, - memLimit: internal.NewMemoryLimitController(memLimitBytes), + memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), } serviceNameResolver := internal.NewPulsarServiceNameResolver(url) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 087cec550e..fb77d0dc6a 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -77,7 +77,8 @@ const ( ) const ( - initialReceiverQueueSize = 1 + initialReceiverQueueSize = 1 + receiverQueueExpansionMemThreshold = 0.75 ) const ( @@ -333,6 +334,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon } if pc.options.autoReceiverQueueSize { pc.currentQueueSize.Store(initialReceiverQueueSize) + pc.client.memLimit.RegisterTrigger(pc.shrinkReceiverQueueSize) } else { pc.currentQueueSize.Store(int32(pc.options.receiverQueueSize)) } @@ -1002,6 +1004,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.metrics.MessagesReceived.Add(float64(numMsgs)) pc.metrics.PrefetchedMessages.Add(float64(numMsgs)) + var bytesReceived int for i := 0; i < numMsgs; i++ { smm, payload, err := reader.ReadMessage() if err != nil || payload == nil { @@ -1116,9 +1119,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header }) messages = append(messages, msg) + bytesReceived += msg.size() } if pc.options.autoReceiverQueueSize { + pc.client.memLimit.ForceReserveMemory(int64(bytesReceived)) pc.incomingMessages.Add(int32(len(messages))) pc.markScaleIfNeed() } @@ -1270,6 +1275,7 @@ func (pc *partitionConsumer) dispatcher() { var queueCh chan []*message var messageCh chan ConsumerMessage var nextMessage ConsumerMessage + var nextMessageSize int // are there more messages to send? if len(messages) > 0 { @@ -1277,6 +1283,7 @@ func (pc *partitionConsumer) dispatcher() { Consumer: pc.parentConsumer, Message: messages[0], } + nextMessageSize = messages[0].size() if pc.dlq.shouldSendToDlq(&nextMessage) { // pass the message to the DLQ router @@ -1339,6 +1346,7 @@ func (pc *partitionConsumer) dispatcher() { if pc.options.autoReceiverQueueSize { pc.incomingMessages.Dec() + pc.client.memLimit.ReleaseMemory(int64(nextMessageSize)) pc.expectMoreIncomingMessages() } @@ -1742,7 +1750,8 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() { oldSize := pc.currentQueueSize.Load() maxSize := int32(pc.options.receiverQueueSize) newSize := int32(math.Min(float64(maxSize), float64(oldSize*2))) - if newSize > oldSize { + usagePercent := pc.client.memLimit.CurrentUsagePercent() + if usagePercent < receiverQueueExpansionMemThreshold && newSize > oldSize { pc.currentQueueSize.CAS(oldSize, newSize) pc.availablePermits.add(newSize - oldSize) pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize) @@ -1760,6 +1769,21 @@ func (pc *partitionConsumer) markScaleIfNeed() { } } +func (pc *partitionConsumer) shrinkReceiverQueueSize() { + if !pc.options.autoReceiverQueueSize { + return + } + + oldSize := pc.currentQueueSize.Load() + minSize := int32(math.Min(float64(initialReceiverQueueSize), float64(pc.options.receiverQueueSize))) + newSize := int32(math.Max(float64(minSize), float64(oldSize/2))) + if newSize < oldSize { + pc.currentQueueSize.CAS(oldSize, newSize) + pc.availablePermits.add(newSize - oldSize) + pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize) + } +} + func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) { providerEntry, ok := pc.compressionProviders.Load(msgMeta.GetCompression()) if !ok { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 1477ce6c20..521e576767 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4113,3 +4113,204 @@ func TestConsumerBatchIndexAckDisabled(t *testing.T) { assert.Nil(t, err) assert.Equal(t, []byte("done"), message.Payload()) } + +func TestConsumerMemoryLimit(t *testing.T) { + // Create client 1 without memory limit + cli1, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer cli1.Close() + + // Create client 1 with memory limit + cli2, err := NewClient(ClientOptions{ + URL: lookupURL, + MemoryLimitBytes: 10 * 1024, + }) + + assert.Nil(t, err) + defer cli2.Close() + + topic := newTopicName() + + // Use client 1 to create producer p1 + p1, err := cli1.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer p1.Close() + + // Use mem-limited client 2 to create consumer c1 + c1, err := cli2.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-1", + Type: Exclusive, + EnableAutoScaledReceiverQueueSize: true, + }) + assert.Nil(t, err) + defer c1.Close() + pc1 := c1.(*consumer).consumers[0] + + // Fill up the messageCh of c1 + for i := 0; i < 10; i++ { + p1.SendAsync( + context.Background(), + &ProducerMessage{Payload: createTestMessagePayload(1)}, + func(id MessageID, producerMessage *ProducerMessage, err error) { + }, + ) + } + + retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool { + return assert.Equal(t, 10, len(pc1.messageCh)) + }) + + // Get current receiver queue size of c1 + prevQueueSize := pc1.currentQueueSize.Load() + + // Make the client 1 exceed the memory limit + _, err = p1.Send(context.Background(), &ProducerMessage{ + Payload: createTestMessagePayload(10*1024 + 1), + }) + assert.NoError(t, err) + + // c1 should shrink it's receiver queue size + retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool { + return assert.Equal(t, prevQueueSize/2, pc1.currentQueueSize.Load()) + }) + + // Use mem-limited client 2 to create consumer c2 + c2, err := cli2.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-2", + Type: Exclusive, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + EnableAutoScaledReceiverQueueSize: true, + }) + assert.Nil(t, err) + defer c2.Close() + pc2 := c2.(*consumer).consumers[0] + + // Try to induce c2 receiver queue size expansion + for i := 0; i < 10; i++ { + p1.SendAsync( + context.Background(), + &ProducerMessage{Payload: createTestMessagePayload(1)}, + func(id MessageID, producerMessage *ProducerMessage, err error) { + }, + ) + } + + retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool { + return assert.Equal(t, 10, len(pc1.messageCh)) + }) + + // c2 receiver queue size should not expansion because client 1 has exceeded the memory limit + assert.Equal(t, 1, int(pc2.currentQueueSize.Load())) + + // Use mem-limited client 2 to create producer p2 + p2, err := cli2.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + DisableBlockIfQueueFull: true, + }) + assert.Nil(t, err) + defer p2.Close() + + _, err = p2.Send(context.Background(), &ProducerMessage{ + Payload: createTestMessagePayload(1), + }) + // Producer can't send message + assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull)) +} + +func TestMultiConsumerMemoryLimit(t *testing.T) { + // Create client 1 without memory limit + cli1, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer cli1.Close() + + // Create client 1 with memory limit + cli2, err := NewClient(ClientOptions{ + URL: lookupURL, + MemoryLimitBytes: 10 * 1024, + }) + + assert.Nil(t, err) + defer cli2.Close() + + topic := newTopicName() + + // Use client 1 to create producer p1 + p1, err := cli1.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer p1.Close() + + // Use mem-limited client 2 to create consumer c1 + c1, err := cli2.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-1", + Type: Exclusive, + EnableAutoScaledReceiverQueueSize: true, + }) + assert.Nil(t, err) + defer c1.Close() + pc1 := c1.(*consumer).consumers[0] + + // Use mem-limited client 2 to create consumer c1 + c2, err := cli2.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-2", + Type: Exclusive, + EnableAutoScaledReceiverQueueSize: true, + }) + assert.Nil(t, err) + defer c2.Close() + pc2 := c2.(*consumer).consumers[0] + + // Fill up the messageCh of c1 nad c2 + for i := 0; i < 10; i++ { + p1.SendAsync( + context.Background(), + &ProducerMessage{Payload: createTestMessagePayload(1)}, + func(id MessageID, producerMessage *ProducerMessage, err error) { + }, + ) + } + + retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool { + return assert.Equal(t, 10, len(pc1.messageCh)) + }) + + retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool { + return assert.Equal(t, 10, len(pc2.messageCh)) + }) + + // Get current receiver queue size of c1 and c2 + pc1PrevQueueSize := pc1.currentQueueSize.Load() + pc2PrevQueueSize := pc2.currentQueueSize.Load() + + // Make the client 1 exceed the memory limit + _, err = p1.Send(context.Background(), &ProducerMessage{ + Payload: createTestMessagePayload(10*1024 + 1), + }) + assert.NoError(t, err) + + // c1 should shrink it's receiver queue size + retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool { + return assert.Equal(t, pc1PrevQueueSize/2, pc1.currentQueueSize.Load()) + }) + + // c2 should shrink it's receiver queue size too + retryAssert(t, 5, 200, func() {}, func(t assert.TestingT) bool { + return assert.Equal(t, pc2PrevQueueSize/2, pc2.currentQueueSize.Load()) + }) +} diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 9c56070295..89a709f14a 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -382,6 +382,10 @@ func (msg *message) BrokerPublishTime() *time.Time { return msg.brokerPublishTime } +func (msg *message) size() int { + return len(msg.payLoad) +} + func newAckTracker(size uint) *ackTracker { batchIDs := bitset.New(size) for i := uint(0); i < size; i++ { diff --git a/pulsar/internal/memory_limit_controller.go b/pulsar/internal/memory_limit_controller.go index 5bf8d59a58..5ead9f5b9a 100644 --- a/pulsar/internal/memory_limit_controller.go +++ b/pulsar/internal/memory_limit_controller.go @@ -31,18 +31,42 @@ type MemoryLimitController interface { CurrentUsage() int64 CurrentUsagePercent() float64 IsMemoryLimited() bool + RegisterTrigger(trigger func()) } type memoryLimitController struct { limit int64 chCond *chCond currentUsage int64 + + triggers []*thresholdTrigger + // valid range is (0, 1.0) + triggerThreshold float64 +} + +type thresholdTrigger struct { + triggerFunc func() + triggerRunning int32 +} + +func (t *thresholdTrigger) canTryRunning() bool { + return atomic.CompareAndSwapInt32(&t.triggerRunning, 0, 1) +} + +func (t *thresholdTrigger) setRunning(isRunning bool) { + if isRunning { + atomic.StoreInt32(&t.triggerRunning, 1) + } else { + atomic.StoreInt32(&t.triggerRunning, 0) + } } -func NewMemoryLimitController(limit int64) MemoryLimitController { +// NewMemoryLimitController threshold valid range is (0, 1.0) +func NewMemoryLimitController(limit int64, threshold float64) MemoryLimitController { mlc := &memoryLimitController{ - limit: limit, - chCond: newCond(&sync.Mutex{}), + limit: limit, + chCond: newCond(&sync.Mutex{}), + triggerThreshold: threshold, } return mlc } @@ -72,13 +96,16 @@ func (m *memoryLimitController) TryReserveMemory(size int64) bool { } if atomic.CompareAndSwapInt64(&m.currentUsage, current, newUsage) { + m.checkTrigger(current, newUsage) return true } } } func (m *memoryLimitController) ForceReserveMemory(size int64) { - atomic.AddInt64(&m.currentUsage, size) + nextUsage := atomic.AddInt64(&m.currentUsage, size) + prevUsage := nextUsage - size + m.checkTrigger(prevUsage, nextUsage) } func (m *memoryLimitController) ReleaseMemory(size int64) { @@ -99,3 +126,26 @@ func (m *memoryLimitController) CurrentUsagePercent() float64 { func (m *memoryLimitController) IsMemoryLimited() bool { return m.limit > 0 } + +func (m *memoryLimitController) RegisterTrigger(trigger func()) { + m.chCond.L.Lock() + defer m.chCond.L.Unlock() + m.triggers = append(m.triggers, &thresholdTrigger{ + triggerFunc: trigger, + }) +} + +func (m *memoryLimitController) checkTrigger(prevUsage int64, nextUsage int64) { + nextUsagePercent := float64(nextUsage) / float64(m.limit) + prevUsagePercent := float64(prevUsage) / float64(m.limit) + if nextUsagePercent >= m.triggerThreshold && prevUsagePercent < m.triggerThreshold { + for _, trigger := range m.triggers { + if trigger.canTryRunning() { + go func(trigger *thresholdTrigger) { + trigger.triggerFunc() + trigger.setRunning(false) + }(trigger) + } + } + } +} diff --git a/pulsar/internal/memory_limit_controller_test.go b/pulsar/internal/memory_limit_controller_test.go index a62c6e6dda..81623eca10 100644 --- a/pulsar/internal/memory_limit_controller_test.go +++ b/pulsar/internal/memory_limit_controller_test.go @@ -28,7 +28,7 @@ import ( func TestLimit(t *testing.T) { - mlc := NewMemoryLimitController(100) + mlc := NewMemoryLimitController(100, 1.0) for i := 0; i < 101; i++ { assert.True(t, mlc.TryReserveMemory(1)) @@ -57,7 +57,7 @@ func TestLimit(t *testing.T) { } func TestDisableLimit(t *testing.T) { - mlc := NewMemoryLimitController(-1) + mlc := NewMemoryLimitController(-1, 1.0) assert.True(t, mlc.TryReserveMemory(1000000)) assert.True(t, mlc.ReserveMemory(context.Background(), 1000000)) mlc.ReleaseMemory(1000000) @@ -65,7 +65,7 @@ func TestDisableLimit(t *testing.T) { } func TestMultiGoroutineTryReserveMem(t *testing.T) { - mlc := NewMemoryLimitController(10000) + mlc := NewMemoryLimitController(10000, 1.0) // Multi goroutine try reserve memory. wg := sync.WaitGroup{} @@ -87,7 +87,7 @@ func TestMultiGoroutineTryReserveMem(t *testing.T) { } func TestReserveWithContext(t *testing.T) { - mlc := NewMemoryLimitController(100) + mlc := NewMemoryLimitController(100, 1.0) assert.True(t, mlc.TryReserveMemory(101)) gorNum := 10 @@ -120,7 +120,7 @@ func TestReserveWithContext(t *testing.T) { } func TestBlocking(t *testing.T) { - mlc := NewMemoryLimitController(100) + mlc := NewMemoryLimitController(100, 1.0) assert.True(t, mlc.TryReserveMemory(101)) assert.Equal(t, int64(101), mlc.CurrentUsage()) assert.InDelta(t, 1.01, mlc.CurrentUsagePercent(), 0.000001) @@ -146,7 +146,7 @@ func TestBlocking(t *testing.T) { } func TestStepRelease(t *testing.T) { - mlc := NewMemoryLimitController(100) + mlc := NewMemoryLimitController(100, 1.0) assert.True(t, mlc.TryReserveMemory(101)) assert.Equal(t, int64(101), mlc.CurrentUsage()) assert.InDelta(t, 1.01, mlc.CurrentUsagePercent(), 0.000001) @@ -169,6 +169,57 @@ func TestStepRelease(t *testing.T) { assert.Equal(t, int64(101), mlc.CurrentUsage()) } +func TestRegisterTrigger(t *testing.T) { + mlc := NewMemoryLimitController(100, 0.95) + triggeredResult1 := false + triggeredResult2 := false + finishCh := make(chan struct{}, 2) + + mlc.RegisterTrigger(func() { + triggeredResult1 = true + finishCh <- struct{}{} + }) + + mlc.RegisterTrigger(func() { + triggeredResult2 = true + finishCh <- struct{}{} + }) + + mlc.TryReserveMemory(50) + timer := time.NewTimer(time.Millisecond * 500) + select { + case <-finishCh: + assert.Fail(t, "should not be triggered") + case <-timer.C: + } + + mlc.TryReserveMemory(45) + timer.Reset(time.Millisecond * 500) + for i := 0; i < 2; i++ { + select { + case <-finishCh: + case <-timer.C: + assert.Fail(t, "trigger timeout") + } + } + + assert.True(t, triggeredResult1) + assert.True(t, triggeredResult2) + + triggeredResult2 = false + mlc.ReleaseMemory(1) + mlc.ForceReserveMemory(1) + timer.Reset(time.Millisecond * 500) + for i := 0; i < 2; i++ { + select { + case <-finishCh: + case <-timer.C: + assert.Fail(t, "trigger timeout") + } + } + assert.True(t, triggeredResult2) +} + func reserveMemory(mlc MemoryLimitController, ch chan int) { mlc.ReserveMemory(context.Background(), 1) ch <- 1