From 316603548d0d20dcf8a62d62b6c92d708508b4b1 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Wed, 6 Dec 2023 16:22:06 +0100 Subject: [PATCH 1/6] [Connectors API] Add sync job status check to cancel connector sync job integration test. (#103057) Extend cancel connector sync integration test --- .../test/entsearch/430_connector_sync_job_cancel.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/430_connector_sync_job_cancel.yml b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/430_connector_sync_job_cancel.yml index e9c612cbf9f27..d934b7c674f25 100644 --- a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/430_connector_sync_job_cancel.yml +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/430_connector_sync_job_cancel.yml @@ -20,13 +20,21 @@ setup: id: test-connector job_type: full trigger_method: on_demand + - set: { id: sync-job-id-to-cancel } + - do: connector_sync_job.cancel: connector_sync_job_id: $sync-job-id-to-cancel - match: { acknowledged: true } + - do: + connector_sync_job.get: + connector_sync_job_id: $sync-job-id-to-cancel + + - match: { status: "canceling"} + --- "Cancel a Connector Sync Job - Connector Sync Job does not exist": From ed2155cc47366982ab22e0a276ef055de0b4279a Mon Sep 17 00:00:00 2001 From: sabi0 <2sabio@gmail.com> Date: Wed, 6 Dec 2023 16:27:16 +0100 Subject: [PATCH 2/6] Fix args length == 1 case handling in ESLoggerUsageChecker (#102382) * Fix args length == 1 case handling in ESLoggerUsageChecker There was an operator precedence mistake in: (lengthWithoutMarker == 1 || lengthWithoutMarker == 2) && lengthWithoutMarker == 2 ? ... Logical AND && has higher precedence than ternary operator ?:, So the above expression is equivalent to lengthWithoutMarker == 2 ? ... --------- Co-authored-by: Elastic Machine --- .../elasticsearch/test/loggerusage/ESLoggerUsageChecker.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/logger-usage/src/main/java/org/elasticsearch/test/loggerusage/ESLoggerUsageChecker.java b/test/logger-usage/src/main/java/org/elasticsearch/test/loggerusage/ESLoggerUsageChecker.java index bd51c74ee8e47..d7cde5676a27f 100644 --- a/test/logger-usage/src/main/java/org/elasticsearch/test/loggerusage/ESLoggerUsageChecker.java +++ b/test/logger-usage/src/main/java/org/elasticsearch/test/loggerusage/ESLoggerUsageChecker.java @@ -364,9 +364,8 @@ private void verifyLoggerUsage( && argumentTypes[markerOffset + 1].equals(OBJECT_CLASS)) { // MULTI-PARAM METHOD: debug(Marker?, String, Object p0, ...) checkFixedArityArgs(methodNode, logMessageFrames[i], lineNumber, methodInsn, markerOffset + 0, lengthWithoutMarker - 1); - } else if ((lengthWithoutMarker == 1 || lengthWithoutMarker == 2) && lengthWithoutMarker == 2 - ? argumentTypes[markerOffset + 1].equals(THROWABLE_CLASS) - : true) { + } else if (lengthWithoutMarker == 1 + || (lengthWithoutMarker == 2 && argumentTypes[markerOffset + 1].equals(THROWABLE_CLASS))) { // all the rest: debug(Marker?, (Message|MessageSupplier|CharSequence|Object|String|Supplier), Throwable?) checkFixedArityArgs(methodNode, logMessageFrames[i], lineNumber, methodInsn, markerOffset + 0, 0); } else { From 1617a8db3643573671469b343e9c6dfb787026b2 Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Wed, 6 Dec 2023 17:29:39 +0200 Subject: [PATCH 3/6] [ILM] More resilient when a policy is added to searchable snapshot (#102741) In this PR we enable ILM to handle the following scenarios: - An ILM policy with the a searchable snapshot action in hot or cold is added on a partially mounted searchable snapshot. - An ILM policy with the a searchable snapshot action in frozen is added on a fully mounted searchable snapshot. The searchable snapshot could have had a previous ILM policy that has been removed via POST /_ilm/remove or it might not have been managed at all. --- docs/changelog/102741.yaml | 6 + .../reference/ilm/actions/ilm-delete.asciidoc | 5 + .../xpack/core/ilm/MountSnapshotStep.java | 50 ++-- .../core/ilm/SearchableSnapshotAction.java | 64 +++-- .../actions/SearchableSnapshotActionIT.java | 218 +++++++++++++++++- 5 files changed, 304 insertions(+), 39 deletions(-) create mode 100644 docs/changelog/102741.yaml diff --git a/docs/changelog/102741.yaml b/docs/changelog/102741.yaml new file mode 100644 index 0000000000000..84a4b8092632f --- /dev/null +++ b/docs/changelog/102741.yaml @@ -0,0 +1,6 @@ +pr: 102741 +summary: "[ILM] More resilient when a policy is added to searchable snapshot" +area: ILM+SLM +type: bug +issues: + - 101958 diff --git a/docs/reference/ilm/actions/ilm-delete.asciidoc b/docs/reference/ilm/actions/ilm-delete.asciidoc index fbd7f1b0a238a..eac3b9804709a 100644 --- a/docs/reference/ilm/actions/ilm-delete.asciidoc +++ b/docs/reference/ilm/actions/ilm-delete.asciidoc @@ -16,6 +16,11 @@ Defaults to `true`. This option is applicable when the <> action is used in any previous phase. +WARNING: If a policy with a searchable snapshot action is applied on an existing searchable snapshot index, +the snapshot backing this index will NOT be deleted because it was not created by this policy. If you want +to clean this snapshot, please delete it manually after the index is deleted using the <>, you +can find the repository and snapshot name using the <>. + [[ilm-delete-action-ex]] ==== Example diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java index 057f0c8930e66..96f280b4e03c9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java @@ -68,24 +68,32 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl String indexName = indexMetadata.getIndex().getName(); LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState(); + SearchableSnapshotAction.SearchableSnapshotMetadata searchableSnapshotMetadata = SearchableSnapshotAction + .extractSearchableSnapshotFromSettings(indexMetadata); String policyName = indexMetadata.getLifecyclePolicyName(); - final String snapshotRepository = lifecycleState.snapshotRepository(); + String snapshotRepository = lifecycleState.snapshotRepository(); if (Strings.hasText(snapshotRepository) == false) { - listener.onFailure( - new IllegalStateException( - "snapshot repository is not present for policy [" + policyName + "] and index [" + indexName + "]" - ) - ); - return; + if (searchableSnapshotMetadata == null) { + listener.onFailure( + new IllegalStateException( + "snapshot repository is not present for policy [" + policyName + "] and index [" + indexName + "]" + ) + ); + return; + } else { + snapshotRepository = searchableSnapshotMetadata.repositoryName(); + } } - final String snapshotName = lifecycleState.snapshotName(); - if (Strings.hasText(snapshotName) == false) { + String snapshotName = lifecycleState.snapshotName(); + if (Strings.hasText(snapshotName) == false && searchableSnapshotMetadata == null) { listener.onFailure( new IllegalStateException("snapshot name was not generated for policy [" + policyName + "] and index [" + indexName + "]") ); return; + } else if (searchableSnapshotMetadata != null) { + snapshotName = searchableSnapshotMetadata.snapshotName(); } String mountedIndexName = restoredIndexPrefix + indexName; @@ -102,16 +110,20 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl final String snapshotIndexName = lifecycleState.snapshotIndexName(); if (snapshotIndexName == null) { - // This index had its searchable snapshot created prior to a version where we captured - // the original index name, so make our best guess at the name - indexName = bestEffortIndexNameResolution(indexName); - logger.debug( - "index [{}] using policy [{}] does not have a stored snapshot index name, " - + "using our best effort guess of [{}] for the original snapshotted index name", - indexMetadata.getIndex().getName(), - policyName, - indexName - ); + if (searchableSnapshotMetadata == null) { + // This index had its searchable snapshot created prior to a version where we captured + // the original index name, so make our best guess at the name + indexName = bestEffortIndexNameResolution(indexName); + logger.debug( + "index [{}] using policy [{}] does not have a stored snapshot index name, " + + "using our best effort guess of [{}] for the original snapshotted index name", + indexMetadata.getIndex().getName(), + policyName, + indexName + ); + } else { + indexName = searchableSnapshotMetadata.sourceIndex(); + } } else { // Use the name of the snapshot as specified in the metadata, because the current index // name not might not reflect the name of the index actually in the snapshot diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index 9ae0024c5a573..5b9b559b4d957 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xcontent.ConstructingObjectParser; @@ -32,6 +33,7 @@ import java.util.Objects; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY; import static org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOT_FEATURE; @@ -141,10 +143,12 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac IndexMetadata indexMetadata = clusterState.getMetadata().index(index); assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state"; String policyName = indexMetadata.getLifecyclePolicyName(); - if (indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME) != null) { + SearchableSnapshotMetadata searchableSnapshotMetadata = extractSearchableSnapshotFromSettings(indexMetadata); + if (searchableSnapshotMetadata != null) { + // TODO: allow this behavior instead of returning false, in this case the index is already a searchable a snapshot + // so the most graceful way of recovery might be to use this repo // The index is already a searchable snapshot, let's see if the repository matches - String repo = indexMetadata.getSettings().get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY); - if (this.snapshotRepository.equals(repo) == false) { + if (this.snapshotRepository.equals(searchableSnapshotMetadata.repositoryName) == false) { // Okay, different repo, we need to go ahead with the searchable snapshot logger.debug( "[{}] action is configured for index [{}] in policy [{}] which is already mounted as a searchable " @@ -153,15 +157,14 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac SearchableSnapshotAction.NAME, index.getName(), policyName, - repo, + searchableSnapshotMetadata.repositoryName, this.snapshotRepository ); return false; } // Check to the storage type to see if we need to convert between full <-> partial - final boolean partial = indexMetadata.getSettings().getAsBoolean(SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY, false); - MountSearchableSnapshotRequest.Storage existingType = partial + MountSearchableSnapshotRequest.Storage existingType = searchableSnapshotMetadata.partial ? MountSearchableSnapshotRequest.Storage.SHARED_CACHE : MountSearchableSnapshotRequest.Storage.FULL_COPY; MountSearchableSnapshotRequest.Storage type = getConcreteStorageType(preActionBranchingKey); @@ -172,7 +175,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac SearchableSnapshotAction.NAME, index.getName(), policyName, - repo, + searchableSnapshotMetadata.repositoryName, type ); return true; @@ -215,7 +218,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac // When generating a snapshot, we either jump to the force merge step, or we skip the // forcemerge and go straight to steps for creating the snapshot StepKey keyForSnapshotGeneration = forceMergeIndex ? forceMergeStepKey : generateSnapshotNameKey; - // Branch, deciding whether there is an existing searchable snapshot snapshot that can be used for mounting the index + // Branch, deciding whether there is an existing searchable snapshot that can be used for mounting the index // (in which case, skip generating a new name and the snapshot cleanup), or if we need to generate a new snapshot BranchingStep skipGeneratingSnapshotStep = new BranchingStep( skipGeneratingSnapshotKey, @@ -225,7 +228,8 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac IndexMetadata indexMetadata = clusterState.getMetadata().index(index); String policyName = indexMetadata.getLifecyclePolicyName(); LifecycleExecutionState lifecycleExecutionState = indexMetadata.getLifecycleExecutionState(); - if (lifecycleExecutionState.snapshotName() == null) { + SearchableSnapshotMetadata searchableSnapshotMetadata = extractSearchableSnapshotFromSettings(indexMetadata); + if (lifecycleExecutionState.snapshotName() == null && searchableSnapshotMetadata == null) { // No name exists, so it must be generated logger.trace( "no snapshot name for index [{}] in policy [{}] exists, so one will be generated", @@ -234,8 +238,20 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac ); return false; } + String snapshotIndexName; + String snapshotName; + String repoName; + if (lifecycleExecutionState.snapshotName() != null) { + snapshotIndexName = lifecycleExecutionState.snapshotIndexName(); + snapshotName = lifecycleExecutionState.snapshotName(); + repoName = lifecycleExecutionState.snapshotRepository(); + } else { + snapshotIndexName = searchableSnapshotMetadata.sourceIndex; + snapshotName = searchableSnapshotMetadata.snapshotName; + repoName = searchableSnapshotMetadata.repositoryName; + } - if (this.snapshotRepository.equals(lifecycleExecutionState.snapshotRepository()) == false) { + if (this.snapshotRepository.equals(repoName) == false) { // A different repository is being used // TODO: allow this behavior instead of throwing an exception throw new IllegalArgumentException("searchable snapshot indices may be converted only within the same repository"); @@ -244,12 +260,14 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac // We can skip the generate, initial cleanup, and snapshot taking for this index, as we already have a generated snapshot. // This will jump ahead directly to the "mount snapshot" step logger.debug( - "an existing snapshot [{}] in repository [{}] (index name: [{}]) " - + "will be used for mounting [{}] as a searchable snapshot", - lifecycleExecutionState.snapshotName(), - lifecycleExecutionState.snapshotRepository(), - lifecycleExecutionState.snapshotIndexName(), - index.getName() + "Policy [{}] will use an existing snapshot [{}] in repository [{}] (index name: [{}]) " + + "to mount [{}] as a searchable snapshot. This snapshot was found in the {}.", + policyName, + snapshotName, + snapshotRepository, + snapshotIndexName, + index.getName(), + lifecycleExecutionState.snapshotName() != null ? "lifecycle execution state" : "metadata of " + index.getName() ); return true; } @@ -411,4 +429,18 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(snapshotRepository, forceMergeIndex); } + + @Nullable + static SearchableSnapshotMetadata extractSearchableSnapshotFromSettings(IndexMetadata indexMetadata) { + String indexName = indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME); + if (indexName == null) { + return null; + } + String snapshotName = indexMetadata.getSettings().get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY); + String repo = indexMetadata.getSettings().get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY); + final boolean partial = indexMetadata.getSettings().getAsBoolean(SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY, false); + return new SearchableSnapshotMetadata(indexName, repo, snapshotName, partial); + } + + record SearchableSnapshotMetadata(String sourceIndex, String repositoryName, String snapshotName, boolean partial) {}; } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java index 9ec36d4d9b7cf..361cfd79b5e88 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java @@ -58,6 +58,7 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; +import static org.elasticsearch.xpack.core.ilm.DeleteAction.WITH_SNAPSHOT_DELETE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -184,10 +185,7 @@ public void testDeleteActionDeletesSearchableSnapshot() throws Exception { Map coldActions = Map.of(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo)); Map phases = new HashMap<>(); phases.put("cold", new Phase("cold", TimeValue.ZERO, coldActions)); - phases.put( - "delete", - new Phase("delete", TimeValue.timeValueMillis(10000), singletonMap(DeleteAction.NAME, DeleteAction.WITH_SNAPSHOT_DELETE)) - ); + phases.put("delete", new Phase("delete", TimeValue.timeValueMillis(10000), singletonMap(DeleteAction.NAME, WITH_SNAPSHOT_DELETE))); LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases); // PUT policy XContentBuilder builder = jsonBuilder(); @@ -574,6 +572,218 @@ public void testConvertingSearchableSnapshotFromFullToPartial() throws Exception ); } + @SuppressWarnings("unchecked") + public void testResumingSearchableSnapshotFromFullToPartial() throws Exception { + String index = "myindex-" + randomAlphaOfLength(4).toLowerCase(Locale.ROOT); + createSnapshotRepo(client(), snapshotRepo, randomBoolean()); + var policyCold = "policy-cold"; + createPolicy( + client(), + policyCold, + null, + null, + new Phase( + "cold", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + null, + null + ); + var policyFrozen = "policy-cold-frozen"; + createPolicy( + client(), + policyFrozen, + null, + null, + new Phase( + "cold", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + new Phase( + "frozen", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + null + ); + + createIndex(index, Settings.EMPTY); + ensureGreen(index); + indexDocument(client(), index, true); + + // enable ILM after we indexed a document as otherwise ILM might sometimes run so fast the indexDocument call will fail with + // `index_not_found_exception` + updateIndexSettings(index, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyCold)); + + final String fullMountedIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + index; + + assertBusy(() -> { + logger.info("--> waiting for [{}] to exist...", fullMountedIndexName); + assertTrue(indexExists(fullMountedIndexName)); + }, 30, TimeUnit.SECONDS); + + assertBusy(() -> { + Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), fullMountedIndexName); + assertThat(stepKeyForIndex.phase(), is("cold")); + assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); + }, 30, TimeUnit.SECONDS); + + // remove ILM + { + Request request = new Request("POST", "/" + fullMountedIndexName + "/_ilm/remove"); + Map responseMap = responseAsMap(client().performRequest(request)); + assertThat(responseMap.get("has_failures"), is(false)); + } + // add cold-frozen + updateIndexSettings(index, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyFrozen)); + String partiallyMountedIndexName = SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX + fullMountedIndexName; + assertBusy(() -> { + logger.info("--> waiting for [{}] to exist...", partiallyMountedIndexName); + assertTrue(indexExists(partiallyMountedIndexName)); + }, 30, TimeUnit.SECONDS); + + assertBusy(() -> { + Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), partiallyMountedIndexName); + assertThat(stepKeyForIndex.phase(), is("frozen")); + assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); + }, 30, TimeUnit.SECONDS); + + // Ensure the searchable snapshot is not deleted when the index was deleted because it was not created by this + // policy. We add the delete phase now to ensure that the index will not be deleted before we verify the above + // assertions + createPolicy( + client(), + policyFrozen, + null, + null, + new Phase( + "cold", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + new Phase( + "frozen", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, WITH_SNAPSHOT_DELETE)) + ); + assertBusy(() -> { + logger.info("--> waiting for [{}] to be deleted...", partiallyMountedIndexName); + assertThat(indexExists(partiallyMountedIndexName), is(false)); + Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); + Map responseMap = responseAsMap(client().performRequest(getSnaps)); + assertThat(((List>) responseMap.get("snapshots")).size(), equalTo(1)); + }, 30, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + public void testResumingSearchableSnapshotFromPartialToFull() throws Exception { + String index = "myindex-" + randomAlphaOfLength(4).toLowerCase(Locale.ROOT); + createSnapshotRepo(client(), snapshotRepo, randomBoolean()); + var policyCold = "policy-cold"; + createPolicy( + client(), + policyCold, + null, + null, + new Phase( + "cold", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + null, + null + ); + var policyColdFrozen = "policy-cold-frozen"; + createPolicy( + client(), + policyColdFrozen, + + null, + null, + new Phase( + "cold", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + new Phase( + "frozen", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + null + ); + + createIndex(index, Settings.EMPTY); + ensureGreen(index); + indexDocument(client(), index, true); + + // enable ILM after we indexed a document as otherwise ILM might sometimes run so fast the indexDocument call will fail with + // `index_not_found_exception` + updateIndexSettings(index, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyColdFrozen)); + + final String fullMountedIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + index; + final String partialMountedIndexName = SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX + fullMountedIndexName; + + assertBusy(() -> { + logger.info("--> waiting for [{}] to exist...", partialMountedIndexName); + assertTrue(indexExists(partialMountedIndexName)); + }, 30, TimeUnit.SECONDS); + + assertBusy(() -> { + Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), partialMountedIndexName); + assertThat(stepKeyForIndex.phase(), is("frozen")); + assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); + }, 30, TimeUnit.SECONDS); + + // remove ILM from the partially mounted searchable snapshot + { + Request request = new Request("POST", "/" + partialMountedIndexName + "/_ilm/remove"); + Map responseMap = responseAsMap(client().performRequest(request)); + assertThat(responseMap.get("has_failures"), is(false)); + } + // add a policy that will only include the fully mounted searchable snapshot + updateIndexSettings(index, Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyCold)); + String restoredPartiallyMountedIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + partialMountedIndexName; + assertBusy(() -> { + logger.info("--> waiting for [{}] to exist...", restoredPartiallyMountedIndexName); + assertTrue(indexExists(restoredPartiallyMountedIndexName)); + }, 30, TimeUnit.SECONDS); + + assertBusy(() -> { + Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), restoredPartiallyMountedIndexName); + assertThat(stepKeyForIndex.phase(), is("cold")); + assertThat(stepKeyForIndex.name(), is(PhaseCompleteStep.NAME)); + }, 30, TimeUnit.SECONDS); + + // Ensure the searchable snapshot is not deleted when the index was deleted because it was not created by this + // policy. We add the delete phase now to ensure that the index will not be deleted before we verify the above + // assertions + createPolicy( + client(), + policyCold, + null, + null, + new Phase( + "cold", + TimeValue.ZERO, + singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean())) + ), + null, + new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, WITH_SNAPSHOT_DELETE)) + ); + assertBusy(() -> { + logger.info("--> waiting for [{}] to be deleted...", restoredPartiallyMountedIndexName); + assertThat(indexExists(restoredPartiallyMountedIndexName), is(false)); + Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all"); + Map responseMap = responseAsMap(client().performRequest(getSnaps)); + assertThat(((List>) responseMap.get("snapshots")).size(), equalTo(1)); + }, 30, TimeUnit.SECONDS); + } + public void testSecondSearchableSnapshotUsingDifferentRepoThrows() throws Exception { String secondRepo = randomAlphaOfLengthBetween(10, 20); createSnapshotRepo(client(), snapshotRepo, randomBoolean()); From 5c3d118031dea20ef2e121da6d319c63a2ddb724 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 6 Dec 2023 08:15:18 -0800 Subject: [PATCH 4/6] Unmute HeapAttack tests (#102942) This PR re-enables two more heap attack tests. I have run more than 100 iterations with these tests without hitting any failures. --- .../elasticsearch/xpack/esql/qa/heap_attack/HeapAttackIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/heap_attack/HeapAttackIT.java b/x-pack/plugin/esql/qa/server/heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/heap_attack/HeapAttackIT.java index 2cc13117a299f..37f2c86dbc251 100644 --- a/x-pack/plugin/esql/qa/server/heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/heap_attack/HeapAttackIT.java +++ b/x-pack/plugin/esql/qa/server/heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/heap_attack/HeapAttackIT.java @@ -113,7 +113,6 @@ public void testGroupOnSomeLongs() throws IOException { /** * This groups on 5000 columns which used to throw a {@link StackOverflowError}. */ - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100640") public void testGroupOnManyLongs() throws IOException { initManyLongs(); Map map = XContentHelper.convertToMap( @@ -182,7 +181,6 @@ private Response concat(int evals) throws IOException { /** * Returns many moderately long strings. */ - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100678") public void testManyConcat() throws IOException { initManyLongs(); Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(manyConcat(300).getEntity()), false); From c183b92585862a2d790fe6210dcf27a1fe4b30f0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 6 Dec 2023 08:27:39 -0800 Subject: [PATCH 5/6] Fast path for reading single doc with ordinals (#102902) This optimization is added for enrich lookups, which are likely to match a single document. The change decreases the latency of the enrich operation in the nyc_taxis benchmark from 100ms to 70ms. When combined with #102901, it further reduces the latency to below 40ms, better than the previous performance before the regression. Relates #102625 --- docs/changelog/102902.yaml | 5 +++++ .../index/mapper/BlockDocValuesReader.java | 12 ++++++++++++ 2 files changed, 17 insertions(+) create mode 100644 docs/changelog/102902.yaml diff --git a/docs/changelog/102902.yaml b/docs/changelog/102902.yaml new file mode 100644 index 0000000000000..b33afdd35a603 --- /dev/null +++ b/docs/changelog/102902.yaml @@ -0,0 +1,5 @@ +pr: 102902 +summary: Fast path for reading single doc with ordinals +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java index 11e57e030dfe7..2160f52cbec02 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java @@ -555,8 +555,20 @@ private static class SingletonOrdinals extends BlockDocValuesReader { this.ordinals = ordinals; } + private BlockLoader.Block readSingleDoc(BlockFactory factory, int docId) throws IOException { + if (ordinals.advanceExact(docId)) { + BytesRef v = ordinals.lookupOrd(ordinals.ordValue()); + return factory.constantBytes(v); + } else { + return factory.constantNulls(); + } + } + @Override public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException { + if (docs.count() == 1) { + return readSingleDoc(factory, docs.get(0)); + } try (BlockLoader.SingletonOrdinalsBuilder builder = factory.singletonOrdinalsBuilder(ordinals, docs.count())) { for (int i = 0; i < docs.count(); i++) { int doc = docs.get(i); From be1277a769c13156caa1ede1c3a1d46f536ab947 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 6 Dec 2023 09:14:39 -0800 Subject: [PATCH 6/6] Collect warnings in compute service (#103031) We have implemented #99927 in DriverRunner. However, we also need to implement this in ComputeService, where we spawn multiple requests to avoid losing response headers. Relates #99927 Closes #100163 Closes #102982 Closes #102871 Closes #103028 --- docs/changelog/103031.yaml | 9 ++ .../compute/operator/DriverRunner.java | 31 +------ .../operator/ResponseHeadersCollector.java | 60 +++++++++++++ .../ResponseHeadersCollectorTests.java | 72 ++++++++++++++++ .../src/main/resources/ip.csv-spec | 3 +- .../xpack/esql/action/WarningsIT.java | 85 +++++++++++++++++++ .../xpack/esql/plugin/ComputeService.java | 14 ++- 7 files changed, 241 insertions(+), 33 deletions(-) create mode 100644 docs/changelog/103031.yaml create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ResponseHeadersCollector.java create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ResponseHeadersCollectorTests.java create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/WarningsIT.java diff --git a/docs/changelog/103031.yaml b/docs/changelog/103031.yaml new file mode 100644 index 0000000000000..f63094139f5ca --- /dev/null +++ b/docs/changelog/103031.yaml @@ -0,0 +1,9 @@ +pr: 103031 +summary: Collect warnings in compute service +area: ES|QL +type: bug +issues: + - 100163 + - 103028 + - 102871 + - 102982 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverRunner.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverRunner.java index 4f16a615572b7..5de017fbd279e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverRunner.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverRunner.java @@ -9,16 +9,11 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.TaskCancelledException; -import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; /** @@ -41,11 +36,10 @@ public DriverRunner(ThreadContext threadContext) { */ public void runToCompletion(List drivers, ActionListener listener) { AtomicReference failure = new AtomicReference<>(); - AtomicArray>> responseHeaders = new AtomicArray<>(drivers.size()); + var responseHeadersCollector = new ResponseHeadersCollector(threadContext); CountDown counter = new CountDown(drivers.size()); for (int i = 0; i < drivers.size(); i++) { Driver driver = drivers.get(i); - int driverIndex = i; ActionListener driverListener = new ActionListener<>() { @Override public void onResponse(Void unused) { @@ -80,9 +74,9 @@ public void onFailure(Exception e) { } private void done() { - responseHeaders.setOnce(driverIndex, threadContext.getResponseHeaders()); + responseHeadersCollector.collect(); if (counter.countDown()) { - mergeResponseHeaders(responseHeaders); + responseHeadersCollector.finish(); Exception error = failure.get(); if (error != null) { listener.onFailure(error); @@ -96,23 +90,4 @@ private void done() { start(driver, driverListener); } } - - private void mergeResponseHeaders(AtomicArray>> responseHeaders) { - final Map> merged = new HashMap<>(); - for (int i = 0; i < responseHeaders.length(); i++) { - final Map> resp = responseHeaders.get(i); - if (resp == null || resp.isEmpty()) { - continue; - } - for (Map.Entry> e : resp.entrySet()) { - // Use LinkedHashSet to retain the order of the values - merged.computeIfAbsent(e.getKey(), k -> new LinkedHashSet<>(e.getValue().size())).addAll(e.getValue()); - } - } - for (Map.Entry> e : merged.entrySet()) { - for (String v : e.getValue()) { - threadContext.addResponseHeader(e.getKey(), v); - } - } - } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ResponseHeadersCollector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ResponseHeadersCollector.java new file mode 100644 index 0000000000000..8f40664be74d4 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ResponseHeadersCollector.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ThreadContext; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +/** + * A helper class that can be used to collect and merge response headers from multiple child requests. + */ +public final class ResponseHeadersCollector { + private final ThreadContext threadContext; + private final Queue>> collected = ConcurrentCollections.newQueue(); + + public ResponseHeadersCollector(ThreadContext threadContext) { + this.threadContext = threadContext; + } + + /** + * Called when a child request is completed to collect the response headers of the responding thread + */ + public void collect() { + Map> responseHeaders = threadContext.getResponseHeaders(); + if (responseHeaders.isEmpty() == false) { + collected.add(responseHeaders); + } + } + + /** + * Called when all child requests are completed. This will merge all collected response headers + * from the child requests and restore to the current thread. + */ + public void finish() { + final Map> merged = new HashMap<>(); + Map> resp; + while ((resp = collected.poll()) != null) { + for (Map.Entry> e : resp.entrySet()) { + // Use LinkedHashSet to retain the order of the values + merged.computeIfAbsent(e.getKey(), k -> new LinkedHashSet<>(e.getValue().size())).addAll(e.getValue()); + } + } + for (Map.Entry> e : merged.entrySet()) { + for (String v : e.getValue()) { + threadContext.addResponseHeader(e.getKey(), v); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ResponseHeadersCollectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ResponseHeadersCollectorTests.java new file mode 100644 index 0000000000000..b09372f3a962c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ResponseHeadersCollectorTests.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.TestThreadPool; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; + +public class ResponseHeadersCollectorTests extends ESTestCase { + + public void testCollect() { + int numThreads = randomIntBetween(1, 10); + TestThreadPool threadPool = new TestThreadPool( + getTestClass().getSimpleName(), + new FixedExecutorBuilder(Settings.EMPTY, "test", numThreads, 1024, "test", EsExecutors.TaskTrackingConfig.DEFAULT) + ); + Set expectedWarnings = new HashSet<>(); + try { + ThreadContext threadContext = threadPool.getThreadContext(); + var collector = new ResponseHeadersCollector(threadContext); + PlainActionFuture future = new PlainActionFuture<>(); + Runnable mergeAndVerify = () -> { + collector.finish(); + List actualWarnings = threadContext.getResponseHeaders().getOrDefault("Warnings", List.of()); + assertThat(Sets.newHashSet(actualWarnings), equalTo(expectedWarnings)); + }; + try (RefCountingListener refs = new RefCountingListener(ActionListener.runAfter(future, mergeAndVerify))) { + CyclicBarrier barrier = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + String warning = "warning-" + i; + expectedWarnings.add(warning); + ActionListener listener = ActionListener.runBefore(refs.acquire(), collector::collect); + threadPool.schedule(new ActionRunnable<>(listener) { + @Override + protected void doRun() throws Exception { + barrier.await(30, TimeUnit.SECONDS); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.addResponseHeader("Warnings", warning); + listener.onResponse(null); + } + } + }, TimeValue.timeValueNanos(between(0, 1000_000)), threadPool.executor("test")); + } + } + future.actionGet(TimeValue.timeValueSeconds(30)); + } finally { + terminate(threadPool); + } + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec index e0167ce451e80..02e9db6ededf1 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec @@ -257,8 +257,7 @@ eth1 |alpha |::1 |::1 eth0 |beta |127.0.0.1 |::1 ; -// AwaitsFix: https://github.com/elastic/elasticsearch/issues/103028 -pushDownIPWithComparision#[skip:-8.11.99, reason:Lucene multivalue warning introduced in 8.12 only]-Ignore +pushDownIPWithComparision#[skip:-8.11.99, reason:Lucene multivalue warning introduced in 8.12 only] from hosts | where ip1 > to_ip("127.0.0.1") | keep card, ip1; ignoreOrder:true warning:Line 1:20: evaluation of [ip1 > to_ip(\"127.0.0.1\")] failed, treating result as null. Only first 20 failures recorded. diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/WarningsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/WarningsIT.java new file mode 100644 index 0000000000000..12897979a47e0 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/WarningsIT.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.transport.TransportService; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug") +public class WarningsIT extends AbstractEsqlIntegTestCase { + + public void testCollectWarnings() { + final String node1, node2; + if (randomBoolean()) { + internalCluster().ensureAtLeastNumDataNodes(2); + node1 = randomDataNode().getName(); + node2 = randomValueOtherThan(node1, () -> randomDataNode().getName()); + } else { + node1 = randomDataNode().getName(); + node2 = randomDataNode().getName(); + } + + int numDocs1 = randomIntBetween(1, 15); + assertAcked( + client().admin() + .indices() + .prepareCreate("index-1") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", node1)) + .setMapping("host", "type=keyword") + ); + for (int i = 0; i < numDocs1; i++) { + client().prepareIndex("index-1").setSource("host", "192." + i).get(); + } + int numDocs2 = randomIntBetween(1, 15); + assertAcked( + client().admin() + .indices() + .prepareCreate("index-2") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", node2)) + .setMapping("host", "type=keyword") + ); + for (int i = 0; i < numDocs2; i++) { + client().prepareIndex("index-2").setSource("host", "10." + i).get(); + } + + DiscoveryNode coordinator = randomFrom(clusterService().state().nodes().stream().toList()); + client().admin().indices().prepareRefresh("index-1", "index-2").get(); + + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM index-* | EVAL ip = to_ip(host) | STATS s = COUNT(*) by ip | KEEP ip | LIMIT 100"); + request.pragmas(randomPragmas()); + PlainActionFuture future = new PlainActionFuture<>(); + client(coordinator.getName()).execute(EsqlQueryAction.INSTANCE, request, ActionListener.runBefore(future, () -> { + var threadpool = internalCluster().getInstance(TransportService.class, coordinator.getName()).getThreadPool(); + Map> responseHeaders = threadpool.getThreadContext().getResponseHeaders(); + List warnings = responseHeaders.getOrDefault("Warning", List.of()) + .stream() + .filter(w -> w.contains("is not an IP string literal")) + .toList(); + int expectedWarnings = Math.min(20, numDocs1 + numDocs2); + // we cap the number of warnings per node + assertThat(warnings.size(), greaterThanOrEqualTo(expectedWarnings)); + })); + future.actionGet(30, TimeUnit.SECONDS).close(); + } + + private DiscoveryNode randomDataNode() { + return randomFrom(clusterService().state().nodes().getDataNodes().values()); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index dd5ae00294ed0..b7b31868d65e2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -32,6 +32,7 @@ import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverTaskRunner; +import org.elasticsearch.compute.operator.ResponseHeadersCollector; import org.elasticsearch.compute.operator.exchange.ExchangeResponse; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; @@ -162,6 +163,8 @@ public void execute( LOGGER.debug("Sending data node plan\n{}\n with filter [{}]", dataNodePlan, requestFilter); + final var responseHeadersCollector = new ResponseHeadersCollector(transportService.getThreadPool().getThreadContext()); + listener = ActionListener.runBefore(listener, responseHeadersCollector::finish); String[] originalIndices = PlannerUtils.planOriginalIndices(physicalPlan); computeTargetNodes( rootTask, @@ -193,6 +196,7 @@ public void execute( computeContext, coordinatorPlan, cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(driverProfiles -> { + responseHeadersCollector.collect(); if (configuration.profile()) { collectedProfiles.addAll(driverProfiles); } @@ -208,6 +212,7 @@ public void execute( exchangeSource, targetNodes, () -> cancelOnFailure(rootTask, cancelled, requestRefs.acquire()).map(response -> { + responseHeadersCollector.collect(); if (configuration.profile()) { collectedProfiles.addAll(response.profiles); } @@ -501,9 +506,12 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(driverProfiles -> { // don't return until all pages are fetched exchangeSink.addCompletionListener( - ActionListener.releaseAfter( - listener.map(nullValue -> new DataNodeResponse(driverProfiles)), - () -> exchangeService.finishSinkHandler(sessionId, null) + ContextPreservingActionListener.wrapPreservingContext( + ActionListener.releaseAfter( + listener.map(nullValue -> new DataNodeResponse(driverProfiles)), + () -> exchangeService.finishSinkHandler(sessionId, null) + ), + transportService.getThreadPool().getThreadContext() ) ); }, e -> {