diff --git a/bin/pulsar b/bin/pulsar
index 0bce9c4cd6496..9f4224b4d1eda 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -137,6 +137,7 @@ where command is one of:
initialize-cluster-metadata One-time metadata initialization
compact-topic Run compaction against a topic
zookeeper-shell Open a ZK shell client
+ tokens Utility to create authentication tokens
help This help message
@@ -331,6 +332,8 @@ elif [ $COMMAND == "sql" ]; then
exec $JAVA -cp "${PRESTO_HOME}/lib/*" com.facebook.presto.cli.Presto --server localhost:8081 "${@}"
elif [ $COMMAND == "sql-worker" ]; then
exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
+elif [ $COMMAND == "tokens" ]; then
+ exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@
elif [ $COMMAND == "help" ]; then
pulsar_help;
else
diff --git a/conf/broker.conf b/conf/broker.conf
index 186b6d77c5018..3c1853adc521d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -283,6 +283,22 @@ athenzDomainNames=
# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=
+### --- Token Authentication Provider --- ###
+
+## Symmetric key
+# Configure the secret key to be used to validate auth tokens
+# The key can be specified like:
+# tokenSecretKey=data:base64,xxxxxxxxx
+# tokenSecretKey=file:///my/secret.key
+tokenSecretKey=
+
+## Asymmetric public/private key pair
+# Configure the public key to be used to validate auth tokens
+# The key can be specified like:
+# tokenPublicKey=data:base64,xxxxxxxxx
+# tokenPublicKey=file:///my/public.key
+tokenPublicKey=
+
### --- BookKeeper Client --- ###
# Authentication plugin to use when connecting to bookies
diff --git a/conf/proxy.conf b/conf/proxy.conf
index dbac95aa2324b..ffa6c45fadaa6 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -129,6 +129,22 @@ tlsHostnameVerificationEnabled=false
# certificate isn't trusted.
tlsRequireTrustedClientCertOnConnect=false
+### --- Token Authentication Provider --- ###
+
+## Symmetric key
+# Configure the secret key to be used to validate auth tokens
+# The key can be specified like:
+# tokenSecretKey=data:base64,xxxxxxxxx
+# tokenSecretKey=file:///my/secret.key
+tokenSecretKey=
+
+## Asymmetric public/private key pair
+# Configure the public key to be used to validate auth tokens
+# The key can be specified like:
+# tokenPublicKey=data:base64,xxxxxxxxx
+# tokenPublicKey=file:///my/public.key
+tokenPublicKey=
+
### --- Deprecated config variables --- ###
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index bd6e8ae5890ab..d204122ad222e 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -469,10 +469,13 @@ The Apache Software License, Version 2.0
- io.dropwizard.metrics-metrics-jvm-3.1.0.jar
* Prometheus
- io.prometheus-simpleclient_httpserver-0.5.0.jar
+ * Java JSON WebTokens
+ - io.jsonwebtoken-jjwt-api-0.10.5.jar
+ - io.jsonwebtoken-jjwt-impl-0.10.5.jar
+ - io.jsonwebtoken-jjwt-jackson-0.10.5.jar
* JavaX Injection
- javax.inject-javax.inject-1.jar
-
BSD 3-clause "New" or "Revised" License
* Google auth library
- com.google.auth-google-auth-library-credentials-0.9.0.jar -- licenses/LICENSE-google-auth-library.txt
diff --git a/pom.xml b/pom.xml
index 9f72ceb45e82e..bbb85553789b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,7 @@ flexible messaging model and an intuitive client API.
1.6.0
2.11
0.8.2
+ 0.10.5
0.12.3
@@ -750,6 +751,22 @@ flexible messaging model and an intuitive client API.
+
+
+ io.jsonwebtoken
+ jjwt-api
+ ${jsonwebtoken.version}
+
+
+ io.jsonwebtoken
+ jjwt-impl
+ ${jsonwebtoken.version}
+
+
+ io.jsonwebtoken
+ jjwt-jackson
+ ${jsonwebtoken.version}
+
org.aspectj
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index e1de84d57836e..e7031f33983d4 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -38,6 +38,12 @@
pulsar-zookeeper-utils
${project.version}
+
+
+ ${project.groupId}
+ pulsar-common
+ ${project.version}
+
com.google.guava
@@ -54,5 +60,14 @@
javax.ws.rs-api
+
+ io.jsonwebtoken
+ jjwt-impl
+
+
+
+ io.jsonwebtoken
+ jjwt-jackson
+
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
new file mode 100644
index 0000000000000..4ed88684ae463
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwt;
+import io.jsonwebtoken.JwtException;
+import io.jsonwebtoken.Jwts;
+
+import java.io.IOException;
+import java.security.Key;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+
+public class AuthenticationProviderToken implements AuthenticationProvider {
+
+ public final static String HTTP_HEADER_NAME = "Authorization";
+ final static String HTTP_HEADER_VALUE_PREFIX = "Bearer ";
+
+ // When simmetric key is configured
+ final static String CONF_TOKEN_SECRET_KEY = "tokenSecretKey";
+
+ // When public/private key pair is configured
+ final static String CONF_TOKEN_PUBLIC_KEY = "tokenPublicKey";
+
+ private Key validationKey;
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {
+ this.validationKey = getValidationKey(config);
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "token";
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+ String token = null;
+
+ if (authData.hasDataFromCommand()) {
+ // Authenticate Pulsar binary connection
+ token = authData.getCommandData();
+ } else if (authData.hasDataFromHttp()) {
+ // Authentication HTTP request. The format here should be compliant to RFC-6750
+ // (https://tools.ietf.org/html/rfc6750#section-2.1). Eg:
+ //
+ // Authorization: Bearer xxxxxxxxxxxxx
+ String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);
+ if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
+ throw new AuthenticationException("Invalid HTTP Authorization header");
+ }
+
+ // Remove prefix
+ token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length());
+ } else {
+ throw new AuthenticationException("No token credentials passed");
+ }
+
+ // Validate the token
+ try {
+ @SuppressWarnings("unchecked")
+ Jwt, Claims> jwt = Jwts.parser()
+ .setSigningKey(validationKey)
+ .parse(token);
+
+ return jwt.getBody().getSubject();
+ } catch (JwtException e) {
+ throw new AuthenticationException("Failed to authentication token: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Try to get the validation key for tokens from several possible config options.
+ */
+ private static Key getValidationKey(ServiceConfiguration conf) throws IOException {
+ final boolean isPublicKey;
+ final String validationKeyConfig;
+
+ if (conf.getProperty(CONF_TOKEN_SECRET_KEY) != null
+ && !StringUtils.isBlank((String) conf.getProperty(CONF_TOKEN_SECRET_KEY))) {
+ isPublicKey = false;
+ validationKeyConfig = (String) conf.getProperty(CONF_TOKEN_SECRET_KEY);
+ } else if (conf.getProperty(CONF_TOKEN_PUBLIC_KEY) != null
+ && !StringUtils.isBlank((String) conf.getProperty(CONF_TOKEN_PUBLIC_KEY))) {
+ isPublicKey = true;
+ validationKeyConfig = (String) conf.getProperty(CONF_TOKEN_PUBLIC_KEY);
+ } else {
+ throw new IOException("No secret key was provided for token authentication");
+ }
+
+ byte[] validationKey = AuthTokenUtils.readKeyFromUrl(validationKeyConfig);
+
+ if (isPublicKey) {
+ return AuthTokenUtils.decodePublicKey(validationKey);
+ } else {
+ return AuthTokenUtils.decodeSecretKey(validationKey);
+ }
+ }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java
new file mode 100644
index 0000000000000..08ff1c7ebd1d8
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.utils;
+
+import com.google.common.io.ByteStreams;
+
+import io.jsonwebtoken.JwtBuilder;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.io.Decoders;
+import io.jsonwebtoken.io.Encoders;
+import io.jsonwebtoken.security.Keys;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.Key;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.security.spec.X509EncodedKeySpec;
+import java.util.Date;
+import java.util.Optional;
+
+import javax.crypto.SecretKey;
+
+import lombok.experimental.UtilityClass;
+
+import org.apache.pulsar.client.api.url.URL;
+
+@UtilityClass
+public class AuthTokenUtils {
+
+ public static SecretKey createSecretKey(SignatureAlgorithm signatureAlgorithm) {
+ return Keys.secretKeyFor(signatureAlgorithm);
+ }
+
+ public static SecretKey decodeSecretKey(byte[] secretKey) {
+ return Keys.hmacShaKeyFor(secretKey);
+ }
+
+ public static PrivateKey decodePrivateKey(byte[] key) throws IOException {
+ try {
+ PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(key);
+ KeyFactory kf = KeyFactory.getInstance("RSA");
+ return kf.generatePrivate(spec);
+ } catch (Exception e) {
+ throw new IOException("Failed to decode private key", e);
+ }
+ }
+
+ public static PublicKey decodePublicKey(byte[] key) throws IOException {
+ try {
+ X509EncodedKeySpec spec = new X509EncodedKeySpec(key);
+ KeyFactory kf = KeyFactory.getInstance("RSA");
+ return kf.generatePublic(spec);
+ } catch (Exception e) {
+ throw new IOException("Failed to decode public key", e);
+ }
+ }
+
+ public static String encodeKeyBase64(Key key) {
+ return Encoders.BASE64.encode(key.getEncoded());
+ }
+
+ public static String createToken(Key signingKey, String subject, Optional expiryTime) {
+ JwtBuilder builder = Jwts.builder()
+ .setSubject(subject)
+ .signWith(signingKey);
+
+ if (expiryTime.isPresent()) {
+ builder.setExpiration(expiryTime.get());
+ }
+
+ return builder.compact();
+ }
+
+ public static byte[] readKeyFromUrl(String keyConfUrl) throws IOException {
+ if (keyConfUrl.startsWith("data:") || keyConfUrl.startsWith("file:")) {
+ try {
+ return ByteStreams.toByteArray((InputStream) new URL(keyConfUrl).getContent());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else {
+ // Assume the key content was passed in base64
+ return Decoders.BASE64.decode(keyConfUrl);
+ }
+ }
+}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
new file mode 100644
index 0000000000000..078a7b625afe5
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwt;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.io.Decoders;
+import io.jsonwebtoken.security.Keys;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.PrivateKey;
+import java.sql.Date;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.SecretKey;
+import javax.naming.AuthenticationException;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.testng.annotations.Test;
+
+public class AuthenticationProviderTokenTest {
+
+ @Test
+ public void testInvalidInitialize() throws Exception {
+ AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+ try {
+ provider.initialize(new ServiceConfiguration());
+ fail("should have failed");
+ } catch (IOException e) {
+ // Expected, secret key was not defined
+ }
+
+ provider.close();
+ }
+
+ @Test
+ public void testSerializeSecretKey() throws Exception {
+ SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ String token = Jwts.builder()
+ .setSubject("my-test-subject")
+ .signWith(secretKey)
+ .compact();
+
+ @SuppressWarnings("unchecked")
+ Jwt, Claims> jwt = Jwts.parser()
+ .setSigningKey(AuthTokenUtils.decodeSecretKey(secretKey.getEncoded()))
+ .parse(token);
+
+ System.out.println("Subject: " + jwt.getBody().getSubject());
+ }
+
+ @Test
+ public void testSerializeKeyPair() throws Exception {
+ KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+
+ String privateKey = AuthTokenUtils.encodeKeyBase64(keyPair.getPrivate());
+ String publicKey = AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
+
+ String token = AuthTokenUtils.createToken(AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKey)),
+ "my-test-subject",
+ Optional.empty());
+
+ @SuppressWarnings("unchecked")
+ Jwt, Claims> jwt = Jwts.parser()
+ .setSigningKey(AuthTokenUtils.decodePublicKey(Decoders.BASE64.decode(publicKey)))
+ .parse(token);
+
+ System.out.println("Subject: " + jwt.getBody().getSubject());
+ }
+
+ @Test
+ public void testAuthSecretKey() throws Exception {
+ SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ AuthenticationProviderToken provider = new AuthenticationProviderToken();
+ assertEquals(provider.getAuthMethodName(), "token");
+
+ Properties properties = new Properties();
+ properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY,
+ AuthTokenUtils.encodeKeyBase64(secretKey));
+
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setProperties(properties);
+ provider.initialize(conf);
+
+ try {
+ provider.authenticate(new AuthenticationDataSource() {
+ });
+ fail("Should have failed");
+ } catch (AuthenticationException e) {
+ // expected, no credential passed
+ }
+
+ String token = AuthTokenUtils.createToken(secretKey, "my-test-subject", Optional.empty());
+
+ // Pulsar protocol auth
+ String subject = provider.authenticate(new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return token;
+ }
+ });
+ assertEquals(subject, "my-test-subject");
+
+ // HTTP protocol auth
+ provider.authenticate(new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromHttp() {
+ return true;
+ }
+
+ @Override
+ public String getHttpHeader(String name) {
+ if (name.equals("Authorization")) {
+ return "Bearer " + token;
+ } else {
+ throw new IllegalArgumentException("Wrong HTTP header");
+ }
+ }
+ });
+ assertEquals(subject, "my-test-subject");
+
+ // Expired token. This should be rejected by the authentication provider
+ String expiredToken = AuthTokenUtils.createToken(secretKey, "my-test-subject",
+ Optional.of(new Date(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1))));
+
+ // Pulsar protocol auth
+ try {
+ provider.authenticate(new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return expiredToken;
+ }
+ });
+ fail("Should have failed");
+ } catch (AuthenticationException e) {
+ // expected, token was expired
+ }
+
+ provider.close();
+ }
+
+ @Test
+ public void testAuthSecretKeyFromFile() throws Exception {
+ SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ File secretKeyFile = File.createTempFile("pular-test-secret-key-", ".key");
+ secretKeyFile.deleteOnExit();
+ Files.write(Paths.get(secretKeyFile.toString()), secretKey.getEncoded());
+
+ AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+ Properties properties = new Properties();
+ properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY, "file://" + secretKeyFile.toString());
+
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setProperties(properties);
+ provider.initialize(conf);
+
+ String token = AuthTokenUtils.createToken(secretKey, "my-test-subject", Optional.empty());
+
+ // Pulsar protocol auth
+ String subject = provider.authenticate(new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return token;
+ }
+ });
+ assertEquals(subject, "my-test-subject");
+ provider.close();
+ }
+
+ @Test
+ public void testAuthSecretKeyFromDataBase64() throws Exception {
+ SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+ Properties properties = new Properties();
+ properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY,
+ "data:;base64," + AuthTokenUtils.encodeKeyBase64(secretKey));
+
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setProperties(properties);
+ provider.initialize(conf);
+
+ String token = AuthTokenUtils.createToken(secretKey, "my-test-subject", Optional.empty());
+
+ // Pulsar protocol auth
+ String subject = provider.authenticate(new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return token;
+ }
+ });
+ assertEquals(subject, "my-test-subject");
+ provider.close();
+ }
+
+ @Test
+ public void testAuthSecretKeyPair() throws Exception {
+ KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+
+ String privateKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPrivate());
+ String publicKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
+
+ AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+ Properties properties = new Properties();
+ // Use public key for validation
+ properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_PUBLIC_KEY, publicKeyStr);
+
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setProperties(properties);
+ provider.initialize(conf);
+
+ // Use private key to generate token
+ PrivateKey privateKey = AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr));
+ String token = AuthTokenUtils.createToken(privateKey, "my-test-subject", Optional.empty());
+
+ // Pulsar protocol auth
+ String subject = provider.authenticate(new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return token;
+ }
+ });
+ assertEquals(subject, "my-test-subject");
+
+ provider.close();
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
new file mode 100644
index 0000000000000..19e71f39b46f7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils.auth.tokens;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Charsets;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwt;
+import io.jsonwebtoken.JwtException;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.io.Decoders;
+import io.jsonwebtoken.io.Encoders;
+import io.jsonwebtoken.security.Keys;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Key;
+import java.security.KeyPair;
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.SecretKey;
+import javax.naming.AuthenticationException;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
+
+public class TokensCliUtils {
+
+ public static class Arguments {
+ @Parameter(names = { "-h", "--help" }, description = "Show this help message")
+ private boolean help = false;
+ }
+
+ @Parameters(commandDescription = "Create a new secret key")
+ public static class CommandCreateSecretKey {
+ @Parameter(names = { "-a",
+ "--signature-algorithm" }, description = "The signature algorithm for the new secret key.")
+ SignatureAlgorithm algorithm = SignatureAlgorithm.HS256;
+
+ @Parameter(names = { "-o",
+ "--output" }, description = "Write the secret key to a file instead of stdout")
+ String outputFile;
+
+ @Parameter(names = {
+ "-b", "--base64" }, description = "Encode the key in base64")
+ boolean base64 = false;
+
+ public void run() throws IOException {
+ SecretKey secretKey = AuthTokenUtils.createSecretKey(algorithm);
+ byte[] encoded = secretKey.getEncoded();
+
+ if (base64) {
+ encoded = Encoders.BASE64.encode(encoded).getBytes();
+ }
+
+ if (outputFile != null) {
+ Files.write(Paths.get(outputFile), encoded);
+ } else {
+ System.out.write(encoded);
+ }
+ }
+ }
+
+ @Parameters(commandDescription = "Create a new or pair of keys public/private")
+ public static class CommandCreateKeyPair {
+ @Parameter(names = { "-a",
+ "--signature-algorithm" }, description = "The signature algorithm for the new key pair.")
+ SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
+
+ @Parameter(names = {
+ "--output-private-key" }, description = "File where to write the private key", required = true)
+ String privateKeyFile;
+ @Parameter(names = {
+ "--output-public-key" }, description = "File where to write the public key", required = true)
+ String publicKeyFile;
+
+ public void run() throws IOException {
+ KeyPair pair = Keys.keyPairFor(algorithm);
+
+ Files.write(Paths.get(publicKeyFile), pair.getPublic().getEncoded());
+ Files.write(Paths.get(privateKeyFile), pair.getPrivate().getEncoded());
+ }
+ }
+
+ @Parameters(commandDescription = "Create a new token")
+ public static class CommandCreateToken {
+
+ @Parameter(names = { "-s",
+ "--subject" }, description = "Specify the 'subject' or 'principal' associate with this token", required = true)
+ private String subject;
+
+ @Parameter(names = { "-e",
+ "--expiry-time" }, description = "Relative expiry time for the token (eg: 1h, 3d, 10y). (m=minutes) Default: no expiration")
+ private String expiryTime;
+
+ @Parameter(names = { "-sk",
+ "--secret-key" }, description = "Pass the secret key for signing the token. This can either be: data:, file:, etc..")
+ private String secretKey;
+
+ @Parameter(names = { "-pk",
+ "--private-key" }, description = "Pass the private key for signing the token. This can either be: data:, file:, etc..")
+ private String privateKey;
+
+ public void run() throws Exception {
+ if (secretKey == null && privateKey == null) {
+ System.err.println(
+ "Either --secret-key or --private-key needs to be passed for signing a token");
+ System.exit(1);
+ } else if (secretKey != null && privateKey != null) {
+ System.err.println(
+ "Only one of --secret-key and --private-key needs to be passed for signing a token");
+ System.exit(1);
+ }
+
+ Key signingKey;
+
+ if (privateKey != null) {
+ byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(privateKey);
+ signingKey = AuthTokenUtils.decodePrivateKey(encodedKey);
+ } else {
+ byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(secretKey);
+ signingKey = AuthTokenUtils.decodeSecretKey(encodedKey);
+ }
+
+ Optional optExpiryTime = Optional.empty();
+ if (expiryTime != null) {
+ long relativeTimeMillis = TimeUnit.SECONDS
+ .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(expiryTime));
+ optExpiryTime = Optional.of(new Date(System.currentTimeMillis() + relativeTimeMillis));
+ }
+
+ String token = AuthTokenUtils.createToken(signingKey, subject, optExpiryTime);
+ System.out.println(token);
+ }
+ }
+
+ @Parameters(commandDescription = "Show the content of token")
+ public static class CommandShowToken {
+
+ @Parameter(description = "The token string", arity = 1)
+ private java.util.List args;
+
+ @Parameter(names = { "-i",
+ "--stdin" }, description = "Read token from standard input")
+ private Boolean stdin = false;
+
+ @Parameter(names = { "-f",
+ "--token-file" }, description = "Read token from a file")
+ private String tokenFile;
+
+ public void run() throws Exception {
+ String token;
+ if (args != null) {
+ token = args.get(0);
+ } else if (stdin) {
+ @Cleanup
+ BufferedReader r = new BufferedReader(new InputStreamReader(System.in));
+ token = r.readLine();
+ } else if (tokenFile != null) {
+ token = new String(Files.readAllBytes(Paths.get(tokenFile)), Charsets.UTF_8);
+ } else if (System.getenv("TOKEN") != null) {
+ token = System.getenv("TOKEN");
+ } else {
+ System.err.println(
+ "Token needs to be either passed as an argument or through `--stdin`, `--token-file` or by the `TOKEN` environment variable");
+ System.exit(1);
+ return;
+ }
+
+ String[] parts = token.split("\\.");
+ System.out.println(new String(Decoders.BASE64URL.decode(parts[0])));
+ System.out.println("---");
+ System.out.println(new String(Decoders.BASE64URL.decode(parts[1])));
+ }
+ }
+
+ @Parameters(commandDescription = "Validate a token against a key")
+ public static class CommandValidateToken {
+
+ @Parameter(description = "The token string", arity = 1)
+ private java.util.List args;
+
+ @Parameter(names = { "-i",
+ "--stdin" }, description = "Read token from standard input")
+ private Boolean stdin = false;
+
+ @Parameter(names = { "-f",
+ "--token-file" }, description = "Read token from a file")
+ private String tokenFile;
+
+ @Parameter(names = { "-sk",
+ "--secret-key" }, description = "Pass the secret key for validating the token. This can either be: data:, file:, etc..")
+ private String secretKey;
+
+ @Parameter(names = { "-pk",
+ "--public-key" }, description = "Pass the public key for validating the token. This can either be: data:, file:, etc..")
+ private String publicKey;
+
+ public void run() throws Exception {
+ if (secretKey == null && publicKey == null) {
+ System.err.println(
+ "Either --secret-key or --public-key needs to be passed for signing a token");
+ System.exit(1);
+ } else if (secretKey != null && publicKey != null) {
+ System.err.println(
+ "Only one of --secret-key and --public-key needs to be passed for signing a token");
+ System.exit(1);
+ }
+
+ String token;
+ if (args != null) {
+ token = args.get(0);
+ } else if (stdin) {
+ @Cleanup
+ BufferedReader r = new BufferedReader(new InputStreamReader(System.in));
+ token = r.readLine();
+ } else if (tokenFile != null) {
+ token = new String(Files.readAllBytes(Paths.get(tokenFile)), Charsets.UTF_8);
+ } else if (System.getenv("TOKEN") != null) {
+ token = System.getenv("TOKEN");
+ } else {
+ System.err.println(
+ "Token needs to be either passed as an argument or through `--stdin`, `--token-file` or by the `TOKEN` environment variable");
+ System.exit(1);
+ return;
+ }
+
+ Key validationKey;
+
+ if (publicKey != null) {
+ byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(publicKey);
+ validationKey = AuthTokenUtils.decodePublicKey(encodedKey);
+ } else {
+ byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(secretKey);
+ validationKey = AuthTokenUtils.decodeSecretKey(encodedKey);
+ }
+
+ // Validate the token
+ @SuppressWarnings("unchecked")
+ Jwt, Claims> jwt = Jwts.parser()
+ .setSigningKey(validationKey)
+ .parse(token);
+
+ System.out.println(jwt.getBody());
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Arguments arguments = new Arguments();
+ JCommander jcommander = new JCommander(arguments);
+
+ CommandCreateSecretKey commandCreateSecretKey = new CommandCreateSecretKey();
+ jcommander.addCommand("create-secret-key", commandCreateSecretKey);
+
+ CommandCreateKeyPair commandCreateKeyPair = new CommandCreateKeyPair();
+ jcommander.addCommand("create-key-pair", commandCreateKeyPair);
+
+ CommandCreateToken commandCreateToken = new CommandCreateToken();
+ jcommander.addCommand("create", commandCreateToken);
+
+ CommandShowToken commandShowToken = new CommandShowToken();
+ jcommander.addCommand("show", commandShowToken);
+
+ CommandValidateToken commandValidateToken = new CommandValidateToken();
+ jcommander.addCommand("validate", commandValidateToken);
+
+ try {
+ jcommander.parse(args);
+
+ if (arguments.help || jcommander.getParsedCommand() == null) {
+ jcommander.usage();
+ System.exit(1);
+ }
+ } catch (Exception e) {
+ jcommander.usage();
+ System.err.println(e);
+ System.exit(1);
+ }
+
+ String cmd = jcommander.getParsedCommand();
+
+ if (cmd.equals("create-secret-key")) {
+ commandCreateSecretKey.run();
+ } else if (cmd.equals("create-key-pair")) {
+ commandCreateKeyPair.run();
+ } else if (cmd.equals("create")) {
+ commandCreateToken.run();
+ } else if (cmd.equals("show")) {
+ commandShowToken.run();
+ } else if (cmd.equals("validate")) {
+ commandValidateToken.run();
+ } else {
+ System.err.println("Invalid command: " + cmd);
+ System.exit(1);
+ }
+ }
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index 7af1b9f0d4223..99d27d896295d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -102,31 +102,6 @@ static long validateSizeString(String s) {
}
}
- static int validateTimeString(String s) {
- char last = s.charAt(s.length() - 1);
- String subStr = s.substring(0, s.length() - 1);
- switch (last) {
- case 'm':
- case 'M':
- return Integer.parseInt(subStr);
-
- case 'h':
- case 'H':
- return Integer.parseInt(subStr) * 60;
-
- case 'd':
- case 'D':
- return Integer.parseInt(subStr) * 24 * 60;
-
- case 'w':
- case 'W':
- return Integer.parseInt(subStr) * 7 * 24 * 60;
-
- default:
- return Integer.parseInt(s);
- }
- }
-
static MessageId validateMessageIdString(String resetMessageIdStr) throws PulsarAdminException {
String[] messageId = resetMessageIdStr.split(":");
try {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 3b724f2ea9a9f..665061887477b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -44,6 +44,7 @@
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations about namespaces")
public class CmdNamespaces extends CmdBase {
@@ -352,9 +353,16 @@ private class SetRetention extends CliCommand {
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
long sizeLimit = validateSizeString(limitStr);
- int retentionTimeInMin = validateTimeString(retentionTimeStr);
+ long retentionTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr);
- int retentionSizeInMB;
+ final int retentionTimeInMin;
+ if (retentionTimeInSec != -1) {
+ retentionTimeInMin = (int) TimeUnit.SECONDS.toMinutes(retentionTimeInSec);
+ } else {
+ retentionTimeInMin = -1;
+ }
+
+ final int retentionSizeInMB;
if (sizeLimit != -1) {
retentionSizeInMB = (int) (sizeLimit / (1024 * 1024));
} else {
@@ -901,7 +909,8 @@ private class SetOffloadDeletionLag extends CliCommand {
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
- admin.namespaces().setOffloadDeleteLag(namespace, validateTimeString(lag), TimeUnit.MINUTES);
+ admin.namespaces().setOffloadDeleteLag(namespace, RelativeTimeUtil.parseRelativeTimeInSeconds(lag),
+ TimeUnit.SECONDS);
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index bdc8761e6b18c..2314ae6aceb1a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -31,7 +31,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
-
+import org.apache.pulsar.common.util.RelativeTimeUtil;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
@@ -240,7 +240,7 @@ private class DeletePartitionedCmd extends CliCommand {
@Parameter(description = "persistent://property/cluster/namespace/topic\n", required = true)
private java.util.List params;
-
+
@Parameter(names = "--force", description = "Close all producer/consumer/replicator and delete topic forcefully")
private boolean force = false;
@@ -259,7 +259,7 @@ private class DeleteCmd extends CliCommand {
@Parameter(names = "--force", description = "Close all producer/consumer/replicator and delete topic forcefully")
private boolean force = false;
-
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
@@ -374,7 +374,7 @@ void run() throws Exception {
print(persistentTopics.getPartitionedInternalStats(persistentTopic));
}
}
-
+
@Parameters(commandDescription = "Skip all the messages for the subscription")
private class SkipAll extends CliCommand {
@Parameter(description = "persistent://property/cluster/namespace/topic", required = true)
@@ -496,8 +496,8 @@ void run() throws PulsarAdminException {
MessageId messageId = validateMessageIdString(resetMessageIdStr);
persistentTopics.resetCursor(persistentTopic, subName, messageId);
} else if (isNotBlank(resetTimeStr)) {
- int resetBackTimeInMin = validateTimeString(resetTimeStr);
- long resetTimeInMillis = TimeUnit.MILLISECONDS.convert(resetBackTimeInMin, TimeUnit.MINUTES);
+ long resetTimeInMillis = TimeUnit.SECONDS
+ .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
// now - go back time
long timestamp = System.currentTimeMillis() - resetTimeInMillis;
persistentTopics.resetCursor(persistentTopic, subName, timestamp);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index a85df44b30494..5014f14ac054e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -46,6 +46,7 @@
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations on persistent topics")
public class CmdTopics extends CmdBase {
@@ -494,8 +495,8 @@ void run() throws PulsarAdminException {
MessageId messageId = validateMessageIdString(resetMessageIdStr);
topics.resetCursor(persistentTopic, subName, messageId);
} else if (isNotBlank(resetTimeStr)) {
- int resetBackTimeInMin = validateTimeString(resetTimeStr);
- long resetTimeInMillis = TimeUnit.MILLISECONDS.convert(resetBackTimeInMin, TimeUnit.MINUTES);
+ long resetTimeInMillis = TimeUnit.SECONDS
+ .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
// now - go back time
long timestamp = System.currentTimeMillis() - resetTimeInMillis;
topics.resetCursor(persistentTopic, subName, timestamp);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Authentication.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Authentication.java
index bd564f591eb35..b3a2172677310 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Authentication.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Authentication.java
@@ -22,6 +22,9 @@
import java.io.Serializable;
import java.util.Map;
+/**
+ * Interface of authentication providers.
+ */
public interface Authentication extends Closeable, Serializable {
/**
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
index f955a5695fd21..21060a1525d5c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
@@ -19,14 +19,49 @@
package org.apache.pulsar.client.api;
import java.util.Map;
+import java.util.function.Supplier;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
public final class AuthenticationFactory {
+ /**
+ * Create an authentication provider for token based authentication.
+ *
+ * @param token
+ * the client auth token
+ */
+ public static Authentication token(String token) {
+ return new AuthenticationToken(token);
+ }
+
+ /**
+ * Create an authentication provider for token based authentication.
+ *
+ * @param tokenSupplier
+ * a supplier of the client auth token
+ */
+ public static Authentication token(Supplier tokenSupplier) {
+ return new AuthenticationToken(tokenSupplier);
+ }
+
+ /**
+ * Create an authentication provider for TLS based authentication.
+ *
+ * @param certFilePath
+ * the path to the TLS client public key
+ * @param keyFilePath
+ * the path to the TLS client private key
+ */
+ public static Authentication TLS(String certFilePath, String keyFilePath) {
+ return new AuthenticationTls(certFilePath, keyFilePath);
+ }
+
/**
* Create an instance of the Authentication-Plugin
*
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataToken.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataToken.java
new file mode 100644
index 0000000000000..f04400b9cf649
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataToken.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+public class AuthenticationDataToken implements AuthenticationDataProvider {
+ public final static String HTTP_HEADER_NAME = "Authorization";
+
+ private final Supplier tokenSupplier;
+
+ public AuthenticationDataToken(Supplier tokenSupplier) {
+ this.tokenSupplier = tokenSupplier;
+ }
+
+ @Override
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @Override
+ public Set> getHttpHeaders() {
+ return Collections.singletonMap(HTTP_HEADER_NAME, "Bearer " + getToken()).entrySet();
+ }
+
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return getToken();
+ }
+
+ private String getToken() {
+ try {
+ return tokenSupplier.get();
+ } catch (Throwable t) {
+ throw new RuntimeException("failed to get client token", t);
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index b2ad87544e5bc..859487979ee99 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -41,6 +41,14 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
private String certFilePath;
private String keyFilePath;
+ public AuthenticationTls() {
+ }
+
+ public AuthenticationTls(String certFilePath, String keyFilePath) {
+ this.certFilePath = certFilePath;
+ this.keyFilePath = keyFilePath;
+ }
+
@Override
public void close() throws IOException {
// noop
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
new file mode 100644
index 0000000000000..9e49016aca78d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import com.google.common.base.Charsets;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * Token based authentication provider.
+ */
+public class AuthenticationToken implements Authentication, EncodedAuthenticationParameterSupport {
+
+ private Supplier tokenSupplier;
+
+ public AuthenticationToken() {
+ }
+
+ public AuthenticationToken(String token) {
+ this(() -> token);
+ }
+
+ public AuthenticationToken(Supplier tokenSupplier) {
+ this.tokenSupplier = tokenSupplier;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "token";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ return new AuthenticationDataToken(tokenSupplier);
+ }
+
+ @Override
+ public void configure(String encodedAuthParamString) {
+ // Interpret the whole param string as the token. If the string contains the notation `token:xxxxx` then strip
+ // the prefix
+ if (encodedAuthParamString.startsWith("token:")) {
+ this.tokenSupplier = () -> encodedAuthParamString.substring("token:".length());
+ } else if (encodedAuthParamString.startsWith("file:")) {
+ // Read token from a file
+ URI filePath = URI.create(encodedAuthParamString);
+ this.tokenSupplier = () -> {
+ try {
+ return new String(Files.readAllBytes(Paths.get(filePath)), Charsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read token from file", e);
+ }
+ };
+ } else {
+ this.tokenSupplier = () -> encodedAuthParamString;
+ }
+ }
+
+ @Override
+ public void configure(Map authParams) {
+ // noop
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // noop
+ }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
new file mode 100644
index 0000000000000..b2f3a7fd20515
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.base.Charsets;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.testng.annotations.Test;
+
+public class AuthenticationTokenTest {
+
+ @Test
+ public void testAuthToken() throws Exception {
+ AuthenticationToken authToken = new AuthenticationToken("token-xyz");
+ assertEquals(authToken.getAuthMethodName(), "token");
+
+ AuthenticationDataProvider authData = authToken.getAuthData();
+ assertTrue(authData.hasDataFromCommand());
+ assertEquals(authData.getCommandData(), "token-xyz");
+
+ assertFalse(authData.hasDataForTls());
+ assertNull(authData.getTlsCertificates());
+ assertNull(authData.getTlsPrivateKey());
+
+ assertTrue(authData.hasDataForHttp());
+ assertEquals(authData.getHttpHeaders(),
+ Collections.singletonMap("Authorization", "Bearer token-xyz").entrySet());
+
+ authToken.close();
+ }
+
+ @Test
+ public void testAuthTokenConfig() throws Exception {
+ AuthenticationToken authToken = new AuthenticationToken();
+ authToken.configure("token:my-test-token-string");
+ assertEquals(authToken.getAuthMethodName(), "token");
+
+ AuthenticationDataProvider authData = authToken.getAuthData();
+ assertTrue(authData.hasDataFromCommand());
+ assertEquals(authData.getCommandData(), "my-test-token-string");
+ authToken.close();
+ }
+
+ @Test
+ public void testAuthTokenConfigFromFile() throws Exception {
+ File tokenFile = File.createTempFile("pular-test-token", ".key");
+ tokenFile.deleteOnExit();
+ FileUtils.write(tokenFile, "my-test-token-string", Charsets.UTF_8);
+
+ AuthenticationToken authToken = new AuthenticationToken();
+ authToken.configure("file://" + tokenFile);
+ assertEquals(authToken.getAuthMethodName(), "token");
+
+ AuthenticationDataProvider authData = authToken.getAuthData();
+ assertTrue(authData.hasDataFromCommand());
+ assertEquals(authData.getCommandData(), "my-test-token-string");
+
+ // Ensure if the file content changes, the token will get refreshed as well
+ FileUtils.write(tokenFile, "other-token", Charsets.UTF_8);
+
+ AuthenticationDataProvider authData2 = authToken.getAuthData();
+ assertTrue(authData2.hasDataFromCommand());
+ assertEquals(authData2.getCommandData(), "other-token");
+
+ authToken.close();
+ }
+
+ @Test
+ public void testAuthTokenConfigNoPrefix() throws Exception {
+ AuthenticationToken authToken = new AuthenticationToken();
+ authToken.configure("my-test-token-string");
+ assertEquals(authToken.getAuthMethodName(), "token");
+
+ AuthenticationDataProvider authData = authToken.getAuthData();
+ assertTrue(authData.hasDataFromCommand());
+ assertEquals(authData.getCommandData(), "my-test-token-string");
+ authToken.close();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
similarity index 82%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
index f4147cd027170..e70dcecf3e1a1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
@@ -33,12 +33,15 @@
public class DataURLStreamHandler extends URLStreamHandler {
- class DataURLConnection extends URLConnection {
+ static class DataURLConnection extends URLConnection {
private boolean parsed = false;
private String contentType;
- private String data;
+ private byte[] data;
private URI uri;
+ private static final Pattern pattern = Pattern.compile(
+ "(?[^;,]+)?(;(?charset=[^;,]+))?(;(?base64))?,(?.+)", Pattern.DOTALL);
+
protected DataURLConnection(URL url) {
super(url);
try {
@@ -57,20 +60,19 @@ public void connect() throws IOException {
if (this.uri == null) {
throw new IOException();
}
- Pattern pattern = Pattern.compile(
- "(?.+?)(;(?charset=.+?))?(;(?base64?))?,(?.+)", Pattern.DOTALL);
+
Matcher matcher = pattern.matcher(this.uri.getSchemeSpecificPart());
if (matcher.matches()) {
this.contentType = matcher.group("mimeType");
- String charset = matcher.group("charset");
- if (charset == null) {
- charset = "US-ASCII";
+ if (contentType == null) {
+ this.contentType = "application/data";
}
+
if (matcher.group("base64") == null) {
// Support Urlencode but not decode here because already decoded by URI class.
- this.data = new String(matcher.group("data").getBytes(), charset);
+ this.data = matcher.group("data").getBytes();
} else {
- this.data = new String(Base64.getDecoder().decode(matcher.group("data")), charset);
+ this.data = Base64.getDecoder().decode(matcher.group("data"));
}
} else {
throw new MalformedURLException();
@@ -83,7 +85,7 @@ public long getContentLengthLong() {
long length;
try {
this.connect();
- length = this.data.length();
+ length = this.data.length;
} catch (IOException e) {
length = -1;
}
@@ -109,7 +111,7 @@ public String getContentEncoding() {
public InputStream getInputStream() throws IOException {
this.connect();
- return new ByteArrayInputStream(this.data.getBytes());
+ return new ByteArrayInputStream(this.data);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
similarity index 95%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
index af7b668f611b9..b09d384c24248 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
@@ -24,7 +24,7 @@
import java.util.Map;
public class PulsarURLStreamHandlerFactory implements URLStreamHandlerFactory {
- static Map> handlers;
+ private static final Map> handlers;
static {
handlers = new HashMap<>();
handlers.put("data", DataURLStreamHandler.class);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
similarity index 88%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
index 4d8c36745554e..e5246b76393dc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
@@ -26,8 +26,8 @@
import java.net.URLStreamHandlerFactory;
public class URL {
- private static URLStreamHandlerFactory urlStreamHandlerFactory = new PulsarURLStreamHandlerFactory();
- private java.net.URL url;
+ private static final URLStreamHandlerFactory urlStreamHandlerFactory = new PulsarURLStreamHandlerFactory();
+ private final java.net.URL url;
public URL(String spec)
throws MalformedURLException, URISyntaxException, InstantiationException, IllegalAccessException {
@@ -47,7 +47,7 @@ public Object getContent() throws IOException {
return this.url.getContent();
}
- public Object getContent(Class[] classes) throws IOException {
+ public Object getContent(Class>[] classes) throws IOException {
return this.url.getContent(classes);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java
new file mode 100644
index 0000000000000..3034960d8af4f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.util.concurrent.TimeUnit;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class RelativeTimeUtil {
+ public static long parseRelativeTimeInSeconds(String relativeTime) {
+ if (relativeTime.isEmpty()) {
+ throw new IllegalArgumentException("exipiry time cannot be empty");
+ }
+
+ int lastIndex= relativeTime.length() - 1;
+ char lastChar = relativeTime.charAt(lastIndex);
+ final char timeUnit;
+
+ if (!Character.isAlphabetic(lastChar)) {
+ // No unit specified, assume seconds
+ timeUnit = 's';
+ lastIndex = relativeTime.length();
+ } else {
+ timeUnit = Character.toLowerCase(lastChar);
+ }
+
+ long duration = Long.parseLong(relativeTime.substring(0, lastIndex));
+
+ switch (timeUnit) {
+ case 's':
+ return duration;
+ case 'm':
+ return TimeUnit.MINUTES.toSeconds(duration);
+ case 'h':
+ return TimeUnit.HOURS.toSeconds(duration);
+ case 'd':
+ return TimeUnit.DAYS.toSeconds(duration);
+ case 'w':
+ return 7 * TimeUnit.DAYS.toSeconds(duration);
+ // No unit for months
+ case 'y':
+ return 365 * TimeUnit.DAYS.toSeconds(duration);
+ default:
+ throw new IllegalArgumentException("Invalid time unit '" + lastChar + "'");
+ }
+ }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RelativeTimeUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RelativeTimeUtilTest.java
new file mode 100644
index 0000000000000..bc44ac4e8819f
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RelativeTimeUtilTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import org.testng.annotations.Test;
+
+public class RelativeTimeUtilTest {
+ @Test
+ public void testParseRelativeTime() {
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("-1"), -1);
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("7"), 7);
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("3s"), 3);
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("3S"), 3);
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("5m"), TimeUnit.MINUTES.toSeconds(5));
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("5M"), TimeUnit.MINUTES.toSeconds(5));
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("7h"), TimeUnit.HOURS.toSeconds(7));
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("7H"), TimeUnit.HOURS.toSeconds(7));
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("9d"), TimeUnit.DAYS.toSeconds(9));
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("9D"), TimeUnit.DAYS.toSeconds(9));
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("3w"), 7 * TimeUnit.DAYS.toSeconds(3));
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("11y"), 365 * TimeUnit.DAYS.toSeconds(11));
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("11Y"), 365 * TimeUnit.DAYS.toSeconds(11));
+
+ // Negative interval
+ assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("-5m"), -TimeUnit.MINUTES.toSeconds(5));
+
+ try {
+ RelativeTimeUtil.parseRelativeTimeInSeconds("");
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ // Invalid time unit specified
+ RelativeTimeUtil.parseRelativeTimeInSeconds("1234x");
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+}
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 0d235896b88fc..2fec49e6e1bcd 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -108,7 +108,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|enableNonPersistentTopics| Whether non-persistent topics are enabled on the broker |true|
|functionsWorkerEnabled| Whether the Pulsar Functions worker service is enabled in the broker |false|
|zookeeperServers| Zookeeper quorum connection string ||
-|globalZookeeperServers| Global Zookeeper quorum connection string ||
+|globalZookeeperServers| Global Zookeeper quorum connection string ||
|brokerServicePort| Broker data port |6650|
|brokerServicePortTls| Broker data port for TLS |6651|
|webServicePort| Port to use to server HTTP request |8080|
@@ -142,6 +142,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|tlsAllowInsecureConnection| Accept untrusted TLS certificate from client |false|
|tlsProtocols|Specify the tls protocols the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLSv1.2```, ```TLSv1.1```, ```TLSv1``` ||
|tlsCiphers|Specify the tls cipher the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256```||
+|tokenSecretKey| Configure the secret key to be used to validate auth tokens. The key can be specified like: `tokenSecretKey=data:base64,xxxxxxxxx` or `tokenSecretKey=file:///my/secret.key`||
+|tokenPublicKey| Configure the public key to be used to validate auth tokens. The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or `tokenPublicKey=file:///my/secret.key`||
|maxUnackedMessagesPerConsumer| Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back. Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction |50000|
|maxUnackedMessagesPerSubscription| Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit check and dispatcher can dispatch messages without any restriction |200000|
|maxConcurrentLookupRequest| Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic |50000|
@@ -386,7 +388,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|zooKeeperSessionTimeoutMillis| |30000|
|serviceUrl|||
|serviceUrlTls|||
-|brokerServiceUrl|||
+|brokerServiceUrl|||
|brokerServiceUrlTls|||
|webServicePort||8080|
|webServicePortTls||8443|
@@ -397,7 +399,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|authorizationEnabled||false|
|superUserRoles |||
|brokerClientAuthenticationPlugin|||
-|brokerClientAuthenticationParameters|||
+|brokerClientAuthenticationParameters|||
|tlsEnabled||false|
|tlsAllowInsecureConnection||false|
|tlsCertificateFilePath|||
@@ -405,7 +407,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|tlsTrustCertsFilePath|||
-## Pulsar proxy
+## Pulsar proxy
The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be configured in the `conf/proxy.conf` file.
@@ -438,6 +440,8 @@ The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be config
|tlsRequireTrustedClientCertOnConnect| Whether client certificates are required for TLS. Connections are rejected if the client certificate isn’t trusted. |false|
|tlsProtocols|Specify the tls protocols the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLSv1.2```, ```TLSv1.1```, ```TLSv1``` ||
|tlsCiphers|Specify the tls cipher the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256```||
+|tokenSecretKey| Configure the secret key to be used to validate auth tokens. The key can be specified like: `tokenSecretKey=data:base64,xxxxxxxxx` or `tokenSecretKey=file:///my/secret.key`||
+|tokenPublicKey| Configure the public key to be used to validate auth tokens. The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or `tokenPublicKey=file:///my/secret.key`||
## ZooKeeper
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
new file mode 100644
index 0000000000000..56068118d4cfa
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.token;
+
+import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.stream.Stream;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.ProxyContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Network;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+@Slf4j
+public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTestBase implements ITest {
+
+ protected String superUserAuthToken;
+ protected String proxyAuthToken;
+ protected String clientAuthToken;
+
+ protected abstract void createKeysAndTokens(PulsarContainer> container) throws Exception;
+ protected abstract void configureBroker(BrokerContainer brokerContainer) throws Exception;
+ protected abstract void configureProxy(ProxyContainer proxyContainer) throws Exception;
+
+ protected static final String SUPER_USER_ROLE = "super-user";
+ protected static final String PROXY_ROLE = "proxy";
+ protected static final String REGULAR_USER_ROLE = "client";
+
+ @BeforeSuite
+ @Override
+ public void setupCluster() throws Exception {
+ // Before starting the cluster, generate the secret key and the token
+ // Use Zk container to have 1 container available before starting the cluster
+ try (ZKContainer> zkContainer = new ZKContainer<>("cli-setup")) {
+ zkContainer
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases(ZKContainer.NAME)
+ .withEnv("zkServers", ZKContainer.NAME);
+ zkContainer.start();
+
+ createKeysAndTokens(zkContainer);
+ }
+
+ final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-"));
+
+ PulsarClusterSpec spec = PulsarClusterSpec.builder()
+ .numBookies(2)
+ .numBrokers(1)
+ .numProxies(1)
+ .clusterName(clusterName)
+ .build();
+
+ log.info("Setting up cluster {} with token authentication and {} bookies, {} brokers",
+ spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+ pulsarCluster = PulsarCluster.forSpec(spec);
+
+ for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+ configureBroker(brokerContainer);
+ brokerContainer.withEnv("authenticationEnabled", "true");
+ brokerContainer.withEnv("authenticationProviders",
+ "org.apache.pulsar.broker.authentication.AuthenticationProviderToken");
+ brokerContainer.withEnv("authorizationEnabled", "true");
+ brokerContainer.withEnv("superUserRoles", SUPER_USER_ROLE + "," + PROXY_ROLE);
+ }
+
+ ProxyContainer proxyContainer = pulsarCluster.getProxy();
+ configureProxy(proxyContainer);
+ proxyContainer.withEnv("authenticationEnabled", "true");
+ proxyContainer.withEnv("authenticationProviders",
+ "org.apache.pulsar.broker.authentication.AuthenticationProviderToken");
+ proxyContainer.withEnv("authorizationEnabled", "true");
+ proxyContainer.withEnv("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
+ proxyContainer.withEnv("brokerClientAuthenticationParameters", "token:" + proxyAuthToken);
+
+ pulsarCluster.start();
+
+ log.info("Cluster {} is setup", spec.clusterName());
+ }
+
+ @AfterSuite
+ @Override
+ public void tearDownCluster() {
+ super.tearDownCluster();
+ }
+
+ @Override
+ public String getTestName() {
+ return "token-auth-test-suite";
+ }
+
+ @Test
+ public void testPublishWithTokenAuth() throws Exception {
+ final String tenant = "token-test-tenant" + randomName(4);
+ final String namespace = tenant + "/ns-1";
+ final String topic = "persistent://" + namespace + "/topic-1";
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+ .authentication(AuthenticationFactory.token(superUserAuthToken))
+ .build();
+
+ try {
+ admin.tenants().createTenant(tenant,
+ new TenantInfo(Collections.singleton(REGULAR_USER_ROLE),
+ Collections.singleton(pulsarCluster.getClusterName())));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName()));
+ admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE, EnumSet.allOf(AuthAction.class));
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .authentication(AuthenticationFactory.token(clientAuthToken))
+ .build();
+
+ @Cleanup
+ Producer producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ Consumer consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("hello-" + i);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message msg = consumer.receive();
+ assertEquals(msg.getValue(), "hello-" + i);
+
+ consumer.acknowledge(msg);
+ }
+
+ // Test client with no auth and expect it to fail
+ @Cleanup
+ PulsarClient clientNoAuth = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ try {
+ clientNoAuth.newProducer(Schema.STRING).topic(topic)
+ .create();
+ fail("Should have failed to create producer");
+ } catch (PulsarClientException e) {
+ // Expected
+ }
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java
new file mode 100644
index 0000000000000..7f1a03aa9cd8b
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.token;
+
+import com.google.common.io.Files;
+
+import java.io.File;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.ProxyContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+
+@Slf4j
+public class TokenAuthWithPublicPrivateKeys extends PulsarTokenAuthenticationBaseSuite {
+
+ private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
+ private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";
+
+ private File publicKeyFile;
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ protected void createKeysAndTokens(PulsarContainer container) throws Exception {
+ container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
+ "--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+ "--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+
+ byte[] publicKeyBytes = DockerUtils
+ .runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
+ "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
+ .getStdout();
+
+ publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
+ Files.write(publicKeyBytes, publicKeyFile);
+
+ clientAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+ "--subject", REGULAR_USER_ROLE)
+ .getStdout().trim();
+ log.info("Created client token: {}", clientAuthToken);
+
+ superUserAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+ "--subject", SUPER_USER_ROLE)
+ .getStdout().trim();
+ log.info("Created super-user token: {}", superUserAuthToken);
+
+ proxyAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+ "--subject", PROXY_ROLE)
+ .getStdout().trim();
+ log.info("Created proxy token: {}", proxyAuthToken);
+ }
+
+ @Override
+ protected void configureBroker(BrokerContainer brokerContainer) throws Exception {
+ brokerContainer.withFileSystemBind(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+ brokerContainer.withEnv("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+ }
+
+ @Override
+ protected void configureProxy(ProxyContainer proxyContainer) throws Exception {
+ proxyContainer.withFileSystemBind(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+ proxyContainer.withEnv("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java
new file mode 100644
index 0000000000000..96efa6c2ec13f
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.token;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.ProxyContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+
+@Slf4j
+public class TokenAuthWithSymmetricKeys extends PulsarTokenAuthenticationBaseSuite {
+
+ private String secretKey;
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ protected void createKeysAndTokens(PulsarContainer container) throws Exception {
+ secretKey = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-secret-key", "--base64")
+ .getStdout();
+ log.info("Created secret key: {}", secretKey);
+
+ clientAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--secret-key", "data:base64," + secretKey,
+ "--subject", REGULAR_USER_ROLE)
+ .getStdout().trim();
+ log.info("Created client token: {}", clientAuthToken);
+
+ superUserAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--secret-key", "data:base64," + secretKey,
+ "--subject", SUPER_USER_ROLE)
+ .getStdout().trim();
+ log.info("Created super-user token: {}", superUserAuthToken);
+
+ proxyAuthToken = container
+ .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
+ "--secret-key", "data:base64," + secretKey,
+ "--subject", PROXY_ROLE)
+ .getStdout().trim();
+ log.info("Created proxy token: {}", proxyAuthToken);
+ }
+
+ @Override
+ protected void configureBroker(BrokerContainer brokerContainer) throws Exception {
+ brokerContainer.withEnv("tokenSecretKey", "data:base64," + secretKey);
+ }
+
+ @Override
+ protected void configureProxy(ProxyContainer proxyContainer) throws Exception {
+ proxyContainer.withEnv("tokenSecretKey", "data:base64," + secretKey);
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java
new file mode 100644
index 0000000000000..0f29f187da601
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.docker;
+
+import lombok.Data;
+
+/**
+ * Represents the result of executing a command.
+ */
+@Data(staticConstructor = "of")
+public class ContainerExecResultBytes {
+
+ private final int exitCode;
+ private final byte[] stdout;
+ private final byte[] stderr;
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index c04b6692e6aa3..803bde7ce91dd 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -452,6 +452,10 @@ public Collection getBrokers() {
return brokerContainers.values();
}
+ public ProxyContainer getProxy() {
+ return proxyContainer;
+ }
+
public Collection getBookies() {
return bookieContainers.values();
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 5148a3691275a..30ab86984c982 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -24,16 +24,19 @@
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.command.InspectExecResponse;
-import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.ContainerNetwork;
-
+import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
-import java.io.OutputStream;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
@@ -42,11 +45,11 @@
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -236,6 +239,82 @@ public void onComplete() {
return result;
}
+ public static ContainerExecResultBytes runCommandWithRawOutput(DockerClient docker,
+ String containerId,
+ String... cmd)
+ throws ContainerExecException {
+ CompletableFuture future = new CompletableFuture<>();
+ String execid = docker.execCreateCmd(containerId)
+ .withCmd(cmd)
+ .withAttachStderr(true)
+ .withAttachStdout(true)
+ .exec()
+ .getId();
+ String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
+ ByteBuf stdout = Unpooled.buffer();
+ ByteBuf stderr = Unpooled.buffer();
+ docker.execStartCmd(execid).withDetach(false)
+ .exec(new ResultCallback() {
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void onStart(Closeable closeable) {
+ LOG.info("DOCKER.exec({}:{}): Executing...", containerId, cmdString);
+ }
+
+ @Override
+ public void onNext(Frame object) {
+ if (StreamType.STDOUT == object.getStreamType()) {
+ stdout.writeBytes(object.getPayload());
+ } else if (StreamType.STDERR == object.getStreamType()) {
+ stderr.writeBytes(object.getPayload());
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString);
+ future.complete(true);
+ }
+ });
+ future.join();
+
+ InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+ while (resp.isRunning()) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ resp = docker.inspectExecCmd(execid).exec();
+ }
+ int retCode = resp.getExitCode();
+
+ byte[] stdoutBytes = new byte[stdout.readableBytes()];
+ stdout.readBytes(stdoutBytes);
+ byte[] stderrBytes = new byte[stderr.readableBytes()];
+ stderr.readBytes(stderrBytes);
+
+ ContainerExecResultBytes result = ContainerExecResultBytes.of(
+ retCode,
+ stdoutBytes,
+ stderrBytes);
+ LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
+
+ if (retCode != 0) {
+ throw new ContainerExecException(cmdString, containerId, null);
+ }
+ return result;
+ }
+
public static Optional getContainerCluster(DockerClient docker, String containerId) {
return Optional.ofNullable(docker.inspectContainerCmd(containerId)
.exec().getConfig().getLabels().get("cluster"));