Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hot-reloadable remote cluster credentials (with blocking call fix) #103215

Merged
merged 21 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d247ed4
Hot-reloadable remote cluster credentials
n1v0lg Dec 8, 2023
6007cf8
Revert "Revert "Hot-reloadable remote cluster credentials (#102798)" …
n1v0lg Dec 8, 2023
f052d9c
Update docs/changelog/103215.yaml
n1v0lg Dec 8, 2023
74ad43f
Merge branch 'rcs2-reload-fixed' into revert-103211-revert-102798-rcs…
n1v0lg Dec 8, 2023
b668d0a
Merge branch 'revert-103211-revert-102798-rcs2-reload' of github.com:…
n1v0lg Dec 8, 2023
8d13158
Remove changelog
n1v0lg Dec 8, 2023
29766d8
Update docs/changelog/103215.yaml
n1v0lg Dec 8, 2023
2e880e4
Changelog again
n1v0lg Dec 8, 2023
7c2b67e
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
n1v0lg Dec 11, 2023
4072fac
Action types refactor
n1v0lg Dec 11, 2023
140eaaf
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
elasticmachine Dec 11, 2023
2c5380e
Private constructor
n1v0lg Dec 11, 2023
ed34686
Clearer assertion message
n1v0lg Dec 11, 2023
cd63809
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
n1v0lg Dec 11, 2023
4874bb3
Align changelog with original pr
n1v0lg Dec 11, 2023
09398b4
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
n1v0lg Dec 14, 2023
4b1257b
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
n1v0lg Dec 14, 2023
85e104a
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
n1v0lg Dec 14, 2023
cee133b
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
elasticmachine Dec 14, 2023
161fe67
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
elasticmachine Dec 14, 2023
ab23b66
Merge branch 'main' into revert-103211-revert-102798-rcs2-reload
n1v0lg Dec 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/102798.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102798
summary: Hot-reloadable remote cluster credentials
area: Security
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
newConnection,
clusterAlias,
actualProfile.getTransportProfile()
connectionManager.getCredentialsManager()
),
actualProfile.getHandshakeTimeout(),
cn -> true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,28 @@ final class RemoteClusterConnection implements Closeable {
* @param settings the nodes settings object
* @param clusterAlias the configured alias of the cluster to connect to
* @param transportService the local nodes transport service
* @param credentialsProtected Whether the remote cluster is protected by a credentials, i.e. it has a credentials configured
* via secure setting. This means the remote cluster uses the new configurable access RCS model
* (as opposed to the basic model).
* @param credentialsManager object to lookup remote cluster credentials by cluster alias. If a cluster is protected by a credential,
* i.e. it has a credential configured via secure setting.
* This means the remote cluster uses the advances RCS model (as opposed to the basic model).
*/
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, boolean credentialsProtected) {
RemoteClusterConnection(
Settings settings,
String clusterAlias,
TransportService transportService,
RemoteClusterCredentialsManager credentialsManager
) {
this.transportService = transportService;
this.clusterAlias = clusterAlias;
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings, credentialsProtected);
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(
clusterAlias,
settings,
credentialsManager.hasCredentials(clusterAlias)
);
this.remoteConnectionManager = new RemoteConnectionManager(
clusterAlias,
credentialsManager,
createConnectionManager(profile, transportService)
);
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
this.remoteConnectionManager.addListener(transportService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;

import java.util.Map;

import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS;

public class RemoteClusterCredentialsManager {

private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsManager.class);

private volatile Map<String, SecureString> clusterCredentials;

public RemoteClusterCredentialsManager(Settings settings) {
updateClusterCredentials(settings);
}

public void updateClusterCredentials(Settings settings) {
clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings);
logger.debug(
() -> Strings.format(
"Updated remote cluster credentials for clusters: [%s]",
Strings.collectionToCommaDelimitedString(clusterCredentials.keySet())
)
);
}

@Nullable
public SecureString resolveCredentials(String clusterAlias) {
return clusterCredentials.get(clusterAlias);
}

public boolean hasCredentials(String clusterAlias) {
return clusterCredentials.containsKey(clusterAlias);
}

public static final RemoteClusterCredentialsManager EMPTY = new RemoteClusterCredentialsManager(Settings.EMPTY);
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,14 @@ public boolean isRemoteClusterServerEnabled() {

private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
private final Set<String> credentialsProtectedRemoteClusters;
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;

RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
this.transportService = transportService;
this.credentialsProtectedRemoteClusters = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).keySet();

this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings);
if (remoteClusterServerEnabled) {
registerRemoteClusterHandshakeRequestHandler(transportService);
}
Expand Down Expand Up @@ -305,6 +304,14 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski
}
}

public void updateRemoteClusterCredentials(Settings settings) {
remoteClusterCredentialsManager.updateClusterCredentials(settings);
}

public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
return remoteClusterCredentialsManager;
}

@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -363,12 +370,7 @@ synchronized void updateRemoteCluster(
if (remote == null) {
// this is a new cluster we have to add a new representation
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(
finalSettings,
clusterAlias,
transportService,
credentialsProtectedRemoteClusters.contains(clusterAlias)
);
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
} else if (remote.shouldRebuildConnection(newSettings)) {
Expand All @@ -380,12 +382,7 @@ synchronized void updateRemoteCluster(
}
remoteClusters.remove(clusterAlias);
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(
finalSettings,
clusterAlias,
transportService,
credentialsProtectedRemoteClusters.contains(clusterAlias)
);
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand All @@ -25,18 +26,19 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;

public class RemoteConnectionManager implements ConnectionManager {

private final String clusterAlias;
private final RemoteClusterCredentialsManager credentialsManager;
private final ConnectionManager delegate;
private final AtomicLong counter = new AtomicLong();
private volatile List<DiscoveryNode> connectedNodes = Collections.emptyList();

RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) {
RemoteConnectionManager(String clusterAlias, RemoteClusterCredentialsManager credentialsManager, ConnectionManager delegate) {
this.clusterAlias = clusterAlias;
this.credentialsManager = credentialsManager;
this.delegate = delegate;
this.delegate.addListener(new TransportConnectionListener() {
@Override
Expand All @@ -51,6 +53,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
});
}

public RemoteClusterCredentialsManager getCredentialsManager() {
return credentialsManager;
}

/**
* Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode}
* instead of this method.
Expand Down Expand Up @@ -95,13 +101,7 @@ public void openConnection(DiscoveryNode node, @Nullable ConnectionProfile profi
node,
profile,
listener.delegateFailureAndWrap(
(l, connection) -> l.onResponse(
new InternalRemoteConnection(
connection,
clusterAlias,
profile != null ? profile.getTransportProfile() : getConnectionProfile().getTransportProfile()
)
)
(l, connection) -> l.onResponse(wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager))
)
);
}
Expand Down Expand Up @@ -182,16 +182,35 @@ public void closeNoBlock() {
* @return a cluster alias if the connection target a node in the remote cluster, otherwise an empty result
*/
public static Optional<String> resolveRemoteClusterAlias(Transport.Connection connection) {
return resolveRemoteClusterAliasWithCredentials(connection).map(RemoteClusterAliasWithCredentials::clusterAlias);
}

public record RemoteClusterAliasWithCredentials(String clusterAlias, @Nullable SecureString credentials) {
@Override
public String toString() {
return "RemoteClusterAliasWithCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}";
}
}

/**
* This method returns information (alias and credentials) for remote cluster for the given transport connection.
* Either or both of alias and credentials can be null depending on the connection.
*
* @param connection the transport connection for which to resolve a remote cluster alias
*/
public static Optional<RemoteClusterAliasWithCredentials> resolveRemoteClusterAliasWithCredentials(Transport.Connection connection) {
Transport.Connection unwrapped = TransportService.unwrapConnection(connection);
if (unwrapped instanceof InternalRemoteConnection remoteConnection) {
return Optional.of(remoteConnection.getClusterAlias());
return Optional.of(
new RemoteClusterAliasWithCredentials(remoteConnection.getClusterAlias(), remoteConnection.getClusterCredentials())
);
}
return Optional.empty();
}

private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException {
Transport.Connection connection = delegate.getConnection(node);
return new InternalRemoteConnection(connection, clusterAlias, getConnectionProfile().getTransportProfile());
return wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager);
}

private synchronized void addConnectedNode(DiscoveryNode addedNode) {
Expand Down Expand Up @@ -297,21 +316,27 @@ private static final class InternalRemoteConnection implements Transport.Connect
private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class);
private final Transport.Connection connection;
private final String clusterAlias;
private final boolean isRemoteClusterProfile;
@Nullable
private final SecureString clusterCredentials;

InternalRemoteConnection(Transport.Connection connection, String clusterAlias, String transportProfile) {
private InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) {
assert false == connection instanceof InternalRemoteConnection : "should not double wrap";
assert false == connection instanceof ProxyConnection
: "proxy connection should wrap internal remote connection, not the other way around";
this.clusterAlias = Objects.requireNonNull(clusterAlias);
this.connection = Objects.requireNonNull(connection);
this.isRemoteClusterProfile = REMOTE_CLUSTER_PROFILE.equals(Objects.requireNonNull(transportProfile));
this.clusterAlias = Objects.requireNonNull(clusterAlias);
this.clusterCredentials = clusterCredentials;
}

public String getClusterAlias() {
return clusterAlias;
}

@Nullable
public SecureString getClusterCredentials() {
return clusterCredentials;
}

@Override
public DiscoveryNode getNode() {
return connection.getNode();
Expand All @@ -321,7 +346,7 @@ public DiscoveryNode getNode() {
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
final String effectiveAction;
if (isRemoteClusterProfile && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias);
effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
} else {
Expand Down Expand Up @@ -389,8 +414,8 @@ public boolean hasReferences() {
static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo(
Transport.Connection connection,
String clusterAlias,
String transportProfile
RemoteClusterCredentialsManager credentialsManager
) {
return new InternalRemoteConnection(connection, clusterAlias, transportProfile);
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ private ConnectionManager.ConnectionValidator getConnectionValidator(DiscoveryNo
: "transport profile must be consistent between the connection manager and the actual profile";
transportService.connectionValidator(node)
.validate(
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, profile.getTransportProfile()),
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
connection,
clusterAlias,
connectionManager.getCredentialsManager()
),
profile,
listener
);
Expand Down
Loading
Loading