diff --git a/docs/changelog/102798.yaml b/docs/changelog/102798.yaml new file mode 100644 index 0000000000000..986ad99f96a19 --- /dev/null +++ b/docs/changelog/102798.yaml @@ -0,0 +1,5 @@ +pr: 102798 +summary: Hot-reloadable remote cluster credentials +area: Security +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 320b9cfdbf7e6..cfb6f872ce748 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -179,7 +179,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo( newConnection, clusterAlias, - actualProfile.getTransportProfile() + connectionManager.getCredentialsManager() ), actualProfile.getHandshakeTimeout(), cn -> true, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index a055e4122257f..3c74e46851504 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -57,15 +57,28 @@ final class RemoteClusterConnection implements Closeable { * @param settings the nodes settings object * @param clusterAlias the configured alias of the cluster to connect to * @param transportService the local nodes transport service - * @param credentialsProtected Whether the remote cluster is protected by a credentials, i.e. it has a credentials configured - * via secure setting. This means the remote cluster uses the new configurable access RCS model - * (as opposed to the basic model). + * @param credentialsManager object to lookup remote cluster credentials by cluster alias. If a cluster is protected by a credential, + * i.e. it has a credential configured via secure setting. + * This means the remote cluster uses the advances RCS model (as opposed to the basic model). */ - RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, boolean credentialsProtected) { + RemoteClusterConnection( + Settings settings, + String clusterAlias, + TransportService transportService, + RemoteClusterCredentialsManager credentialsManager + ) { this.transportService = transportService; this.clusterAlias = clusterAlias; - ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings, credentialsProtected); - this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService)); + ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile( + clusterAlias, + settings, + credentialsManager.hasCredentials(clusterAlias) + ); + this.remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + credentialsManager, + createConnectionManager(profile, transportService) + ); this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings); // we register the transport service here as a listener to make sure we notify handlers on disconnect etc. this.remoteConnectionManager.addListener(transportService); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterCredentialsManager.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterCredentialsManager.java new file mode 100644 index 0000000000000..32a5e196c3a0b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterCredentialsManager.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; + +import java.util.Map; + +import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS; + +public class RemoteClusterCredentialsManager { + + private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsManager.class); + + private volatile Map clusterCredentials; + + public RemoteClusterCredentialsManager(Settings settings) { + updateClusterCredentials(settings); + } + + public void updateClusterCredentials(Settings settings) { + clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings); + logger.debug( + () -> Strings.format( + "Updated remote cluster credentials for clusters: [%s]", + Strings.collectionToCommaDelimitedString(clusterCredentials.keySet()) + ) + ); + } + + @Nullable + public SecureString resolveCredentials(String clusterAlias) { + return clusterCredentials.get(clusterAlias); + } + + public boolean hasCredentials(String clusterAlias) { + return clusterCredentials.containsKey(clusterAlias); + } + + public static final RemoteClusterCredentialsManager EMPTY = new RemoteClusterCredentialsManager(Settings.EMPTY); +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index c38f4b26c665f..6bfbb95cbcfe9 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -147,15 +147,14 @@ public boolean isRemoteClusterServerEnabled() { private final TransportService transportService; private final Map remoteClusters = ConcurrentCollections.newConcurrentMap(); - private final Set credentialsProtectedRemoteClusters; + private final RemoteClusterCredentialsManager remoteClusterCredentialsManager; RemoteClusterService(Settings settings, TransportService transportService) { super(settings); this.enabled = DiscoveryNode.isRemoteClusterClient(settings); this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings); this.transportService = transportService; - this.credentialsProtectedRemoteClusters = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).keySet(); - + this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings); if (remoteClusterServerEnabled) { registerRemoteClusterHandshakeRequestHandler(transportService); } @@ -305,6 +304,14 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski } } + public void updateRemoteClusterCredentials(Settings settings) { + remoteClusterCredentialsManager.updateClusterCredentials(settings); + } + + public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() { + return remoteClusterCredentialsManager; + } + @Override protected void updateRemoteCluster(String clusterAlias, Settings settings) { CountDownLatch latch = new CountDownLatch(1); @@ -363,12 +370,7 @@ synchronized void updateRemoteCluster( if (remote == null) { // this is a new cluster we have to add a new representation Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); - remote = new RemoteClusterConnection( - finalSettings, - clusterAlias, - transportService, - credentialsProtectedRemoteClusters.contains(clusterAlias) - ); + remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager); remoteClusters.put(clusterAlias, remote); remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED)); } else if (remote.shouldRebuildConnection(newSettings)) { @@ -380,12 +382,7 @@ synchronized void updateRemoteCluster( } remoteClusters.remove(clusterAlias); Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); - remote = new RemoteClusterConnection( - finalSettings, - clusterAlias, - transportService, - credentialsProtectedRemoteClusters.contains(clusterAlias) - ); + remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager); remoteClusters.put(clusterAlias, remote); remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED)); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java index b16734b273376..3b531d54fb033 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java @@ -12,6 +12,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -25,18 +26,19 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME; public class RemoteConnectionManager implements ConnectionManager { private final String clusterAlias; + private final RemoteClusterCredentialsManager credentialsManager; private final ConnectionManager delegate; private final AtomicLong counter = new AtomicLong(); private volatile List connectedNodes = Collections.emptyList(); - RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) { + RemoteConnectionManager(String clusterAlias, RemoteClusterCredentialsManager credentialsManager, ConnectionManager delegate) { this.clusterAlias = clusterAlias; + this.credentialsManager = credentialsManager; this.delegate = delegate; this.delegate.addListener(new TransportConnectionListener() { @Override @@ -51,6 +53,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti }); } + public RemoteClusterCredentialsManager getCredentialsManager() { + return credentialsManager; + } + /** * Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode} * instead of this method. @@ -95,13 +101,7 @@ public void openConnection(DiscoveryNode node, @Nullable ConnectionProfile profi node, profile, listener.delegateFailureAndWrap( - (l, connection) -> l.onResponse( - new InternalRemoteConnection( - connection, - clusterAlias, - profile != null ? profile.getTransportProfile() : getConnectionProfile().getTransportProfile() - ) - ) + (l, connection) -> l.onResponse(wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager)) ) ); } @@ -182,16 +182,35 @@ public void closeNoBlock() { * @return a cluster alias if the connection target a node in the remote cluster, otherwise an empty result */ public static Optional resolveRemoteClusterAlias(Transport.Connection connection) { + return resolveRemoteClusterAliasWithCredentials(connection).map(RemoteClusterAliasWithCredentials::clusterAlias); + } + + public record RemoteClusterAliasWithCredentials(String clusterAlias, @Nullable SecureString credentials) { + @Override + public String toString() { + return "RemoteClusterAliasWithCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}"; + } + } + + /** + * This method returns information (alias and credentials) for remote cluster for the given transport connection. + * Either or both of alias and credentials can be null depending on the connection. + * + * @param connection the transport connection for which to resolve a remote cluster alias + */ + public static Optional resolveRemoteClusterAliasWithCredentials(Transport.Connection connection) { Transport.Connection unwrapped = TransportService.unwrapConnection(connection); if (unwrapped instanceof InternalRemoteConnection remoteConnection) { - return Optional.of(remoteConnection.getClusterAlias()); + return Optional.of( + new RemoteClusterAliasWithCredentials(remoteConnection.getClusterAlias(), remoteConnection.getClusterCredentials()) + ); } return Optional.empty(); } private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException { Transport.Connection connection = delegate.getConnection(node); - return new InternalRemoteConnection(connection, clusterAlias, getConnectionProfile().getTransportProfile()); + return wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager); } private synchronized void addConnectedNode(DiscoveryNode addedNode) { @@ -297,21 +316,27 @@ private static final class InternalRemoteConnection implements Transport.Connect private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class); private final Transport.Connection connection; private final String clusterAlias; - private final boolean isRemoteClusterProfile; + @Nullable + private final SecureString clusterCredentials; - InternalRemoteConnection(Transport.Connection connection, String clusterAlias, String transportProfile) { + private InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) { assert false == connection instanceof InternalRemoteConnection : "should not double wrap"; assert false == connection instanceof ProxyConnection : "proxy connection should wrap internal remote connection, not the other way around"; - this.clusterAlias = Objects.requireNonNull(clusterAlias); this.connection = Objects.requireNonNull(connection); - this.isRemoteClusterProfile = REMOTE_CLUSTER_PROFILE.equals(Objects.requireNonNull(transportProfile)); + this.clusterAlias = Objects.requireNonNull(clusterAlias); + this.clusterCredentials = clusterCredentials; } public String getClusterAlias() { return clusterAlias; } + @Nullable + public SecureString getClusterCredentials() { + return clusterCredentials; + } + @Override public DiscoveryNode getNode() { return connection.getNode(); @@ -321,7 +346,7 @@ public DiscoveryNode getNode() { public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { final String effectiveAction; - if (isRemoteClusterProfile && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) { + if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) { logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias); effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME; } else { @@ -389,8 +414,8 @@ public boolean hasReferences() { static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo( Transport.Connection connection, String clusterAlias, - String transportProfile + RemoteClusterCredentialsManager credentialsManager ) { - return new InternalRemoteConnection(connection, clusterAlias, transportProfile); + return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias)); } } diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 0dcad9cf6864c..0f68a58faf463 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -357,7 +357,11 @@ private ConnectionManager.ConnectionValidator getConnectionValidator(DiscoveryNo : "transport profile must be consistent between the connection manager and the actual profile"; transportService.connectionValidator(node) .validate( - RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, profile.getTransportProfile()), + RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo( + connection, + clusterAlias, + connectionManager.getCredentialsManager() + ), profile, listener ); diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index ead43d0bac05e..b3c7c5adac95d 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -130,7 +130,11 @@ public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddress() { ); int numOfConnections = randomIntBetween(4, 8); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( clusterAlias, localService, @@ -188,7 +192,11 @@ public void testProxyStrategyWillOpenNewConnectionsOnDisconnect() throws Excepti AtomicBoolean useAddress1 = new AtomicBoolean(true); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( clusterAlias, localService, @@ -263,7 +271,11 @@ public void testConnectFailsWithIncompatibleNodes() { ); int numOfConnections = randomIntBetween(4, 8); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( clusterAlias, localService, @@ -328,7 +340,11 @@ public void testConnectFailsWithNonRetryableException() { ); int numOfConnections = randomIntBetween(4, 8); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( clusterAlias, localService, @@ -388,7 +404,11 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro AtomicBoolean useAddress1 = new AtomicBoolean(true); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( clusterAlias, localService, @@ -459,7 +479,11 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception ); int numOfConnections = randomIntBetween(4, 8); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( clusterAlias, localService, @@ -511,7 +535,11 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) }); try ( - var remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + var remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); var strategy = new ProxyConnectionStrategy( clusterAlias, localService, @@ -554,7 +582,11 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesOrServe ); int numOfConnections = randomIntBetween(4, 8); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( clusterAlias, localService, @@ -672,7 +704,11 @@ public void testServerNameAttributes() { ); int numOfConnections = randomIntBetween(4, 8); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy( clusterAlias, localService, diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index d4f03f1027838..cbe15cc9664f4 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -62,6 +62,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -252,7 +253,14 @@ public void run() { AtomicReference exceptionReference = new AtomicReference<>(); String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode)); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, randomBoolean())) { + try ( + RemoteClusterConnection connection = new RemoteClusterConnection( + settings, + clusterAlias, + service, + randomFrom(RemoteClusterCredentialsManager.EMPTY, buildCredentialsManager(clusterAlias)) + ) + ) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -322,7 +330,14 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.acceptIncomingRequests(); String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, seedNodes); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, false)) { + try ( + RemoteClusterConnection connection = new RemoteClusterConnection( + settings, + clusterAlias, + service, + RemoteClusterCredentialsManager.EMPTY + ) + ) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -470,7 +485,12 @@ private void doTestGetConnectionInfo(boolean hasClusterCredentials) throws Excep settings = Settings.builder().put(settings).setSecureSettings(secureSettings).build(); } try ( - RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, hasClusterCredentials) + RemoteClusterConnection connection = new RemoteClusterConnection( + settings, + clusterAlias, + service, + hasClusterCredentials ? buildCredentialsManager(clusterAlias) : RemoteClusterCredentialsManager.EMPTY + ) ) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); @@ -662,7 +682,12 @@ private void doTestCollectNodes(boolean hasClusterCredentials) throws Exception } try ( - RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, hasClusterCredentials) + RemoteClusterConnection connection = new RemoteClusterConnection( + settings, + clusterAlias, + service, + hasClusterCredentials ? buildCredentialsManager(clusterAlias) : RemoteClusterCredentialsManager.EMPTY + ) ) { CountDownLatch responseLatch = new CountDownLatch(1); AtomicReference> reference = new AtomicReference<>(); @@ -713,7 +738,14 @@ public void testNoChannelsExceptREG() throws Exception { String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode)); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, false)) { + try ( + RemoteClusterConnection connection = new RemoteClusterConnection( + settings, + clusterAlias, + service, + RemoteClusterCredentialsManager.EMPTY + ) + ) { PlainActionFuture plainActionFuture = new PlainActionFuture<>(); connection.ensureConnected(plainActionFuture); plainActionFuture.get(10, TimeUnit.SECONDS); @@ -779,7 +811,14 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, seedNodes); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, randomBoolean())) { + try ( + RemoteClusterConnection connection = new RemoteClusterConnection( + settings, + clusterAlias, + service, + randomFrom(RemoteClusterCredentialsManager.EMPTY, buildCredentialsManager(clusterAlias)) + ) + ) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -873,7 +912,14 @@ public void testGetConnection() throws Exception { service.acceptIncomingRequests(); String clusterAlias = "test-cluster"; Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode)); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, false)) { + try ( + RemoteClusterConnection connection = new RemoteClusterConnection( + settings, + clusterAlias, + service, + RemoteClusterCredentialsManager.EMPTY + ) + ) { PlainActionFuture.get(fut -> connection.ensureConnected(fut.map(x -> null))); for (int i = 0; i < 10; i++) { // always a direct connection as the remote node is already connected @@ -921,4 +967,13 @@ private static Settings buildSniffSettings(String clusterAlias, List see ); return builder.build(); } + + private static RemoteClusterCredentialsManager buildCredentialsManager(String clusterAlias) { + Objects.requireNonNull(clusterAlias); + final Settings.Builder builder = Settings.builder(); + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote." + clusterAlias + ".credentials", randomAlphaOfLength(20)); + builder.setSecureSettings(secureSettings); + return new RemoteClusterCredentialsManager(builder.build()); + } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterCredentialsManagerTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterCredentialsManagerTests.java new file mode 100644 index 0000000000000..f02148a40e47e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterCredentialsManagerTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class RemoteClusterCredentialsManagerTests extends ESTestCase { + public void testResolveRemoteClusterCredentials() { + final String clusterAlias = randomAlphaOfLength(9); + final String otherClusterAlias = randomAlphaOfLength(10); + + final String secret = randomAlphaOfLength(20); + final Settings settings = buildSettingsWithCredentials(clusterAlias, secret); + RemoteClusterCredentialsManager credentialsManager = new RemoteClusterCredentialsManager(settings); + assertThat(credentialsManager.resolveCredentials(clusterAlias).toString(), equalTo(secret)); + assertThat(credentialsManager.hasCredentials(otherClusterAlias), is(false)); + + final String updatedSecret = randomAlphaOfLength(21); + credentialsManager.updateClusterCredentials(buildSettingsWithCredentials(clusterAlias, updatedSecret)); + assertThat(credentialsManager.resolveCredentials(clusterAlias).toString(), equalTo(updatedSecret)); + + credentialsManager.updateClusterCredentials(Settings.EMPTY); + assertThat(credentialsManager.hasCredentials(clusterAlias), is(false)); + } + + private Settings buildSettingsWithCredentials(String clusterAlias, String secret) { + final Settings.Builder builder = Settings.builder(); + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote." + clusterAlias + ".credentials", secret); + return builder.setSecureSettings(secureSettings).build(); + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java index 839138d3c7c34..b1ffda669e6a1 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -23,17 +24,20 @@ import java.io.IOException; import java.net.InetAddress; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class RemoteConnectionManagerTests extends ESTestCase { @@ -49,6 +53,7 @@ public void setUp() throws Exception { transport = mock(Transport.class); remoteConnectionManager = new RemoteConnectionManager( "remote-cluster", + RemoteClusterCredentialsManager.EMPTY, new ClusterConnectionManager(Settings.EMPTY, transport, new ThreadContext(Settings.EMPTY)) ); @@ -120,10 +125,13 @@ public void testResolveRemoteClusterAlias() throws ExecutionException, Interrupt public void testRewriteHandshakeAction() throws IOException { final Transport.Connection connection = mock(Transport.Connection.class); + final String clusterAlias = randomAlphaOfLengthBetween(3, 8); + final RemoteClusterCredentialsManager credentialsResolver = mock(RemoteClusterCredentialsManager.class); + when(credentialsResolver.resolveCredentials(clusterAlias)).thenReturn(new SecureString(randomAlphaOfLength(42))); final Transport.Connection wrappedConnection = RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo( connection, - randomAlphaOfLengthBetween(3, 8), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + clusterAlias, + credentialsResolver ); final long requestId = randomLong(); final TransportRequest request = mock(TransportRequest.class); @@ -142,6 +150,26 @@ public void testRewriteHandshakeAction() throws IOException { verify(connection).sendRequest(requestId, anotherAction, request, options); } + public void testWrapAndResolveConnectionRoundTrip() { + final Transport.Connection connection = mock(Transport.Connection.class); + final String clusterAlias = randomAlphaOfLengthBetween(3, 8); + final RemoteClusterCredentialsManager credentialsResolver = mock(RemoteClusterCredentialsManager.class); + final SecureString credentials = new SecureString(randomAlphaOfLength(42)); + // second credential will never be resolved + when(credentialsResolver.resolveCredentials(clusterAlias)).thenReturn(credentials, (SecureString) null); + final Transport.Connection wrappedConnection = RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo( + connection, + clusterAlias, + credentialsResolver + ); + + final Optional actual = RemoteConnectionManager + .resolveRemoteClusterAliasWithCredentials(wrappedConnection); + + assertThat(actual.isPresent(), is(true)); + assertThat(actual.get(), equalTo(new RemoteConnectionManager.RemoteClusterAliasWithCredentials(clusterAlias, credentials))); + } + private static class TestRemoteConnection extends CloseableConnection { private final DiscoveryNode node; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 5d461e906a266..ca9986ba5eb1f 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -26,7 +26,11 @@ public void testStrategyChangeMeansThatStrategyMustBeRebuilt() { mock(Transport.class), threadContext ); - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + "cluster-alias", + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); FakeConnectionStrategy first = new FakeConnectionStrategy( "cluster-alias", mock(TransportService.class), @@ -46,7 +50,11 @@ public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() { mock(Transport.class), threadContext ); - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + "cluster-alias", + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); FakeConnectionStrategy first = new FakeConnectionStrategy( "cluster-alias", mock(TransportService.class), @@ -69,7 +77,11 @@ public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval()); assertEquals(Compression.Enabled.INDEXING_DATA, connectionManager.getConnectionProfile().getCompressionEnabled()); assertEquals(Compression.Scheme.LZ4, connectionManager.getConnectionProfile().getCompressionScheme()); - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + "cluster-alias", + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); FakeConnectionStrategy first = new FakeConnectionStrategy( "cluster-alias", mock(TransportService.class), diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index 3c955258d45c8..ddee1ff4d690a 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -192,7 +192,11 @@ public void testSniffStrategyWillConnectToAndDiscoverNodes() { threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + hasClusterCredentials ? new RemoteClusterCredentialsManager(clientSettings) : RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -262,7 +266,11 @@ public void testSniffStrategyWillResolveDiscoveryNodesEachConnect() throws Excep threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -336,7 +344,11 @@ public void testSniffStrategyWillConnectToMaxAllowedNodesAndOpenNewConnectionsOn threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -424,7 +436,11 @@ public void testDiscoverWithSingleIncompatibleSeedNode() { threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -486,7 +502,11 @@ public void testConnectFailsWithIncompatibleNodes() { threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -549,7 +569,11 @@ public void testFilterNodesWithNodePredicate() { threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -617,7 +641,11 @@ public void testConnectFailsIfNoConnectionsOpened() { threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -694,7 +722,11 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -783,7 +815,11 @@ public void testMultipleCallsToConnectEnsuresConnection() { threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -895,7 +931,11 @@ public void testConfiguredProxyAddressModeWillReplaceNodeAddress() { threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, @@ -964,7 +1004,11 @@ public void testSniffStrategyWillNeedToBeRebuiltIfNumOfConnectionsOrSeedsOrProxy threadPool.getThreadContext() ); try ( - RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager( + clusterAlias, + RemoteClusterCredentialsManager.EMPTY, + connectionManager + ); SniffConnectionStrategy strategy = new SniffConnectionStrategy( clusterAlias, localService, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ActionTypes.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ActionTypes.java new file mode 100644 index 0000000000000..fbc08a0dee8aa --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ActionTypes.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.security.action; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; + +/** + * A collection of actions types for the Security plugin that need to be available in xpack.core.security and thus cannot be stored + * directly with their transport action implementation. + */ +public final class ActionTypes { + private ActionTypes() {}; + + public static final ActionType RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION = ActionType.localOnly( + "cluster:admin/xpack/security/remote_cluster_credentials/reload" + ); +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index 2616b63df7c01..013d7cc21a54a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -12,6 +12,7 @@ import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.persistent.CompletionPersistentTaskAction; import org.elasticsearch.transport.TransportActionProxy; +import org.elasticsearch.xpack.core.security.action.ActionTypes; import org.elasticsearch.xpack.core.security.support.StringMatcher; import java.util.Collections; @@ -43,7 +44,8 @@ public final class SystemPrivilege extends Privilege { "indices:data/read/*", // needed for SystemIndexMigrator "indices:admin/refresh", // needed for SystemIndexMigrator "indices:admin/aliases", // needed for SystemIndexMigrator - TransportSearchShardsAction.TYPE.name() // added so this API can be called with the system user by other APIs + TransportSearchShardsAction.TYPE.name(), // added so this API can be called with the system user by other APIs + ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION.name() // needed for Security plugin reload of remote cluster credentials ); private static final Predicate PREDICATE = (action) -> { diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 6e78eb2fb5b83..ea27eb9406d09 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -255,6 +255,7 @@ public class Constants { "cluster:admin/xpack/security/profile/suggest", "cluster:admin/xpack/security/profile/set_enabled", "cluster:admin/xpack/security/realm/cache/clear", + "cluster:admin/xpack/security/remote_cluster_credentials/reload", "cluster:admin/xpack/security/role/delete", "cluster:admin/xpack/security/role/get", "cluster:admin/xpack/security/role/put", diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/ReloadRemoteClusterCredentialsIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/ReloadRemoteClusterCredentialsIT.java new file mode 100644 index 0000000000000..6042d0072270d --- /dev/null +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/ReloadRemoteClusterCredentialsIT.java @@ -0,0 +1,314 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.security; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.reload.NodesReloadSecureSettingsResponse; +import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchShardsRequest; +import org.elasticsearch.action.search.SearchShardsResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.action.search.TransportSearchShardsAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.KeyStoreWrapper; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.env.Environment; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.test.SecuritySingleNodeTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterCredentialsManager; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.security.authc.ApiKeyService; +import org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders; +import org.junit.BeforeClass; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ReloadRemoteClusterCredentialsIT extends SecuritySingleNodeTestCase { + private static final String CLUSTER_ALIAS = "my_remote_cluster"; + + @BeforeClass + public static void disableInFips() { + assumeFalse( + "Cannot run in FIPS mode since the keystore will be password protected and sending a password in the reload" + + "settings api call, require TLS to be configured for the transport layer", + inFipsJvm() + ); + } + + @Override + public String configRoles() { + return org.elasticsearch.core.Strings.format(""" + user: + cluster: [ "ALL" ] + indices: + - names: '*' + privileges: [ "ALL" ] + remote_indices: + - names: '*' + privileges: [ "ALL" ] + clusters: ["*"] + """); + } + + @Override + public void tearDown() throws Exception { + try { + clearRemoteCluster(); + super.tearDown(); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + public void testReloadRemoteClusterCredentials() throws Exception { + final String credentials = randomAlphaOfLength(42); + writeCredentialsToKeyStore(credentials); + final RemoteClusterCredentialsManager clusterCredentialsManager = getInstanceFromNode(TransportService.class) + .getRemoteClusterService() + .getRemoteClusterCredentialsManager(); + // Until we reload, credentials written to keystore are not loaded into the credentials manager + assertThat(clusterCredentialsManager.hasCredentials(CLUSTER_ALIAS), is(false)); + reloadSecureSettings(); + assertThat(clusterCredentialsManager.resolveCredentials(CLUSTER_ALIAS), equalTo(credentials)); + + // Check that credentials get used for a remote connection, once we configure it + final BlockingQueue> capturedHeaders = ConcurrentCollections.newBlockingQueue(); + try (MockTransportService remoteTransport = startTransport("remoteNodeA", threadPool, capturedHeaders)) { + final TransportAddress remoteAddress = remoteTransport.getOriginalTransport() + .profileBoundAddresses() + .get("_remote_cluster") + .publishAddress(); + + configureRemoteCluster(remoteAddress); + + // Run search to trigger header capturing on the receiving side + client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get(); + + assertHeadersContainCredentialsThenClear(credentials, capturedHeaders); + + // Update credentials and ensure they are used + final String updatedCredentials = randomAlphaOfLength(41); + writeCredentialsToKeyStore(updatedCredentials); + reloadSecureSettings(); + + client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get(); + + assertHeadersContainCredentialsThenClear(updatedCredentials, capturedHeaders); + } + } + + private void assertHeadersContainCredentialsThenClear(String credentials, BlockingQueue> capturedHeaders) { + assertThat(capturedHeaders, is(not(empty()))); + for (Map actualHeaders : capturedHeaders) { + assertThat(actualHeaders, hasKey(CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY)); + assertThat( + actualHeaders.get(CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY), + equalTo(ApiKeyService.withApiKeyPrefix(credentials)) + ); + } + capturedHeaders.clear(); + assertThat(capturedHeaders, is(empty())); + } + + private void clearRemoteCluster() throws InterruptedException, ExecutionException { + final var builder = Settings.builder() + .putNull("cluster.remote." + CLUSTER_ALIAS + ".mode") + .putNull("cluster.remote." + CLUSTER_ALIAS + ".seeds") + .putNull("cluster.remote." + CLUSTER_ALIAS + ".proxy_address"); + clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(builder)).get(); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()).put("xpack.security.remote_cluster_client.ssl.enabled", false).build(); + } + + private void configureRemoteCluster(TransportAddress remoteAddress) throws InterruptedException, ExecutionException { + final Settings.Builder builder = Settings.builder(); + if (randomBoolean()) { + builder.put("cluster.remote." + CLUSTER_ALIAS + ".mode", "sniff") + .put("cluster.remote." + CLUSTER_ALIAS + ".seeds", remoteAddress.toString()) + .putNull("cluster.remote." + CLUSTER_ALIAS + ".proxy_address"); + } else { + builder.put("cluster.remote." + CLUSTER_ALIAS + ".mode", "proxy") + .put("cluster.remote." + CLUSTER_ALIAS + ".proxy_address", remoteAddress.toString()) + .putNull("cluster.remote." + CLUSTER_ALIAS + ".seeds"); + } + clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(builder)).get(); + } + + private void writeCredentialsToKeyStore(String credentials) throws Exception { + final Environment environment = getInstanceFromNode(Environment.class); + final KeyStoreWrapper keyStoreWrapper = KeyStoreWrapper.create(); + keyStoreWrapper.setString("cluster.remote." + CLUSTER_ALIAS + ".credentials", credentials.toCharArray()); + keyStoreWrapper.save(environment.configFile(), new char[0], false); + } + + public static MockTransportService startTransport( + final String nodeName, + final ThreadPool threadPool, + final BlockingQueue> capturedHeaders + ) { + boolean success = false; + final Settings settings = Settings.builder() + .put("node.name", nodeName) + .put("remote_cluster_server.enabled", "true") + .put("remote_cluster.port", "0") + .put("xpack.security.remote_cluster_server.ssl.enabled", "false") + .build(); + final MockTransportService service = MockTransportService.createNewService( + settings, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool, + null + ); + try { + service.registerRequestHandler( + ClusterStateAction.NAME, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + ClusterStateRequest::new, + (request, channel, task) -> { + capturedHeaders.add(Map.copyOf(threadPool.getThreadContext().getHeaders())); + channel.sendResponse( + new ClusterStateResponse(ClusterName.DEFAULT, ClusterState.builder(ClusterName.DEFAULT).build(), false) + ); + } + ); + service.registerRequestHandler( + RemoteClusterNodesAction.TYPE.name(), + EsExecutors.DIRECT_EXECUTOR_SERVICE, + RemoteClusterNodesAction.Request::new, + (request, channel, task) -> { + capturedHeaders.add(Map.copyOf(threadPool.getThreadContext().getHeaders())); + channel.sendResponse(new RemoteClusterNodesAction.Response(List.of())); + } + ); + service.registerRequestHandler( + TransportSearchShardsAction.TYPE.name(), + EsExecutors.DIRECT_EXECUTOR_SERVICE, + SearchShardsRequest::new, + (request, channel, task) -> { + capturedHeaders.add(Map.copyOf(threadPool.getThreadContext().getHeaders())); + channel.sendResponse(new SearchShardsResponse(List.of(), List.of(), Collections.emptyMap())); + } + ); + service.registerRequestHandler( + TransportSearchAction.TYPE.name(), + EsExecutors.DIRECT_EXECUTOR_SERVICE, + SearchRequest::new, + (request, channel, task) -> { + capturedHeaders.add(Map.copyOf(threadPool.getThreadContext().getHeaders())); + channel.sendResponse( + new SearchResponse( + new InternalSearchResponse( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), + InternalAggregations.EMPTY, + null, + null, + false, + null, + 1 + ), + null, + 1, + 1, + 0, + 100, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ) + ); + } + ); + service.start(); + service.acceptIncomingRequests(); + success = true; + return service; + } finally { + if (success == false) { + service.close(); + } + } + } + + private void reloadSecureSettings() throws InterruptedException { + final AtomicReference reloadSettingsError = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + final SecureString emptyPassword = randomBoolean() ? new SecureString(new char[0]) : null; + clusterAdmin().prepareReloadSecureSettings() + .setSecureStorePassword(emptyPassword) + .setNodesIds(Strings.EMPTY_ARRAY) + .execute(new ActionListener<>() { + @Override + public void onResponse(NodesReloadSecureSettingsResponse nodesReloadResponse) { + try { + assertThat(nodesReloadResponse, notNullValue()); + final Map nodesMap = nodesReloadResponse.getNodesMap(); + assertThat(nodesMap.size(), equalTo(1)); + for (final NodesReloadSecureSettingsResponse.NodeResponse nodeResponse : nodesReloadResponse.getNodes()) { + assertThat(nodeResponse.reloadException(), nullValue()); + } + } catch (final AssertionError e) { + reloadSettingsError.set(e); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + reloadSettingsError.set(new AssertionError("Nodes request failed", e)); + latch.countDown(); + } + }); + latch.await(); + if (reloadSettingsError.get() != null) { + throw reloadSettingsError.get(); + } + } +} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 51a902d7e12c0..461e044e732d1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.TransportVersion; @@ -21,6 +22,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -110,6 +112,7 @@ import org.elasticsearch.xpack.core.security.SecurityExtension; import org.elasticsearch.xpack.core.security.SecurityField; import org.elasticsearch.xpack.core.security.SecuritySettings; +import org.elasticsearch.xpack.core.security.action.ActionTypes; import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheAction; import org.elasticsearch.xpack.core.security.action.DelegatePkiAuthenticationAction; import org.elasticsearch.xpack.core.security.action.apikey.BulkUpdateApiKeyAction; @@ -244,6 +247,7 @@ import org.elasticsearch.xpack.security.action.service.TransportGetServiceAccountCredentialsAction; import org.elasticsearch.xpack.security.action.service.TransportGetServiceAccountNodesCredentialsAction; import org.elasticsearch.xpack.security.action.settings.TransportGetSecuritySettingsAction; +import org.elasticsearch.xpack.security.action.settings.TransportReloadRemoteClusterCredentialsAction; import org.elasticsearch.xpack.security.action.settings.TransportUpdateSecuritySettingsAction; import org.elasticsearch.xpack.security.action.token.TransportCreateTokenAction; import org.elasticsearch.xpack.security.action.token.TransportInvalidateTokenAction; @@ -364,7 +368,6 @@ import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry; import org.elasticsearch.xpack.security.support.ExtensionComponents; import org.elasticsearch.xpack.security.support.SecuritySystemIndices; -import org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver; import org.elasticsearch.xpack.security.transport.SecurityHttpSettings; import org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor; import org.elasticsearch.xpack.security.transport.filter.IPFilter; @@ -386,6 +389,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; @@ -554,6 +558,7 @@ public class Security extends Plugin private final SetOnce reservedRoleMappingAction = new SetOnce<>(); private final SetOnce workflowService = new SetOnce<>(); private final SetOnce realms = new SetOnce<>(); + private final SetOnce client = new SetOnce<>(); public Security(Settings settings) { this(settings, Collections.emptyList()); @@ -573,25 +578,30 @@ public Security(Settings settings) { runStartupChecks(settings); Automatons.updateConfiguration(settings); } else { - final List remoteClusterCredentialsSettingKeys = RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getAllConcreteSettings( - settings - ).map(Setting::getKey).sorted().toList(); - if (false == remoteClusterCredentialsSettingKeys.isEmpty()) { - throw new IllegalArgumentException( - format( - "Found [%s] remote clusters with credentials [%s]. Security [%s] must be enabled to connect to them. " - + "Please either enable security or remove these settings from the keystore.", - remoteClusterCredentialsSettingKeys.size(), - Strings.collectionToCommaDelimitedString(remoteClusterCredentialsSettingKeys), - XPackSettings.SECURITY_ENABLED.getKey() - ) - ); - } + ensureNoRemoteClusterCredentialsOnDisabledSecurity(settings); this.bootstrapChecks.set(Collections.emptyList()); } this.securityExtensions.addAll(extensions); } + private void ensureNoRemoteClusterCredentialsOnDisabledSecurity(Settings settings) { + assert false == enabled; + final List remoteClusterCredentialsSettingKeys = RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getAllConcreteSettings( + settings + ).map(Setting::getKey).sorted().toList(); + if (false == remoteClusterCredentialsSettingKeys.isEmpty()) { + throw new IllegalArgumentException( + format( + "Found [%s] remote clusters with credentials [%s]. Security [%s] must be enabled to connect to them. " + + "Please either enable security or remove these settings from the keystore.", + remoteClusterCredentialsSettingKeys.size(), + Strings.collectionToCommaDelimitedString(remoteClusterCredentialsSettingKeys), + XPackSettings.SECURITY_ENABLED.getKey() + ) + ); + } + } + private static void runStartupChecks(Settings settings) { validateRealmSettings(settings); if (XPackSettings.FIPS_MODE_ENABLED.get(settings)) { @@ -616,6 +626,14 @@ protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } + protected Client getClient() { + return client.get(); + } + + protected Realms getRealms() { + return realms.get(); + } + @Override public Collection createComponents(PluginServices services) { try { @@ -654,6 +672,8 @@ Collection createComponents( return Collections.singletonList(new SecurityUsageServices(null, null, null, null, null, null)); } + this.client.set(client); + // The settings in `environment` may have additional values over what was provided during construction // See Plugin#additionalSettings() this.settings = environment.settings(); @@ -980,8 +1000,6 @@ Collection createComponents( ipFilter.set(new IPFilter(settings, auditTrailService, clusterService.getClusterSettings(), getLicenseState())); components.add(ipFilter.get()); - final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = new RemoteClusterCredentialsResolver(settings); - DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterService.getClusterSettings()); crossClusterAccessAuthcService.set(new CrossClusterAccessAuthenticationService(clusterService, apiKeyService, authcService.get())); components.add(crossClusterAccessAuthcService.get()); @@ -995,7 +1013,6 @@ Collection createComponents( securityContext.get(), destructiveOperations, crossClusterAccessAuthcService.get(), - remoteClusterCredentialsResolver, getLicenseState() ) ); @@ -1348,6 +1365,7 @@ public void onIndexModule(IndexModule module) { new ActionHandler<>(SetProfileEnabledAction.INSTANCE, TransportSetProfileEnabledAction.class), new ActionHandler<>(GetSecuritySettingsAction.INSTANCE, TransportGetSecuritySettingsAction.class), new ActionHandler<>(UpdateSecuritySettingsAction.INSTANCE, TransportUpdateSecuritySettingsAction.class), + new ActionHandler<>(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION, TransportReloadRemoteClusterCredentialsAction.class), usageAction, infoAction ).filter(Objects::nonNull).toList(); @@ -1887,16 +1905,56 @@ public BiConsumer getJoinValidator() { @Override public void reload(Settings settings) throws Exception { if (enabled) { - realms.get().stream().filter(r -> JwtRealmSettings.TYPE.equals(r.realmRef().getType())).forEach(realm -> { - if (realm instanceof JwtRealm jwtRealm) { - jwtRealm.rotateClientSecret( - CLIENT_AUTHENTICATION_SHARED_SECRET.getConcreteSettingForNamespace(realm.realmRef().getName()).get(settings) - ); - } - }); + final List reloadExceptions = new ArrayList<>(); + try { + reloadRemoteClusterCredentials(settings); + } catch (Exception ex) { + reloadExceptions.add(ex); + } + + try { + reloadSharedSecretsForJwtRealms(settings); + } catch (Exception ex) { + reloadExceptions.add(ex); + } + + if (false == reloadExceptions.isEmpty()) { + final var combinedException = new ElasticsearchException( + "secure settings reload failed for one or more security components" + ); + reloadExceptions.forEach(combinedException::addSuppressed); + throw combinedException; + } + } else { + ensureNoRemoteClusterCredentialsOnDisabledSecurity(settings); } } + private void reloadSharedSecretsForJwtRealms(Settings settingsWithKeystore) { + getRealms().stream().filter(r -> JwtRealmSettings.TYPE.equals(r.realmRef().getType())).forEach(realm -> { + if (realm instanceof JwtRealm jwtRealm) { + jwtRealm.rotateClientSecret( + CLIENT_AUTHENTICATION_SHARED_SECRET.getConcreteSettingForNamespace(realm.realmRef().getName()).get(settingsWithKeystore) + ); + } + }); + } + + /** + * This method uses a transport action internally to access classes that are injectable but not part of the plugin contract. + * See {@link TransportReloadRemoteClusterCredentialsAction} for more context. + */ + private void reloadRemoteClusterCredentials(Settings settingsWithKeystore) { + final PlainActionFuture future = new PlainActionFuture<>(); + getClient().execute( + ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION, + new TransportReloadRemoteClusterCredentialsAction.Request(settingsWithKeystore), + future + ); + assert future.isDone() : "expecting local-only action call to return immediately on invocation"; + future.actionGet(0, TimeUnit.NANOSECONDS); + } + static final class ValidateLicenseForFIPS implements BiConsumer { private final boolean inFipsMode; private final LicenseService licenseService; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/settings/TransportReloadRemoteClusterCredentialsAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/settings/TransportReloadRemoteClusterCredentialsAction.java new file mode 100644 index 0000000000000..d6f54e9d3e9e1 --- /dev/null +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/settings/TransportReloadRemoteClusterCredentialsAction.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.security.action.settings; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.security.action.ActionTypes; +import org.elasticsearch.xpack.security.Security; + +import java.io.IOException; + +/** + * This is a local-only action which updates remote cluster credentials for remote cluster connections, from keystore settings reloaded via + * a call to {@link org.elasticsearch.rest.action.admin.cluster.RestReloadSecureSettingsAction}. + * + * It's invoked as part of the {@link Security#reload(Settings)} call. + * + * This action is largely an implementation detail to work around the fact that Security is a plugin without direct access to many core + * classes, including the {@link RemoteClusterService} which is required for a credentials reload. A transport action gives us access to + * the {@link RemoteClusterService} which is injectable but not part of the plugin contract. + */ +public class TransportReloadRemoteClusterCredentialsAction extends TransportAction< + TransportReloadRemoteClusterCredentialsAction.Request, + ActionResponse.Empty> { + + private final RemoteClusterService remoteClusterService; + + @Inject + public TransportReloadRemoteClusterCredentialsAction(TransportService transportService, ActionFilters actionFilters) { + super(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION.name(), actionFilters, transportService.getTaskManager()); + this.remoteClusterService = transportService.getRemoteClusterService(); + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + // We avoid stashing and marking context as system to keep the action as minimal as possible (i.e., avoid copying context) + remoteClusterService.updateRemoteClusterCredentials(request.getSettings()); + listener.onResponse(ActionResponse.Empty.INSTANCE); + } + + public static class Request extends ActionRequest { + private final Settings settings; + + public Request(Settings settings) { + this.settings = settings; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Settings getSettings() { + return settings; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + localOnly(); + } + } +} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java deleted file mode 100644 index 93735a700bf92..0000000000000 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.security.transport; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.SecureString; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xpack.security.authc.ApiKeyService; - -import java.util.Map; -import java.util.Optional; - -import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS; - -public class RemoteClusterCredentialsResolver { - - private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsResolver.class); - - private final Map clusterCredentials; - - public RemoteClusterCredentialsResolver(final Settings settings) { - this.clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings); - logger.debug( - "Read cluster credentials for remote clusters [{}]", - Strings.collectionToCommaDelimitedString(clusterCredentials.keySet()) - ); - } - - public Optional resolve(final String clusterAlias) { - final SecureString apiKey = clusterCredentials.get(clusterAlias); - if (apiKey == null) { - return Optional.empty(); - } else { - return Optional.of(new RemoteClusterCredentials(clusterAlias, ApiKeyService.withApiKeyPrefix(apiKey.toString()))); - } - } - - record RemoteClusterCredentials(String clusterAlias, String credentials) { - @Override - public String toString() { - return "RemoteClusterCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}"; - } - } -} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index 53dd31fe46793..162cabf5297ce 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.ssl.SslConfiguration; import org.elasticsearch.common.util.Maps; @@ -24,6 +25,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteConnectionManager; +import org.elasticsearch.transport.RemoteConnectionManager.RemoteClusterAliasWithCredentials; import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; @@ -46,6 +48,7 @@ import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.security.Security; import org.elasticsearch.xpack.security.audit.AuditUtil; +import org.elasticsearch.xpack.security.authc.ApiKeyService; import org.elasticsearch.xpack.security.authc.AuthenticationService; import org.elasticsearch.xpack.security.authc.CrossClusterAccessAuthenticationService; import org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders; @@ -63,7 +66,6 @@ import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE; import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED; import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY; -import static org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver.RemoteClusterCredentials; public class SecurityServerTransportInterceptor implements TransportInterceptor { @@ -85,8 +87,7 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor private final Settings settings; private final SecurityContext securityContext; private final CrossClusterAccessAuthenticationService crossClusterAccessAuthcService; - private final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver; - private final Function> remoteClusterAliasResolver; + private final Function> remoteClusterCredentialsResolver; private final XPackLicenseState licenseState; public SecurityServerTransportInterceptor( @@ -98,7 +99,6 @@ public SecurityServerTransportInterceptor( SecurityContext securityContext, DestructiveOperations destructiveOperations, CrossClusterAccessAuthenticationService crossClusterAccessAuthcService, - RemoteClusterCredentialsResolver remoteClusterCredentialsResolver, XPackLicenseState licenseState ) { this( @@ -110,9 +110,8 @@ public SecurityServerTransportInterceptor( securityContext, destructiveOperations, crossClusterAccessAuthcService, - remoteClusterCredentialsResolver, licenseState, - RemoteConnectionManager::resolveRemoteClusterAlias + RemoteConnectionManager::resolveRemoteClusterAliasWithCredentials ); } @@ -125,10 +124,9 @@ public SecurityServerTransportInterceptor( SecurityContext securityContext, DestructiveOperations destructiveOperations, CrossClusterAccessAuthenticationService crossClusterAccessAuthcService, - RemoteClusterCredentialsResolver remoteClusterCredentialsResolver, XPackLicenseState licenseState, // Inject for simplified testing - Function> remoteClusterAliasResolver + Function> remoteClusterCredentialsResolver ) { this.settings = settings; this.threadPool = threadPool; @@ -139,7 +137,6 @@ public SecurityServerTransportInterceptor( this.crossClusterAccessAuthcService = crossClusterAccessAuthcService; this.licenseState = licenseState; this.remoteClusterCredentialsResolver = remoteClusterCredentialsResolver; - this.remoteClusterAliasResolver = remoteClusterAliasResolver; this.profileFilters = initializeProfileFilters(destructiveOperations); } @@ -159,7 +156,8 @@ public void sendRequest( TransportResponseHandler handler ) { assertNoCrossClusterAccessHeadersInContext(); - final Optional remoteClusterAlias = remoteClusterAliasResolver.apply(connection); + final Optional remoteClusterAlias = remoteClusterCredentialsResolver.apply(connection) + .map(RemoteClusterAliasWithCredentials::clusterAlias); if (PreAuthorizationUtils.shouldRemoveParentAuthorizationFromThreadContext(remoteClusterAlias, action, securityContext)) { securityContext.executeAfterRemovingParentAuthorization(original -> { sendRequestInner( @@ -278,22 +276,23 @@ public void sendRequest( * Returns cluster credentials if the connection is remote, and cluster credentials are set up for the target cluster. */ private Optional getRemoteClusterCredentials(Transport.Connection connection) { - final Optional optionalRemoteClusterAlias = remoteClusterAliasResolver.apply(connection); - if (optionalRemoteClusterAlias.isEmpty()) { + final Optional remoteClusterAliasWithCredentials = remoteClusterCredentialsResolver + .apply(connection); + if (remoteClusterAliasWithCredentials.isEmpty()) { logger.trace("Connection is not remote"); return Optional.empty(); } - final String remoteClusterAlias = optionalRemoteClusterAlias.get(); - final Optional remoteClusterCredentials = remoteClusterCredentialsResolver.resolve( - remoteClusterAlias - ); - if (remoteClusterCredentials.isEmpty()) { + final String remoteClusterAlias = remoteClusterAliasWithCredentials.get().clusterAlias(); + final SecureString remoteClusterCredentials = remoteClusterAliasWithCredentials.get().credentials(); + if (remoteClusterCredentials == null) { logger.trace("No cluster credentials are configured for remote cluster [{}]", remoteClusterAlias); return Optional.empty(); } - return remoteClusterCredentials; + return Optional.of( + new RemoteClusterCredentials(remoteClusterAlias, ApiKeyService.withApiKeyPrefix(remoteClusterCredentials.toString())) + ); } private void sendWithCrossClusterAccessHeaders( @@ -442,7 +441,7 @@ private void sendWithUser( throw new IllegalStateException("there should always be a user when sending a message for action [" + action + "]"); } - assert securityContext.getParentAuthorization() == null || remoteClusterAliasResolver.apply(connection).isPresent() == false + assert securityContext.getParentAuthorization() == null || remoteClusterCredentialsResolver.apply(connection).isEmpty() : "parent authorization header should not be set for remote cluster requests"; try { @@ -663,4 +662,12 @@ public void onFailure(Exception e) { } } } + + record RemoteClusterCredentials(String clusterAlias, String credentials) { + + @Override + public String toString() { + return "RemoteClusterCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}"; + } + } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalStateSecurity.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalStateSecurity.java index d44e7c27d760e..a2aa04e0f56c3 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalStateSecurity.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalStateSecurity.java @@ -16,6 +16,7 @@ import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.XPackInfoResponse; import org.elasticsearch.protocol.xpack.XPackUsageRequest; @@ -36,7 +37,7 @@ import java.util.Collections; import java.util.List; -public class LocalStateSecurity extends LocalStateCompositeXPackPlugin { +public class LocalStateSecurity extends LocalStateCompositeXPackPlugin implements ReloadablePlugin { public static class SecurityTransportXPackUsageAction extends TransportXPackUsageAction { @Inject @@ -130,4 +131,15 @@ protected Class> public List plugins() { return plugins; } + + @Override + public void reload(Settings settings) throws Exception { + plugins.stream().filter(p -> p instanceof ReloadablePlugin).forEach(p -> { + try { + ((ReloadablePlugin) p).reload(settings); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index 6773da137ac96..ed5ed53894b6c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -9,10 +9,13 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionModule; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -72,6 +75,7 @@ import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.SecurityExtension; import org.elasticsearch.xpack.core.security.SecurityField; +import org.elasticsearch.xpack.core.security.action.ActionTypes; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper; import org.elasticsearch.xpack.core.security.authc.Realm; @@ -116,6 +120,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static org.elasticsearch.xpack.core.security.authc.RealmSettings.getFullSettingKey; @@ -133,7 +138,10 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class SecurityTests extends ESTestCase { @@ -877,6 +885,23 @@ public void testSecurityMustBeEnableToConnectRemoteClusterWithCredentials() { + "Please either enable security or remove these settings from the keystore." ) ); + + // Security off, remote cluster with credentials on reload call + final MockSecureSettings secureSettings5 = new MockSecureSettings(); + secureSettings5.setString("cluster.remote.my1.credentials", randomAlphaOfLength(20)); + secureSettings5.setString("cluster.remote.my2.credentials", randomAlphaOfLength(20)); + final Settings.Builder builder5 = Settings.builder().setSecureSettings(secureSettings5); + // Use builder with security disabled to construct valid Security instance + final var security = new Security(builder2.build()); + final IllegalArgumentException e5 = expectThrows(IllegalArgumentException.class, () -> security.reload(builder5.build())); + assertThat( + e5.getMessage(), + containsString( + "Found [2] remote clusters with credentials [cluster.remote.my1.credentials,cluster.remote.my2.credentials]. " + + "Security [xpack.security.enabled] must be enabled to connect to them. " + + "Please either enable security or remove these settings from the keystore." + ) + ); } public void testLoadExtensions() throws Exception { @@ -905,6 +930,98 @@ public List loadExtensions(Class extensionPointType) { assertThat(registry, instanceOf(DummyOperatorOnlyRegistry.class)); } + public void testReload() throws Exception { + final Settings settings = Settings.builder().put("xpack.security.enabled", true).put("path.home", createTempDir()).build(); + + final PlainActionFuture value = new PlainActionFuture<>(); + final Client mockedClient = mock(Client.class); + + final Realms mockedRealms = mock(Realms.class); + when(mockedRealms.stream()).thenReturn(Stream.of()); + + doAnswer((inv) -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) inv.getArguments()[2]; + listener.onResponse(ActionResponse.Empty.INSTANCE); + return null; + }).when(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any()); + + security = new Security(settings, Collections.emptyList()) { + @Override + protected Client getClient() { + return mockedClient; + } + + @Override + protected Realms getRealms() { + return mockedRealms; + } + }; + + final Settings inputSettings = Settings.EMPTY; + security.reload(inputSettings); + + verify(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any()); + verify(mockedRealms).stream(); + } + + public void testReloadWithFailures() { + final Settings settings = Settings.builder().put("xpack.security.enabled", true).put("path.home", createTempDir()).build(); + + final boolean failRemoteClusterCredentialsReload = randomBoolean(); + final Client mockedClient = mock(Client.class); + if (failRemoteClusterCredentialsReload) { + doAnswer((inv) -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) inv.getArguments()[2]; + listener.onFailure(new RuntimeException("failed remote cluster credentials reload")); + return null; + }).when(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any()); + } else { + doAnswer((inv) -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) inv.getArguments()[2]; + listener.onResponse(ActionResponse.Empty.INSTANCE); + return null; + }).when(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any()); + } + + final Realms mockedRealms = mock(Realms.class); + final boolean failRealmsReload = (false == failRemoteClusterCredentialsReload) || randomBoolean(); + if (failRealmsReload) { + when(mockedRealms.stream()).thenThrow(new RuntimeException("failed jwt realms reload")); + } else { + when(mockedRealms.stream()).thenReturn(Stream.of()); + } + security = new Security(settings, Collections.emptyList()) { + @Override + protected Client getClient() { + return mockedClient; + } + + @Override + protected Realms getRealms() { + return mockedRealms; + } + }; + + final Settings inputSettings = Settings.EMPTY; + final var exception = expectThrows(ElasticsearchException.class, () -> security.reload(inputSettings)); + + assertThat(exception.getMessage(), containsString("secure settings reload failed for one or more security component")); + if (failRemoteClusterCredentialsReload) { + assertThat(exception.getSuppressed()[0].getMessage(), containsString("failed remote cluster credentials reload")); + if (failRealmsReload) { + assertThat(exception.getSuppressed()[1].getMessage(), containsString("failed jwt realms reload")); + } + } else { + assertThat(exception.getSuppressed()[0].getMessage(), containsString("failed jwt realms reload")); + } + // Verify both called despite failure + verify(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any()); + verify(mockedRealms).stream(); + } + public void testLoadNoExtensions() throws Exception { Settings settings = Settings.builder() .put("xpack.security.enabled", true) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java deleted file mode 100644 index debb50384e217..0000000000000 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.security.transport; - -import org.elasticsearch.common.settings.MockSecureSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.security.authc.ApiKeyService; - -import java.util.Optional; - -import static org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver.RemoteClusterCredentials; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; - -public class RemoteClusterCredentialsResolverTests extends ESTestCase { - - public void testResolveRemoteClusterCredentials() { - final String clusterNameA = "clusterA"; - final String clusterDoesNotExist = randomAlphaOfLength(10); - final Settings.Builder builder = Settings.builder(); - - final String secret = randomAlphaOfLength(20); - final MockSecureSettings secureSettings = new MockSecureSettings(); - secureSettings.setString("cluster.remote." + clusterNameA + ".credentials", secret); - final Settings settings = builder.setSecureSettings(secureSettings).build(); - RemoteClusterCredentialsResolver remoteClusterAuthorizationResolver = new RemoteClusterCredentialsResolver(settings); - final Optional remoteClusterCredentials = remoteClusterAuthorizationResolver.resolve(clusterNameA); - assertThat(remoteClusterCredentials.isPresent(), is(true)); - assertThat(remoteClusterCredentials.get().clusterAlias(), equalTo(clusterNameA)); - assertThat(remoteClusterCredentials.get().credentials(), equalTo(ApiKeyService.withApiKeyPrefix(secret))); - assertThat(remoteClusterAuthorizationResolver.resolve(clusterDoesNotExist), is(Optional.empty())); - } -} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index 57e48581d159c..46b0fac78ad8e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.ssl.SslClientAuthenticationMode; import org.elasticsearch.common.ssl.SslConfiguration; @@ -33,6 +34,7 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterPortSettings; +import org.elasticsearch.transport.RemoteConnectionManager.RemoteClusterAliasWithCredentials; import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport.Connection; @@ -77,6 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; import static org.elasticsearch.test.ActionListenerUtils.anyActionListener; @@ -87,7 +90,6 @@ import static org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo.CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY; import static org.elasticsearch.xpack.core.security.authz.RoleDescriptorTests.randomUniquelyNamedRoleDescriptors; import static org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY; -import static org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver.RemoteClusterCredentials; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -153,7 +155,6 @@ public void testSendAsync() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings), mockLicenseState ); ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener @@ -205,7 +206,6 @@ public void testSendAsyncSwitchToSystem() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings), mockLicenseState ); ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener @@ -250,7 +250,6 @@ public void testSendWithoutUser() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings), mockLicenseState ) { @Override @@ -313,7 +312,6 @@ public void testSendToNewerVersionSetsCorrectVersion() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings), mockLicenseState ); ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener @@ -382,7 +380,6 @@ public void testSendToOlderVersionSetsCorrectVersion() throws Exception { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings), mockLicenseState ); ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener @@ -449,7 +446,6 @@ public void testSetUserBasedOnActionOrigin() { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings), mockLicenseState ); @@ -604,7 +600,6 @@ public void testSendWithCrossClusterAccessHeadersWithUnsupportedLicense() throws AuthenticationTestHelper.builder().build().writeToContext(threadContext); final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10); - final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mockRemoteClusterCredentialsResolver(remoteClusterAlias); final SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor( settings, @@ -618,9 +613,8 @@ public void testSendWithCrossClusterAccessHeadersWithUnsupportedLicense() throws new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - remoteClusterCredentialsResolver, unsupportedLicenseState, - ignored -> Optional.of(remoteClusterAlias) + mockRemoteClusterCredentialsResolver(remoteClusterAlias) ); final AsyncSender sender = interceptor.interceptSender(mock(AsyncSender.class, ignored -> { @@ -661,18 +655,16 @@ public TransportResponse read(StreamInput in) { actualException.get().getCause().getMessage(), equalTo("current license is non-compliant for [" + Security.ADVANCED_REMOTE_CLUSTER_SECURITY_FEATURE.getName() + "]") ); - verify(remoteClusterCredentialsResolver, times(1)).resolve(eq(remoteClusterAlias)); assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY), nullValue()); assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY), nullValue()); } - private RemoteClusterCredentialsResolver mockRemoteClusterCredentialsResolver(String remoteClusterAlias) { - final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class); - final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42)); - when(remoteClusterCredentialsResolver.resolve(any())).thenReturn( - Optional.of(new RemoteClusterCredentials(remoteClusterAlias, remoteClusterCredential)) + private Function> mockRemoteClusterCredentialsResolver( + String remoteClusterAlias + ) { + return connection -> Optional.of( + new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(randomAlphaOfLengthBetween(10, 42).toCharArray())) ); - return remoteClusterCredentialsResolver; } public void testSendWithCrossClusterAccessHeadersForSystemUserRegularAction() throws Exception { @@ -736,12 +728,9 @@ private void doTestSendWithCrossClusterAccessHeaders( ) throws IOException { authentication.writeToContext(threadContext); final String expectedRequestId = AuditUtil.getOrGenerateRequestId(threadContext); - final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class); final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10); - final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42)); - when(remoteClusterCredentialsResolver.resolve(any())).thenReturn( - Optional.of(new RemoteClusterCredentials(remoteClusterAlias, remoteClusterCredential)) - ); + final String encodedApiKey = randomAlphaOfLengthBetween(10, 42); + final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(encodedApiKey); final AuthorizationService authzService = mock(AuthorizationService.class); // We capture the listener so that we can complete the full flow, by calling onResponse further down @SuppressWarnings("unchecked") @@ -760,9 +749,8 @@ private void doTestSendWithCrossClusterAccessHeaders( new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - remoteClusterCredentialsResolver, mockLicenseState, - ignored -> Optional.of(remoteClusterAlias) + ignored -> Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(encodedApiKey.toCharArray()))) ); final AtomicBoolean calledWrappedSender = new AtomicBoolean(false); @@ -861,7 +849,6 @@ public TransportResponse read(StreamInput in) { } assertThat(sentCredential.get(), equalTo(remoteClusterCredential)); verify(securityContext, never()).executeAsInternalUser(any(), any(), anyConsumer()); - verify(remoteClusterCredentialsResolver, times(1)).resolve(eq(remoteClusterAlias)); assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY), nullValue()); assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY), nullValue()); assertThat(AuditUtil.extractRequestId(securityContext.getThreadContext()), equalTo(expectedRequestId)); @@ -874,15 +861,9 @@ public void testSendWithUserIfCrossClusterAccessHeadersConditionNotMet() throws if (false == (notRemoteConnection || noCredential)) { noCredential = true; } + final boolean finalNoCredential = noCredential; final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10); - final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class); - when(remoteClusterCredentialsResolver.resolve(any())).thenReturn( - noCredential - ? Optional.empty() - : Optional.of( - new RemoteClusterCredentials(remoteClusterAlias, ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42))) - ) - ); + final String encodedApiKey = randomAlphaOfLengthBetween(10, 42); final AuthenticationTestHelper.AuthenticationTestBuilder builder = AuthenticationTestHelper.builder(); final Authentication authentication = randomFrom( builder.apiKey().build(), @@ -904,9 +885,12 @@ public void testSendWithUserIfCrossClusterAccessHeadersConditionNotMet() throws new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - remoteClusterCredentialsResolver, mockLicenseState, - ignored -> notRemoteConnection ? Optional.empty() : Optional.of(remoteClusterAlias) + ignored -> notRemoteConnection + ? Optional.empty() + : (finalNoCredential + ? Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, null)) + : Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(encodedApiKey.toCharArray())))) ); final AtomicBoolean calledWrappedSender = new AtomicBoolean(false); @@ -944,12 +928,9 @@ public void testSendWithCrossClusterAccessHeadersThrowsOnOldConnection() throws .realm() .build(); authentication.writeToContext(threadContext); - final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class); final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10); - final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42)); - when(remoteClusterCredentialsResolver.resolve(any())).thenReturn( - Optional.of(new RemoteClusterCredentials(remoteClusterAlias, remoteClusterCredential)) - ); + final String encodedApiKey = randomAlphaOfLengthBetween(10, 42); + final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(encodedApiKey); final SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor( settings, @@ -963,9 +944,8 @@ public void testSendWithCrossClusterAccessHeadersThrowsOnOldConnection() throws new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - remoteClusterCredentialsResolver, mockLicenseState, - ignored -> Optional.of(remoteClusterAlias) + ignored -> Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(encodedApiKey.toCharArray()))) ); final AsyncSender sender = interceptor.interceptSender(new AsyncSender() { @@ -1029,7 +1009,6 @@ public TransportResponse read(StreamInput in) { + "] does not support receiving them" ) ); - verify(remoteClusterCredentialsResolver, times(1)).resolve(eq(remoteClusterAlias)); assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY), nullValue()); assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY), nullValue()); } @@ -1040,12 +1019,9 @@ public void testSendRemoteRequestFailsIfUserHasNoRemoteIndicesPrivileges() throw .realm() .build(); authentication.writeToContext(threadContext); - final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class); final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10); - final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42)); - when(remoteClusterCredentialsResolver.resolve(any())).thenReturn( - Optional.of(new RemoteClusterCredentials(remoteClusterAlias, remoteClusterCredential)) - ); + final String encodedApiKey = randomAlphaOfLengthBetween(10, 42); + final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(encodedApiKey); final AuthorizationService authzService = mock(AuthorizationService.class); doAnswer(invocation -> { @@ -1067,9 +1043,8 @@ public void testSendRemoteRequestFailsIfUserHasNoRemoteIndicesPrivileges() throw new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - remoteClusterCredentialsResolver, mockLicenseState, - ignored -> Optional.of(remoteClusterAlias) + ignored -> Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(encodedApiKey.toCharArray()))) ); final AsyncSender sender = interceptor.interceptSender(new AsyncSender() { @@ -1171,7 +1146,6 @@ public void testProfileFiltersCreatedDifferentlyForDifferentTransportAndRemoteCl new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings), mockLicenseState ); @@ -1225,7 +1199,6 @@ public void testNoProfileFilterForRemoteClusterWhenTheFeatureIsDisabled() { new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING)) ), mock(CrossClusterAccessAuthenticationService.class), - new RemoteClusterCredentialsResolver(settings), mockLicenseState );