diff --git a/README.md b/README.md index a940e17c..71fafa0d 100644 --- a/README.md +++ b/README.md @@ -26,14 +26,14 @@ Please see our [documentation](https://opendistro.github.io/for-elasticsearch-do 1. Check out this package from version control. 1. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package. -1. To build from the command line, set `JAVA_HOME` to point to a JDK >=11 before running `./gradlew`. +1. To build from the command line, set `JAVA_HOME` to point to a JDK >= 12 before running `./gradlew`. ## Build This package is organized into subprojects, most of which contribute JARs to the top-level plugin in the `alerting` subproject. -All subprojects in this package use the [Gradle](https://docs.gradle.org/4.10.2/userguide/userguide.html) build system. Gradle comes with excellent documentation that should be your first stop when trying to figure out how to operate or modify the build. +All subprojects in this package use the [Gradle](https://docs.gradle.org/current/userguide/userguide.html) build system. Gradle comes with excellent documentation that should be your first stop when trying to figure out how to operate or modify the build. However, to build the `alerting` plugin subproject, we also use the Elastic build tools for Gradle. These tools are idiosyncratic and don't always follow the conventions and instructions for building regular Java code using Gradle. Not everything in `alerting` will work the way it's described in the Gradle documentation. If you encounter such a situation, the Elastic build tools [source code](https://github.com/elastic/elasticsearch/tree/master/buildSrc/src/main/groovy/org/elasticsearch/gradle) is your best bet for figuring out what's going on. diff --git a/alerting/build.gradle b/alerting/build.gradle index bb3d146e..cb0860f3 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -102,7 +102,7 @@ task startMultiNode(dependsOn: startMultiNode1) def es_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile es_tmp_dir.mkdirs() -unitTest { +test { systemProperty 'tests.security.manager', 'false' } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt index 4c6ca141..221d0029 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt @@ -21,7 +21,6 @@ import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX -import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull @@ -355,7 +354,7 @@ class MonitorRunner( // spend time reloading the alert and writing it back. when (alert.state) { ACTIVE, ERROR -> { - listOf>(IndexRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE) + listOf>(IndexRequest(AlertIndices.ALERT_INDEX) .routing(alert.monitorId) .source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .id(if (alert.id != Alert.NO_ID) alert.id else null)) @@ -365,9 +364,9 @@ class MonitorRunner( } COMPLETED -> { listOf>( - DeleteRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, alert.id) + DeleteRequest(AlertIndices.ALERT_INDEX, alert.id) .routing(alert.monitorId), - IndexRequest(AlertIndices.HISTORY_WRITE_INDEX, AlertIndices.MAPPING_TYPE) + IndexRequest(AlertIndices.HISTORY_WRITE_INDEX) .routing(alert.monitorId) .source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .id(alert.id)) @@ -440,7 +439,7 @@ class MonitorRunner( } private suspend fun getDestinationInfo(destinationId: String): Destination { - val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, destinationId).routing(destinationId) + val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, destinationId).routing(destinationId) val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) } if (!getResponse.isExists || getResponse.isSourceEmpty) { throw IllegalStateException("Destination document with id $destinationId not found or source is empty") diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertMover.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertMover.kt index 37955b89..a1af6949 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertMover.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertMover.kt @@ -70,9 +70,9 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu val response: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) } // If no alerts are found, simply return - if (response.hits.totalHits == 0L) return + if (response.hits.totalHits.value == 0L) return val indexRequests = response.hits.map { hit -> - IndexRequest(AlertIndices.HISTORY_WRITE_INDEX, AlertIndices.MAPPING_TYPE) + IndexRequest(AlertIndices.HISTORY_WRITE_INDEX) .routing(monitorId) .source(Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version) .copy(state = Alert.State.DELETED) @@ -85,9 +85,10 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu val copyResponse: BulkResponse = client.suspendUntil { bulk(copyRequest, it) } val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map { - DeleteRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, it.id) + DeleteRequest(AlertIndices.ALERT_INDEX, it.id) .routing(monitorId) .version(it.version) + .versionType(VersionType.EXTERNAL_GTE) } val deleteResponse: BulkResponse = client.suspendUntil { bulk(BulkRequest().add(deleteRequests), it) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Alert.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Alert.kt index 57dd40a0..9b48ac20 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Alert.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/Alert.kt @@ -93,14 +93,6 @@ data class Alert( const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND - /** - * The mapping type of [Alert]s in the ES index. - * - * This should go away starting ES 7. We use "_doc" for future compatibility as described here: - * https://www.elastic.co/guide/en/elasticsearch/reference/6.x/removal-of-types.html#_schedule_for_removal_of_mapping_types - */ - const val ALERT_TYPE = "_doc" - @JvmStatic @JvmOverloads @Throws(IOException::class) fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): Alert { diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestAcknowledgeAlertAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestAcknowledgeAlertAction.kt index c432aea4..b26c3b83 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestAcknowledgeAlertAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestAcknowledgeAlertAction.kt @@ -24,6 +24,8 @@ import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ERROR import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger import org.elasticsearch.action.ActionListener import org.elasticsearch.action.bulk.BulkRequest import org.elasticsearch.action.bulk.BulkResponse @@ -54,6 +56,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder import java.io.IOException import java.time.Instant +private val log: Logger = LogManager.getLogger(RestAcknowledgeAlertAction::class.java) + /** * This class consists of the REST handler to acknowledge alerts. * The user provides the monitorID to which these alerts pertain and in the content of the request provides @@ -100,9 +104,8 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController) .filter(QueryBuilders.termsQuery("_id", alertIds)) val searchRequest = SearchRequest() .indices(AlertIndices.ALERT_INDEX) - .types(Alert.ALERT_TYPE) .routing(monitorId) - .source(SearchSourceBuilder().query(queryBuilder).version(true)) + .source(SearchSourceBuilder().query(queryBuilder).version(true).seqNoAndPrimaryTerm(true)) client.search(searchRequest, ActionListener.wrap(::onSearchResponse, ::onFailure)) } @@ -115,9 +118,10 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController) val alert = Alert.parse(xcp, hit.id, hit.version) alerts[alert.id] = alert if (alert.state == ACTIVE) { - listOf(UpdateRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, hit.id) + listOf(UpdateRequest(AlertIndices.ALERT_INDEX, hit.id) .routing(monitorId) - .version(hit.version) + .setIfSeqNo(hit.seqNo) + .setIfPrimaryTerm(hit.primaryTerm) .doc(XContentFactory.jsonBuilder().startObject() .field(Alert.STATE_FIELD, ACKNOWLEDGED.toString()) .optionalTimeField(Alert.ACKNOWLEDGED_TIME_FIELD, Instant.now()) @@ -127,7 +131,7 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController) } } - logger.info("Acknowledging monitor: $monitorId, alerts: ${updateRequests.map { it.id() }}") + log.info("Acknowledging monitor: $monitorId, alerts: ${updateRequests.map { it.id() }}") val request = BulkRequest().add(updateRequests).setRefreshPolicy(refreshPolicy) client.bulk(request, ActionListener.wrap(::onBulkResponse, ::onFailure)) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteDestinationAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteDestinationAction.kt index 7447469d..af2bd453 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteDestinationAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteDestinationAction.kt @@ -49,7 +49,7 @@ class RestDeleteDestinationAction(settings: Settings, controller: RestController return RestChannelConsumer { channel -> val deleteDestinationRequest = - DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, ScheduledJob.SCHEDULED_JOB_TYPE, destinationId) + DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, destinationId) .setRefreshPolicy(refreshPolicy) client.delete(deleteDestinationRequest, RestStatusToXContentListener(channel)) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteMonitorAction.kt index 0250d50f..4eb66b21 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteMonitorAction.kt @@ -52,7 +52,7 @@ class RestDeleteMonitorAction(settings: Settings, controller: RestController) : val refreshPolicy = RefreshPolicy.parse(request.param(REFRESH, RefreshPolicy.IMMEDIATE.value)) return RestChannelConsumer { channel -> - val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, ScheduledJob.SCHEDULED_JOB_TYPE, monitorId) + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) .setRefreshPolicy(refreshPolicy) client.delete(deleteRequest, RestStatusToXContentListener(channel)) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt index 470a641a..3349de8d 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin +import org.apache.logging.log4j.LogManager import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.withContext @@ -42,6 +43,8 @@ import org.elasticsearch.rest.RestStatus import org.elasticsearch.rest.action.RestActionListener import java.time.Instant +private val log = LogManager.getLogger(RestExecuteMonitorAction::class.java) + class RestExecuteMonitorAction( val settings: Settings, restController: RestController, @@ -70,7 +73,7 @@ class RestExecuteMonitorAction( channel.sendResponse(BytesRestResponse(RestStatus.OK, channel.newBuilder().value(response))) } } catch (e: Exception) { - logger.error("Unexpected error running monitor", e) + log.error("Unexpected error running monitor", e) withContext(Dispatchers.IO) { channel.sendResponse(BytesRestResponse(channel, e)) } } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestGetMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestGetMonitorAction.kt index 4ae06083..93faf888 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestGetMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestGetMonitorAction.kt @@ -16,11 +16,12 @@ package com.amazon.opendistroforelasticsearch.alerting.resthandler import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX -import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE import com.amazon.opendistroforelasticsearch.alerting.util._ID import com.amazon.opendistroforelasticsearch.alerting.util._VERSION import com.amazon.opendistroforelasticsearch.alerting.util.context import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin +import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM +import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO import org.elasticsearch.action.get.GetRequest import org.elasticsearch.action.get.GetResponse import org.elasticsearch.client.node.NodeClient @@ -62,9 +63,10 @@ class RestGetMonitorAction(settings: Settings, controller: RestController) : Bas if (monitorId == null || monitorId.isEmpty()) { throw IllegalArgumentException("missing id") } - val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, monitorId) + val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, monitorId) .version(RestActions.parseVersion(request)) .fetchSourceContext(context(request)) + if (request.method() == HEAD) { getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE) } @@ -83,6 +85,8 @@ class RestGetMonitorAction(settings: Settings, controller: RestController) : Bas .startObject() .field(_ID, response.id) .field(_VERSION, response.version) + .field(_SEQ_NO, response.seqNo) + .field(_PRIMARY_TERM, response.primaryTerm) if (!response.isSourceEmpty) { XContentHelper.createParser(channel.request().xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON).use { xcp -> diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexDestinationAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexDestinationAction.kt index 417bec94..0183b681 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexDestinationAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexDestinationAction.kt @@ -23,8 +23,12 @@ import com.amazon.opendistroforelasticsearch.alerting.util._ID import com.amazon.opendistroforelasticsearch.alerting.util._VERSION import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX -import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin +import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM +import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO +import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM +import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO +import org.apache.logging.log4j.LogManager import org.elasticsearch.action.ActionListener import org.elasticsearch.action.admin.indices.create.CreateIndexResponse import org.elasticsearch.action.get.GetRequest @@ -38,6 +42,7 @@ import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentParserUtils +import org.elasticsearch.index.seqno.SequenceNumbers import org.elasticsearch.rest.BaseRestHandler import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer import org.elasticsearch.rest.BytesRestResponse @@ -46,10 +51,11 @@ import org.elasticsearch.rest.RestController import org.elasticsearch.rest.RestRequest import org.elasticsearch.rest.RestResponse import org.elasticsearch.rest.RestStatus -import org.elasticsearch.rest.action.RestActions import org.elasticsearch.rest.action.RestResponseListener import java.io.IOException +private val log = LogManager.getLogger(RestIndexDestinationAction::class.java) + /** * Rest handlers to create and update Destination */ @@ -85,14 +91,15 @@ class RestIndexDestinationAction( val xcp = request.contentParser() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) val destination = Destination.parse(xcp, id) - val destintaionVersion = RestActions.parseVersion(request) + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) + val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) val refreshPolicy = if (request.hasParam(REFRESH)) { WriteRequest.RefreshPolicy.parse(request.param(REFRESH)) } else { WriteRequest.RefreshPolicy.IMMEDIATE } return RestChannelConsumer { channel -> - IndexDestinationHandler(client, channel, id, destintaionVersion, refreshPolicy, destination).start() + IndexDestinationHandler(client, channel, id, seqNo, primaryTerm, refreshPolicy, destination).start() } } @@ -100,7 +107,8 @@ class RestIndexDestinationAction( client: NodeClient, channel: RestChannel, private val destinationId: String, - private val destinationVersion: Long, + private val seqNo: Long, + private val primaryTerm: Long, private val refreshPolicy: WriteRequest.RefreshPolicy, private var newDestination: Destination ) : AsyncActionHandler(client, channel) { @@ -116,10 +124,11 @@ class RestIndexDestinationAction( private fun prepareDestinationIndexing() { if (channel.request().method() == RestRequest.Method.PUT) updateDestination() else { - val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE) + val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(refreshPolicy) .source(newDestination.toXContent(channel.newBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .version(destinationVersion) + .setIfSeqNo(seqNo) + .setIfPrimaryTerm(primaryTerm) .timeout(indexTimeout) client.index(indexRequest, indexDestinationResponse()) } @@ -127,17 +136,17 @@ class RestIndexDestinationAction( private fun onCreateMappingsResponse(response: CreateIndexResponse) { if (response.isAcknowledged) { - logger.info("Created ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.") + log.info("Created ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.") prepareDestinationIndexing() } else { - logger.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.") + log.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.") channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, response.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))) } } private fun updateDestination() { - val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, destinationId) + val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, destinationId) client.get(getRequest, ActionListener.wrap(::onGetResponse, ::onFailure)) } @@ -149,11 +158,12 @@ class RestIndexDestinationAction( .endObject() return channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, response.toXContent(builder, ToXContent.EMPTY_PARAMS))) } - val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE) + val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(refreshPolicy) .source(newDestination.toXContent(channel.newBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) .id(destinationId) - .version(destinationVersion) + .setIfSeqNo(seqNo) + .setIfPrimaryTerm(primaryTerm) .timeout(indexTimeout) return client.index(indexRequest, indexDestinationResponse()) } @@ -174,6 +184,8 @@ class RestIndexDestinationAction( .startObject() .field(_ID, response.id) .field(_VERSION, response.version) + .field(_SEQ_NO, response.seqNo) + .field(_PRIMARY_TERM, response.primaryTerm) .field("destination", newDestination) .endObject() diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt index 9698f2f3..29a04c8f 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -17,7 +17,6 @@ package com.amazon.opendistroforelasticsearch.alerting.resthandler import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX -import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT @@ -26,6 +25,11 @@ import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH import com.amazon.opendistroforelasticsearch.alerting.util._ID import com.amazon.opendistroforelasticsearch.alerting.util._VERSION import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin +import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM +import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO +import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM +import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO +import org.apache.logging.log4j.LogManager import org.elasticsearch.action.ActionListener import org.elasticsearch.action.admin.indices.create.CreateIndexResponse import org.elasticsearch.action.get.GetRequest @@ -45,6 +49,7 @@ import org.elasticsearch.common.xcontent.XContentParser.Token import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.index.query.QueryBuilders +import org.elasticsearch.index.seqno.SequenceNumbers import org.elasticsearch.rest.BaseRestHandler import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer import org.elasticsearch.rest.BytesRestResponse @@ -55,12 +60,13 @@ import org.elasticsearch.rest.RestRequest.Method.POST import org.elasticsearch.rest.RestRequest.Method.PUT import org.elasticsearch.rest.RestResponse import org.elasticsearch.rest.RestStatus -import org.elasticsearch.rest.action.RestActions import org.elasticsearch.rest.action.RestResponseListener import org.elasticsearch.search.builder.SearchSourceBuilder import java.io.IOException import java.time.Instant +private val log = LogManager.getLogger(RestIndexMonitorAction::class.java) + /** * Rest handlers to create and update monitors. */ @@ -101,14 +107,15 @@ class RestIndexMonitorAction( val xcp = request.contentParser() ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now()) - val monitorVersion = RestActions.parseVersion(request) + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) + val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) val refreshPolicy = if (request.hasParam(REFRESH)) { WriteRequest.RefreshPolicy.parse(request.param(REFRESH)) } else { WriteRequest.RefreshPolicy.IMMEDIATE } return RestChannelConsumer { channel -> - IndexMonitorHandler(client, channel, id, monitorVersion, refreshPolicy, monitor).start() + IndexMonitorHandler(client, channel, id, seqNo, primaryTerm, refreshPolicy, monitor).start() } } @@ -116,7 +123,8 @@ class RestIndexMonitorAction( client: NodeClient, channel: RestChannel, private val monitorId: String, - private val monitorVersion: Long, + private val seqNo: Long, + private val primaryTerm: Long, private val refreshPolicy: WriteRequest.RefreshPolicy, private var newMonitor: Monitor ) : AsyncActionHandler(client, channel) { @@ -139,7 +147,6 @@ class RestIndexMonitorAction( val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE)) val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout) val searchRequest = SearchRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) - .types(ScheduledJob.SCHEDULED_JOB_TYPE) .source(searchSource) client.search(searchRequest, ActionListener.wrap(::onSearchResponse, ::onFailure)) } @@ -148,8 +155,8 @@ class RestIndexMonitorAction( * After searching for all existing monitors we validate the system can support another monitor to be created. */ private fun onSearchResponse(response: SearchResponse) { - if (response.hits.totalHits >= maxMonitors) { - logger.error("This request would create more than the allowed monitors [$maxMonitors].") + if (response.hits.totalHits.value >= maxMonitors) { + log.error("This request would create more than the allowed monitors [$maxMonitors].") onFailure(IllegalArgumentException("This request would create more than the allowed monitors [$maxMonitors].")) } else { indexMonitor() @@ -158,26 +165,27 @@ class RestIndexMonitorAction( private fun onCreateMappingsResponse(response: CreateIndexResponse) { if (response.isAcknowledged) { - logger.info("Created ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.") + log.info("Created ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.") prepareMonitorIndexing() } else { - logger.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.") + log.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.") channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS))) } } private fun indexMonitor() { - val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE) + val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(refreshPolicy) .source(newMonitor.toXContentWithType(channel.newBuilder())) - .version(monitorVersion) + .setIfSeqNo(seqNo) + .setIfPrimaryTerm(primaryTerm) .timeout(indexTimeout) client.index(indexRequest, indexMonitorResponse()) } private fun updateMonitor() { - val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, monitorId) + val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, monitorId) client.get(getRequest, ActionListener.wrap(::onGetResponse, ::onFailure)) } @@ -195,11 +203,12 @@ class RestIndexMonitorAction( // If both are enabled, use the current existing monitor enabled time, otherwise the next execution will be // incorrect. if (newMonitor.enabled && currentMonitor.enabled) newMonitor = newMonitor.copy(enabledTime = currentMonitor.enabledTime) - val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE) + val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(refreshPolicy) .source(newMonitor.toXContentWithType(channel.newBuilder())) .id(monitorId) - .version(monitorVersion) + .setIfSeqNo(seqNo) + .setIfPrimaryTerm(primaryTerm) .timeout(indexTimeout) return client.index(indexRequest, indexMonitorResponse()) } @@ -216,6 +225,8 @@ class RestIndexMonitorAction( .startObject() .field(_ID, response.id) .field(_VERSION, response.version) + .field(_SEQ_NO, response.seqNo) + .field(_PRIMARY_TERM, response.primaryTerm) .field("monitor", newMonitor) .endObject() diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestSearchMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestSearchMonitorAction.kt index b7bf5ae3..1b364bdf 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestSearchMonitorAction.kt @@ -16,7 +16,6 @@ package com.amazon.opendistroforelasticsearch.alerting.resthandler import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX -import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.util.context import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin @@ -67,10 +66,11 @@ class RestSearchMonitorAction(settings: Settings, controller: RestController) : // searched. searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()) .filter(QueryBuilders.termQuery(Monitor.MONITOR_TYPE + ".type", Monitor.MONITOR_TYPE))) + .seqNoAndPrimaryTerm(true) + .version(true) val searchRequest = SearchRequest() .source(searchSourceBuilder) .indices(SCHEDULED_JOBS_INDEX) - .types(SCHEDULED_JOB_TYPE) return RestChannelConsumer { channel -> client.search(searchRequest, searchMonitorResponse(channel)) } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/RestHandlerUtils.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/RestHandlerUtils.kt index 9297958f..167e36f9 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/RestHandlerUtils.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/RestHandlerUtils.kt @@ -36,4 +36,8 @@ fun context(request: RestRequest): FetchSourceContext? { const val _ID = "_id" const val _VERSION = "_version" +const val _SEQ_NO = "_seq_no" +const val IF_SEQ_NO = "if_seq_no" +const val _PRIMARY_TERM = "_primary_term" +const val IF_PRIMARY_TERM = "if_primary_term" const val REFRESH = "refresh" diff --git a/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/alerts/alert_mapping.json index 844c37b7..0666f511 100644 --- a/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/com/amazon/opendistroforelasticsearch/alerting/alerts/alert_mapping.json @@ -1,81 +1,79 @@ { - "_doc": { - "dynamic": "strict", - "_routing": { - "required" : true + "dynamic": "strict", + "_routing": { + "required": true + }, + "properties": { + "monitor_id": { + "type": "keyword" }, - "properties" : { - "monitor_id" : { - "type" : "keyword" - }, - "monitor_version" : { - "type" : "long" - }, - "severity": { - "type": "keyword" - }, - "monitor_name" : { - "type" : "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above" : 256 - } + "monitor_version": { + "type": "long" + }, + "severity": { + "type": "keyword" + }, + "monitor_name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 } - }, - "trigger_id" : { - "type" : "keyword" - }, - "trigger_name" : { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above" : 256 - } + } + }, + "trigger_id": { + "type": "keyword" + }, + "trigger_name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 } - }, - "state" : { - "type": "keyword" - }, - "start_time" : { - "type" : "date" - }, - "last_notification_time" : { - "type" : "date" - }, - "acknowledged_time" : { - "type" : "date" - }, - "end_time" : { - "type" : "date" - }, - "error_message" : { - "type" : "text" - }, - "alert_history": { - "type": "nested", - "properties" : { - "timestamp": { - "type": "date" - }, - "message": { - "type": "text" - } + } + }, + "state": { + "type": "keyword" + }, + "start_time": { + "type": "date" + }, + "last_notification_time": { + "type": "date" + }, + "acknowledged_time": { + "type": "date" + }, + "end_time": { + "type": "date" + }, + "error_message": { + "type": "text" + }, + "alert_history": { + "type": "nested", + "properties": { + "timestamp": { + "type": "date" + }, + "message": { + "type": "text" } - }, - "action_execution_results": { - "type": "nested", - "properties": { - "action_id": { - "type": "keyword" - }, - "last_execution_time": { - "type": "date" - }, - "throttled_count": { - "type": "integer" - } + } + }, + "action_execution_results": { + "type": "nested", + "properties": { + "action_id": { + "type": "keyword" + }, + "last_execution_time": { + "type": "date" + }, + "throttled_count": { + "type": "integer" } } } diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingRestTestCase.kt index 383ade1b..6ae1cda4 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingRestTestCase.kt @@ -224,11 +224,9 @@ abstract class AlertingRestTestCase : ESRestTestCase() { /** A test index that can be used across tests. Feel free to add new fields but don't remove any. */ protected fun createTestIndex(index: String = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)): String { createIndex(index, Settings.EMPTY, """ - "_doc" : { - "properties" : { - "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" } - } - } + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" } + } """.trimIndent()) return index } diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt index 15104302..3fac537d 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt @@ -71,8 +71,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") - val total = searchResult.stringMap("hits")?.get("total") as Int - assertEquals("Incorrect search result", 1, total) + val total = searchResult.stringMap("hits")?.get("total") as Map + assertEquals("Incorrect search result", 1, total["value"]) } fun `test execute monitor not triggered`() { diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt index e8179907..5646e2e3 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt @@ -27,6 +27,8 @@ import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResul import com.amazon.opendistroforelasticsearch.alerting.model.action.Throttle import org.apache.http.Header import org.apache.http.HttpEntity +import org.elasticsearch.client.Request +import org.elasticsearch.client.RequestOptions import org.elasticsearch.client.Response import org.elasticsearch.client.RestClient import org.elasticsearch.common.UUIDs @@ -130,11 +132,15 @@ fun RestClient.makeRequest( entity: HttpEntity? = null, vararg headers: Header ): Response { - return if (entity != null) { - performRequest(method, endpoint, params, entity, *headers) - } else { - performRequest(method, endpoint, params, *headers) + val request = Request(method, endpoint) + val options = RequestOptions.DEFAULT.toBuilder() + headers.forEach { options.addHeader(it.name, it.value) } + request.options = options.build() + params.forEach { request.addParameter(it.key, it.value) } + if (entity != null) { + request.entity = entity } + return performRequest(request) } /** @@ -149,9 +155,12 @@ fun RestClient.makeRequest( entity: HttpEntity? = null, vararg headers: Header ): Response { - return if (entity != null) { - performRequest(method, endpoint, emptyMap(), entity, *headers) - } else { - performRequest(method, endpoint, emptyMap(), *headers) + val request = Request(method, endpoint) + val options = RequestOptions.DEFAULT.toBuilder() + headers.forEach { options.addHeader(it.name, it.value) } + request.options = options.build() + if (entity != null) { + request.entity = entity } + return performRequest(request) } diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/MonitorRestApiIT.kt index 94781e17..54e3f938 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/MonitorRestApiIT.kt @@ -231,8 +231,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { NStringEntity(search, ContentType.APPLICATION_JSON)) assertEquals("Search monitor failed", RestStatus.OK, searchResponse.restStatus()) val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) - val hits = xcp.map()["hits"]!! as Map - val numberDocsFound = hits["total"] + val hits = xcp.map()["hits"]!! as Map> + val numberDocsFound = hits["total"]?.get("value") assertEquals("Monitor not found during search", 1, numberDocsFound) } @@ -245,8 +245,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { NStringEntity(search, ContentType.APPLICATION_JSON)) assertEquals("Search monitor failed", RestStatus.OK, searchResponse.restStatus()) val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) - val hits = xcp.map()["hits"]!! as Map - val numberDocsFound = hits["total"] + val hits = xcp.map()["hits"]!! as Map> + val numberDocsFound = hits["total"]?.get("value") assertEquals("Monitor not found during search", 1, numberDocsFound) } @@ -263,8 +263,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { NStringEntity(search, ContentType.APPLICATION_JSON)) assertEquals("Search monitor failed", RestStatus.OK, searchResponse.restStatus()) val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) - val hits = xcp.map()["hits"]!! as Map - val numberDocsFound = hits["total"] + val hits = xcp.map()["hits"]!! as Map> + val numberDocsFound = hits["total"]?.get("value") assertEquals("Monitor found during search when no document present.", 0, numberDocsFound) } @@ -281,8 +281,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Search monitor failed", RestStatus.OK, searchResponse.restStatus()) val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) - val hits = xcp.map()["hits"] as Map - val numberDocsFound = hits["total"] + val hits = xcp.map()["hits"] as Map> + val numberDocsFound = hits["total"]?.get("value") assertEquals("Monitor not found during search", 1, numberDocsFound) val searchHits = hits["hits"] as List @@ -302,8 +302,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Search monitor failed", RestStatus.OK, searchResponse.restStatus()) val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) - val hits = xcp.map()["hits"] as Map - val numberDocsFound = hits["total"] + val hits = xcp.map()["hits"] as Map> + val numberDocsFound = hits["total"]?.get("value") assertEquals("Monitor not found during search", 1, numberDocsFound) val searchHits = hits["hits"] as List @@ -341,7 +341,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test mappings after monitor creation`() { createRandomMonitor(refresh = true) - val response = client().makeRequest("GET", "/${ScheduledJob.SCHEDULED_JOBS_INDEX}/_mapping/_doc") + val response = client().makeRequest("GET", "/${ScheduledJob.SCHEDULED_JOBS_INDEX}/_mapping") val parserMap = createParser(XContentType.JSON.xContent(), response.entity.content).map() as Map> val mappingsMap = parserMap[ScheduledJob.SCHEDULED_JOBS_INDEX]!!["mappings"] as Map val expected = createParser( @@ -426,7 +426,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test update monitor with wrong version`() { val monitor = createRandomMonitor(refresh = true) try { - client().makeRequest("PUT", "${monitor.relativeUrl()}?refresh=true&version=1234", + client().makeRequest("PUT", "${monitor.relativeUrl()}?refresh=true&if_seq_no=1234&if_primary_term=1234", emptyMap(), monitor.toHttpEntity()) fail("expected 409 ResponseException") } catch (e: ResponseException) { diff --git a/build-tools/esplugin-coverage.gradle b/build-tools/esplugin-coverage.gradle index 4aba2d80..ae6793d1 100644 --- a/build-tools/esplugin-coverage.gradle +++ b/build-tools/esplugin-coverage.gradle @@ -53,10 +53,6 @@ task dummyIntegTest(type: Test) { } } -unitTest { - jvmArg dummyTest.jacoco.getAsJvmArg() -} - integTestCluster { jvmArgs += " ${dummyIntegTest.jacoco.getAsJvmArg()}" systemProperty 'com.sun.management.jmxremote', "true" diff --git a/build.gradle b/build.gradle index 8cd05370..e10b8111 100644 --- a/build.gradle +++ b/build.gradle @@ -17,7 +17,7 @@ buildscript { apply from: 'build-tools/repositories.gradle' ext { - es_version = '6.7.1' + es_version = '7.0.1' kotlin_version = '1.3.21' } @@ -41,7 +41,7 @@ apply plugin: 'jacoco' apply from: 'build-tools/merged-coverage.gradle' ext { - opendistroVersion = '0.9.0' + opendistroVersion = '1.0.0' isSnapshot = "true" == System.getProperty("build.snapshot", "true") } diff --git a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/action/node/ScheduledJobsStatsAction.kt b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/action/node/ScheduledJobsStatsAction.kt index 140e03e8..a31df753 100644 --- a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/action/node/ScheduledJobsStatsAction.kt +++ b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/action/node/ScheduledJobsStatsAction.kt @@ -16,18 +16,13 @@ package com.amazon.opendistroforelasticsearch.alerting.core.action.node import org.elasticsearch.action.Action -import org.elasticsearch.client.ElasticsearchClient -class ScheduledJobsStatsAction : Action(NAME) { +class ScheduledJobsStatsAction : Action(NAME) { companion object { val INSTANCE = ScheduledJobsStatsAction() const val NAME = "cluster:admin/opendistro/_scheduled_jobs/stats" } - override fun newRequestBuilder(client: ElasticsearchClient): ScheduledJobsStatsRequestBuilder { - return ScheduledJobsStatsRequestBuilder(client, this) - } - override fun newResponse(): ScheduledJobsStatsResponse { return ScheduledJobsStatsResponse() } diff --git a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt index a47a92b8..e77ef20a 100644 --- a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt +++ b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt @@ -26,12 +26,10 @@ import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.nodes.BaseNodeRequest import org.elasticsearch.action.support.nodes.TransportNodesAction import org.elasticsearch.cluster.health.ClusterIndexHealth -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.inject.Inject import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.io.stream.StreamOutput -import org.elasticsearch.common.settings.Settings import org.elasticsearch.threadpool.ThreadPool import org.elasticsearch.transport.TransportService import java.io.IOException @@ -47,23 +45,19 @@ class ScheduledJobsStatsTransportAction : TransportNodesAction