Skip to content

Commit

Permalink
Store creds with connection
Browse files Browse the repository at this point in the history
  • Loading branch information
n1v0lg committed Nov 30, 2023
1 parent f477953 commit 324f84e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public void openConnection(DiscoveryNode node, @Nullable ConnectionProfile profi
node,
profile,
listener.delegateFailureAndWrap(
(l, connection) -> l.onResponse(new InternalRemoteConnection(connection, clusterAlias, credentialsManager))
(l, connection) -> l.onResponse(
new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias))
)
)
);
}
Expand Down Expand Up @@ -212,7 +214,7 @@ public static RemoteClusterInfoTuple resolveRemoteClusterInfoTuple(Transport.Con

private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException {
Transport.Connection connection = delegate.getConnection(node);
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager);
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias));
}

private synchronized void addConnectedNode(DiscoveryNode addedNode) {
Expand Down Expand Up @@ -318,23 +320,25 @@ private static final class InternalRemoteConnection implements Transport.Connect
private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class);
private final Transport.Connection connection;
private final String clusterAlias;
private final RemoteClusterCredentialsManager clusterCredentialsManager;
@Nullable
private final SecureString clusterCredentials;

InternalRemoteConnection(Transport.Connection connection, String clusterAlias, RemoteClusterCredentialsManager credentialsManager) {
InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) {
assert false == connection instanceof InternalRemoteConnection : "should not double wrap";
assert false == connection instanceof ProxyConnection
: "proxy connection should wrap internal remote connection, not the other way around";
this.clusterAlias = Objects.requireNonNull(clusterAlias);
this.connection = Objects.requireNonNull(connection);
this.clusterCredentialsManager = Objects.requireNonNull(credentialsManager);
this.clusterCredentials = clusterCredentials;
}

public String getClusterAlias() {
return clusterAlias;
}

@Nullable
public SecureString getClusterCredentials() {
return clusterCredentialsManager.resolveCredentials(clusterAlias);
return clusterCredentials;
}

@Override
Expand All @@ -346,7 +350,7 @@ public DiscoveryNode getNode() {
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
final String effectiveAction;
if (clusterCredentialsManager.hasCredentials(clusterAlias) && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias);
effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
} else {
Expand Down Expand Up @@ -416,6 +420,6 @@ static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo(
String clusterAlias,
RemoteClusterCredentialsManager credentialsManager
) {
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager);
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void testReloadRemoteClusterCredentials() throws Exception {
successfulReloadCall();

assertThat(remoteClusterService.getRemoteClusterCredentialsManager().resolveCredentials("my_remote_cluster"), equalTo(credentials));

}

private void successfulReloadCall() throws InterruptedException {
Expand Down

0 comments on commit 324f84e

Please sign in to comment.