Skip to content

Commit

Permalink
[fix][proxy] Refresh auth data if ProxyLookupRequests (apache#20067)
Browse files Browse the repository at this point in the history
Fixes: apache#10816
PIP: apache#19771
Supersedes: apache#19026
Depends on: apache#20062

The Pulsar Proxy does not properly handle authentication data refresh when in state `ProxyLookupRequests`. The consequence is described in apache#10816. Essentially, the problem is that the proxy caches stale authentication data and sends it to the broker leading to connection failures.

apache#17831 attempted to fix the underlying problem, but it missed an important edge cases. Specifically, it missed the case that the `ConnectionPool` will have multiple connections when a lookup gets redirected. As such, the following problem exists (and is fixed by this PR):

1. Client opens connection to perform lookups.
2. Proxy connects to broker 1 to get the topic ownership info.
3. Time passes.
4. Client does an additional lookup, and this topic is on a newly created broker 2. In this case, the proxy opens a new connection with the stale client auth data.
5. Broker 2 rejects the connection because it fails with expired authentication.

* Remove some of the implementation from apache#17831. This new implementation still allows a broker to challenge the client through the proxy, but notably, it limits the number of challenges sent to the client. Further, the proxy does not challenge the client when the auth data is not expired.
* Introduce authentication refresh in the proxy so that the proxy challenges the client any time the auth data is expired.
* Update the `ProxyClientCnx` to get the `clientAuthData` from the `ProxyConnection` to ensure that it gets new authentication data.
* Add clock skew to the `AuthenticationProviderToken`. This is necessary to make some of the testing not flaky and it will also be necessary for users to configure in their clusters.

The `ProxyRefreshAuthTest` covers the existing behavior and I expanded it to cover the edge case described above.

Additionally, testing this part of the code will be much easier to test once we implement apache#19624.

- [x] `doc-not-needed`

PR in forked repository: the relevant tests pass locally, so I am going to skip the forked tests.

(cherry picked from commit 075b625)
  • Loading branch information
michaeljmarshall committed Apr 20, 2023
1 parent 6332aa4 commit 139209a
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class AuthenticationProviderToken implements AuthenticationProvider {

// The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
static final String CONF_TOKEN_AUDIENCE = "tokenAudience";
// The amount of time in seconds that a token is allowed to be out of sync with the server's time when performing
// token validation.
static final String CONF_TOKEN_ALLOWED_CLOCK_SKEW_SECONDS = "tokenAllowedClockSkewSeconds";

static final String TOKEN = "token";

Expand Down Expand Up @@ -101,6 +104,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
private String confTokenPublicAlgSettingName;
private String confTokenAudienceClaimSettingName;
private String confTokenAudienceSettingName;
private String confTokenAllowedClockSkewSecondsSettingName;

@Override
public void close() throws IOException {
Expand All @@ -125,6 +129,7 @@ public void initialize(ServiceConfiguration config) throws IOException, IllegalA
this.confTokenPublicAlgSettingName = prefix + CONF_TOKEN_PUBLIC_ALG;
this.confTokenAudienceClaimSettingName = prefix + CONF_TOKEN_AUDIENCE_CLAIM;
this.confTokenAudienceSettingName = prefix + CONF_TOKEN_AUDIENCE;
this.confTokenAllowedClockSkewSecondsSettingName = prefix + CONF_TOKEN_ALLOWED_CLOCK_SKEW_SECONDS;

// we need to fetch the algorithm before we fetch the key
this.publicKeyAlg = getPublicKeyAlgType(config);
Expand All @@ -133,7 +138,12 @@ public void initialize(ServiceConfiguration config) throws IOException, IllegalA
this.audienceClaim = getTokenAudienceClaim(config);
this.audience = getTokenAudience(config);

this.parser = Jwts.parserBuilder().setSigningKey(this.validationKey).build();
long allowedSkew = getConfTokenAllowedClockSkewSeconds(config);

this.parser = Jwts.parserBuilder()
.setAllowedClockSkewSeconds(allowedSkew)
.setSigningKey(this.validationKey)
.build();

if (audienceClaim != null && audience == null) {
throw new IllegalArgumentException("Token Audience Claim [" + audienceClaim
Expand Down Expand Up @@ -329,6 +339,16 @@ private String getTokenAudience(ServiceConfiguration conf) throws IllegalArgumen
}
}

// get Token's allowed clock skew in seconds. If not configured, defaults to 0.
private long getConfTokenAllowedClockSkewSeconds(ServiceConfiguration conf) throws IllegalArgumentException {
String allowedSkewStr = (String) conf.getProperty(confTokenAllowedClockSkewSecondsSettingName);
if (StringUtils.isNotBlank(allowedSkewStr)) {
return Long.parseLong(allowedSkewStr);
} else {
return 0;
}
}

private static final class TokenAuthenticationState implements AuthenticationState {
private final AuthenticationProviderToken provider;
private final SocketAddress remoteAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,31 @@
import io.netty.channel.EventLoopGroup;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.protocol.Commands;

/**
* Channel handler for Pulsar proxy's Pulsar broker client connections for lookup requests.
* <p>
* Please see {@link org.apache.pulsar.common.protocol.PulsarDecoder} javadoc for important details about handle*
* method parameter instance lifecycle.
*/
@Slf4j
public class ProxyClientCnx extends ClientCnx {
private final boolean forwardClientAuthData;
private final String clientAuthMethod;
private final String clientAuthRole;
private final AuthData clientAuthData;
private final ProxyConnection proxyConnection;

public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
AuthData clientAuthData, String clientAuthMethod, int protocolVersion,
String clientAuthMethod, int protocolVersion,
boolean forwardClientAuthData, ProxyConnection proxyConnection) {
super(conf, eventLoopGroup, protocolVersion);
this.clientAuthRole = clientAuthRole;
this.clientAuthData = clientAuthData;
this.clientAuthMethod = clientAuthMethod;
this.forwardClientAuthData = forwardClientAuthData;
this.proxyConnection = proxyConnection;
Expand All @@ -53,9 +58,15 @@ protected ByteBuf newConnectCommand() throws Exception {
if (log.isDebugEnabled()) {
log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
+ " clientAuthData = {}, clientAuthMethod = {}",
clientAuthRole, clientAuthData, clientAuthMethod);
clientAuthRole, proxyConnection.getClientAuthData(), clientAuthMethod);
}
AuthData clientAuthData = null;
if (forwardClientAuthData) {
// There is a chance this auth data is expired because the ProxyConnection does not do early token refresh.
// Based on the current design, the best option is to configure the broker to accept slightly stale
// authentication data.
clientAuthData = proxyConnection.getClientAuthData();
}

authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
Expand All @@ -69,43 +80,20 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
checkArgument(authChallenge.getChallenge().hasAuthData());

boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
if (!forwardClientAuthData || !isRefresh) {
super.handleAuthChallenge(authChallenge);
return;
}

try {
if (log.isDebugEnabled()) {
log.debug("Proxy {} request to refresh the original client authentication data for "
+ "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
}

proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
protocolVersion))
.addListener(writeFuture -> {
if (writeFuture.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
+ "with method {} for the proxy client {}",
proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
}
} else {
log.error("Failed to send the auth challenge to original client by the proxy {} "
+ "for the proxy client {}",
proxyConnection.ctx().channel(),
ctx.channel(),
writeFuture.cause());
closeWithException(writeFuture.cause());
}
if (forwardClientAuthData && isRefresh) {
proxyConnection.getValidClientAuthData()
.thenApplyAsync(authData -> {
ctx.writeAndFlush(Commands.newAuthResponse(clientAuthMethod, authData, this.protocolVersion,
String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())));
return null;
}, ctx.executor())
.exceptionally(ex -> {
log.warn("Failed to get valid client auth data. Closing connection.", ex);
ctx.close();
return null;
});

if (state == State.SentConnectFrame) {
state = State.Connecting;
}
} catch (Exception e) {
log.error("Failed to send the auth challenge to origin client by the proxy {} for the proxy client {}",
proxyConnection.ctx().channel(), ctx.channel(), e);
closeWithException(e);
} else {
super.handleAuthChallenge(authChallenge);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ public class ProxyConfiguration implements PulsarConfiguration {
+ "to take effect"
)
private boolean forwardAuthorizationCredentials = false;

@FieldContext(
category = CATEGORY_AUTHENTICATION,
doc = "Interval of time for checking for expired authentication credentials. Disable by setting to 0."
)
private int authenticationRefreshCheckSeconds = 60;

@FieldContext(
category = CATEGORY_AUTHENTICATION,
doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true."
Expand Down
Loading

0 comments on commit 139209a

Please sign in to comment.