Skip to content

Commit

Permalink
[fix][cli] Pulsar shell: ensure admin client is recycled or disposed (#…
Browse files Browse the repository at this point in the history
…17619)

* [fix][cli] Pulsar shell: ensure admin client is recycled or disposed
  • Loading branch information
nicoloboschi authored Sep 14, 2022
1 parent 869339d commit 1ff9fb6
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
Expand Down Expand Up @@ -2148,18 +2149,17 @@ public void requestTimeout() throws Exception {
//Ok
}

Field adminBuilderField = PulsarAdminTool.class.getDeclaredField("adminBuilder");
adminBuilderField.setAccessible(true);
PulsarAdminBuilderImpl builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);

final PulsarAdmin admin = tool.getPulsarAdminSupplier().get();
Field requestTimeoutField =
PulsarAdminBuilderImpl.class.getDeclaredField("requestTimeout");
PulsarAdminImpl.class.getDeclaredField("requestTimeout");
requestTimeoutField.setAccessible(true);
int requestTimeout = (int) requestTimeoutField.get(builder);
int requestTimeout = (int) requestTimeoutField.get(admin);

Field requestTimeoutUnitField =
PulsarAdminBuilderImpl.class.getDeclaredField("requestTimeoutUnit");
PulsarAdminImpl.class.getDeclaredField("requestTimeoutUnit");
requestTimeoutUnitField.setAccessible(true);
TimeUnit requestTimeoutUnit = (TimeUnit) requestTimeoutUnitField.get(builder);
TimeUnit requestTimeoutUnit = (TimeUnit) requestTimeoutUnitField.get(admin);
assertEquals(1, requestTimeout);
assertEquals(TimeUnit.SECONDS, requestTimeoutUnit);
}
Expand All @@ -2185,12 +2185,8 @@ public void testAuthTlsWithJsonParam() throws Exception {
}

// validate Authentication-tls has been configured
Field adminBuilderField = PulsarAdminTool.class.getDeclaredField("adminBuilder");
adminBuilderField.setAccessible(true);
PulsarAdminBuilderImpl builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);
Field confField = PulsarAdminBuilderImpl.class.getDeclaredField("conf");
confField.setAccessible(true);
ClientConfigurationData conf = (ClientConfigurationData) confField.get(builder);
ClientConfigurationData conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get())
.getClientConfigData();
AuthenticationTls atuh = (AuthenticationTls) conf.getAuthentication();
assertEquals(atuh.getCertFilePath(), certFilePath);
assertEquals(atuh.getKeyFilePath(), keyFilePath);
Expand All @@ -2203,8 +2199,8 @@ public void testAuthTlsWithJsonParam() throws Exception {
// Ok
}

builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);
conf = (ClientConfigurationData) confField.get(builder);
conf = conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get())
.getClientConfigData();
atuh = (AuthenticationTls) conf.getAuthentication();
assertNull(atuh.getCertFilePath());
assertNull(atuh.getKeyFilePath());
Expand Down Expand Up @@ -2421,12 +2417,8 @@ private static String runCustomCommand(String[] args) throws Exception {
properties.put("webServiceUrl", "http://localhost:2181");
properties.put("cliExtensionsDirectory", narFile.getParentFile().getAbsolutePath());
properties.put("customCommandFactories", "dummy");
PulsarAdminTool tool = new PulsarAdminTool(properties) {
@Override
protected PulsarAdminBuilder createAdminBuilder(Properties properties) {
return builder;
}
};
PulsarAdminTool tool = new PulsarAdminTool(properties);
tool.setPulsarAdminSupplier(new PulsarAdminSupplier(builder, tool.getRootParams()));

// see the custom command help in the main help
StringBuilder logs = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;

public class PulsarAdminSupplier implements Supplier<PulsarAdmin> {

@Data
private static final class RootParamsKey {
String serviceUrl;
String authPluginClassName;
int requestTimeout;
String authParams;
Boolean tlsAllowInsecureConnection;
String tlsTrustCertsFilePath;
Boolean tlsEnableHostnameVerification;
String tlsProvider;

static RootParamsKey fromRootParams(PulsarAdminTool.RootParams params) {
final RootParamsKey key = new RootParamsKey();
key.setServiceUrl(params.getServiceUrl());
key.setAuthParams(params.getAuthParams());
key.setAuthPluginClassName(params.getAuthPluginClassName());
key.setRequestTimeout(params.getRequestTimeout());
key.setTlsAllowInsecureConnection(params.getTlsAllowInsecureConnection());
key.setTlsTrustCertsFilePath(params.getTlsTrustCertsFilePath());
key.setTlsEnableHostnameVerification(params.getTlsEnableHostnameVerification());
key.setTlsProvider(params.getTlsProvider());
return key;
}
}

private final PulsarAdminBuilder adminBuilder;
private RootParamsKey currentParamsKey;
private PulsarAdmin admin;

public PulsarAdminSupplier(PulsarAdminBuilder baseAdminBuilder, PulsarAdminTool.RootParams rootParams) {
this.adminBuilder = baseAdminBuilder;
rootParamsUpdated(rootParams);
}

@Override
public PulsarAdmin get() {
if (admin == null) {
try {
admin = adminBuilder.build();
} catch (Exception ex) {
System.err.println(ex.getClass() + ": " + ex.getMessage());
throw new RuntimeException("Not able to create pulsar admin: " + ex.getMessage(), ex);
}
}
return admin;
}

void rootParamsUpdated(PulsarAdminTool.RootParams newParams) {
final RootParamsKey newParamsKey = RootParamsKey.fromRootParams(newParams);
if (newParamsKey.equals(currentParamsKey)) {
return;
}
applyRootParamsToAdminBuilder(adminBuilder, newParams);
currentParamsKey = newParamsKey;
if (admin != null) {
admin.close();
}
this.admin = null;
}

@SneakyThrows
private static void applyRootParamsToAdminBuilder(PulsarAdminBuilder adminBuilder,
PulsarAdminTool.RootParams rootParams) {
adminBuilder.serviceHttpUrl(rootParams.serviceUrl);
adminBuilder.authentication(rootParams.authPluginClassName, rootParams.authParams);
adminBuilder.requestTimeout(rootParams.requestTimeout, TimeUnit.SECONDS);
if (rootParams.tlsAllowInsecureConnection != null) {
adminBuilder.allowTlsInsecureConnection(rootParams.tlsAllowInsecureConnection);
}
if (rootParams.tlsEnableHostnameVerification != null) {
adminBuilder.enableTlsHostnameVerification(rootParams.tlsEnableHostnameVerification);
}
if (isNotBlank(rootParams.tlsProvider)) {
adminBuilder.sslProvider(rootParams.tlsProvider);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
Expand All @@ -53,9 +51,9 @@ public class PulsarAdminTool {
protected List<CustomCommandFactory> customCommandFactories = new ArrayList();
protected Map<String, Class<?>> commandMap;
protected JCommander jcommander;
protected final PulsarAdminBuilder adminBuilder;
protected RootParams rootParams;
private final Properties properties;
private PulsarAdminSupplier pulsarAdminSupplier;

@Getter
public static class RootParams {
Expand Down Expand Up @@ -105,11 +103,13 @@ public PulsarAdminTool(Properties properties) throws Exception {
rootParams = new RootParams();
// fallback to previous-version serviceUrl property to maintain backward-compatibility
initRootParamsFromProperties(properties);
adminBuilder = createAdminBuilder(properties);
final PulsarAdminBuilder baseAdminBuilder = createAdminBuilderFromProperties(properties);
pulsarAdminSupplier = new PulsarAdminSupplier(baseAdminBuilder, rootParams);
initJCommander();
}

protected PulsarAdminBuilder createAdminBuilder(Properties properties) {

private static PulsarAdminBuilder createAdminBuilderFromProperties(Properties properties) {
boolean useKeyStoreTls = Boolean
.parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
String tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
Expand All @@ -121,16 +121,12 @@ protected PulsarAdminBuilder createAdminBuilder(Properties properties) {
String tlsKeyFilePath = properties.getProperty("tlsKeyFilePath");
String tlsCertificateFilePath = properties.getProperty("tlsCertificateFilePath");

boolean tlsAllowInsecureConnection = this.rootParams.tlsAllowInsecureConnection != null
? this.rootParams.tlsAllowInsecureConnection
: Boolean.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "false"));
boolean tlsAllowInsecureConnection = Boolean.parseBoolean(properties
.getProperty("tlsAllowInsecureConnection", "false"));

boolean tlsEnableHostnameVerification = this.rootParams.tlsEnableHostnameVerification != null
? this.rootParams.tlsEnableHostnameVerification
: Boolean.parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false"));
final String tlsTrustCertsFilePath = isNotBlank(this.rootParams.tlsTrustCertsFilePath)
? this.rootParams.tlsTrustCertsFilePath
: properties.getProperty("tlsTrustCertsFilePath");
boolean tlsEnableHostnameVerification = Boolean.parseBoolean(properties
.getProperty("tlsEnableHostnameVerification", "false"));
final String tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");

return PulsarAdmin.builder().allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsEnableHostnameVerification)
Expand All @@ -152,39 +148,19 @@ protected void initRootParamsFromProperties(Properties properties) {
: properties.getProperty("serviceUrl");
rootParams.authPluginClassName = properties.getProperty("authPlugin");
rootParams.authParams = properties.getProperty("authParams");
rootParams.tlsProvider = properties.getProperty("webserviceTlsProvider");
}

private static class PulsarAdminSupplier implements Supplier<PulsarAdmin> {

private final PulsarAdminBuilder pulsarAdminBuilder;
private final Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory;
private PulsarAdmin admin;
private PulsarAdminSupplier(PulsarAdminBuilder pulsarAdminBuilder,
Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {
this.pulsarAdminBuilder = pulsarAdminBuilder;
this.adminFactory = adminFactory;
}

@Override
public PulsarAdmin get() {
if (admin == null) {
admin = adminFactory.apply(pulsarAdminBuilder);
}
return admin;
}
}

public void setupCommands(Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {
public void setupCommands() {
try {
Supplier<PulsarAdmin> admin = new PulsarAdminSupplier(adminBuilder, adminFactory);
for (Map.Entry<String, Class<?>> c : commandMap.entrySet()) {
addCommand(c, admin);
addCommand(c, pulsarAdminSupplier);
}

CommandExecutionContext context = new CommandExecutionContext() {
@Override
public PulsarAdmin getPulsarAdmin() {
return admin.get();
return pulsarAdminSupplier.get();
}

@Override
Expand All @@ -197,7 +173,7 @@ public Properties getConfiguration() {
for (CustomCommandFactory factory : customCommandFactories) {
List<CustomCommandGroup> customCommandGroups = factory.commandGroups(context);
for (CustomCommandGroup group : customCommandGroups) {
Object generated = CustomCommandsUtils.generateCliCommand(group, context, admin);
Object generated = CustomCommandsUtils.generateCliCommand(group, context, pulsarAdminSupplier);
jcommander.addCommand(group.name(), generated);
commandMap.put(group.name(), null);
}
Expand All @@ -215,7 +191,7 @@ public Properties getConfiguration() {
}

private void loadCustomCommandFactories() throws Exception {
customCommandFactories.addAll(CustomCommandFactoryProvider.createCustomCommandFactories(properties));
customCommandFactories = CustomCommandFactoryProvider.createCustomCommandFactories(properties);
}


Expand All @@ -237,12 +213,8 @@ private void addCommand(Map.Entry<String, Class<?>> c, Supplier<PulsarAdmin> adm
}

protected boolean run(String[] args) {
final Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory = createAdminFactory(args);
return run(args, adminFactory);
}
setupCommands();

boolean run(String[] args, Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {
setupCommands(adminFactory);
if (args.length == 0) {
jcommander.usage();
return false;
Expand All @@ -257,17 +229,8 @@ boolean run(String[] args, Function<PulsarAdminBuilder, ? extends PulsarAdmin> a

try {
jcommander.parse(Arrays.copyOfRange(args, 0, Math.min(cmdPos, args.length)));

//rootParams are populated by jcommander.parse
adminBuilder.serviceHttpUrl(rootParams.serviceUrl);
adminBuilder.authentication(rootParams.authPluginClassName, rootParams.authParams);
adminBuilder.requestTimeout(rootParams.requestTimeout, TimeUnit.SECONDS);
if (isBlank(rootParams.tlsProvider)) {
rootParams.tlsProvider = properties.getProperty("webserviceTlsProvider");
}
if (isNotBlank(rootParams.tlsProvider)) {
adminBuilder.sslProvider(rootParams.tlsProvider);
}
pulsarAdminSupplier.rootParamsUpdated(rootParams);
} catch (Exception e) {
System.err.println(e.getMessage());
System.err.println();
Expand Down Expand Up @@ -348,35 +311,6 @@ private static void exit(int code) {
}
}

private Function<PulsarAdminBuilder, ? extends PulsarAdmin> createAdminFactory(String[] args) {
int cmdPos;
for (cmdPos = 0; cmdPos < args.length; cmdPos++) {
if (commandMap.containsKey(args[cmdPos])) {
break;
}
}

++cmdPos;
boolean isLocalRun = cmdPos < args.length && "localrun".equalsIgnoreCase(args[cmdPos]);

Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory;
if (isLocalRun) {
// bypass constructing admin client
adminFactory = (adminBuilder) -> null;
} else {
adminFactory = (adminBuilder) -> {
try {
return adminBuilder.build();
} catch (Exception ex) {
System.err.println(ex.getClass() + ": " + ex.getMessage());
exit(1);
return null;
}
};
}
return adminFactory;
}

static void setAllowSystemExit(boolean allowSystemExit) {
PulsarAdminTool.allowSystemExit = allowSystemExit;
}
Expand Down Expand Up @@ -434,4 +368,18 @@ protected void initJCommander() {
commandMap.put("transactions", CmdTransactions.class);
}

@VisibleForTesting
public void setPulsarAdminSupplier(PulsarAdminSupplier pulsarAdminSupplier) {
this.pulsarAdminSupplier = pulsarAdminSupplier;
}

@VisibleForTesting
public PulsarAdminSupplier getPulsarAdminSupplier() {
return pulsarAdminSupplier;
}

@VisibleForTesting
public RootParams getRootParams() {
return rootParams;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public String getAdminUrl() {
@Override
public void setupState(Properties properties) {
getJCommander().setProgramName(getName());
setupCommands(b -> null);
setupCommands();
}

@Override
Expand Down
Loading

0 comments on commit 1ff9fb6

Please sign in to comment.