Skip to content

Commit

Permalink
enable checkStyle_plugin in pulsar-broker-common module (#13732)
Browse files Browse the repository at this point in the history
  • Loading branch information
liudezhi2098 authored Jan 13, 2022
1 parent 0648f6b commit adcbe0f
Show file tree
Hide file tree
Showing 58 changed files with 625 additions and 339 deletions.
13 changes: 13 additions & 0 deletions pulsar-broker-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>checkstyle</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void setConf(Configuration conf) {
super.setConf(conf);
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
if (storeProperty == null) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client configuration");
throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client "
+ "configuration");
}

if (!(storeProperty instanceof MetadataStore)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,15 @@ private static Optional<EnsemblePlacementPolicyConfig> getEnsemblePlacementPolic
return Optional.empty();
}

private static Pair<Set<String>, Set<String>> getIsolationGroup(EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
private static Pair<Set<String>, Set<String>> getIsolationGroup(
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
MutablePair<Set<String>, Set<String>> pair = new MutablePair<>();
String className = IsolatedBookieEnsemblePlacementPolicy.class.getName();
if (ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) {
Map<String, Object> properties = ensemblePlacementPolicyConfig.getProperties();
String primaryIsolationGroupString = castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, ""));
String secondaryIsolationGroupString = castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
String secondaryIsolationGroupString =
castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
if (!primaryIsolationGroupString.isEmpty()) {
pair.setLeft(new HashSet(Arrays.asList(primaryIsolationGroupString.split(","))));
}
Expand Down Expand Up @@ -247,8 +249,8 @@ private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
// if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
if (totalAvailableBookiesInPrimaryGroup < ensembleSize) {
log.info(
"Not found enough available-bookies from primary isolation group [{}], checking secondary group [{}]",
primaryIsolationGroup, secondaryIsolationGroup);
"Not found enough available-bookies from primary isolation group [{}], checking secondary "
+ "group [{}]", primaryIsolationGroup, secondaryIsolationGroup);
for (String group : secondaryIsolationGroup) {
Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
if (bookieGroup != null && !bookieGroup.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Pulsar Client API.
*/
package org.apache.pulsar.bookie.rackawareness;

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.net.SocketAddress;
import java.security.cert.Certificate;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.net.InetSocketAddress;
import java.net.SocketAddress;

import javax.servlet.http.HttpServletRequest;

public class AuthenticationDataHttp implements AuthenticationDataSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.authentication;

import java.security.cert.X509Certificate;

import javax.servlet.http.HttpServletRequest;

public class AuthenticationDataHttps extends AuthenticationDataHttp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.pulsar.common.api.AuthData;

/**
* Interface for accessing data which are used in variety of authentication schemes on server side
* Interface for accessing data which are used in variety of authentication schemes on server side.
*/
public interface AuthenticationDataSource {
/*
Expand Down Expand Up @@ -63,7 +63,7 @@ default boolean hasDataFromHttp() {

/**
*
* @return a authentication scheme, or <code>null<c/ode> if the request is not be authenticated
* @return a authentication scheme, or <code>null</code> if the request is not be authenticated.
*/
default String getHttpAuthType() {
return null;
Expand Down Expand Up @@ -141,16 +141,16 @@ default boolean hasSubscription() {
}

/**
* Subscription name can be necessary for consumption
* Subscription name can be necessary for consumption.
*
* @return a <code>String</code> containing the subscription name
*/
default String getSubscription() { return null; }
default String getSubscription() {
return null;
}

/**
* Subscription name can be necessary for consumption
*
* @return a <code>String</code> containing the subscription name
* Subscription name can be necessary for consumption.
*/
default void setSubscription(String subscription) { };
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,24 @@

import java.io.Closeable;
import java.io.IOException;

import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;

import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.util.FutureUtil;

/**
* Provider of authentication mechanism
* Provider of authentication mechanism.
*/
public interface AuthenticationProvider extends Closeable {

/**
* Perform initialization for the authentication provider
* Perform initialization for the authentication provider.
*
* @param config
* broker config object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,23 @@

package org.apache.pulsar.broker.authentication;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.commons.codec.digest.Crypt;
import org.apache.commons.codec.digest.Md5Crypt;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;

import lombok.Cleanup;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;

import javax.naming.AuthenticationException;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

public class AuthenticationProviderBasic implements AuthenticationProvider {
private static final String HTTP_HEADER_NAME = "Authorization";
private static final String CONF_SYSTEM_PROPERTY_KEY = "pulsar.auth.basic.conf";
Expand Down Expand Up @@ -99,7 +97,8 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
throw new AuthenticationException(msg);
}
} catch (AuthenticationException exception) {
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage());
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(),
exception.getMessage());
throw exception;
}
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@
@Slf4j
public class AuthenticationProviderList implements AuthenticationProvider {

private interface AuthProcessor<T, P> {
private interface AuthProcessor<T, W> {

T apply(P process) throws AuthenticationException;
T apply(W process) throws AuthenticationException;

}

static <T, P> T applyAuthProcessor(List<P> processors, AuthProcessor<T, P> authFunc)
static <T, W> T applyAuthProcessor(List<W> processors, AuthProcessor<T, W> authFunc)
throws AuthenticationException {
AuthenticationException authenticationException = null;
for (P ap : processors) {
for (W ap : processors) {
try {
return authFunc.apply(ap);
} catch (AuthenticationException ae) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.io.IOException;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;

import javax.naming.AuthenticationException;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;

Expand All @@ -50,8 +48,8 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
try {
if (authData.hasDataFromTls()) {
/**
* Maybe authentication type should be checked if it is an HTTPS session. However this check fails actually
* because authType is null.
* Maybe authentication type should be checked if it is an HTTPS session. However this check fails
* actually because authType is null.
*
* This check is not necessarily needed, because an untrusted certificate is not passed to
* HttpServletRequest.
Expand Down Expand Up @@ -91,7 +89,8 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
}
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
} catch (AuthenticationException exception) {
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage());
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(),
exception.getMessage());
throw exception;
}
return commonName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,31 @@
package org.apache.pulsar.broker.authentication;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.annotations.VisibleForTesting;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
import io.jsonwebtoken.Jwt;
import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.JwtParser;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.RequiredTypeException;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.security.SignatureException;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.io.IOException;
import java.net.SocketAddress;
import java.security.Key;

import java.util.Date;
import java.util.List;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

import com.google.common.annotations.VisibleForTesting;
import io.jsonwebtoken.ExpiredJwtException;
import io.jsonwebtoken.RequiredTypeException;
import io.jsonwebtoken.JwtParser;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.common.api.AuthData;

import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwt;
import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.security.SignatureException;

public class AuthenticationProviderToken implements AuthenticationProvider {

static final String HTTP_HEADER_NAME = "Authorization";
Expand Down Expand Up @@ -135,9 +131,9 @@ public void initialize(ServiceConfiguration config) throws IOException, IllegalA

this.parser = Jwts.parserBuilder().setSigningKey(this.validationKey).build();

if (audienceClaim != null && audience == null ) {
if (audienceClaim != null && audience == null) {
throw new IllegalArgumentException("Token Audience Claim [" + audienceClaim
+ "] configured, but Audience stands for this broker not.");
+ "] configured, but Audience stands for this broker not.");
}
}

Expand All @@ -157,7 +153,8 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
return role;
} catch (AuthenticationException exception) {
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage());
AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(),
exception.getMessage());
throw exception;
}
}
Expand Down Expand Up @@ -226,7 +223,8 @@ private Jwt<?, Claims> authenticateToken(final String token) throws Authenticati
}

if (jwt.getBody().getExpiration() != null) {
expiringTokenMinutesMetrics.observe((double) (jwt.getBody().getExpiration().getTime() - new Date().getTime()) / (60 * 1000));
expiringTokenMinutesMetrics.observe(
(double) (jwt.getBody().getExpiration().getTime() - new Date().getTime()) / (60 * 1000));
}
return jwt;
} catch (JwtException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Authentication service
* Authentication service.
*
*/
public class AuthenticationService implements Closeable {
Expand Down Expand Up @@ -100,7 +99,8 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent
return providerToUse.authenticate(authData);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + ": " + e.getMessage(), e);
LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + " : "
+ e.getMessage(), e);
}
// Store the exception so we can throw it later instead of a generic one
authenticationException = e;
Expand All @@ -112,7 +112,8 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent
return provider.authenticate(authData);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": " + e.getMessage(), e);
LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": "
+ e.getMessage(), e);
}
// Ignore the exception because we don't know which authentication method is expected here.
}
Expand Down
Loading

0 comments on commit adcbe0f

Please sign in to comment.