Skip to content

Commit

Permalink
[fix][client] Initializing client-authentication using configured aut…
Browse files Browse the repository at this point in the history
…h params
  • Loading branch information
rdhabalia committed Nov 19, 2024
1 parent 387a96d commit a75a7c9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
Expand Down Expand Up @@ -1916,5 +1917,49 @@ public void close() {
}
}
}

@Test
public void testTlsWithAuthParams() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";
Authentication auth;

Set<String> providers = new HashSet<>();
providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");

conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();

String authParam = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTlsFileForClient("admin.cert"),
getTlsFileForClient("admin.key-pk8"));
String authClassName = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl(brokerUrlTls.toString());
conf.setAuthParams(authParam);
conf.setAuthPluginClassName(authClassName);
conf.setTlsAllowInsecureConnection(true);

PulsarClient pulsarClient = null;
try {
pulsarClient = (new ClientBuilderImpl(conf)).build();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.common.tls.InetAddressUtils;
Expand Down Expand Up @@ -64,6 +65,9 @@ public PulsarClient build() throws PulsarClientException {
"Cannot get service url from service url provider.");
conf.setServiceUrl(conf.getServiceUrlProvider().getServiceUrl());
}
if (conf.getAuthentication() == null || conf.getAuthentication() == AuthenticationDisabled.INSTANCE) {
setAuthenticationFromPropsIfAvailable(conf);
}
PulsarClient client = new PulsarClientImpl(conf);
if (conf.getServiceUrlProvider() != null) {
conf.getServiceUrlProvider().initialize(client);
Expand Down

0 comments on commit a75a7c9

Please sign in to comment.