Skip to content

Commit

Permalink
Merge branch 'main' into fix-license-version-check
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Nov 20, 2023
2 parents 2b0ccb4 + 36a2f9b commit 951ed0e
Show file tree
Hide file tree
Showing 184 changed files with 5,432 additions and 1,694 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

package org.elasticsearch.benchmark.compute.operator;

import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
Expand All @@ -19,6 +22,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
Expand All @@ -30,14 +34,16 @@
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.BlockReaderFactories;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.lookup.SearchLookup;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -56,7 +62,9 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -93,18 +101,113 @@ public class ValuesSourceReaderBenchmark {
}
}

private static BlockLoader blockLoader(String name) {
private static List<ValuesSourceReaderOperator.FieldInfo> fields(String name) {
return switch (name) {
case "long" -> numericBlockLoader(name, NumberFieldMapper.NumberType.LONG);
case "int" -> numericBlockLoader(name, NumberFieldMapper.NumberType.INTEGER);
case "double" -> numericBlockLoader(name, NumberFieldMapper.NumberType.DOUBLE);
case "keyword" -> new KeywordFieldMapper.KeywordFieldType(name).blockLoader(null);
default -> throw new IllegalArgumentException("can't read [" + name + "]");
case "3_stored_keywords" -> List.of(
new ValuesSourceReaderOperator.FieldInfo("keyword_1", List.of(blockLoader("stored_keyword_1"))),
new ValuesSourceReaderOperator.FieldInfo("keyword_2", List.of(blockLoader("stored_keyword_2"))),
new ValuesSourceReaderOperator.FieldInfo("keyword_3", List.of(blockLoader("stored_keyword_3")))
);
default -> List.of(new ValuesSourceReaderOperator.FieldInfo(name, List.of(blockLoader(name))));
};
}

private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.NumberType numberType) {
return new NumberFieldMapper.NumberFieldType(name, numberType).blockLoader(null);
enum Where {
DOC_VALUES,
SOURCE,
STORED;
}

private static BlockLoader blockLoader(String name) {
Where where = Where.DOC_VALUES;
if (name.startsWith("stored_")) {
name = name.substring("stored_".length());
where = Where.STORED;
} else if (name.startsWith("source_")) {
name = name.substring("source_".length());
where = Where.SOURCE;
}
switch (name) {
case "long":
return numericBlockLoader(name, where, NumberFieldMapper.NumberType.LONG);
case "int":
return numericBlockLoader(name, where, NumberFieldMapper.NumberType.INTEGER);
case "double":
return numericBlockLoader(name, where, NumberFieldMapper.NumberType.DOUBLE);
case "keyword":
name = "keyword_1";
}
if (name.startsWith("keyword")) {
boolean syntheticSource = false;
FieldType ft = new FieldType(KeywordFieldMapper.Defaults.FIELD_TYPE);
switch (where) {
case DOC_VALUES:
break;
case SOURCE:
ft.setDocValuesType(DocValuesType.NONE);
break;
case STORED:
ft.setStored(true);
ft.setDocValuesType(DocValuesType.NONE);
syntheticSource = true;
break;
}
ft.freeze();
return new KeywordFieldMapper.KeywordFieldType(
name,
ft,
Lucene.KEYWORD_ANALYZER,
Lucene.KEYWORD_ANALYZER,
Lucene.KEYWORD_ANALYZER,
new KeywordFieldMapper.Builder(name, IndexVersion.current()).docValues(ft.docValuesType() != DocValuesType.NONE),
syntheticSource
).blockLoader(new MappedFieldType.BlockLoaderContext() {
@Override
public String indexName() {
return "benchmark";
}

@Override
public SearchLookup lookup() {
throw new UnsupportedOperationException();
}

@Override
public Set<String> sourcePaths(String name) {
return Set.of(name);
}
});
}
throw new IllegalArgumentException("can't read [" + name + "]");
}

private static BlockLoader numericBlockLoader(String name, Where where, NumberFieldMapper.NumberType numberType) {
boolean stored = false;
boolean docValues = true;
switch (where) {
case DOC_VALUES:
break;
case SOURCE:
stored = true;
docValues = false;
break;
case STORED:
throw new UnsupportedOperationException();
}
return new NumberFieldMapper.NumberFieldType(
name,
numberType,
true,
stored,
docValues,
true,
null,
Map.of(),
null,
false,
null,
null
).blockLoader(null);
}

/**
Expand All @@ -122,7 +225,7 @@ private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.Num
@Param({ "in_order", "shuffled", "shuffled_singles" })
public String layout;

@Param({ "long", "int", "double", "keyword" })
@Param({ "long", "int", "double", "keyword", "stored_keyword", "3_stored_keywords" })
public String name;

private Directory directory;
Expand All @@ -134,9 +237,9 @@ private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.Num
public void benchmark() {
ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(
BlockFactory.getNonBreakingInstance(),
List.of(BlockReaderFactories.loaderToFactory(reader, blockLoader(name))),
0,
name
fields(name),
List.of(reader),
0
);
long sum = 0;
for (Page page : pages) {
Expand All @@ -160,7 +263,7 @@ public void benchmark() {
sum += (long) values.getDouble(p);
}
}
case "keyword" -> {
case "keyword", "stored_keyword" -> {
BytesRef scratch = new BytesRef();
BytesRefVector values = op.getOutput().<BytesRefBlock>getBlock(1).asVector();
for (int p = 0; p < values.getPositionCount(); p++) {
Expand All @@ -170,21 +273,59 @@ public void benchmark() {
sum += Integer.parseInt(r.utf8ToString());
}
}
case "3_stored_keywords" -> {
BytesRef scratch = new BytesRef();
Page out = op.getOutput();
for (BytesRefVector values : new BytesRefVector[] {
out.<BytesRefBlock>getBlock(1).asVector(),
out.<BytesRefBlock>getBlock(2).asVector(),
out.<BytesRefBlock>getBlock(3).asVector() }) {

for (int p = 0; p < values.getPositionCount(); p++) {
BytesRef r = values.getBytesRef(p, scratch);
r.offset++;
r.length--;
sum += Integer.parseInt(r.utf8ToString());
}
}
}
}
}
long expected;
if (name.equals("keyword")) {
expected = 0;
for (int i = 0; i < INDEX_SIZE; i++) {
expected += i % 1000;
}
} else {
expected = INDEX_SIZE;
expected = expected * (expected - 1) / 2;
long expected = 0;
switch (name) {
case "keyword", "stored_keyword":
for (int i = 0; i < INDEX_SIZE; i++) {
expected += i % 1000;
}
break;
case "3_stored_keywords":
for (int i = 0; i < INDEX_SIZE; i++) {
expected += 3 * (i % 1000);
}
break;
default:
expected = INDEX_SIZE;
expected = expected * (expected - 1) / 2;
}
if (expected != sum) {
throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
}
boolean foundStoredFieldLoader = false;
ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
for (Map.Entry<String, Integer> e : status.readersBuilt().entrySet()) {
if (e.getKey().indexOf("stored_fields") >= 0) {
foundStoredFieldLoader = true;
}
}
if (name.indexOf("stored") >= 0) {
if (foundStoredFieldLoader == false) {
throw new AssertionError("expected to use a stored field loader but only had: " + status.readersBuilt());
}
} else {
if (foundStoredFieldLoader) {
throw new AssertionError("expected not to use a stored field loader but only had: " + status.readersBuilt());
}
}
}

@Setup
Expand All @@ -195,15 +336,23 @@ public void setup() throws IOException {

private void setupIndex() throws IOException {
directory = new ByteBuffersDirectory();
FieldType keywordFieldType = new FieldType(KeywordFieldMapper.Defaults.FIELD_TYPE);
keywordFieldType.setStored(true);
keywordFieldType.freeze();
try (IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE))) {
for (int i = 0; i < INDEX_SIZE; i++) {
String c = Character.toString('a' - ((i % 1000) % 26) + 26);
iw.addDocument(
List.of(
new NumericDocValuesField("long", i),
new StoredField("long", i),
new NumericDocValuesField("int", i),
new StoredField("int", i),
new NumericDocValuesField("double", NumericUtils.doubleToSortableLong(i)),
new KeywordFieldMapper.KeywordField("keyword", new BytesRef(c + i % 1000), KeywordFieldMapper.Defaults.FIELD_TYPE)
new StoredField("double", (double) i),
new KeywordFieldMapper.KeywordField("keyword_1", new BytesRef(c + i % 1000), keywordFieldType),
new KeywordFieldMapper.KeywordField("keyword_2", new BytesRef(c + i % 1000), keywordFieldType),
new KeywordFieldMapper.KeywordField("keyword_3", new BytesRef(c + i % 1000), keywordFieldType)
)
);
if (i % COMMIT_INTERVAL == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;
import static org.elasticsearch.client.RestClient.IGNORE_RESPONSE_CODES_PARAM;

/**
* Client that connects to an Elasticsearch cluster through HTTP.
Expand All @@ -106,6 +107,9 @@
* Requests can be traced by enabling trace logging for "tracer". The trace logger outputs requests and responses in curl format.
*/
public class RestClient implements Closeable {

public static final String IGNORE_RESPONSE_CODES_PARAM = "ignore";

private static final Log logger = LogFactory.getLog(RestClient.class);

private final CloseableHttpAsyncClient client;
Expand Down Expand Up @@ -780,8 +784,8 @@ private class InternalRequest {
this.request = request;
Map<String, String> params = new HashMap<>(request.getParameters());
params.putAll(request.getOptions().getParameters());
// ignore is a special parameter supported by the clients, shouldn't be sent to es
String ignoreString = params.remove("ignore");
// IGNORE_RESPONSE_CODES_PARAM is a special parameter supported by the clients, shouldn't be sent to es
String ignoreString = params.remove(IGNORE_RESPONSE_CODES_PARAM);
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public void testErrorStatusCodes() throws Exception {
try {
Request request = new Request(method, "/" + errorStatusCode);
if (false == ignoreParam.isEmpty()) {
// literal "ignore" rather than IGNORE_RESPONSE_CODES_PARAM since this is something on which callers might rely
request.addParameter("ignore", ignoreParam);
}
Response response = restClient.performRequest(request);
Expand Down Expand Up @@ -568,6 +569,7 @@ private HttpUriRequest performRandomRequest(String method) throws Exception {
if (randomBoolean()) {
ignore += "," + randomFrom(RestClientTestUtil.getAllErrorStatusCodes());
}
// literal "ignore" rather than IGNORE_RESPONSE_CODES_PARAM since this is something on which callers might rely
request.addParameter("ignore", ignore);
}
URI uri = uriBuilder.build();
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/100408.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100408
summary: "ESQL: Make blocks ref counted"
area: ES|QL
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/101845.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101845
summary: Introduce new endpoint to expose data stream lifecycle stats
area: Data streams
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/102138.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102138
summary: Skip shards that don't match the source query during checkpointing
area: Transform
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/102192.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102192
summary: "ESQL: Load more than one field at once"
area: ES|QL
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/102282.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 102282
summary: "ES|QL: Fix drop of renamed grouping"
area: ES|QL
type: bug
issues:
- 102121
5 changes: 5 additions & 0 deletions docs/changelog/102292.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102292
summary: Consider duplicate stacktraces in custom index
area: Application
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/102350.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 102350
summary: "ESQL: Fix rare bug with empty string"
area: ES|QL
type: bug
issues:
- 101969
4 changes: 4 additions & 0 deletions docs/reference/data-streams/data-stream-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ preview:[]
preview:[]
* <<data-streams-explain-lifecycle,Explain data stream lifecycle>>
preview:[]
* <<data-streams-get-lifecycle-stats, Get data stream lifecycle stats>>
preview:[]

The following API is available for <<tsds,time series data streams>>:

Expand Down Expand Up @@ -55,4 +57,6 @@ include::{es-repo-dir}/data-streams/lifecycle/apis/delete-lifecycle.asciidoc[]

include::{es-repo-dir}/data-streams/lifecycle/apis/explain-lifecycle.asciidoc[]

include::{es-repo-dir}/data-streams/lifecycle/apis/get-lifecycle-stats.asciidoc[]

include::{es-repo-dir}/indices/downsample-data-stream.asciidoc[]
Loading

0 comments on commit 951ed0e

Please sign in to comment.