Skip to content

Commit

Permalink
Merge branch 'main' into rcs2-reload
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Dec 6, 2023
2 parents 4c589dc + be1277a commit 6dcd745
Show file tree
Hide file tree
Showing 17 changed files with 572 additions and 77 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/102741.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 102741
summary: "[ILM] More resilient when a policy is added to searchable snapshot"
area: ILM+SLM
type: bug
issues:
- 101958
5 changes: 5 additions & 0 deletions docs/changelog/102902.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102902
summary: Fast path for reading single doc with ordinals
area: ES|QL
type: enhancement
issues: []
9 changes: 9 additions & 0 deletions docs/changelog/103031.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pr: 103031
summary: Collect warnings in compute service
area: ES|QL
type: bug
issues:
- 100163
- 103028
- 102871
- 102982
5 changes: 5 additions & 0 deletions docs/reference/ilm/actions/ilm-delete.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ Defaults to `true`.
This option is applicable when the <<ilm-searchable-snapshot,searchable
snapshot>> action is used in any previous phase.

WARNING: If a policy with a searchable snapshot action is applied on an existing searchable snapshot index,
the snapshot backing this index will NOT be deleted because it was not created by this policy. If you want
to clean this snapshot, please delete it manually after the index is deleted using the <<delete-snapshot-api, delete snapshot API>>, you
can find the repository and snapshot name using the <<indices-get-index, get index API>>.

[[ilm-delete-action-ex]]
==== Example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,20 @@ private static class SingletonOrdinals extends BlockDocValuesReader {
this.ordinals = ordinals;
}

private BlockLoader.Block readSingleDoc(BlockFactory factory, int docId) throws IOException {
if (ordinals.advanceExact(docId)) {
BytesRef v = ordinals.lookupOrd(ordinals.ordValue());
return factory.constantBytes(v);
} else {
return factory.constantNulls();
}
}

@Override
public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
if (docs.count() == 1) {
return readSingleDoc(factory, docs.get(0));
}
try (BlockLoader.SingletonOrdinalsBuilder builder = factory.singletonOrdinalsBuilder(ordinals, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
int doc = docs.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,8 @@ private void verifyLoggerUsage(
&& argumentTypes[markerOffset + 1].equals(OBJECT_CLASS)) {
// MULTI-PARAM METHOD: debug(Marker?, String, Object p0, ...)
checkFixedArityArgs(methodNode, logMessageFrames[i], lineNumber, methodInsn, markerOffset + 0, lengthWithoutMarker - 1);
} else if ((lengthWithoutMarker == 1 || lengthWithoutMarker == 2) && lengthWithoutMarker == 2
? argumentTypes[markerOffset + 1].equals(THROWABLE_CLASS)
: true) {
} else if (lengthWithoutMarker == 1
|| (lengthWithoutMarker == 2 && argumentTypes[markerOffset + 1].equals(THROWABLE_CLASS))) {
// all the rest: debug(Marker?, (Message|MessageSupplier|CharSequence|Object|String|Supplier), Throwable?)
checkFixedArityArgs(methodNode, logMessageFrames[i], lineNumber, methodInsn, markerOffset + 0, 0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,32 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
String indexName = indexMetadata.getIndex().getName();

LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();
SearchableSnapshotAction.SearchableSnapshotMetadata searchableSnapshotMetadata = SearchableSnapshotAction
.extractSearchableSnapshotFromSettings(indexMetadata);

String policyName = indexMetadata.getLifecyclePolicyName();
final String snapshotRepository = lifecycleState.snapshotRepository();
String snapshotRepository = lifecycleState.snapshotRepository();
if (Strings.hasText(snapshotRepository) == false) {
listener.onFailure(
new IllegalStateException(
"snapshot repository is not present for policy [" + policyName + "] and index [" + indexName + "]"
)
);
return;
if (searchableSnapshotMetadata == null) {
listener.onFailure(
new IllegalStateException(
"snapshot repository is not present for policy [" + policyName + "] and index [" + indexName + "]"
)
);
return;
} else {
snapshotRepository = searchableSnapshotMetadata.repositoryName();
}
}

final String snapshotName = lifecycleState.snapshotName();
if (Strings.hasText(snapshotName) == false) {
String snapshotName = lifecycleState.snapshotName();
if (Strings.hasText(snapshotName) == false && searchableSnapshotMetadata == null) {
listener.onFailure(
new IllegalStateException("snapshot name was not generated for policy [" + policyName + "] and index [" + indexName + "]")
);
return;
} else if (searchableSnapshotMetadata != null) {
snapshotName = searchableSnapshotMetadata.snapshotName();
}

String mountedIndexName = restoredIndexPrefix + indexName;
Expand All @@ -102,16 +110,20 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl

final String snapshotIndexName = lifecycleState.snapshotIndexName();
if (snapshotIndexName == null) {
// This index had its searchable snapshot created prior to a version where we captured
// the original index name, so make our best guess at the name
indexName = bestEffortIndexNameResolution(indexName);
logger.debug(
"index [{}] using policy [{}] does not have a stored snapshot index name, "
+ "using our best effort guess of [{}] for the original snapshotted index name",
indexMetadata.getIndex().getName(),
policyName,
indexName
);
if (searchableSnapshotMetadata == null) {
// This index had its searchable snapshot created prior to a version where we captured
// the original index name, so make our best guess at the name
indexName = bestEffortIndexNameResolution(indexName);
logger.debug(
"index [{}] using policy [{}] does not have a stored snapshot index name, "
+ "using our best effort guess of [{}] for the original snapshotted index name",
indexMetadata.getIndex().getName(),
policyName,
indexName
);
} else {
indexName = searchableSnapshotMetadata.sourceIndex();
}
} else {
// Use the name of the snapshot as specified in the metadata, because the current index
// name not might not reflect the name of the index actually in the snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xcontent.ConstructingObjectParser;
Expand All @@ -32,6 +33,7 @@
import java.util.Objects;

import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY;
import static org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOT_FEATURE;

Expand Down Expand Up @@ -141,10 +143,12 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state";
String policyName = indexMetadata.getLifecyclePolicyName();
if (indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME) != null) {
SearchableSnapshotMetadata searchableSnapshotMetadata = extractSearchableSnapshotFromSettings(indexMetadata);
if (searchableSnapshotMetadata != null) {
// TODO: allow this behavior instead of returning false, in this case the index is already a searchable a snapshot
// so the most graceful way of recovery might be to use this repo
// The index is already a searchable snapshot, let's see if the repository matches
String repo = indexMetadata.getSettings().get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY);
if (this.snapshotRepository.equals(repo) == false) {
if (this.snapshotRepository.equals(searchableSnapshotMetadata.repositoryName) == false) {
// Okay, different repo, we need to go ahead with the searchable snapshot
logger.debug(
"[{}] action is configured for index [{}] in policy [{}] which is already mounted as a searchable "
Expand All @@ -153,15 +157,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
SearchableSnapshotAction.NAME,
index.getName(),
policyName,
repo,
searchableSnapshotMetadata.repositoryName,
this.snapshotRepository
);
return false;
}

// Check to the storage type to see if we need to convert between full <-> partial
final boolean partial = indexMetadata.getSettings().getAsBoolean(SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY, false);
MountSearchableSnapshotRequest.Storage existingType = partial
MountSearchableSnapshotRequest.Storage existingType = searchableSnapshotMetadata.partial
? MountSearchableSnapshotRequest.Storage.SHARED_CACHE
: MountSearchableSnapshotRequest.Storage.FULL_COPY;
MountSearchableSnapshotRequest.Storage type = getConcreteStorageType(preActionBranchingKey);
Expand All @@ -172,7 +175,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
SearchableSnapshotAction.NAME,
index.getName(),
policyName,
repo,
searchableSnapshotMetadata.repositoryName,
type
);
return true;
Expand Down Expand Up @@ -215,7 +218,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
// When generating a snapshot, we either jump to the force merge step, or we skip the
// forcemerge and go straight to steps for creating the snapshot
StepKey keyForSnapshotGeneration = forceMergeIndex ? forceMergeStepKey : generateSnapshotNameKey;
// Branch, deciding whether there is an existing searchable snapshot snapshot that can be used for mounting the index
// Branch, deciding whether there is an existing searchable snapshot that can be used for mounting the index
// (in which case, skip generating a new name and the snapshot cleanup), or if we need to generate a new snapshot
BranchingStep skipGeneratingSnapshotStep = new BranchingStep(
skipGeneratingSnapshotKey,
Expand All @@ -225,7 +228,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
String policyName = indexMetadata.getLifecyclePolicyName();
LifecycleExecutionState lifecycleExecutionState = indexMetadata.getLifecycleExecutionState();
if (lifecycleExecutionState.snapshotName() == null) {
SearchableSnapshotMetadata searchableSnapshotMetadata = extractSearchableSnapshotFromSettings(indexMetadata);
if (lifecycleExecutionState.snapshotName() == null && searchableSnapshotMetadata == null) {
// No name exists, so it must be generated
logger.trace(
"no snapshot name for index [{}] in policy [{}] exists, so one will be generated",
Expand All @@ -234,8 +238,20 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
);
return false;
}
String snapshotIndexName;
String snapshotName;
String repoName;
if (lifecycleExecutionState.snapshotName() != null) {
snapshotIndexName = lifecycleExecutionState.snapshotIndexName();
snapshotName = lifecycleExecutionState.snapshotName();
repoName = lifecycleExecutionState.snapshotRepository();
} else {
snapshotIndexName = searchableSnapshotMetadata.sourceIndex;
snapshotName = searchableSnapshotMetadata.snapshotName;
repoName = searchableSnapshotMetadata.repositoryName;
}

if (this.snapshotRepository.equals(lifecycleExecutionState.snapshotRepository()) == false) {
if (this.snapshotRepository.equals(repoName) == false) {
// A different repository is being used
// TODO: allow this behavior instead of throwing an exception
throw new IllegalArgumentException("searchable snapshot indices may be converted only within the same repository");
Expand All @@ -244,12 +260,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
// We can skip the generate, initial cleanup, and snapshot taking for this index, as we already have a generated snapshot.
// This will jump ahead directly to the "mount snapshot" step
logger.debug(
"an existing snapshot [{}] in repository [{}] (index name: [{}]) "
+ "will be used for mounting [{}] as a searchable snapshot",
lifecycleExecutionState.snapshotName(),
lifecycleExecutionState.snapshotRepository(),
lifecycleExecutionState.snapshotIndexName(),
index.getName()
"Policy [{}] will use an existing snapshot [{}] in repository [{}] (index name: [{}]) "
+ "to mount [{}] as a searchable snapshot. This snapshot was found in the {}.",
policyName,
snapshotName,
snapshotRepository,
snapshotIndexName,
index.getName(),
lifecycleExecutionState.snapshotName() != null ? "lifecycle execution state" : "metadata of " + index.getName()
);
return true;
}
Expand Down Expand Up @@ -411,4 +429,18 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(snapshotRepository, forceMergeIndex);
}

@Nullable
static SearchableSnapshotMetadata extractSearchableSnapshotFromSettings(IndexMetadata indexMetadata) {
String indexName = indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME);
if (indexName == null) {
return null;
}
String snapshotName = indexMetadata.getSettings().get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY);
String repo = indexMetadata.getSettings().get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY);
final boolean partial = indexMetadata.getSettings().getAsBoolean(SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY, false);
return new SearchableSnapshotMetadata(indexName, repo, snapshotName, partial);
}

record SearchableSnapshotMetadata(String sourceIndex, String repositoryName, String snapshotName, boolean partial) {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@ setup:
id: test-connector
job_type: full
trigger_method: on_demand

- set: { id: sync-job-id-to-cancel }

- do:
connector_sync_job.cancel:
connector_sync_job_id: $sync-job-id-to-cancel

- match: { acknowledged: true }

- do:
connector_sync_job.get:
connector_sync_job_id: $sync-job-id-to-cancel

- match: { status: "canceling"}


---
"Cancel a Connector Sync Job - Connector Sync Job does not exist":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,11 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.TaskCancelledException;

import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -41,11 +36,10 @@ public DriverRunner(ThreadContext threadContext) {
*/
public void runToCompletion(List<Driver> drivers, ActionListener<Void> listener) {
AtomicReference<Exception> failure = new AtomicReference<>();
AtomicArray<Map<String, List<String>>> responseHeaders = new AtomicArray<>(drivers.size());
var responseHeadersCollector = new ResponseHeadersCollector(threadContext);
CountDown counter = new CountDown(drivers.size());
for (int i = 0; i < drivers.size(); i++) {
Driver driver = drivers.get(i);
int driverIndex = i;
ActionListener<Void> driverListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {
Expand Down Expand Up @@ -80,9 +74,9 @@ public void onFailure(Exception e) {
}

private void done() {
responseHeaders.setOnce(driverIndex, threadContext.getResponseHeaders());
responseHeadersCollector.collect();
if (counter.countDown()) {
mergeResponseHeaders(responseHeaders);
responseHeadersCollector.finish();
Exception error = failure.get();
if (error != null) {
listener.onFailure(error);
Expand All @@ -96,23 +90,4 @@ private void done() {
start(driver, driverListener);
}
}

private void mergeResponseHeaders(AtomicArray<Map<String, List<String>>> responseHeaders) {
final Map<String, Set<String>> merged = new HashMap<>();
for (int i = 0; i < responseHeaders.length(); i++) {
final Map<String, List<String>> resp = responseHeaders.get(i);
if (resp == null || resp.isEmpty()) {
continue;
}
for (Map.Entry<String, List<String>> e : resp.entrySet()) {
// Use LinkedHashSet to retain the order of the values
merged.computeIfAbsent(e.getKey(), k -> new LinkedHashSet<>(e.getValue().size())).addAll(e.getValue());
}
}
for (Map.Entry<String, Set<String>> e : merged.entrySet()) {
for (String v : e.getValue()) {
threadContext.addResponseHeader(e.getKey(), v);
}
}
}
}
Loading

0 comments on commit 6dcd745

Please sign in to comment.