From 95daaad3f4cc2aa066e1e41104dc7a333eb0cade Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Mon, 8 Jan 2024 14:49:46 +0100 Subject: [PATCH] Use ref counting runnable --- .../transport/RemoteClusterService.java | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 31024367b8b85..56fcddd0b4f61 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -16,7 +16,7 @@ import org.elasticsearch.action.support.CountDownActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -323,21 +323,17 @@ synchronized void updateRemoteClusterCredentials(Settings settings, ActionListen return; } logger.info("rebuilding [{}] connections after credentials update", totalConnectionsToRebuild); - try (var refCountingListener = new RefCountingListener(totalConnectionsToRebuild, listener)) { + try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { for (var clusterAlias : result.addedClusterAliases()) { - maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, refCountingListener); + maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs); } for (var clusterAlias : result.removedClusterAliases()) { - maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, refCountingListener); + maybeRebuildConnectionOnCredentialsChange(clusterAlias, settings, connectionRefs); } } } - private void maybeRebuildConnectionOnCredentialsChange( - String clusterAlias, - Settings settings, - RefCountingListener refCountingListener - ) { + private void maybeRebuildConnectionOnCredentialsChange(String clusterAlias, Settings settings, RefCountingRunnable connectionRefs) { if (false == remoteClusters.containsKey(clusterAlias)) { // A credential was added or removed before a remote connection was configured. // Without an existing connection, there is nothing to rebuild. @@ -345,12 +341,10 @@ private void maybeRebuildConnectionOnCredentialsChange( return; } - final var listener = refCountingListener.acquire(); - updateRemoteCluster(clusterAlias, settings, true, new ActionListener<>() { + updateRemoteCluster(clusterAlias, settings, true, ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(RemoteClusterConnectionStatus status) { logger.info("remote cluster connection [{}] updated after credentials change: [{}]", clusterAlias, status); - listener.onResponse(null); } @Override @@ -359,9 +353,8 @@ public void onFailure(Exception e) { // a failed credential reload; same as for remote cluster settings updates, we "handle" the failure by logging a WARN // message logger.warn(() -> "failed to update remote cluster connection [" + clusterAlias + "] after credentials change", e); - listener.onResponse(null); } - }); + }, connectionRefs.acquire())); } @Override