Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] high CPU usage caused by list topics under namespace #23049

Merged
merged 10 commits into from
Jul 22, 2024

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jul 18, 2024

Motivation

NamespaceService. getListOfTopics cost too many CPU resources

broker_cpu.html.txt

pulsar-io-4-20" #135 prio=5 os_prio=0 cpu=79326083.91ms elapsed=1465985.25s tid=0x00007f5bc40740b0 nid=0xe9 runnable  [0x00007f5ba51fd000]
   java.lang.Thread.State: RUNNABLE
	at java.util.concurrent.ConcurrentLinkedQueue.offer([email protected]/ConcurrentLinkedQueue.java:380)
	at java.util.concurrent.ConcurrentLinkedQueue.add([email protected]/ConcurrentLinkedQueue.java:283)
	at com.google.common.cache.LocalCache$Segment.recordRead(LocalCache.java:2546)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2068)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4012)
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5011)
	at org.apache.pulsar.common.naming.TopicName.get(TopicName.java:81)
	at org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic(SystemTopicNames.java:62)
	at org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic(SystemTopicNames.java:86)
	at org.apache.pulsar.common.topics.TopicList.lambda$filterSystemTopic$1(TopicList.java:65)
	at org.apache.pulsar.common.topics.TopicList$$Lambda$2132/0x00007f5d049a4800.test(Unknown Source)
	at java.util.stream.ReferencePipeline$2$1.accept([email protected]/ReferencePipeline.java:178)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining([email protected]/ArrayList.java:1625)
	at java.util.stream.AbstractPipeline.copyInto([email protected]/AbstractPipeline.java:509)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto([email protected]/AbstractPipeline.java:499)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential([email protected]/ReduceOps.java:921)
	at java.util.stream.AbstractPipeline.evaluate([email protected]/AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect([email protected]/ReferencePipeline.java:682)
	at org.apache.pulsar.common.topics.TopicList.filterSystemTopic(TopicList.java:66)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleGetTopicsOfNamespace$47(ServerCnx.java:2126)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$2129/0x00007f5d049b1c80.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:757)
	at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:735)
	at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2182)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleGetTopicsOfNamespace$49(ServerCnx.java:2123)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$2128/0x00007f5d049b1a38.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow([email protected]/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage([email protected]/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply([email protected]/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleGetTopicsOfNamespace(ServerCnx.java:2120)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run([email protected]/Thread.java:840)

Modifications

  • Merge requests

Performance review

See details in #23052

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 18, 2024
@poorbarcode poorbarcode self-assigned this Jul 18, 2024
@poorbarcode poorbarcode added this to the 3.4.0 milestone Jul 18, 2024
@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@BewareMyPower
Copy link
Contributor

Besides, from the flame graph, the most CPU cost behavior is TopicName.get, which accesses the LoadingCache, which has at least a map get operation and a lock acquirement.

Actually, in this case, a simple string operation on a topic name string is enough.

@poorbarcode
Copy link
Contributor Author

Besides, from the flame graph, the most CPU cost behavior is TopicName.get, which accesses the LoadingCache, which has at least a map get operation and a lock acquirement.
Actually, in this case, a simple string operation on a topic name string is enough.

Improved

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Jul 18, 2024

+0 to this PR. I didn't left comments or request changes just to avoid blocking the merge.

Generally, an API that returns CompletableFuture<T> should not be time costed, so we should not switch to another thread to call it.

            CompletableFuture<List<String>> res = new CompletableFuture<>();
            // Switch thread to avoid blocking other threads who are calling the current method.
            pulsar.getExecutor().execute(() -> {
                getListOfTopics(namespaceName, mode).thenApply(list -> {
                    return TopicList.filterSystemTopic(list);
                }).whenComplete((topics, ex) -> {
                    if (ex != null) {
                        res.completeExceptionally(ex);
                    } else {
                        res.complete(topics);
                    }
                });
            });
            return res;

The code above is an example of bad coding style. The key method that costs CPU is filterSystemTopic, not getListOfTopics. A proper change should be

return getListOfTopics(namespaceName, mode).thenApplyAsync(TopicList::filterSystemTopic, pulsar.getExecutor());

The code is much more simple and clear:

  1. Call getListOfTopics in the current thread since it's an asynchronous method that should not cost much time
  2. Call filterSystemTopic in a separated thread because it's a pure computing method that might cost time and might be called in the same thread when the asynchronous method completes immediately.

After a private chat with @poorbarcode, he think getListOfTopics could also block the current thread. Because it could call getListOfNonPersistentTopics:

        return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName, true)
                .thenCompose(peerClusterData -> {
                    /* callback... */

The callback could cost much time. If it's called in the same thread, the current thread could also be occupied for some time. For this case, I suggested changing thenCompose to thenComposeAsync. But @poorbarcode thought we have to change many places for the sake if we need to look into the asynchronous method internally.

From my perspective, we should keep the same coding style:

  • When writing a method that returns CompletableFuture, ensure it's not blocked. Never switch it to another thread. If the callback could be called in the same thread and take much time, switch the callback instead of the method itself
  • When registering a method, you can choose to switch to another thread if it costs much time.

In short, switch the thread only when necessary.

If we started to calling an asynchronous method in another method, then the coding style will be confusing. There are many other asynchronous methods, how should we determine whether to move to another thread.

Eventually, we agree to disagree so I'm +0 to this PR and I won't block merging it.

@poorbarcode
Copy link
Contributor Author

@BewareMyPower

+0 to this PR. I didn't left comments or request changes just to avoid blocking the merge.

I think you are right, improved the code ❤️

@poorbarcode poorbarcode requested review from BewareMyPower and removed request for BewareMyPower July 18, 2024 14:31
@AuroraTwinkle
Copy link
Contributor

Good job

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 73.43%. Comparing base (bbc6224) to head (aedefd3).
Report is 459 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23049      +/-   ##
============================================
- Coverage     73.57%   73.43%   -0.14%     
- Complexity    32624    33176     +552     
============================================
  Files          1877     1915      +38     
  Lines        139502   143943    +4441     
  Branches      15299    15725     +426     
============================================
+ Hits         102638   105708    +3070     
- Misses        28908    30128    +1220     
- Partials       7956     8107     +151     
Flag Coverage Δ
inttests 27.53% <6.66%> (+2.94%) ⬆️
systests 24.76% <100.00%> (+0.44%) ⬆️
unittests 72.49% <100.00%> (-0.36%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...ache/pulsar/broker/namespace/NamespaceService.java 74.12% <100.00%> (+1.88%) ⬆️
...va/org/apache/pulsar/broker/service/ServerCnx.java 72.13% <100.00%> (-0.02%) ⬇️

... and 498 files with indirect coverage changes

@poorbarcode poorbarcode merged commit 3e4f338 into apache:master Jul 22, 2024
51 checks passed
@poorbarcode poorbarcode deleted the improve/getUserTopics branch July 22, 2024 01:28
poorbarcode added a commit that referenced this pull request Jul 22, 2024
poorbarcode added a commit that referenced this pull request Jul 22, 2024
poorbarcode added a commit that referenced this pull request Jul 22, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 25, 2024
…ce (apache#23049)

(cherry picked from commit 3e4f338)
(cherry picked from commit 3f7206c)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 26, 2024
…ce (apache#23049)

(cherry picked from commit 3e4f338)
(cherry picked from commit 3f7206c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants