Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into issue-39
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaswin-amzn committed May 15, 2019
2 parents af41a34 + 56c49d9 commit b86bf66
Show file tree
Hide file tree
Showing 13 changed files with 289 additions and 240 deletions.
4 changes: 4 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ configurations.all {
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "commons-codec:commons-codec:${versions.commonscodec}"

// This is required because kotlin coroutines core-1.1.1 still requires kotin stdlib 1.3.20 and we're using 1.3.21
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
*/
package com.amazon.opendistroforelasticsearch.alerting

import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import com.amazon.opendistroforelasticsearch.alerting.core.schedule.JobScheduler
import com.amazon.opendistroforelasticsearch.alerting.core.settings.ScheduledJobSettings
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestAcknowledgeAlertAction
import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestDeleteDestinationAction
Expand All @@ -30,11 +35,6 @@ import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestIndexMonit
import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestSearchMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.script.TriggerScript
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import com.amazon.opendistroforelasticsearch.alerting.core.schedule.JobScheduler
import com.amazon.opendistroforelasticsearch.alerting.core.settings.ScheduledJobSettings
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.client.Client
Expand All @@ -61,10 +61,10 @@ import org.elasticsearch.rest.RestController
import org.elasticsearch.rest.RestHandler
import org.elasticsearch.script.ScriptContext
import org.elasticsearch.script.ScriptService
import org.elasticsearch.threadpool.ExecutorBuilder
import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.watcher.ResourceWatcherService
import java.util.function.Supplier

/**
* Entry point of the OpenDistro for Elasticsearch alerting plugin
* This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers.
Expand Down Expand Up @@ -175,8 +175,4 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P
override fun getContexts(): List<ScriptContext<*>> {
return listOf(TriggerScript.CONTEXT)
}

override fun getExecutorBuilders(settings: Settings): List<ExecutorBuilder<*>> {
return listOf(MonitorRunner.executorBuilder(settings))
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_ROLLOVER_PERIOD
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import org.apache.logging.log4j.LogManager
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil
import org.elasticsearch.ResourceAlreadyExistsException
import org.elasticsearch.action.admin.indices.alias.Alias
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.ClusterChangedEvent
Expand Down Expand Up @@ -148,31 +151,34 @@ class AlertIndices(
return alertIndexInitialized && historyIndexInitialized
}

fun createAlertIndex() {
suspend fun createAlertIndex() {
if (!alertIndexInitialized) {
alertIndexInitialized = createIndex(ALERT_INDEX)
}
alertIndexInitialized
}

fun createInitialHistoryIndex() {
suspend fun createInitialHistoryIndex() {
if (!historyIndexInitialized) {
historyIndexInitialized = createIndex(HISTORY_INDEX_PATTERN, HISTORY_WRITE_INDEX)
}
historyIndexInitialized
}

private fun createIndex(index: String, alias: String? = null): Boolean {
private suspend fun createIndex(index: String, alias: String? = null): Boolean {
// This should be a fast check of local cluster state. Should be exceedingly rare that the local cluster
// state does not contain the index and multiple nodes concurrently try to create the index.
// If it does happen that error is handled we catch the ResourceAlreadyExistsException
val exists = client.exists(IndicesExistsRequest(index).local(true)).actionGet(requestTimeout).isExists
if (exists) return true
val existsResponse: IndicesExistsResponse = client.suspendUntil {
client.exists(IndicesExistsRequest(index).local(true), it)
}
if (existsResponse.isExists) return true

val request = CreateIndexRequest(index).mapping(MAPPING_TYPE, alertMapping(), XContentType.JSON)
if (alias != null) request.alias(Alias(alias))
return try {
client.create(request).actionGet(requestTimeout).isAcknowledged
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.create(request, it) }
createIndexResponse.isAcknowledged
} catch (e: ResourceAlreadyExistsException) {
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@

package com.amazon.opendistroforelasticsearch.alerting.alerts

import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import org.apache.logging.log4j.Logger
import org.elasticsearch.action.ActionListener
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.delete.DeleteRequest
Expand All @@ -30,7 +28,6 @@ import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.Client
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.ToXContent
Expand All @@ -39,12 +36,13 @@ import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.VersionType
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.threadpool.ThreadPool

/**
* Class to manage the moving of active alerts when a monitor or trigger is deleted.
* Moves defunct active alerts to the alert history index when the corresponding monitor or trigger is deleted.
*
* The logic for moving alerts consists of:
* 1. Find active alerts:
Expand All @@ -54,114 +52,65 @@ import org.elasticsearch.threadpool.ThreadPool
* 3. Delete alerts from [ALERT_INDEX]
* 4. Schedule a retry if there were any failures
*/
class AlertMover(
private val client: Client,
private val threadPool: ThreadPool,
private val monitorRunner: MonitorRunner,
private val alertIndices: AlertIndices,
private val backoff: Iterator<TimeValue>,
private val logger: Logger,
private val monitorId: String,
private val monitor: Monitor? = null
) {
suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = null) {
val boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))

private var hasFailures: Boolean = false

fun run() {
if (alertIndices.isInitialized()) {
findActiveAlerts()
}
}

private fun findActiveAlerts() {
val boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))

if (monitor != null) {
boolQuery.mustNot(QueryBuilders.termsQuery(Alert.TRIGGER_ID_FIELD, monitor.triggers.map { it.id }))
}

val activeAlertsQuery = SearchSourceBuilder.searchSource()
.query(boolQuery)
.version(true)

val activeAlertsRequest = SearchRequest(AlertIndices.ALERT_INDEX)
.routing(monitorId)
.source(activeAlertsQuery)
client.search(activeAlertsRequest, ActionListener.wrap(::onSearchResponse, ::onFailure))
}

private fun onSearchResponse(response: SearchResponse) {
// If no alerts are found, simply return
if (response.hits.totalHits.value == 0L) return
val indexRequests = response.hits.map { hit ->
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(monitorId)
.source(Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
.copy(state = Alert.State.DELETED)
.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.setIfSeqNo(hit.seqNo)
.setIfPrimaryTerm(hit.primaryTerm)
.id(hit.id)
}
val copyRequest = BulkRequest().add(indexRequests)
client.bulk(copyRequest, ActionListener.wrap(::onCopyResponse, ::onFailure))
if (monitor != null) {
boolQuery.mustNot(QueryBuilders.termsQuery(Alert.TRIGGER_ID_FIELD, monitor.triggers.map { it.id }))
}

private fun onCopyResponse(response: BulkResponse) {
val deleteRequests = response.items.filterNot { it.isFailed }.map {
DeleteRequest(AlertIndices.ALERT_INDEX, it.id)
.routing(monitorId)
}
if (response.hasFailures()) {
hasFailures = true
for (it in response.items) {
logger.error("Failed to move deleted alert to alert history index: ${it.id}",
it.failure.cause)
}
}

val bulkRequest = BulkRequest().add(deleteRequests)
client.bulk(bulkRequest, ActionListener.wrap(::onDeleteResponse, ::onFailure))
val activeAlertsQuery = SearchSourceBuilder.searchSource()
.query(boolQuery)
.version(true)

val activeAlertsRequest = SearchRequest(AlertIndices.ALERT_INDEX)
.routing(monitorId)
.source(activeAlertsQuery)
val response: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) }

// If no alerts are found, simply return
if (response.hits.totalHits.value == 0L) return
val indexRequests = response.hits.map { hit ->
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(monitorId)
.source(Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
.copy(state = Alert.State.DELETED)
.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.version(hit.version)
.versionType(VersionType.EXTERNAL_GTE)
.id(hit.id)
}

private fun onDeleteResponse(response: BulkResponse) {
if (response.hasFailures()) {
hasFailures = true
for (it in response.items) {
logger.error("Failed to delete active alert from alert index: ${it.id}",
it.failure.cause)
}
}
if (hasFailures) reschedule()
val copyRequest = BulkRequest().add(indexRequests)
val copyResponse: BulkResponse = client.suspendUntil { bulk(copyRequest, it) }

val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map {
DeleteRequest(AlertIndices.ALERT_INDEX, it.id)
.routing(monitorId)
.version(it.version)
.versionType(VersionType.EXTERNAL_GTE)
}

private fun onFailure(e: Exception) {
logger.error("Failed to move alerts for ${monitorIdTriggerIdsTuple()}", e)
reschedule()
val deleteResponse: BulkResponse = client.suspendUntil { bulk(BulkRequest().add(deleteRequests), it) }

if (copyResponse.hasFailures()) {
val retryCause = copyResponse.items.filter { it.isFailed }
.firstOrNull { it.status() == RestStatus.TOO_MANY_REQUESTS }
?.failure?.cause
throw RuntimeException("Failed to copy alerts for [$monitorId, ${monitor?.triggers?.map { it.id }}]: " +
copyResponse.buildFailureMessage(), retryCause)
}

private fun reschedule() {
if (backoff.hasNext()) {
logger.warn("Rescheduling AlertMover due to failure for ${monitorIdTriggerIdsTuple()}")
val wait = backoff.next()
val runnable = Runnable {
monitorRunner.rescheduleAlertMover(monitorId, monitor, backoff)
}
threadPool.schedule(runnable, wait, ThreadPool.Names.SAME)
} else {
logger.warn("Retries exhausted for ${monitorIdTriggerIdsTuple()}")
}
if (deleteResponse.hasFailures()) {
val retryCause = deleteResponse.items.filter { it.isFailed }
.firstOrNull { it.status() == RestStatus.TOO_MANY_REQUESTS }
?.failure?.cause
throw RuntimeException("Failed to delete alerts for [$monitorId, ${monitor?.triggers?.map { it.id }}]: " +
deleteResponse.buildFailureMessage(), retryCause)
}
}

private fun alertContentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
private fun alertContentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
return xcp
}

private fun monitorIdTriggerIdsTuple(): String {
return "[$monitorId, ${monitor?.triggers?.map { it.id }}]"
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
return xcp
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,19 @@ data class Action(
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
val xContentBuilder = builder.startObject()
.field(ID_FIELD, id)
.field(NAME_FIELD, name)
.field(DESTINATION_ID_FIELD, destinationId)
.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate)
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.field(THROTTLE_ENABLED_FIELD, throttleEnabled)
.field(THROTTLE_FIELD, throttle)
.endObject()
if (subjectTemplate != null) {
xContentBuilder.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate)
}
if (throttle != null) {
xContentBuilder.field(THROTTLE_FIELD, throttle)
}
return xContentBuilder.endObject()
}

fun asTemplateArg(): Map<String, Any> {
Expand Down Expand Up @@ -93,9 +97,14 @@ data class Action(
ID_FIELD -> id = xcp.text()
NAME_FIELD -> name = xcp.textOrNull()
DESTINATION_ID_FIELD -> destinationId = xcp.textOrNull()
SUBJECT_TEMPLATE_FIELD -> subjectTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
SUBJECT_TEMPLATE_FIELD -> {
subjectTemplate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else
Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
}
MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
THROTTLE_FIELD -> throttle = Throttle.parse(xcp)
THROTTLE_FIELD -> {
throttle = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else Throttle.parse(xcp)
}
THROTTLE_ENABLED_FIELD -> {
throttleEnabled = xcp.booleanValue()
}
Expand All @@ -106,6 +115,10 @@ data class Action(
}
}

if (throttleEnabled) {
requireNotNull(throttle, { "Action throttle enabled but not set throttle value" })
}

return Action(requireNotNull(name) { "Action name is null" },
requireNotNull(destinationId) { "Destination id is null" },
subjectTemplate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ data class Throttle(
@Throws(IOException::class)
fun parse(xcp: XContentParser): Throttle {
var value: Int = 0
var unit: ChronoUnit? = null
var unit: ChronoUnit = ChronoUnit.MINUTES // only support MINUTES throttle unit currently

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ data class Destination(
}
}

@Throws(IOException::class)
fun publish(compiledSubject: String?, compiledMessage: String): String {
val destinationMessage: BaseMessage
when (type) {
Expand Down
Loading

0 comments on commit b86bf66

Please sign in to comment.