Skip to content

Commit

Permalink
[fix][flaky-test] BookKeeperClusterTestCase.setup() (#17865)
Browse files Browse the repository at this point in the history
Fixes: #15773  #16863 #16860

### Motivation
```
  Error:  Tests run: 11, Failures: 1, Errors: 0, Skipped: 3, Time elapsed: 87.06 s <<< FAILURE! - in org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageTest
  Error:  setUp(org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageTest)  Time elapsed: 13.089 s  <<< FAILURE!
  org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
  	at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
  	at org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase.waitForConnection(ZooKeeperWatcherBase.java:159)
  	at org.apache.bookkeeper.zookeeper.ZooKeeperClient$Builder.build(ZooKeeperClient.java:260)
  	at org.apache.bookkeeper.test.ZooKeeperUtil.restartCluster(ZooKeeperUtil.java:133)
  	at org.apache.bookkeeper.test.ZooKeeperUtil.startCluster(ZooKeeperUtil.java:104)
  	at org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test.BookKeeperClusterTestCase.startZKCluster(BookKeeperClusterTestCase.java:238)
  	at org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test.BookKeeperClusterTestCase.setUp(BookKeeperClusterTestCase.java:178)
  	at org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test.BookKeeperClusterTestCase.setUp(BookKeeperClusterTestCase.java:166)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
  	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
  	at org.testng.internal.MethodInvocationHelper.invokeMethodConsideringTimeout(MethodInvocationHelper.java:61)
  	at org.testng.internal.ConfigInvoker.invokeConfigurationMethod(ConfigInvoker.java:366)
  	at org.testng.internal.ConfigInvoker.invokeConfigurations(ConfigInvoker.java:320)
  	at org.testng.internal.TestInvoker.runConfigMethods(TestInvoker.java:701)
  	at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:527)
  	at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
  	at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
  	at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
  	at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
  	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
  	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
  	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
  	at org.testng.TestRunner.privateRun(TestRunner.java:764)
  	at org.testng.TestRunner.run(TestRunner.java:585)
  	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
  	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
  	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
  	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
  	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
  	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
  	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
  	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
  	at org.testng.TestNG.runSuites(TestNG.java:1069)
  	at org.testng.TestNG.run(TestNG.java:1037)
  	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:135)
  	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeSingleClass(TestNGDirectoryTestSuite.java:112)
  	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeLazy(TestNGDirectoryTestSuite.java:123)
  	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:90)
  	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:146)
  	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
  	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
  	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
  	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
```

The root cause is that the zk client randomly selects IPV4 and IPV6 when parsing localhost, can connect when using IPV4, and fails when using IPV6. Therefore, if you continue to randomly connect to IPV6, the connection will timeout.

https://github.com/apache/zookeeper/blob/bc1b231c9e32667b2978c86a6a64833470973dbd/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java#L140-L146
Thanks to @poorbarcode  for helping me locate the problem

### Modifications
add     @AfterMethod(alwaysRun = true)
use Adress replace hostName

### Documentation

- [x] `doc-not-needed` 

### Matching PR in the forked repository

PR in forked repository: 

- congbobo184#1
  • Loading branch information
congbobo184 authored Sep 29, 2022
1 parent c952f3c commit 6cba1f6
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ protected String changeLedgerPath() {
return "";
}

@AfterTest
@AfterTest(alwaysRun = true)
public void tearDown() throws Exception {
boolean failed = false;
for (Throwable e : asyncExceptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ public void restartCluster() throws Exception {

if (0 == zooKeeperPort) {
zooKeeperPort = serverFactory.getLocalPort();
zkaddr = new InetSocketAddress(zkaddr.getHostName(), zooKeeperPort);
connectString = zkaddr.getHostName() + ":" + zooKeeperPort;
zkaddr = new InetSocketAddress(zkaddr.getAddress().getHostAddress(), zooKeeperPort);
connectString = zkaddr.getAddress().getHostAddress() + ":" + zooKeeperPort;
}

boolean b = ClientBase.waitForServerUp(getZooKeeperConnectString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public BookKeeperPackagesStorageTest() {
}

@BeforeMethod()
public void setup() throws Exception {
public void start() throws Exception {
PackagesStorageProvider provider = PackagesStorageProvider
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
Expand All @@ -58,7 +58,7 @@ public void setup() throws Exception {
}

@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
public void close() throws Exception {
if (storage != null) {
storage.closeAsync().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.apache.bookkeeper.test.TmpDirs;
import org.apache.bookkeeper.test.ZooKeeperCluster;
import org.apache.bookkeeper.test.ZooKeeperClusterUtil;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.PortManager;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -191,7 +190,7 @@ protected String getMetadataServiceUri(String ledgersRootPath) {
return zkUtil.getMetadataServiceUri(ledgersRootPath);
}

@AfterMethod
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
boolean failed = false;
for (Throwable e : asyncExceptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,120 +16,142 @@
* specific language governing permissions and limitations
* under the License.
*/
/**
* This file is derived from ZooKeeperUtil from Apache BookKeeper
* http://bookkeeper.apache.org
*/

package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.test.ZooKeeperCluster;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;

public class ZooKeeperUtil {
static final Logger LOG = LoggerFactory.getLogger(org.apache.bookkeeper.test.ZooKeeperUtil.class);

// ZooKeeper related variables
protected static final Integer zooKeeperPort = PortManager.nextFreePort();
private final InetSocketAddress zkaddr;

public class ZooKeeperUtil implements ZooKeeperCluster {
static final Logger LOG;
protected Integer zooKeeperPort = 0;
private InetSocketAddress zkaddr;
protected ZooKeeperServer zks;
protected ZooKeeper zkc; // zookeeper client
protected ZooKeeper zkc;
protected NIOServerCnxnFactory serverFactory;
protected File ZkTmpDir;
private final String connectString;
protected File zkTmpDir;
private String connectString;

public ZooKeeperUtil() {
zkaddr = new InetSocketAddress(zooKeeperPort);
connectString = "localhost:" + zooKeeperPort;
String loopbackIPAddr = InetAddress.getLoopbackAddress().getHostAddress();
this.zkaddr = new InetSocketAddress(loopbackIPAddr, 0);
this.connectString = loopbackIPAddr + ":" + this.zooKeeperPort;
}

public ZooKeeper getZooKeeperClient() {
return zkc;
return this.zkc;
}

public String getZooKeeperConnectString() {
return connectString;
return this.connectString;
}

public String getMetadataServiceUri() {
return this.getMetadataServiceUri("/ledgers");
}

public void startServer() throws Exception {
// create a ZooKeeper server(dataDir, dataLogDir, port)
public String getMetadataServiceUri(String zkLedgersRootPath) {
return "zk://" + this.connectString + zkLedgersRootPath;
}

public String getMetadataServiceUri(String zkLedgersRootPath, String type) {
return "zk+" + type + "://" + this.connectString + zkLedgersRootPath;
}

public void startCluster() throws Exception {
LOG.debug("Running ZK server");
// ServerStats.registerAsConcrete();
ClientBase.setupTestEnv();
ZkTmpDir = File.createTempFile("zookeeper", "test");
ZkTmpDir.delete();
ZkTmpDir.mkdir();
this.zkTmpDir = IOUtils.createTempDir("zookeeper", "test");
this.restartCluster();
this.createBKEnsemble("/ledgers");
}

zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(zkaddr, 100);
serverFactory.startup(zks);
public void restartCluster() throws Exception {
this.zks = new ZooKeeperServer(this.zkTmpDir, this.zkTmpDir, 3000);
this.serverFactory = new NIOServerCnxnFactory();
this.serverFactory.configure(this.zkaddr, 100);
this.serverFactory.startup(this.zks);
if (0 == this.zooKeeperPort) {
this.zooKeeperPort = this.serverFactory.getLocalPort();
this.zkaddr = new InetSocketAddress(this.zkaddr.getAddress().getHostAddress(), this.zooKeeperPort);
this.connectString = this.zkaddr.getAddress().getHostAddress() + ":" + this.zooKeeperPort;
}

boolean b = ClientBase.waitForServerUp(getZooKeeperConnectString(), ClientBase.CONNECTION_TIMEOUT);
boolean b = ClientBase.waitForServerUp(this.getZooKeeperConnectString(), (long)ClientBase.CONNECTION_TIMEOUT);
LOG.debug("Server up: " + b);

// create a zookeeper client
LOG.debug("Instantiate ZK Client");
zkc = ZooKeeperClient.newBuilder().connectString(getZooKeeperConnectString()).build();

// initialize the zk client with values
zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
this.zkc = ZooKeeperClient.newBuilder().connectString(this.getZooKeeperConnectString()).sessionTimeoutMs(10000).build();
}

@SuppressWarnings("deprecation")
public void sleepServer(final int seconds, final CountDownLatch l) throws InterruptedException, IOException {
public void sleepCluster(final int time, final TimeUnit timeUnit, final CountDownLatch l) throws InterruptedException, IOException {
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {
Thread[] var5 = allthreads;
int var6 = allthreads.length;

for(int var7 = 0; var7 < var6; ++var7) {
final Thread t = var5[var7];
if (t.getName().contains("SyncThread:0")) {
Thread sleeper = new Thread(() -> {
try {
t.suspend();
l.countDown();
Thread.sleep(seconds * 1000);
t.resume();
} catch (Exception e) {
LOG.error("Error suspending thread", e);
Thread sleeper = new Thread() {
public void run() {
try {
t.suspend();
l.countDown();
timeUnit.sleep((long)time);
t.resume();
} catch (Exception var2) {
ZooKeeperUtil.LOG.error("Error suspending thread", var2);
}

}
});
};
sleeper.start();
return;
}
}

throw new IOException("ZooKeeper thread not found");
}

public void killServer() throws Exception {
if (zkc != null) {
zkc.close();
public void stopCluster() throws Exception {
if (this.zkc != null) {
this.zkc.close();
}

// shutdown ZK server
if (serverFactory != null) {
serverFactory.shutdown();
Assert.assertTrue(ClientBase.waitForServerDown(getZooKeeperConnectString(), ClientBase.CONNECTION_TIMEOUT),
"waiting for server down");
if (this.serverFactory != null) {
this.serverFactory.shutdown();
Assert.assertTrue("waiting for server down", ClientBase.waitForServerDown(this.getZooKeeperConnectString(), (long)ClientBase.CONNECTION_TIMEOUT));
}
if (zks != null) {
zks.getTxnLogFactory().close();

if (this.zks != null) {
this.zks.getTxnLogFactory().close();
}
// ServerStats.unregister();
FileUtils.deleteDirectory(ZkTmpDir);

}

public void killCluster() throws Exception {
this.stopCluster();
FileUtils.deleteDirectory(this.zkTmpDir);
}

static {
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
LOG = LoggerFactory.getLogger(ZooKeeperUtil.class);
}
}

0 comments on commit 6cba1f6

Please sign in to comment.