Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Support ES 7.0 Issue #39 (#41)
Browse files Browse the repository at this point in the history
* Elasticsearch 7.0 support.

closes #39
  • Loading branch information
lucaswin-amzn authored May 15, 2019
1 parent 3eea987 commit 157f996
Show file tree
Hide file tree
Showing 25 changed files with 388 additions and 370 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ Please see our [documentation](https://opendistro.github.io/for-elasticsearch-do

1. Check out this package from version control.
1. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package.
1. To build from the command line, set `JAVA_HOME` to point to a JDK >=11 before running `./gradlew`.
1. To build from the command line, set `JAVA_HOME` to point to a JDK >= 12 before running `./gradlew`.


## Build

This package is organized into subprojects, most of which contribute JARs to the top-level plugin in the `alerting` subproject.

All subprojects in this package use the [Gradle](https://docs.gradle.org/4.10.2/userguide/userguide.html) build system. Gradle comes with excellent documentation that should be your first stop when trying to figure out how to operate or modify the build.
All subprojects in this package use the [Gradle](https://docs.gradle.org/current/userguide/userguide.html) build system. Gradle comes with excellent documentation that should be your first stop when trying to figure out how to operate or modify the build.

However, to build the `alerting` plugin subproject, we also use the Elastic build tools for Gradle. These tools are idiosyncratic and don't always follow the conventions and instructions for building regular Java code using Gradle. Not everything in `alerting` will work the way it's described in the Gradle documentation. If you encounter such a situation, the Elastic build tools [source code](https://github.com/elastic/elasticsearch/tree/master/buildSrc/src/main/groovy/org/elasticsearch/gradle) is your best bet for figuring out what's going on.

Expand Down
2 changes: 1 addition & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ task startMultiNode(dependsOn: startMultiNode1)
def es_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile
es_tmp_dir.mkdirs()

unitTest {
test {
systemProperty 'tests.security.manager', 'false'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull
Expand Down Expand Up @@ -355,7 +354,7 @@ class MonitorRunner(
// spend time reloading the alert and writing it back.
when (alert.state) {
ACTIVE, ERROR -> {
listOf<DocWriteRequest<*>>(IndexRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE)
listOf<DocWriteRequest<*>>(IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null))
Expand All @@ -365,9 +364,9 @@ class MonitorRunner(
}
COMPLETED -> {
listOf<DocWriteRequest<*>>(
DeleteRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, alert.id)
DeleteRequest(AlertIndices.ALERT_INDEX, alert.id)
.routing(alert.monitorId),
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX, AlertIndices.MAPPING_TYPE)
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(alert.id))
Expand Down Expand Up @@ -440,7 +439,7 @@ class MonitorRunner(
}

private suspend fun getDestinationInfo(destinationId: String): Destination {
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, destinationId).routing(destinationId)
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, destinationId).routing(destinationId)
val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) }
if (!getResponse.isExists || getResponse.isSourceEmpty) {
throw IllegalStateException("Destination document with id $destinationId not found or source is empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu
val response: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) }

// If no alerts are found, simply return
if (response.hits.totalHits == 0L) return
if (response.hits.totalHits.value == 0L) return
val indexRequests = response.hits.map { hit ->
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX, AlertIndices.MAPPING_TYPE)
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(monitorId)
.source(Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
.copy(state = Alert.State.DELETED)
Expand All @@ -85,9 +85,10 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu
val copyResponse: BulkResponse = client.suspendUntil { bulk(copyRequest, it) }

val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map {
DeleteRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, it.id)
DeleteRequest(AlertIndices.ALERT_INDEX, it.id)
.routing(monitorId)
.version(it.version)
.versionType(VersionType.EXTERNAL_GTE)
}
val deleteResponse: BulkResponse = client.suspendUntil { bulk(BulkRequest().add(deleteRequests), it) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ data class Alert(
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND

/**
* The mapping type of [Alert]s in the ES index.
*
* This should go away starting ES 7. We use "_doc" for future compatibility as described here:
* https://www.elastic.co/guide/en/elasticsearch/reference/6.x/removal-of-types.html#_schedule_for_removal_of_mapping_types
*/
const val ALERT_TYPE = "_doc"

@JvmStatic @JvmOverloads
@Throws(IOException::class)
fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): Alert {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ERROR
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.bulk.BulkResponse
Expand Down Expand Up @@ -54,6 +56,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder
import java.io.IOException
import java.time.Instant

private val log: Logger = LogManager.getLogger(RestAcknowledgeAlertAction::class.java)

/**
* This class consists of the REST handler to acknowledge alerts.
* The user provides the monitorID to which these alerts pertain and in the content of the request provides
Expand Down Expand Up @@ -100,9 +104,8 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController)
.filter(QueryBuilders.termsQuery("_id", alertIds))
val searchRequest = SearchRequest()
.indices(AlertIndices.ALERT_INDEX)
.types(Alert.ALERT_TYPE)
.routing(monitorId)
.source(SearchSourceBuilder().query(queryBuilder).version(true))
.source(SearchSourceBuilder().query(queryBuilder).version(true).seqNoAndPrimaryTerm(true))

client.search(searchRequest, ActionListener.wrap(::onSearchResponse, ::onFailure))
}
Expand All @@ -115,9 +118,10 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController)
val alert = Alert.parse(xcp, hit.id, hit.version)
alerts[alert.id] = alert
if (alert.state == ACTIVE) {
listOf(UpdateRequest(AlertIndices.ALERT_INDEX, AlertIndices.MAPPING_TYPE, hit.id)
listOf(UpdateRequest(AlertIndices.ALERT_INDEX, hit.id)
.routing(monitorId)
.version(hit.version)
.setIfSeqNo(hit.seqNo)
.setIfPrimaryTerm(hit.primaryTerm)
.doc(XContentFactory.jsonBuilder().startObject()
.field(Alert.STATE_FIELD, ACKNOWLEDGED.toString())
.optionalTimeField(Alert.ACKNOWLEDGED_TIME_FIELD, Instant.now())
Expand All @@ -127,7 +131,7 @@ class RestAcknowledgeAlertAction(settings: Settings, controller: RestController)
}
}

logger.info("Acknowledging monitor: $monitorId, alerts: ${updateRequests.map { it.id() }}")
log.info("Acknowledging monitor: $monitorId, alerts: ${updateRequests.map { it.id() }}")
val request = BulkRequest().add(updateRequests).setRefreshPolicy(refreshPolicy)
client.bulk(request, ActionListener.wrap(::onBulkResponse, ::onFailure))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class RestDeleteDestinationAction(settings: Settings, controller: RestController

return RestChannelConsumer { channel ->
val deleteDestinationRequest =
DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, ScheduledJob.SCHEDULED_JOB_TYPE, destinationId)
DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, destinationId)
.setRefreshPolicy(refreshPolicy)
client.delete(deleteDestinationRequest, RestStatusToXContentListener(channel))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RestDeleteMonitorAction(settings: Settings, controller: RestController) :
val refreshPolicy = RefreshPolicy.parse(request.param(REFRESH, RefreshPolicy.IMMEDIATE.value))

return RestChannelConsumer { channel ->
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, ScheduledJob.SCHEDULED_JOB_TYPE, monitorId)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
.setRefreshPolicy(refreshPolicy)
client.delete(deleteRequest, RestStatusToXContentListener(channel))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import org.apache.logging.log4j.LogManager
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
Expand All @@ -42,6 +43,8 @@ import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestActionListener
import java.time.Instant

private val log = LogManager.getLogger(RestExecuteMonitorAction::class.java)

class RestExecuteMonitorAction(
val settings: Settings,
restController: RestController,
Expand Down Expand Up @@ -70,7 +73,7 @@ class RestExecuteMonitorAction(
channel.sendResponse(BytesRestResponse(RestStatus.OK, channel.newBuilder().value(response)))
}
} catch (e: Exception) {
logger.error("Unexpected error running monitor", e)
log.error("Unexpected error running monitor", e)
withContext(Dispatchers.IO) { channel.sendResponse(BytesRestResponse(channel, e)) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package com.amazon.opendistroforelasticsearch.alerting.resthandler

import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import com.amazon.opendistroforelasticsearch.alerting.util.context
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.client.node.NodeClient
Expand Down Expand Up @@ -62,9 +63,10 @@ class RestGetMonitorAction(settings: Settings, controller: RestController) : Bas
if (monitorId == null || monitorId.isEmpty()) {
throw IllegalArgumentException("missing id")
}
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE, monitorId)
val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, monitorId)
.version(RestActions.parseVersion(request))
.fetchSourceContext(context(request))

if (request.method() == HEAD) {
getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)
}
Expand All @@ -83,6 +85,8 @@ class RestGetMonitorAction(settings: Settings, controller: RestController) : Bas
.startObject()
.field(_ID, response.id)
.field(_VERSION, response.version)
.field(_SEQ_NO, response.seqNo)
.field(_PRIMARY_TERM, response.primaryTerm)
if (!response.isSourceEmpty) {
XContentHelper.createParser(channel.request().xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON).use { xcp ->
Expand Down
Loading

0 comments on commit 157f996

Please sign in to comment.