Skip to content

Commit

Permalink
[8.x] Add INDEX_REFRESH_BLOCK (#117753)
Browse files Browse the repository at this point in the history
This is the backport of #117543 for 8.18. It contains the 
cluster block and cluster level block, the transport 
version and serialization changes.

It does NOT contain the MetadataCreateIndexService 
logic to apply the block.
  • Loading branch information
tlrx authored Nov 29, 2024
1 parent 8fcf280 commit 36f886f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_REMOVE_NODE_LEVEL_PLAN = def(8_800_00_0);
public static final TransportVersion LOGSDB_TELEMETRY_CUSTOM_CUTOFF_DATE = def(8_801_00_0);
public static final TransportVersion SOURCE_MODE_TELEMETRY = def(8_802_00_0);
public static final TransportVersion NEW_REFRESH_CLUSTER_BLOCK = def(8_803_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.block;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -21,6 +22,7 @@
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;
import java.util.function.Predicate;

public class ClusterBlock implements Writeable, ToXContentFragment {

Expand Down Expand Up @@ -142,7 +144,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
out.writeOptionalString(uuid);
out.writeString(description);
out.writeEnumSet(levels);
if (out.getTransportVersion().onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)) {
out.writeEnumSet(levels);
} else {
// do not send ClusterBlockLevel.REFRESH to old nodes
out.writeEnumSet(filterLevels(levels, level -> ClusterBlockLevel.REFRESH.equals(level) == false));
}
out.writeBoolean(retryable);
out.writeBoolean(disableStatePersistence);
RestStatus.writeTo(out, status);
Expand Down Expand Up @@ -185,4 +192,19 @@ public int hashCode() {
public boolean isAllowReleaseResources() {
return allowReleaseResources;
}

static EnumSet<ClusterBlockLevel> filterLevels(EnumSet<ClusterBlockLevel> levels, Predicate<ClusterBlockLevel> predicate) {
assert levels != null;
int size = levels.size();
if (size == 0 || (size == 1 && predicate.test(levels.iterator().next()))) {
return levels;
}
var filteredLevels = EnumSet.noneOf(ClusterBlockLevel.class);
for (ClusterBlockLevel level : levels) {
if (predicate.test(level)) {
filteredLevels.add(level);
}
}
return filteredLevels;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public enum ClusterBlockLevel {
READ,
WRITE,
METADATA_READ,
METADATA_WRITE;
METADATA_WRITE,
REFRESH;

public static final EnumSet<ClusterBlockLevel> ALL = EnumSet.allOf(ClusterBlockLevel.class);
public static final EnumSet<ClusterBlockLevel> READ_WRITE = EnumSet.of(READ, WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
RestStatus.TOO_MANY_REQUESTS,
EnumSet.of(ClusterBlockLevel.WRITE)
);
public static final ClusterBlock INDEX_REFRESH_BLOCK = new ClusterBlock(
14,
"index refresh blocked, waiting for shard(s) to be started",
true,
false,
false,
RestStatus.REQUEST_TIMEOUT,
EnumSet.of(ClusterBlockLevel.REFRESH)
);

// 'event.ingested' (part of Elastic Common Schema) range is tracked in cluster state, along with @timestamp
public static final String EVENT_INGESTED_FIELD_NAME = "event.ingested";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void testToXContent() throws IOException {
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -180,7 +181,8 @@ public void testToXContent() throws IOException {
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down Expand Up @@ -440,7 +442,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -453,7 +456,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down Expand Up @@ -712,7 +716,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -725,7 +730,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@
package org.elasticsearch.cluster.block;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.EnumSet;
import java.util.Map;

import static java.util.EnumSet.copyOf;
import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion;
import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion;
import static org.elasticsearch.test.TransportVersionUtils.randomVersion;
import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -36,7 +39,7 @@ public void testSerialization() throws Exception {
int iterations = randomIntBetween(5, 20);
for (int i = 0; i < iterations; i++) {
TransportVersion version = randomVersion(random());
ClusterBlock clusterBlock = randomClusterBlock();
ClusterBlock clusterBlock = randomClusterBlock(version);

BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(version);
Expand All @@ -50,13 +53,41 @@ public void testSerialization() throws Exception {
}
}

public void testSerializationBwc() throws Exception {
var out = new BytesStreamOutput();
out.setTransportVersion(
randomVersionBetween(random(), getFirstVersion(), getPreviousVersion(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK))
);

var clusterBlock = randomClusterBlock(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK);
clusterBlock.writeTo(out);

var in = out.bytes().streamInput();
in.setTransportVersion(randomVersion());

assertClusterBlockEquals(
new ClusterBlock(
clusterBlock.id(),
clusterBlock.uuid(),
clusterBlock.description(),
clusterBlock.retryable(),
clusterBlock.disableStatePersistence(),
clusterBlock.isAllowReleaseResources(),
clusterBlock.status(),
// ClusterBlockLevel.REFRESH should not be sent over the wire to nodes with version < NEW_REFRESH_CLUSTER_BLOCK
ClusterBlock.filterLevels(clusterBlock.levels(), level -> ClusterBlockLevel.REFRESH.equals(level) == false)
),
new ClusterBlock(in)
);
}

public void testToStringDanglingComma() {
final ClusterBlock clusterBlock = randomClusterBlock();
final ClusterBlock clusterBlock = randomClusterBlock(randomVersion(random()));
assertThat(clusterBlock.toString(), not(endsWith(",")));
}

public void testGlobalBlocksCheckedIfNoIndicesSpecified() {
ClusterBlock globalBlock = randomClusterBlock();
ClusterBlock globalBlock = randomClusterBlock(randomVersion(random()));
ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), Map.of());
ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]);
assertNotNull(exception);
Expand Down Expand Up @@ -113,9 +144,13 @@ public void testGetIndexBlockWithId() {
assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue());
}

private static ClusterBlock randomClusterBlock() {
private static ClusterBlock randomClusterBlock(TransportVersion version) {
final String uuid = randomBoolean() ? UUIDs.randomBase64UUID() : null;
final List<ClusterBlockLevel> levels = Arrays.asList(ClusterBlockLevel.values());
final EnumSet<ClusterBlockLevel> levels = ClusterBlock.filterLevels(
EnumSet.allOf(ClusterBlockLevel.class),
// Filter out ClusterBlockLevel.REFRESH for versions < TransportVersions.NEW_REFRESH_CLUSTER_BLOCK
level -> ClusterBlockLevel.REFRESH.equals(level) == false || version.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)
);
return new ClusterBlock(
randomInt(),
uuid,
Expand Down

0 comments on commit 36f886f

Please sign in to comment.