Skip to content

Commit

Permalink
HDDS-11543. Track OzoneClient object leaks via LeakDetector framework. (
Browse files Browse the repository at this point in the history
  • Loading branch information
sadanand48 authored Oct 9, 2024
1 parent e00f7ae commit 4846e97
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -878,4 +878,17 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) {
? Thread.currentThread().getStackTrace()
: null;
}

/**
* Logs a warning to report that the class is not closed properly.
*/
public static void reportLeak(Class<?> clazz, String stackTrace, Logger log) {
String warning = String.format("%s is not closed properly", clazz.getSimpleName());
if (stackTrace != null && LOG.isDebugEnabled()) {
String debugMessage = String.format("%nStackTrace for unclosed instance: %s",
stackTrace);
warning = warning.concat(debugMessage);
}
log.warn(warning);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ static UncheckedAutoCloseable track(AutoCloseable object) {

static void reportLeak(Class<?> clazz, String stackTrace) {
ManagedRocksObjectMetrics.INSTANCE.increaseLeakObject();
String warning = String.format("%s is not closed properly", clazz.getSimpleName());
if (stackTrace != null && LOG.isDebugEnabled()) {
String debugMessage = String.format("%nStackTrace for unclosed instance: %s", stackTrace);
warning = warning.concat(debugMessage);
}
LOG.warn(warning);
HddsUtils.reportLeak(clazz, stackTrace, LOG);
}

private static @Nullable StackTraceElement[] getStackTrace() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -205,6 +206,20 @@ public static void waitFor(BooleanSupplier check, int checkEveryMillis,
}
}

public static <T extends Throwable> T assertThrows(
Class<T> expectedType,
Callable<? extends AutoCloseable> func) {
return Assertions.assertThrows(expectedType, () -> {
final AutoCloseable closeable = func.call();
try {
if (closeable != null) {
closeable.close();
}
} catch (Exception ignored) {
}
});
}

/**
* @deprecated use sl4fj based version
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;

import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.UncheckedAutoCloseable;

/**
* OzoneClient connects to Ozone Cluster and
Expand Down Expand Up @@ -76,6 +77,7 @@ public class OzoneClient implements Closeable {
private final ClientProtocol proxy;
private final ObjectStore objectStore;
private ConfigurationSource conf;
private final UncheckedAutoCloseable leakTracker = OzoneClientFactory.track(this);

/**
* Creates a new OzoneClient object, generally constructed
Expand Down Expand Up @@ -119,7 +121,11 @@ public ConfigurationSource getConfiguration() {
*/
@Override
public void close() throws IOException {
proxy.close();
try {
proxy.close();
} finally {
leakTracker.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.LeakDetector;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
Expand All @@ -34,13 +36,17 @@
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
import org.apache.ratis.util.UncheckedAutoCloseable;

import com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;

/**
* Factory class to create OzoneClients.
*/
Expand All @@ -54,6 +60,21 @@ public final class OzoneClientFactory {
*/
private OzoneClientFactory() { }

private static final LeakDetector OZONE_CLIENT_LEAK_DETECTOR =
new LeakDetector("OzoneClientObject");

public static UncheckedAutoCloseable track(AutoCloseable object) {
final Class<?> clazz = object.getClass();
final StackTraceElement[] stackTrace = HddsUtils.getStackTrace(LOG);
return OZONE_CLIENT_LEAK_DETECTOR.track(object,
() -> HddsUtils.reportLeak(clazz,
HddsUtils.formatStackTrace(stackTrace, 4), LOG));
}

public static Logger getLogger() {
return LOG;
}


/**
* Constructs and return an OzoneClient with default configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public void testCreateWithInvalidPaths() throws Exception {
}

private void checkInvalidPath(Path path) {
InvalidPathException pathException = assertThrows(
InvalidPathException pathException = GenericTestUtils.assertThrows(
InvalidPathException.class, () -> fs.create(path, false)
);
assertThat(pathException.getMessage()).contains("Invalid path Name");
Expand Down Expand Up @@ -1831,12 +1831,14 @@ public void testLoopInLinkBuckets() throws Exception {
String rootPath = String.format("%s://%s.%s/",
OzoneConsts.OZONE_URI_SCHEME, linkBucket1Name, linksVolume);

try {
FileSystem.get(URI.create(rootPath), cluster.getConf());
fail("Should throw Exception due to loop in Link Buckets");
try (FileSystem fileSystem = FileSystem.get(URI.create(rootPath),
cluster.getConf())) {
fail("Should throw Exception due to loop in Link Buckets" +
" while initialising fs with URI " + fileSystem.getUri());
} catch (OMException oe) {
// Expected exception
assertEquals(OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS, oe.getResult());
assertEquals(OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS,
oe.getResult());
} finally {
volume.deleteBucket(linkBucket1Name);
volume.deleteBucket(linkBucket2Name);
Expand All @@ -1854,13 +1856,17 @@ public void testLoopInLinkBuckets() throws Exception {
String rootPath2 = String.format("%s://%s.%s/",
OzoneConsts.OZONE_URI_SCHEME, danglingLinkBucketName, linksVolume);

FileSystem fileSystem = null;
try {
FileSystem.get(URI.create(rootPath2), cluster.getConf());
fileSystem = FileSystem.get(URI.create(rootPath2), cluster.getConf());
} catch (OMException oe) {
// Expected exception
fail("Should not throw Exception and show orphan buckets");
} finally {
volume.deleteBucket(danglingLinkBucketName);
if (fileSystem != null) {
fileSystem.close();
}
}
}

Expand Down Expand Up @@ -2230,7 +2236,8 @@ void testFileSystemWithObjectStoreLayout() throws IOException {
OzoneConfiguration config = new OzoneConfiguration(fs.getConf());
config.set(FS_DEFAULT_NAME_KEY, obsRootPath);

IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> FileSystem.get(config));
IllegalArgumentException e = GenericTestUtils.assertThrows(IllegalArgumentException.class,
() -> FileSystem.get(config));
assertThat(e.getMessage()).contains("OBJECT_STORE, which does not support file system semantics");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ public static void listStatusIteratorOnPageSize(OzoneConfiguration conf,
URI uri = FileSystem.getDefaultUri(config);
config.setBoolean(
String.format("fs.%s.impl.disable.cache", uri.getScheme()), true);
FileSystem subject = FileSystem.get(uri, config);
Path dir = new Path(Objects.requireNonNull(rootPath), "listStatusIterator");
try {
Set<String> paths = new TreeSet<>();
for (int dirCount : dirCounts) {
listStatusIterator(subject, dir, paths, dirCount);
try (FileSystem subject = FileSystem.get(uri, config)) {
Path dir = new Path(Objects.requireNonNull(rootPath),
"listStatusIterator");
try {
Set<String> paths = new TreeSet<>();
for (int dirCount : dirCounts) {
listStatusIterator(subject, dir, paths, dirCount);
}
} finally {
subject.delete(dir, true);
}
} finally {
subject.delete(dir, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,16 @@ void teardown() throws IOException {
void fileSystemWithUnsupportedDefaultBucketLayout(String layout) {
OzoneConfiguration conf = configWithDefaultBucketLayout(layout);

OMException e = assertThrows(OMException.class,
() -> FileSystem.newInstance(conf));
OMException e = assertThrows(OMException.class, () -> {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.newInstance(conf);
} finally {
if (fileSystem != null) {
fileSystem.close();
}
}
});
assertThat(e.getMessage())
.contains(ERROR_MAP.get(layout));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.net.StaticMapping;

import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OmTestManagers;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class TestOMSortDatanodes {
"edge1", "/rack1"
);

private static OzoneClient ozoneClient;

@BeforeAll
public static void setup() throws Exception {
config = new OzoneConfiguration();
Expand Down Expand Up @@ -109,11 +112,15 @@ public static void setup() throws Exception {
= new OmTestManagers(config, scm.getBlockProtocolServer(),
mockScmContainerClient);
om = omTestManagers.getOzoneManager();
ozoneClient = omTestManagers.getRpcClient();
keyManager = (KeyManagerImpl)omTestManagers.getKeyManager();
}

@AfterAll
public static void cleanup() throws Exception {
if (ozoneClient != null) {
ozoneClient.close();
}
if (scm != null) {
scm.stop();
scm.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -188,6 +190,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.slf4j.event.Level.DEBUG;

import org.apache.ozone.test.tag.Unhealthy;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -221,6 +224,7 @@ abstract class OzoneRpcClientTests extends OzoneTestBase {
private static OzoneAcl inheritedGroupAcl = new OzoneAcl(GROUP,
remoteGroupName, ACCESS, READ);
private static MessageDigest eTagProvider;
private static Set<OzoneClient> ozoneClients = new HashSet<>();

@BeforeAll
public static void initialize() throws NoSuchAlgorithmException {
Expand Down Expand Up @@ -250,6 +254,7 @@ static void startCluster(OzoneConfiguration conf, MiniOzoneCluster.Builder build
.build();
cluster.waitForClusterToBeReady();
ozClient = OzoneClientFactory.getRpcClient(conf);
ozoneClients.add(ozClient);
store = ozClient.getObjectStore();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
Expand All @@ -259,10 +264,9 @@ static void startCluster(OzoneConfiguration conf, MiniOzoneCluster.Builder build
/**
* Close OzoneClient and shutdown MiniOzoneCluster.
*/
static void shutdownCluster() throws IOException {
if (ozClient != null) {
ozClient.close();
}
static void shutdownCluster() {
org.apache.hadoop.hdds.utils.IOUtils.closeQuietly(ozoneClients);
ozoneClients.clear();

if (storageContainerLocationClient != null) {
storageContainerLocationClient.close();
Expand All @@ -274,6 +278,7 @@ static void shutdownCluster() throws IOException {
}

private static void setOzClient(OzoneClient ozClient) {
ozoneClients.add(ozClient);
OzoneRpcClientTests.ozClient = ozClient;
}

Expand Down Expand Up @@ -3140,6 +3145,37 @@ void testMultipartUploadOverride(ReplicationConfig replication)
doMultipartUpload(bucket, keyName, (byte)97, replication);

}

/**
* This test prints out that there is a memory leak in the test logs
* which during post-processing is caught by the CI thereby failing the
* CI run. Hence, disabling this for CI.
*/
@Unhealthy
public void testClientLeakDetector() throws Exception {
OzoneClient client = OzoneClientFactory.getRpcClient(cluster.getConf());
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
GenericTestUtils.LogCapturer ozoneClientFactoryLogCapturer =
GenericTestUtils.LogCapturer.captureLogs(
OzoneClientFactory.getLogger());

client.getObjectStore().createVolume(volumeName);
OzoneVolume volume = client.getObjectStore().getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
byte[] data = new byte[10];
Arrays.fill(data, (byte) 1);
try (OzoneOutputStream out = bucket.createKey(keyName, 10,
ReplicationConfig.fromTypeAndFactor(RATIS, ONE), new HashMap<>())) {
out.write(data);
}
client = null;
System.gc();
GenericTestUtils.waitFor(() -> ozoneClientFactoryLogCapturer.getOutput()
.contains("is not closed properly"), 100, 2000);
}
@Test
public void testMultipartUploadOwner() throws Exception {
// Save the old user, and switch to the old user after test
Expand Down
Loading

0 comments on commit 4846e97

Please sign in to comment.