diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index 546586a9ff3c3..2fbe3c1fc1532 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -195,13 +195,10 @@ protected SearchHit nextDoc(int doc) throws IOException { context.shardTarget(), context.searcher().getIndexReader(), docIdsToLoad, - context.request().allowPartialSearchResults() + context.request().allowPartialSearchResults(), + context.queryResult() ); - if (docsIterator.isTimedOut()) { - context.queryResult().searchTimedOut(true); - } - if (context.isCancelled()) { for (SearchHit hit : hits) { // release all hits that would otherwise become owned and eventually released by SearchHits below diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java index df4e7649ffd3b..4a242f70e8d02 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java @@ -16,6 +16,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.SearchTimeoutException; import java.io.IOException; @@ -30,12 +31,6 @@ */ abstract class FetchPhaseDocsIterator { - private boolean timedOut = false; - - public boolean isTimedOut() { - return timedOut; - } - /** * Called when a new leaf reader is reached * @param ctx the leaf reader for this set of doc ids @@ -53,7 +48,13 @@ public boolean isTimedOut() { /** * Iterate over a set of docsIds within a particular shard and index reader */ - public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader indexReader, int[] docIds, boolean allowPartialResults) { + public final SearchHit[] iterate( + SearchShardTarget shardTarget, + IndexReader indexReader, + int[] docIds, + boolean allowPartialResults, + QuerySearchResult querySearchResult + ) { SearchHit[] searchHits = new SearchHit[docIds.length]; DocIdToIndex[] docs = new DocIdToIndex[docIds.length]; for (int index = 0; index < docIds.length; index++) { @@ -69,12 +70,10 @@ public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader inde int[] docsInLeaf = docIdsInLeaf(0, endReaderIdx, docs, ctx.docBase); try { setNextReader(ctx, docsInLeaf); - } catch (ContextIndexSearcher.TimeExceededException timeExceededException) { - if (allowPartialResults) { - timedOut = true; - return SearchHits.EMPTY; - } - throw new SearchTimeoutException(shardTarget, "Time exceeded"); + } catch (ContextIndexSearcher.TimeExceededException e) { + SearchTimeoutException.handleTimeout(allowPartialResults, shardTarget, querySearchResult); + assert allowPartialResults; + return SearchHits.EMPTY; } for (int i = 0; i < docs.length; i++) { try { @@ -88,15 +87,15 @@ public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader inde currentDoc = docs[i].docId; assert searchHits[docs[i].index] == null; searchHits[docs[i].index] = nextDoc(docs[i].docId); - } catch (ContextIndexSearcher.TimeExceededException timeExceededException) { - if (allowPartialResults) { - timedOut = true; - SearchHit[] partialSearchHits = new SearchHit[i]; - System.arraycopy(searchHits, 0, partialSearchHits, 0, i); - return partialSearchHits; + } catch (ContextIndexSearcher.TimeExceededException e) { + if (allowPartialResults == false) { + purgeSearchHits(searchHits); } - purgeSearchHits(searchHits); - throw new SearchTimeoutException(shardTarget, "Time exceeded"); + SearchTimeoutException.handleTimeout(allowPartialResults, shardTarget, querySearchResult); + assert allowPartialResults; + SearchHit[] partialSearchHits = new SearchHit[i]; + System.arraycopy(searchHits, 0, partialSearchHits, 0, i); + return partialSearchHits; } } } catch (SearchTimeoutException e) { diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 78d90377cdc3f..9f990fbd97cdf 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -169,8 +169,8 @@ public void setProfiler(QueryProfiler profiler) { * Add a {@link Runnable} that will be run on a regular basis while accessing documents in the * DirectoryReader but also while collecting them and check for query cancellation or timeout. */ - public Runnable addQueryCancellation(Runnable action) { - return this.cancellable.add(action); + public void addQueryCancellation(Runnable action) { + this.cancellable.add(action); } /** @@ -425,8 +425,16 @@ public void throwTimeExceededException() { } } - public static class TimeExceededException extends RuntimeException { + /** + * Exception thrown whenever a search timeout occurs. May be thrown by {@link ContextIndexSearcher} or {@link ExitableDirectoryReader}. + */ + public static final class TimeExceededException extends RuntimeException { // This exception should never be re-thrown, but we fill in the stacktrace to be able to trace where it does not get properly caught + + /** + * Created via {@link #throwTimeExceededException()} + */ + private TimeExceededException() {} } @Override @@ -570,14 +578,12 @@ public DirectoryReader getDirectoryReader() { } private static class MutableQueryTimeout implements ExitableDirectoryReader.QueryCancellation { - private final List runnables = new ArrayList<>(); - private Runnable add(Runnable action) { + private void add(Runnable action) { Objects.requireNonNull(action, "cancellation runnable should not be null"); assert runnables.contains(action) == false : "Cancellation runnable already added"; runnables.add(action); - return action; } private void remove(Runnable action) { diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index af65c30b49dcf..3036a295d459a 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -217,10 +217,11 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats()); if (searcher.timeExceeded()) { assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set"; - if (searchContext.request().allowPartialSearchResults() == false) { - throw new SearchTimeoutException(searchContext.shardTarget(), "Time exceeded"); - } - queryResult.searchTimedOut(true); + SearchTimeoutException.handleTimeout( + searchContext.request().allowPartialSearchResults(), + searchContext.shardTarget(), + searchContext.queryResult() + ); } if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); diff --git a/server/src/main/java/org/elasticsearch/search/query/SearchTimeoutException.java b/server/src/main/java/org/elasticsearch/search/query/SearchTimeoutException.java index 0ed64811fee28..e006f176ff91a 100644 --- a/server/src/main/java/org/elasticsearch/search/query/SearchTimeoutException.java +++ b/server/src/main/java/org/elasticsearch/search/query/SearchTimeoutException.java @@ -33,4 +33,17 @@ public SearchTimeoutException(StreamInput in) throws IOException { public RestStatus status() { return RestStatus.GATEWAY_TIMEOUT; } + + /** + * Propagate a timeout according to whether partial search results are allowed or not. + * In case partial results are allowed, a flag will be set to the provided {@link QuerySearchResult} to indicate that there was a + * timeout, but the execution will continue and partial results will be returned to the user. + * When partial results are disallowed, a {@link SearchTimeoutException} will be thrown and returned to the user. + */ + public static void handleTimeout(boolean allowPartialSearchResults, SearchShardTarget target, QuerySearchResult querySearchResult) { + if (allowPartialSearchResults == false) { + throw new SearchTimeoutException(target, "Time exceeded"); + } + querySearchResult.searchTimedOut(true); + } } diff --git a/server/src/main/java/org/elasticsearch/search/rescore/RescorePhase.java b/server/src/main/java/org/elasticsearch/search/rescore/RescorePhase.java index 1227db5d8e1db..7e3646e7689cc 100644 --- a/server/src/main/java/org/elasticsearch/search/rescore/RescorePhase.java +++ b/server/src/main/java/org/elasticsearch/search/rescore/RescorePhase.java @@ -73,10 +73,11 @@ public static void execute(SearchContext context) { } catch (IOException e) { throw new ElasticsearchException("Rescore Phase Failed", e); } catch (ContextIndexSearcher.TimeExceededException e) { - if (context.request().allowPartialSearchResults() == false) { - throw new SearchTimeoutException(context.shardTarget(), "Time exceeded"); - } - context.queryResult().searchTimedOut(true); + SearchTimeoutException.handleTimeout( + context.request().allowPartialSearchResults(), + context.shardTarget(), + context.queryResult() + ); } } diff --git a/server/src/test/java/org/elasticsearch/search/fetch/FetchPhaseDocsIteratorTests.java b/server/src/test/java/org/elasticsearch/search/fetch/FetchPhaseDocsIteratorTests.java index d5e930321db95..c8d1b6721c64b 100644 --- a/server/src/test/java/org/elasticsearch/search/fetch/FetchPhaseDocsIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/search/fetch/FetchPhaseDocsIteratorTests.java @@ -17,6 +17,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -77,7 +78,7 @@ protected SearchHit nextDoc(int doc) { } }; - SearchHit[] hits = it.iterate(null, reader, docs, randomBoolean()); + SearchHit[] hits = it.iterate(null, reader, docs, randomBoolean(), new QuerySearchResult()); assertThat(hits.length, equalTo(docs.length)); for (int i = 0; i < hits.length; i++) { @@ -125,7 +126,10 @@ protected SearchHit nextDoc(int doc) { } }; - Exception e = expectThrows(FetchPhaseExecutionException.class, () -> it.iterate(null, reader, docs, randomBoolean())); + Exception e = expectThrows( + FetchPhaseExecutionException.class, + () -> it.iterate(null, reader, docs, randomBoolean(), new QuerySearchResult()) + ); assertThat(e.getMessage(), containsString("Error running fetch phase for doc [" + badDoc + "]")); assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index a93590d7a5bc2..646c4f8240c3e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -407,7 +407,7 @@ public enum Cap { /** * Supported the text categorization function "CATEGORIZE". */ - CATEGORIZE_V4(Build.current().isSnapshot()), + CATEGORIZE_V4, /** * QSTR function diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index 69e2d1c45aa3c..35aba7665ec87 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -120,10 +120,14 @@ public final PhysicalOperation groupingPhysicalOperation( * - before stats (keep x = a | stats by x) which requires the partial input to use a's channel * - after stats (stats by a | keep x = a) which causes the output layout to refer to the follow-up alias */ + // TODO: This is likely required only for pre-8.14 node compatibility; confirm and remove if possible. + // Since https://github.com/elastic/elasticsearch/pull/104958, it shouldn't be possible to have aliases in the aggregates + // which the groupings refer to. Except for `BY CATEGORIZE(field)`, which remains as alias in the grouping, all aliases + // should've become EVALs before or after the STATS. for (NamedExpression agg : aggregates) { if (agg instanceof Alias a) { if (a.child() instanceof Attribute attr) { - if (groupAttribute.id().equals(attr.id())) { + if (sourceGroupAttribute.id().equals(attr.id())) { groupAttributeLayout.nameIds().add(a.id()); // TODO: investigate whether a break could be used since it shouldn't be possible to have multiple // attributes pointing to the same attribute @@ -133,8 +137,8 @@ public final PhysicalOperation groupingPhysicalOperation( // is in the output form // if the group points to an alias declared in the aggregate, use the alias child as source else if (aggregatorMode.isOutputPartial()) { - if (groupAttribute.semanticEquals(a.toAttribute())) { - groupAttribute = attr; + if (sourceGroupAttribute.semanticEquals(a.toAttribute())) { + sourceGroupAttribute = attr; break; } }