Skip to content

Commit

Permalink
Merge branch 'main' into rcs2-reload
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Dec 7, 2023
2 parents e513798 + 763ef56 commit 5f91ae7
Show file tree
Hide file tree
Showing 34 changed files with 446 additions and 255 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102559.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102559
summary: "Prune unnecessary information from TransportNodesStatsAction.NodeStatsRequest"
area: Network
type: enhancement
issues: [100878]
6 changes: 6 additions & 0 deletions docs/changelog/103116.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103116
summary: Fix `frequent_item_sets` aggregation on empty index
area: Machine Learning
type: bug
issues:
- 103067
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void syncFlush(String syncId) throws IOException {
// make sure that background merges won't happen; otherwise, IndexWriter#hasUncommittedChanges can become true again
forceMerge(false, 1, false, UUIDs.randomBase64UUID());
assertNotNull(indexWriter);
try (ReleasableLock ignored = writeLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
assertThat(getTranslogStats().getUncommittedOperations(), equalTo(0));
Map<String, String> userData = new HashMap<>(getLastCommittedSegmentInfos().userData);
SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ static TransportVersion def(int id) {
public static final TransportVersion MISSED_INDICES_UPDATE_EXCEPTION_ADDED = def(8_558_00_0);
public static final TransportVersion INFERENCE_SERVICE_EMBEDDING_SIZE_ADDED = def(8_559_00_0);
public static final TransportVersion ENRICH_ELASTICSEARCH_VERSION_REMOVED = def(8_560_00_0);
public static final TransportVersion NODE_STATS_REQUEST_SIMPLIFIED = def(8_561_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ public NodesStatsRequest(StreamInput in) throws IOException {
* for all nodes will be returned.
*/
public NodesStatsRequest(String... nodesIds) {
this(new NodesStatsRequestParameters(), nodesIds);
}

public NodesStatsRequest(NodesStatsRequestParameters nodesStatsRequestParameters, String... nodesIds) {
super(nodesIds);
nodesStatsRequestParameters = new NodesStatsRequestParameters();
this.nodesStatsRequestParameters = nodesStatsRequestParameters;
}

/**
Expand Down Expand Up @@ -180,4 +184,7 @@ public void writeTo(StreamOutput out) throws IOException {
nodesStatsRequestParameters.writeTo(out);
}

public NodesStatsRequestParameters getNodesStatsRequestParameters() {
return nodesStatsRequestParameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -27,10 +28,13 @@
import org.elasticsearch.transport.Transports;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.TransportVersions.NODE_STATS_REQUEST_SIMPLIFIED;

public class TransportNodesStatsAction extends TransportNodesAction<
NodesStatsRequest,
NodesStatsResponse,
Expand Down Expand Up @@ -79,11 +83,11 @@ protected NodeStats newNodeResponse(StreamInput in, DiscoveryNode node) throws I
protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task) {
assert task instanceof CancellableTask;

NodesStatsRequest request = nodeStatsRequest.request;
Set<String> metrics = request.requestedMetrics();
final NodesStatsRequestParameters nodesStatsRequestParameters = nodeStatsRequest.getNodesStatsRequestParameters();
Set<String> metrics = nodesStatsRequestParameters.requestedMetrics();
return nodeService.stats(
request.indices(),
request.includeShardsStats(),
nodesStatsRequestParameters.indices(),
nodesStatsRequestParameters.includeShardsStats(),
NodesStatsRequestParameters.Metric.OS.containedIn(metrics),
NodesStatsRequestParameters.Metric.PROCESS.containedIn(metrics),
NodesStatsRequestParameters.Metric.JVM.containedIn(metrics),
Expand All @@ -104,32 +108,54 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task)

public static class NodeStatsRequest extends TransportRequest {

// TODO don't wrap the whole top-level request, it contains heavy and irrelevant DiscoveryNode things; see #100878
NodesStatsRequest request;
private NodesStatsRequestParameters nodesStatsRequestParameters;
private String[] nodesIds;

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
request = new NodesStatsRequest(in);
if (in.getTransportVersion().onOrAfter(NODE_STATS_REQUEST_SIMPLIFIED)) {
this.nodesStatsRequestParameters = new NodesStatsRequestParameters(in);
this.nodesIds = in.readStringArray();
} else {
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(in);
this.nodesStatsRequestParameters = nodesStatsRequest.getNodesStatsRequestParameters();
this.nodesIds = nodesStatsRequest.nodesIds();
}
}

NodeStatsRequest(NodesStatsRequest request) {
this.request = request;
this.nodesStatsRequestParameters = request.getNodesStatsRequestParameters();
this.nodesIds = request.nodesIds();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers) {
@Override
public String getDescription() {
return request.getDescription();
return Strings.format(
"nodes=%s, metrics=%s, flags=%s",
Arrays.toString(nodesIds),
nodesStatsRequestParameters.requestedMetrics().toString(),
Arrays.toString(nodesStatsRequestParameters.indices().getFlags())
);
}
};
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
if (out.getTransportVersion().onOrAfter(NODE_STATS_REQUEST_SIMPLIFIED)) {
this.nodesStatsRequestParameters.writeTo(out);
out.writeStringArrayNullable(nodesIds);
} else {
new NodesStatsRequest(nodesStatsRequestParameters, this.nodesIds).writeTo(out);
}
}

public NodesStatsRequestParameters getNodesStatsRequestParameters() {
return nodesStatsRequestParameters;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove

@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();
Expand Down Expand Up @@ -1941,7 +1941,7 @@ public NoOpResult noOp(final NoOp noOp) throws IOException {
}

private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
assert readLock.isHeldByCurrentThread();
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try (Releasable ignored = noOpKeyedLock.acquire(seqNo)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException {
}

void syncFlush(IndexWriter writer, InternalEngine engine, String syncId) throws IOException {
try (ReleasableLock ignored = engine.writeLock.acquire()) {
try (ReleasableLock ignored = engine.readLock.acquire()) {
Map<String, String> userData = new HashMap<>();
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
userData.put(Engine.SYNC_COMMIT_ID, syncId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1744,8 +1744,8 @@ protected static Map<String, Object> getIndexMapping(String index) throws IOExce

@SuppressWarnings("unchecked")
protected Map<String, Object> getIndexMappingAsMap(String index) throws IOException {
Map<String, Object> indexSettings = getIndexMapping(index);
return (Map<String, Object>) ((Map<String, Object>) indexSettings.get(index)).get("mappings");
Map<String, Object> indexMapping = getIndexMapping(index);
return (Map<String, Object>) ((Map<String, Object>) indexMapping.get(index)).get("mappings");
}

protected static boolean indexExists(String index) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ setup:
id: test-connector
job_type: full
trigger_method: on_demand

- set: { id: sync-job-id-to-delete }

- do:
connector_sync_job.delete:
connector_sync_job_id: $sync-job-id-to-delete

- match: { acknowledged: true }

- do:
connector_sync_job.get:
connector_sync_job_id: $sync-job-id-to-delete
catch: missing

---
"Delete Connector Sync Job - Connector Sync Job does not exist":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ setup:
id: test-connector
job_type: full
trigger_method: on_demand

- set: { id: id }

- do:
connector_sync_job.error:
connector_sync_job_id: $id
Expand All @@ -29,6 +31,13 @@ setup:

- match: { acknowledged: true }

- do:
connector_sync_job.get:
connector_sync_job_id: $id

- match: { error: error }
- match: { status: error }


---
"Set an error for a Connector Sync Job - Connector Sync Job does not exist":
Expand Down
Loading

0 comments on commit 5f91ae7

Please sign in to comment.