Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule reroute after allocator timed out #15565

Merged
merged 25 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.metadata.ViewMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand Down Expand Up @@ -474,4 +475,9 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard
allocationService.setExistingShardsAllocators(existingShardsAllocators);
}

public void setRerouteServiceForAllocator(RerouteService rerouteService) {
if (shardsAllocator instanceof BalancedShardsAllocator) {
((BalancedShardsAllocator) shardsAllocator).setRerouteService(rerouteService);
}
}
imRishN marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
Expand All @@ -49,12 +50,14 @@
import org.opensearch.cluster.routing.allocation.RebalanceParameter;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.ShardAllocationDecision;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -202,6 +205,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
private long startTime;
private RerouteService rerouteService;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand Down Expand Up @@ -231,6 +235,11 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}

public void setRerouteService(RerouteService rerouteService) {
assert this.rerouteService == null : "RerouteService is already set";
this.rerouteService = rerouteService;
}

/**
* Changes in deprecated setting SHARD_MOVE_PRIMARY_FIRST_SETTING affect value of its replacement setting SHARD_MOVEMENT_STRATEGY_SETTING.
*/
Expand Down Expand Up @@ -342,6 +351,7 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
scheduleRerouteIfAllocatorTimedOut();

final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
Expand Down Expand Up @@ -404,6 +414,23 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
}
}

private void scheduleRerouteIfAllocatorTimedOut() {
if(allocatorTimedOut()) {
if (rerouteService == null) {
logger.info("RerouteService not set to schedule reroute after allocator time out");
imRishN marked this conversation as resolved.
Show resolved Hide resolved
return;
}
rerouteService.reroute(
"reroute after balanced shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)
)
);
}
}

/**
* Returns the currently configured delta threshold
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ public void onTimeout() {
@Override
public void run() {
primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation);
if (timedOutPrimaryShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
}));
return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout) {
Expand Down Expand Up @@ -320,6 +332,18 @@ public void run() {
public void onComplete() {
logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size());
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false);
if (timedOutReplicaShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
};
}
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ protected Node(
final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);
clusterModule.setRerouteServiceForAllocator(rerouteService);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());

Expand Down
Loading