Skip to content

Commit

Permalink
Fixing repository verification logic
Browse files Browse the repository at this point in the history
Signed-off-by: Dharmesh 💤 <[email protected]>
  • Loading branch information
psychbot authored and Sachin Kale committed Aug 31, 2023
1 parent f1409b1 commit a7316a8
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,25 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.action.ActionListener;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -34,6 +38,7 @@ public class RemoteStoreService {

private static final Logger logger = LogManager.getLogger(RemoteStoreService.class);
private final Supplier<RepositoriesService> repositoriesService;
private final ThreadPool threadPool;
public static final Setting<String> REMOTE_STORE_COMPATIBILITY_MODE_SETTING = Setting.simpleString(
"remote_store.compatibility_mode",
CompatibilityMode.ALLOW_ONLY_REMOTE_STORE_NODES.value,
Expand All @@ -44,8 +49,7 @@ public class RemoteStoreService {

public enum CompatibilityMode {
ALLOW_ONLY_REMOTE_STORE_NODES("allow_only_remote_store_nodes"),
ALLOW_ALL_NODES("allow_all_nodes"),
MIGRATING_TO_HOT("migrating_to_hot");
ALLOW_ALL_NODES("allow_all_nodes");

public static CompatibilityMode validate(String compatibilityMode) {
try {
Expand All @@ -69,35 +73,35 @@ public static CompatibilityMode validate(String compatibilityMode) {
}
}

public RemoteStoreService(Supplier<RepositoriesService> repositoriesService) {
public RemoteStoreService(Supplier<RepositoriesService> repositoriesService, ThreadPool threadPool) {
this.repositoriesService = repositoriesService;
this.threadPool = threadPool;
}

public void verifyRepository(List<Repository> repositories) {
ActionListener<VerifyRepositoryResponse> listener = new ActionListener<>() {

@Override
public void onResponse(VerifyRepositoryResponse verifyRepositoryResponse) {
logger.info("Successfully verified repository : " + verifyRepositoryResponse.toString());
}
public void verifyRepository(List<Repository> repositories, DiscoveryNode localNode) {
for (Repository repository : repositories) {
String verificationToken = repository.startVerification();
ExecutorService executor = threadPool.executor(ThreadPool.Names.GENERIC);
executor.execute(() -> {
try {
repository.verify(verificationToken, localNode);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to verify repository", repository), e);
throw new RepositoryVerificationException(repository.getMetadata().name(), e.getMessage());
}
});

@Override
public void onFailure(Exception e) {
throw new IllegalStateException("Failed to finish remote store repository verification" + e.getMessage());
// TODO: See if using listener here which is async makes sense, made this sync as
// we need the repository registration for remote store node to be completed before the bootstrap
// completes.
try {
if(executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
throw new RepositoryVerificationException(repository.getMetadata().name(), "could not complete " +
"repository verification within timeout.");
}
} catch (InterruptedException e) {
throw new RepositoryVerificationException(repository.getMetadata().name(), e.getMessage());
}
};

for (Repository repository : repositories) {
repositoriesService.get()
.verifyRepository(
repository.getMetadata().name(),
ActionListener.delegateFailure(
listener,
(delegatedListener, verifyResponse) -> delegatedListener.onResponse(
new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))
)
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public static DiscoveryNode createLocal(
);
RemoteStoreNode remoteStoreNode = new RemoteStoreNode(discoveryNode);
List<Repository> repositories = remoteStoreService.createRepositories(remoteStoreNode);
remoteStoreService.verifyRepository(repositories);
remoteStoreService.verifyRepository(repositories, discoveryNode);
return discoveryNode;
}

Expand Down
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,14 @@ protected Node(
);
resourcesToClose.add(nodeEnvironment);

final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
final RemoteStoreService remoteStoreService = new RemoteStoreService(repositoriesServiceReference::get);
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreService);

final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);

runnableTaskListener = new AtomicReference<>();
final ThreadPool threadPool = new ThreadPool(settings, runnableTaskListener, executorBuilders.toArray(new ExecutorBuilder[0]));

final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
final RemoteStoreService remoteStoreService = new RemoteStoreService(repositoriesServiceReference::get, threadPool);
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreService);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);
Expand Down Expand Up @@ -1730,11 +1730,11 @@ public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) {
.keySet()
.stream()
.anyMatch(key -> key.startsWith(RemoteStoreNode.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX))) {
localNode.set(DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId));
} else {
localNode.set(
DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId, remoteStoreService)
);
} else {
localNode.set(DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId));
}
return localNode.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public IndexMetadata upgradeIndexMetadata(IndexMetadata indexMetadata, Version m
threadPool
);

remoteStoreService = new RemoteStoreService(new SetOnce<>(repositoriesService)::get);
remoteStoreService = new RemoteStoreService(new SetOnce<>(repositoriesService)::get, threadPool);

nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {}, remoteStoreService);
Expand Down

0 comments on commit a7316a8

Please sign in to comment.