Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Support ES 7.0 Issue #39 #41

Merged
merged 11 commits into from
May 15, 2019
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ Please see our [documentation](https://opendistro.github.io/for-elasticsearch-do

1. Check out this package from version control.
1. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package.
1. To build from the command line, set `JAVA_HOME` to point to a JDK >=11 before running `./gradlew`.
1. To build from the command line, set `JAVA_HOME` to point to a JDK >= 12 before running `./gradlew`.


## Build

This package is organized into subprojects, most of which contribute JARs to the top-level plugin in the `alerting` subproject.

All subprojects in this package use the [Gradle](https://docs.gradle.org/4.10.2/userguide/userguide.html) build system. Gradle comes with excellent documentation that should be your first stop when trying to figure out how to operate or modify the build.
All subprojects in this package use the [Gradle](https://docs.gradle.org/current/userguide/userguide.html) build system. Gradle comes with excellent documentation that should be your first stop when trying to figure out how to operate or modify the build.

However, to build the `alerting` plugin subproject, we also use the Elastic build tools for Gradle. These tools are idiosyncratic and don't always follow the conventions and instructions for building regular Java code using Gradle. Not everything in `alerting` will work the way it's described in the Gradle documentation. If you encounter such a situation, the Elastic build tools [source code](https://github.com/elastic/elasticsearch/tree/master/buildSrc/src/main/groovy/org/elasticsearch/gradle) is your best bet for figuring out what's going on.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull
Expand Down Expand Up @@ -326,7 +325,7 @@ class MonitorRunner(
// spend time reloading the alert and writing it back.
when (alert.state) {
ACTIVE, ERROR -> {
listOf<DocWriteRequest<*>>(IndexRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE)
listOf<DocWriteRequest<*>>(IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null))
Expand All @@ -336,9 +335,9 @@ class MonitorRunner(
}
COMPLETED -> {
listOf<DocWriteRequest<*>>(
DeleteRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, alert.id)
DeleteRequest(AlertIndices.ALERT_INDEX, alert.id)
.routing(alert.monitorId),
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX, AlertIndices.MAPPING_TYPE)
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(alert.id))
Expand Down Expand Up @@ -406,7 +405,7 @@ class MonitorRunner(

private fun getDestinationInfo(destinationId: String): Destination {
var destination: Destination
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, destinationId).routing(destinationId)
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, destinationId).routing(destinationId)
val getResponse = client.get(getRequest).actionGet()
if (!getResponse.isExists || getResponse.isSourceEmpty) {
throw IllegalStateException("Destination document with id $destinationId not found or source is empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.VersionType
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.threadpool.ThreadPool
Expand Down Expand Up @@ -94,15 +93,15 @@ class AlertMover(

private fun onSearchResponse(response: SearchResponse) {
// If no alerts are found, simply return
if (response.hits.totalHits == 0L) return
if (response.hits.totalHits.value == 0L) return
val indexRequests = response.hits.map { hit ->
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX, AlertIndices.MAPPING_TYPE)
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(monitorId)
.source(Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
.copy(state = Alert.State.DELETED)
.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.version(hit.version)
.versionType(VersionType.EXTERNAL_GTE)
.setIfSeqNo(hit.seqNo)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(monitorId)
.source(Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
.copy(state = Alert.State.DELETED)
.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.setIfSeqNo(hit.seqNo)
.setIfPrimaryTerm(hit.primaryTerm)

This doesn't seem correct - how will the seqNo and primaryTerm be retained when moving from the active index to the history index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you are asking. We were keeping the version prior, which is still supported since we are using an external versioning system, but since versions are going away, we might want to keep the document the same as a direct copy from the ACTIVE state to DELETED state between the indexes.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sequence numbers and primary terms are specific to an index. They cannot be carried over from the hit in the active index to the copy in the history index. To avoid stale writes on the alert in the history index we need to continue to use external versioning which is still supported in ES 7: https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking-changes-7.0.html#_internal_versioning_is_no_longer_supported_for_optimistic_concurrency_control

.setIfPrimaryTerm(hit.primaryTerm)
lucaswin-amzn marked this conversation as resolved.
Show resolved Hide resolved
.id(hit.id)
}
val copyRequest = BulkRequest().add(indexRequests)
Expand All @@ -111,9 +110,8 @@ class AlertMover(

private fun onCopyResponse(response: BulkResponse) {
val deleteRequests = response.items.filterNot { it.isFailed }.map {
DeleteRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, it.id)
DeleteRequest(AlertIndices.ALERT_INDEX, it.id)
.routing(monitorId)
.version(it.version)
This conversation was marked as resolved.
Show resolved Hide resolved
}
if (response.hasFailures()) {
hasFailures = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ERROR
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.bulk.BulkResponse
Expand Down Expand Up @@ -61,6 +63,8 @@ import java.time.Instant
*/
class RestAcknowledgeAlertAction(settings: Settings, controller: RestController) : BaseRestHandler(settings) {

private val log: Logger = LogManager.getLogger(javaClass)
lucaswin-amzn marked this conversation as resolved.
Show resolved Hide resolved

init {
// Acknowledge alerts
controller.registerHandler(POST, "${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/_acknowledge/alerts", this)
Expand Down Expand Up @@ -100,7 +104,6 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController)
.filter(QueryBuilders.termsQuery("_id", alertIds))
val searchRequest = SearchRequest()
.indices(AlertIndices.ALERT_INDEX)
.types(Alert.ALERT_TYPE)
.routing(monitorId)
.source(SearchSourceBuilder().query(queryBuilder).version(true))
lucaswin-amzn marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -115,9 +118,10 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController)
val alert = Alert.parse(xcp, hit.id, hit.version)
alerts[alert.id] = alert
if (alert.state == ACTIVE) {
listOf(UpdateRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, hit.id)
listOf(UpdateRequest(AlertIndices.ALERT_INDEX, hit.id)
.routing(monitorId)
.version(hit.version)
.setIfSeqNo(hit.seqNo)
.setIfPrimaryTerm(hit.primaryTerm)
.doc(XContentFactory.jsonBuilder().startObject()
.field(Alert.STATE_FIELD, ACKNOWLEDGED.toString())
.optionalTimeField(Alert.ACKNOWLEDGED_TIME_FIELD, Instant.now())
Expand All @@ -127,7 +131,7 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController)
}
}

logger.info("Acknowledging monitor: $monitorId, alerts: ${updateRequests.map { it.id() }}")
log.info("Acknowledging monitor: $monitorId, alerts: ${updateRequests.map { it.id() }}")
val request = BulkRequest().add(updateRequests).setRefreshPolicy(refreshPolicy)
client.bulk(request, ActionListener.wrap(::onBulkResponse, ::onFailure))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class RestDeleteDestinationAction(settings: Settings, controller: RestController

return RestChannelConsumer { channel ->
val deleteDestinationRequest =
DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, ScheduledJob.SCHEDULED_JOB_TYPE, destinationId)
DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, destinationId)
.setRefreshPolicy(refreshPolicy)
client.delete(deleteDestinationRequest, RestStatusToXContentListener(channel))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RestDeleteMonitorAction(settings: Settings, controller: RestController) :
val refreshPolicy = RefreshPolicy.parse(request.param(REFRESH, RefreshPolicy.IMMEDIATE.value))

return RestChannelConsumer { channel ->
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, ScheduledJob.SCHEDULED_JOB_TYPE, monitorId)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
.setRefreshPolicy(refreshPolicy)
client.delete(deleteRequest, RestStatusToXContentListener(channel))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.client.node.NodeClient
Expand All @@ -45,6 +46,8 @@ class RestExecuteMonitorAction(
private val runner: MonitorRunner
) : BaseRestHandler(settings) {

private val log = LogManager.getLogger(javaClass)
lucaswin-amzn marked this conversation as resolved.
Show resolved Hide resolved

init {
restController.registerHandler(POST, "${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/_execute", this)
restController.registerHandler(POST, "${AlertingPlugin.MONITOR_BASE_URI}/_execute", this)
Expand All @@ -65,7 +68,7 @@ class RestExecuteMonitorAction(
val response = runner.runMonitor(monitor, periodStart, periodEnd, dryrun)
channel.sendResponse(BytesRestResponse(RestStatus.OK, channel.newBuilder().value(response)))
} catch (e: Exception) {
logger.error("Unexpected error running monitor", e)
log.error("Unexpected error running monitor", e)
channel.sendResponse(BytesRestResponse(channel, e))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package com.amazon.opendistroforelasticsearch.alerting.resthandler

import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import com.amazon.opendistroforelasticsearch.alerting.util.context
Expand Down Expand Up @@ -62,7 +61,7 @@ class RestGetMonitorAction(settings: Settings, controller: RestController) : Bas
if (monitorId == null || monitorId.isEmpty()) {
throw IllegalArgumentException("missing id")
}
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, monitorId)
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, monitorId)
lucaswin-amzn marked this conversation as resolved.
Show resolved Hide resolved
.version(RestActions.parseVersion(request))
.fetchSourceContext(context(request))
if (request.method() == HEAD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
lucaswin-amzn marked this conversation as resolved.
Show resolved Hide resolved
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.get.GetRequest
Expand Down Expand Up @@ -61,6 +61,7 @@ class RestIndexDestinationAction(
) : BaseRestHandler(settings) {
private var scheduledJobIndices: ScheduledJobIndices
@Volatile private var indexTimeout = INDEX_TIMEOUT.get(settings)
private val log = LogManager.getLogger(javaClass)

init {
controller.registerHandler(RestRequest.Method.POST, AlertingPlugin.DESTINATION_BASE_URI, this) // Creates new destination
Expand Down Expand Up @@ -116,7 +117,7 @@ class RestIndexDestinationAction(
private fun prepareDestinationIndexing() {
if (channel.request().method() == RestRequest.Method.PUT) updateDestination()
else {
val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE)
val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(refreshPolicy)
.source(newDestination.toXContent(channel.newBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.version(destinationVersion)
Expand All @@ -127,17 +128,17 @@ class RestIndexDestinationAction(

private fun onCreateMappingsResponse(response: CreateIndexResponse) {
if (response.isAcknowledged) {
logger.info("Created ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.")
log.info("Created ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.")
prepareDestinationIndexing()
} else {
logger.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
log.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)))
}
}

private fun updateDestination() {
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, destinationId)
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, destinationId)
client.get(getRequest, ActionListener.wrap(::onGetResponse, ::onFailure))
}

Expand All @@ -149,7 +150,7 @@ class RestIndexDestinationAction(
.endObject()
return channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, response.toXContent(builder, ToXContent.EMPTY_PARAMS)))
}
val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE)
val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(refreshPolicy)
.source(newDestination.toXContent(channel.newBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.id(destinationId)
Expand Down
Loading