Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread #17620

Merged

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Sep 13, 2022

Motivation

In a Pulsar 2.10.x test environment we ran into an issue where there were 12.3 million ZK WatchEvents in the heap dump.
The ZK Event thread was blocked and the ZK session had also been expired.

Modifications

Run callbacks on a separate thread.

Additional context

PIP-45 changes #12770 and #13296

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@lhotari lhotari added this to the 2.12.0 milestone Sep 13, 2022
@lhotari lhotari self-assigned this Sep 13, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 13, 2022
@merlimat
Copy link
Contributor

@lhotari Do you have the stack trace for the deadlock to attach here for context?

@lhotari
Copy link
Member Author

lhotari commented Sep 14, 2022

@lhotari Do you have the stack trace for the deadlock to attach here for context?

@merlimat I only had a heap dump of the problem so I'm not sure about the deadlock details. The heap dump includes a thread dump, but without lock info. Here's the full thread dump:
https://gist.github.com/lhotari/b9513c1eeb7582b25f60f1c89119789d
(line numbers refer to DS fork of BK 4.14.5 and fork of Pulsar 2.10.x)

Here's a subset of threads related to metadata store and ZK. There might be several issues that are causing the issue.
I noticed that LeaderElectionImpl.close is also blocked. I guess the ZK session has expired and the recovery for this doesn't work since it's blocked.

Here are the metadata & ZK related threads from the full thread dump:

AuditorBookie-bookie9.testcluster.example.com:3181  Waiting Thread ID: 85
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
  java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1823)
  java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1998)
  org.apache.pulsar.metadata.bookkeeper.PulsarLedgerUnderreplicationManager.getLostBookieRecoveryDelay(PulsarLedgerUnderreplicationManager.java:785)
  org.apache.bookkeeper.replication.Auditor$9.run(Auditor.java:545)
  org.apache.bookkeeper.util.SafeRunnable$1.safeRun(SafeRunnable.java:43)
  org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  java.util.concurrent.FutureTask.run(FutureTask.java:264)
  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  java.lang.Thread.run(Thread.java:834)

AuditorElector-bookie9.testcluster.example.com:3181  Waiting Thread ID: 83
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
  java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1823)
  java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1998)
  org.apache.pulsar.metadata.bookkeeper.PulsarLedgerUnderreplicationManager.notifyLostBookieRecoveryDelayChanged(PulsarLedgerUnderreplicationManager.java:804)
  org.apache.bookkeeper.replication.Auditor.start(Auditor.java:699)
  org.apache.bookkeeper.replication.AuditorElector$3.run(AuditorElector.java:187)
  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  java.util.concurrent.FutureTask.run(FutureTask.java:264)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  java.lang.Thread.run(Thread.java:834)
 
main-EventThread  Blocked Thread ID: 59
  org.apache.bookkeeper.replication.Auditor.submitAuditTask(Auditor.java:533)
  org.apache.bookkeeper.replication.Auditor.lambda$watchBookieChanges$0(Auditor.java:1051)
  org.apache.bookkeeper.replication.Auditor$$Lambda$340.onBookiesChanged()
  org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient$$Lambda$216.accept()
  java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
  java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
  java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
  org.apache.pulsar.metadata.impl.ZKMetadataStore.handleGetChildrenResult(ZKMetadataStore.java:260)
  org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$6(ZKMetadataStore.java:191)
  org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$156.processResult()
  org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:490)
  org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:712)
  org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)

metadata-store-20-1  Waiting Thread ID: 57
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
  java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
  java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
  java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  java.lang.Thread.run(Thread.java:834)

metadata-store-3-1  Waiting Thread ID: 17
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
  java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
  java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
  java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  java.lang.Thread.run(Thread.java:834)

metadata-store-coordination-service-22-1  Waiting Thread ID: 86
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
  java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
  java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
  java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  java.lang.Thread.run(Thread.java:834)

metadata-store-coordination-service-5-1  Waiting Thread ID: 90
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
  java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1823)
  java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1998)
  org.apache.pulsar.metadata.coordination.impl.LockManagerImpl.lambda$handleSessionEvent$3(LockManagerImpl.java:139)
  org.apache.pulsar.metadata.coordination.impl.LockManagerImpl$$Lambda$343.run()
  org.apache.bookkeeper.common.util.SafeRunnable$1.safeRun(SafeRunnable.java:60)
  org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  java.util.concurrent.FutureTask.run(FutureTask.java:264)
  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  java.lang.Thread.run(Thread.java:834)

metadata-store-zk-session-watcher-21-1  Waiting Thread ID: 60
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
  java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1823)
  java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2043)
  org.apache.pulsar.metadata.coordination.impl.LeaderElectionImpl.close(LeaderElectionImpl.java:233)
  org.apache.pulsar.metadata.bookkeeper.PulsarLedgerAuditorManager.close(PulsarLedgerAuditorManager.java:96)
  org.apache.bookkeeper.replication.AuditorElector.shutdown(AuditorElector.java:234)
  org.apache.bookkeeper.replication.AutoRecoveryMain.shutdown(AutoRecoveryMain.java:154)
  org.apache.bookkeeper.replication.AutoRecoveryMain.lambda$new$0(AutoRecoveryMain.java:99)
  org.apache.bookkeeper.replication.AutoRecoveryMain$$Lambda$222.onSessionExpired()
  org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver.lambda$setSessionStateListener$0(PulsarMetadataClientDriver.java:68)
  org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver$$Lambda$223.accept()
  org.apache.pulsar.metadata.impl.AbstractMetadataStore.lambda$receivedSessionEvent$11(AbstractMetadataStore.java:297)
  org.apache.pulsar.metadata.impl.AbstractMetadataStore$$Lambda$342.accept()
  java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:807)
  org.apache.pulsar.metadata.impl.AbstractMetadataStore.receivedSessionEvent(AbstractMetadataStore.java:295)
  org.apache.pulsar.metadata.impl.ZKMetadataStore.receivedSessionEvent(ZKMetadataStore.java:152)
  org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$148.accept()
  org.apache.pulsar.metadata.impl.ZKSessionWatcher.checkState(ZKSessionWatcher.java:150)
  org.apache.pulsar.metadata.impl.ZKSessionWatcher.checkConnectionStatus(ZKSessionWatcher.java:110)
  org.apache.pulsar.metadata.impl.ZKSessionWatcher$$Lambda$149.run()
  org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54)
  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  java.lang.Thread.run(Thread.java:834)

metadata-store-zk-session-watcher-4-1  Waiting Thread ID: 20
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
  java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1798)
  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
  java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1868)
  java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
  org.apache.pulsar.metadata.impl.ZKSessionWatcher.checkConnectionStatus(ZKSessionWatcher.java:104)
  org.apache.pulsar.metadata.impl.ZKSessionWatcher$$Lambda$149.run()
  org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54)
  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  java.lang.Thread.run(Thread.java:834)
 
ReplicationWorker  Waiting Thread ID: 63
  java.lang.Object.wait(Object.java)
  org.apache.pulsar.metadata.bookkeeper.PulsarLedgerUnderreplicationManager.getLedgerToRereplicate(PulsarLedgerUnderreplicationManager.java:572)
  org.apache.bookkeeper.replication.ReplicationWorker.rereplicate(ReplicationWorker.java:264)
  org.apache.bookkeeper.replication.ReplicationWorker.run(ReplicationWorker.java:230)
  io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  java.lang.Thread.run(Thread.java:834)
  
ZKC-connect-executor-0  Waiting Thread ID: 169
  jdk.internal.misc.Unsafe.park(Unsafe.java)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
  java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)
  java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1054)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1114)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  java.lang.Thread.run(Thread.java:834)

ZKC-connect-executor-0-EventThread  Blocked Thread ID: 823
  org.apache.pulsar.metadata.impl.ZKSessionWatcher.process(ZKSessionWatcher.java:120)
  org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$new$0(ZKMetadataStore.java:101)
  org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$361.accept()
  java.util.Optional.ifPresent(Optional.java:183)
  org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$new$1(ZKMetadataStore.java:101)
  org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$139.process()
  org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase.notifyEvent(ZooKeeperWatcherBase.java:180)
  org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase.process(ZooKeeperWatcherBase.java:146)
  org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:578)
  org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)

ZKC-connect-executor-0-SendThread(zk01.testcluster.example.com:2181)  Runnable Thread ID: 822
  sun.nio.ch.EPoll.wait(EPoll.java)
  sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:120)
  sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:124)
  sun.nio.ch.SelectorImpl.select(SelectorImpl.java:136)
  org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:332)
  org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1282)    

@lhotari
Copy link
Member Author

lhotari commented Sep 14, 2022

I created a separate PR about handling session events in a separate thread: #17638

@eolivelli eolivelli merged commit 8e0ae80 into apache:master Sep 14, 2022
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Sep 14, 2022
Jason918 pushed a commit that referenced this pull request Sep 25, 2022
@Technoboy- Technoboy- modified the milestones: 2.12.0, 2.11.0 Sep 26, 2022
Technoboy- pushed a commit that referenced this pull request Sep 26, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Sep 28, 2022
…per event thread (apache#17620)

(cherry picked from commit 8e0ae80)
(cherry picked from commit f764350)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants