diff --git a/alerting/build.gradle b/alerting/build.gradle index b74964b3..cb0860f3 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -49,6 +49,10 @@ configurations.all { force "commons-logging:commons-logging:${versions.commonslogging}" force "org.apache.httpcomponents:httpcore:${versions.httpcore}" force "commons-codec:commons-codec:${versions.commonscodec}" + + // This is required because kotlin coroutines core-1.1.1 still requires kotin stdlib 1.3.20 and we're using 1.3.21 + force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" + force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt index 66aca096..5bd85ec0 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt @@ -14,11 +14,16 @@ */ package com.amazon.opendistroforelasticsearch.alerting +import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction -import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices +import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob +import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput +import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler +import com.amazon.opendistroforelasticsearch.alerting.core.schedule.JobScheduler +import com.amazon.opendistroforelasticsearch.alerting.core.settings.ScheduledJobSettings import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestAcknowledgeAlertAction import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestDeleteDestinationAction @@ -30,11 +35,6 @@ import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestIndexMonit import com.amazon.opendistroforelasticsearch.alerting.resthandler.RestSearchMonitorAction import com.amazon.opendistroforelasticsearch.alerting.script.TriggerScript import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings -import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob -import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput -import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler -import com.amazon.opendistroforelasticsearch.alerting.core.schedule.JobScheduler -import com.amazon.opendistroforelasticsearch.alerting.core.settings.ScheduledJobSettings import org.elasticsearch.action.ActionRequest import org.elasticsearch.action.ActionResponse import org.elasticsearch.client.Client @@ -61,10 +61,10 @@ import org.elasticsearch.rest.RestController import org.elasticsearch.rest.RestHandler import org.elasticsearch.script.ScriptContext import org.elasticsearch.script.ScriptService -import org.elasticsearch.threadpool.ExecutorBuilder import org.elasticsearch.threadpool.ThreadPool import org.elasticsearch.watcher.ResourceWatcherService import java.util.function.Supplier + /** * Entry point of the OpenDistro for Elasticsearch alerting plugin * This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers. @@ -175,8 +175,4 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P override fun getContexts(): List> { return listOf(TriggerScript.CONTEXT) } - - override fun getExecutorBuilders(settings: Settings): List> { - return listOf(MonitorRunner.executorBuilder(settings)) - } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt index 3e2a44fd..221d0029 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt @@ -15,10 +15,18 @@ package com.amazon.opendistroforelasticsearch.alerting -import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices -import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertMover +import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts +import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner +import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob +import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX +import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil +import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult import com.amazon.opendistroforelasticsearch.alerting.model.ActionRunResult import com.amazon.opendistroforelasticsearch.alerting.model.Alert import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACKNOWLEDGED @@ -38,37 +46,34 @@ import com.amazon.opendistroforelasticsearch.alerting.model.action.Action.Compan import com.amazon.opendistroforelasticsearch.alerting.model.destination.Destination import com.amazon.opendistroforelasticsearch.alerting.script.TriggerExecutionContext import com.amazon.opendistroforelasticsearch.alerting.script.TriggerScript -import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS -import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.BULK_TIMEOUT -import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INPUT_TIMEOUT import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS -import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX -import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput -import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap -import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull -import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry -import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult import org.apache.logging.log4j.LogManager +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.DocWriteRequest import org.elasticsearch.action.bulk.BackoffPolicy -import org.elasticsearch.action.bulk.BulkItemResponse import org.elasticsearch.action.bulk.BulkRequest +import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.get.GetRequest +import org.elasticsearch.action.get.GetResponse import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.action.search.SearchRequest +import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.client.Client -import org.elasticsearch.common.Strings import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.Strings import org.elasticsearch.common.bytes.BytesReference +import org.elasticsearch.common.component.AbstractLifecycleComponent import org.elasticsearch.common.settings.Settings -import org.elasticsearch.common.unit.TimeValue -import org.elasticsearch.common.util.concurrent.EsExecutors -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException import org.elasticsearch.common.xcontent.LoggingDeprecationHandler import org.elasticsearch.common.xcontent.NamedXContentRegistry import org.elasticsearch.common.xcontent.ToXContent @@ -84,9 +89,9 @@ import org.elasticsearch.script.ScriptService import org.elasticsearch.script.ScriptType import org.elasticsearch.script.TemplateScript import org.elasticsearch.search.builder.SearchSourceBuilder -import org.elasticsearch.threadpool.ScalingExecutorBuilder import org.elasticsearch.threadpool.ThreadPool import java.time.Instant +import kotlin.coroutines.CoroutineContext class MonitorRunner( settings: Settings, @@ -96,74 +101,79 @@ class MonitorRunner( private val xContentRegistry: NamedXContentRegistry, private val alertIndices: AlertIndices, clusterService: ClusterService -) : JobRunner { +) : JobRunner, CoroutineScope, AbstractLifecycleComponent() { private val logger = LogManager.getLogger(MonitorRunner::class.java) - @Volatile private var searchTimeout = INPUT_TIMEOUT.get(settings) - @Volatile private var bulkTimeout = BULK_TIMEOUT.get(settings) - @Volatile private var alertBackoffMillis = ALERT_BACKOFF_MILLIS.get(settings) - @Volatile private var alertBackoffCount = ALERT_BACKOFF_COUNT.get(settings) - @Volatile private var moveAlertsBackoffMillis = MOVE_ALERTS_BACKOFF_MILLIS.get(settings) - @Volatile private var moveAlertsBackoffCount = MOVE_ALERTS_BACKOFF_COUNT.get(settings) - @Volatile private var retryPolicy = BackoffPolicy.constantBackoff(alertBackoffMillis, alertBackoffCount) - @Volatile private var moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(moveAlertsBackoffMillis, moveAlertsBackoffCount) + private lateinit var runnerSupervisor: Job + override val coroutineContext: CoroutineContext + get() = Dispatchers.Default + runnerSupervisor + + @Volatile private var retryPolicy = + BackoffPolicy.constantBackoff(ALERT_BACKOFF_MILLIS.get(settings), ALERT_BACKOFF_COUNT.get(settings)) + @Volatile private var moveAlertsRetryPolicy = + BackoffPolicy.exponentialBackoff(MOVE_ALERTS_BACKOFF_MILLIS.get(settings), MOVE_ALERTS_BACKOFF_COUNT.get(settings)) init { - clusterService.clusterSettings.addSettingsUpdateConsumer(INPUT_TIMEOUT) { searchTimeout = it } - clusterService.clusterSettings.addSettingsUpdateConsumer(BULK_TIMEOUT) { bulkTimeout = it } - clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS) { - retryPolicy = BackoffPolicy.constantBackoff(it, alertBackoffCount) - } - clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_COUNT) { - retryPolicy = BackoffPolicy.constantBackoff(alertBackoffMillis, it) - } - clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS) { - moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(it, moveAlertsBackoffCount) + clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { + millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count) } - clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_COUNT) { - moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(alertBackoffMillis, it) + clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) { + millis, count -> moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count) } } - companion object { - private const val THREAD_POOL_NAME = "opendistro_monitor_runner" + override fun doStart() { + runnerSupervisor = SupervisorJob() + } - fun executorBuilder(settings: Settings): ScalingExecutorBuilder { - val availableProcessors = EsExecutors.numberOfProcessors(settings) - // Use the same setting as ES GENERIC Executor builder. - val genericThreadPoolMax = Math.min(512, Math.max(128, 4 * availableProcessors)) - return ScalingExecutorBuilder(THREAD_POOL_NAME, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30L)) - } + override fun doStop() { + runnerSupervisor.cancel() } - fun executor() = threadPool.executor(THREAD_POOL_NAME)!! + override fun doClose() { } override fun postIndex(job: ScheduledJob) { - if (job is Monitor) { - executor().submit { - AlertMover(client, threadPool, this, alertIndices, moveAlertsRetryPolicy.iterator(), logger, job.id, job).run() - } - } else { + if (job !is Monitor) { throw IllegalArgumentException("Invalid job type") } + + launch { + try { + moveAlertsRetryPolicy.retry(logger) { + if (alertIndices.isInitialized()) { + moveAlerts(client, job.id, job) + } + } + } catch (e: Exception) { + logger.error("Failed to move active alerts for monitor [${job.id}].", e) + } + } } override fun postDelete(jobId: String) { - executor().submit { - AlertMover(client, threadPool, this, alertIndices, moveAlertsRetryPolicy.iterator(), logger, jobId).run() + launch { + try { + moveAlertsRetryPolicy.retry(logger) { + if (alertIndices.isInitialized()) { + moveAlerts(client, jobId, null) + } + } + } catch (e: Exception) { + logger.error("Failed to move active alerts for monitor [$jobId].", e) + } } } override fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant) { - if (job is Monitor) { - executor().submit { runMonitor(job, periodStart, periodEnd) } - } else { + if (job !is Monitor) { throw IllegalArgumentException("Invalid job type") } + + launch { runMonitor(job, periodStart, periodEnd) } } - fun runMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant, dryrun: Boolean = false): MonitorRunResult { + suspend fun runMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant, dryrun: Boolean = false): MonitorRunResult { if (periodStart == periodEnd) { logger.warn("Start and end time are the same: $periodStart. This monitor will probably only run once.") } @@ -208,12 +218,6 @@ class MonitorRunner( return monitorResult.copy(triggerResults = triggerResults) } - fun rescheduleAlertMover(monitorId: String, monitor: Monitor?, backoff: Iterator) { - executor().submit { - AlertMover(client, threadPool, this, alertIndices, backoff, logger, monitorId, monitor).run() - } - } - private fun currentTime() = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) private fun composeAlert(ctx: TriggerExecutionContext, result: TriggerRunResult, alertError: AlertError?): Alert? { @@ -261,7 +265,7 @@ class MonitorRunner( } } - private fun collectInputResults(monitor: Monitor, periodStart: Instant, periodEnd: Instant): InputRunResults { + private suspend fun collectInputResults(monitor: Monitor, periodStart: Instant, periodEnd: Instant): InputRunResults { return try { val results = mutableListOf>() monitor.inputs.forEach { input -> @@ -279,7 +283,8 @@ class MonitorRunner( XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) } - results += client.search(searchRequest).actionGet(searchTimeout).convertToMap() + val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + results += searchResponse.convertToMap() } else -> { throw IllegalArgumentException("Unsupported input type: ${input.name()}.") @@ -306,11 +311,11 @@ class MonitorRunner( } } - private fun loadCurrentAlerts(monitor: Monitor): Map { + private suspend fun loadCurrentAlerts(monitor: Monitor): Map { val request = SearchRequest(AlertIndices.ALERT_INDEX) .routing(monitor.id) .source(alertQuery(monitor)) - val response = client.search(request).actionGet(searchTimeout) + val response: SearchResponse = client.suspendUntil { client.search(request, it) } if (response.status() != RestStatus.OK) { throw (response.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts")) } @@ -341,7 +346,7 @@ class MonitorRunner( .query(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id)) } - private fun saveAlerts(alerts: List) { + private suspend fun saveAlerts(alerts: List) { var requestsToRetry = alerts.flatMap { alert -> // we don't want to set the version when saving alerts because the Runner has first priority when writing alerts. // In the rare event that a user acknowledges an alert between when it's read and when it's written @@ -370,31 +375,19 @@ class MonitorRunner( } if (requestsToRetry.isEmpty()) return - var bulkRequest = BulkRequest().add(requestsToRetry) - val successfulResponses = mutableListOf() - var failedResponses = listOf() - retryPolicy.retry { // Handles 502, 503, 504 responses for the bulk request. - retryPolicy.iterator().forEach { delay -> // Handles partial failures - val responses = client.bulk(bulkRequest).actionGet(bulkTimeout).items ?: arrayOf() - successfulResponses += responses.filterNot { it.isFailed } - failedResponses = responses.filter { it.isFailed } - // retry only if this is a EsRejectedExecutionException (i.e. 429 TOO MANY REQUESTs) - requestsToRetry = failedResponses - .filter { ExceptionsHelper.unwrapCause(it.failure.cause) is EsRejectedExecutionException } - .map { bulkRequest.requests()[it.itemId] as IndexRequest } - - bulkRequest = BulkRequest().add(requestsToRetry) - if (requestsToRetry.isEmpty()) { - return@retry - } else { - Thread.sleep(delay.millis) - } + // Retry Bulk requests if there was any 429 response + retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { + val bulkRequest = BulkRequest().add(requestsToRetry) + val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) } + val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } + requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS } + .map { bulkRequest.requests()[it.itemId] as IndexRequest } + + if (requestsToRetry.isNotEmpty()) { + val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause + throw ExceptionsHelper.convertToElastic(retryCause) } } - - for (it in failedResponses) { - logger.error("Failed to write alert: ${it.id}", it.failure.cause) - } } private fun isTriggerActionable(ctx: TriggerExecutionContext, result: TriggerRunResult): Boolean { @@ -416,7 +409,7 @@ class MonitorRunner( return true } - private fun runAction(action: Action, ctx: TriggerExecutionContext, dryrun: Boolean): ActionRunResult { + private suspend fun runAction(action: Action, ctx: TriggerExecutionContext, dryrun: Boolean): ActionRunResult { return try { if (!isActionActionable(action, ctx.alert)) { return ActionRunResult(action.id, action.name, mapOf(), true, null, null) @@ -428,8 +421,10 @@ class MonitorRunner( throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") } if (!dryrun) { - var destination = getDestinationInfo(action.destinationId) - actionOutput[MESSAGE_ID] = destination.publish(actionOutput[SUBJECT], actionOutput[MESSAGE]!!) + withContext(Dispatchers.IO) { + val destination = getDestinationInfo(action.destinationId) + actionOutput[MESSAGE_ID] = destination.publish(actionOutput[SUBJECT], actionOutput[MESSAGE]!!) + } } ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null) } catch (e: Exception) { @@ -443,22 +438,24 @@ class MonitorRunner( .execute() } - private fun getDestinationInfo(destinationId: String): Destination { - var destination: Destination + private suspend fun getDestinationInfo(destinationId: String): Destination { val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, destinationId).routing(destinationId) - val getResponse = client.get(getRequest).actionGet() + val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) } if (!getResponse.isExists || getResponse.isSourceEmpty) { throw IllegalStateException("Destination document with id $destinationId not found or source is empty") } val jobSource = getResponse.sourceAsBytesRef - val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, jobSource, XContentType.JSON) - ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) - ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp::getTokenLocation) - ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) - destination = Destination.parse(xcp) - ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp::getTokenLocation) - return destination + return withContext(Dispatchers.IO) { + val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, + jobSource, XContentType.JSON) + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) + ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp::getTokenLocation) + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) + val destination = Destination.parse(xcp) + ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp::getTokenLocation) + destination + } } private fun List?.update(alertError: AlertError?): List { diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertIndices.kt index 762d6c73..398c1356 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertIndices.kt @@ -23,10 +23,13 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings. import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_ROLLOVER_PERIOD import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT import org.apache.logging.log4j.LogManager +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil import org.elasticsearch.ResourceAlreadyExistsException import org.elasticsearch.action.admin.indices.alias.Alias import org.elasticsearch.action.admin.indices.create.CreateIndexRequest +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse import org.elasticsearch.action.admin.indices.rollover.RolloverRequest import org.elasticsearch.client.IndicesAdminClient import org.elasticsearch.cluster.ClusterChangedEvent @@ -148,31 +151,34 @@ class AlertIndices( return alertIndexInitialized && historyIndexInitialized } - fun createAlertIndex() { + suspend fun createAlertIndex() { if (!alertIndexInitialized) { alertIndexInitialized = createIndex(ALERT_INDEX) } alertIndexInitialized } - fun createInitialHistoryIndex() { + suspend fun createInitialHistoryIndex() { if (!historyIndexInitialized) { historyIndexInitialized = createIndex(HISTORY_INDEX_PATTERN, HISTORY_WRITE_INDEX) } historyIndexInitialized } - private fun createIndex(index: String, alias: String? = null): Boolean { + private suspend fun createIndex(index: String, alias: String? = null): Boolean { // This should be a fast check of local cluster state. Should be exceedingly rare that the local cluster // state does not contain the index and multiple nodes concurrently try to create the index. // If it does happen that error is handled we catch the ResourceAlreadyExistsException - val exists = client.exists(IndicesExistsRequest(index).local(true)).actionGet(requestTimeout).isExists - if (exists) return true + val existsResponse: IndicesExistsResponse = client.suspendUntil { + client.exists(IndicesExistsRequest(index).local(true), it) + } + if (existsResponse.isExists) return true val request = CreateIndexRequest(index).mapping(MAPPING_TYPE, alertMapping(), XContentType.JSON) if (alias != null) request.alias(Alias(alias)) return try { - client.create(request).actionGet(requestTimeout).isAcknowledged + val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.create(request, it) } + createIndexResponse.isAcknowledged } catch (e: ResourceAlreadyExistsException) { true } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertMover.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertMover.kt index 057a9e08..a1af6949 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertMover.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/alerts/AlertMover.kt @@ -15,13 +15,11 @@ package com.amazon.opendistroforelasticsearch.alerting.alerts -import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX import com.amazon.opendistroforelasticsearch.alerting.model.Alert import com.amazon.opendistroforelasticsearch.alerting.model.Monitor -import org.apache.logging.log4j.Logger -import org.elasticsearch.action.ActionListener +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil import org.elasticsearch.action.bulk.BulkRequest import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.delete.DeleteRequest @@ -30,7 +28,6 @@ import org.elasticsearch.action.search.SearchRequest import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.client.Client import org.elasticsearch.common.bytes.BytesReference -import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.common.xcontent.LoggingDeprecationHandler import org.elasticsearch.common.xcontent.NamedXContentRegistry import org.elasticsearch.common.xcontent.ToXContent @@ -39,12 +36,13 @@ import org.elasticsearch.common.xcontent.XContentHelper import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentParserUtils import org.elasticsearch.common.xcontent.XContentType +import org.elasticsearch.index.VersionType import org.elasticsearch.index.query.QueryBuilders +import org.elasticsearch.rest.RestStatus import org.elasticsearch.search.builder.SearchSourceBuilder -import org.elasticsearch.threadpool.ThreadPool /** - * Class to manage the moving of active alerts when a monitor or trigger is deleted. + * Moves defunct active alerts to the alert history index when the corresponding monitor or trigger is deleted. * * The logic for moving alerts consists of: * 1. Find active alerts: @@ -54,114 +52,65 @@ import org.elasticsearch.threadpool.ThreadPool * 3. Delete alerts from [ALERT_INDEX] * 4. Schedule a retry if there were any failures */ -class AlertMover( - private val client: Client, - private val threadPool: ThreadPool, - private val monitorRunner: MonitorRunner, - private val alertIndices: AlertIndices, - private val backoff: Iterator, - private val logger: Logger, - private val monitorId: String, - private val monitor: Monitor? = null -) { +suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = null) { + val boolQuery = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId)) - private var hasFailures: Boolean = false - - fun run() { - if (alertIndices.isInitialized()) { - findActiveAlerts() - } - } - - private fun findActiveAlerts() { - val boolQuery = QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId)) - - if (monitor != null) { - boolQuery.mustNot(QueryBuilders.termsQuery(Alert.TRIGGER_ID_FIELD, monitor.triggers.map { it.id })) - } - - val activeAlertsQuery = SearchSourceBuilder.searchSource() - .query(boolQuery) - .version(true) - - val activeAlertsRequest = SearchRequest(AlertIndices.ALERT_INDEX) - .routing(monitorId) - .source(activeAlertsQuery) - client.search(activeAlertsRequest, ActionListener.wrap(::onSearchResponse, ::onFailure)) - } - - private fun onSearchResponse(response: SearchResponse) { - // If no alerts are found, simply return - if (response.hits.totalHits.value == 0L) return - val indexRequests = response.hits.map { hit -> - IndexRequest(AlertIndices.HISTORY_WRITE_INDEX) - .routing(monitorId) - .source(Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version) - .copy(state = Alert.State.DELETED) - .toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .setIfSeqNo(hit.seqNo) - .setIfPrimaryTerm(hit.primaryTerm) - .id(hit.id) - } - val copyRequest = BulkRequest().add(indexRequests) - client.bulk(copyRequest, ActionListener.wrap(::onCopyResponse, ::onFailure)) + if (monitor != null) { + boolQuery.mustNot(QueryBuilders.termsQuery(Alert.TRIGGER_ID_FIELD, monitor.triggers.map { it.id })) } - private fun onCopyResponse(response: BulkResponse) { - val deleteRequests = response.items.filterNot { it.isFailed }.map { - DeleteRequest(AlertIndices.ALERT_INDEX, it.id) - .routing(monitorId) - } - if (response.hasFailures()) { - hasFailures = true - for (it in response.items) { - logger.error("Failed to move deleted alert to alert history index: ${it.id}", - it.failure.cause) - } - } - - val bulkRequest = BulkRequest().add(deleteRequests) - client.bulk(bulkRequest, ActionListener.wrap(::onDeleteResponse, ::onFailure)) + val activeAlertsQuery = SearchSourceBuilder.searchSource() + .query(boolQuery) + .version(true) + + val activeAlertsRequest = SearchRequest(AlertIndices.ALERT_INDEX) + .routing(monitorId) + .source(activeAlertsQuery) + val response: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) } + + // If no alerts are found, simply return + if (response.hits.totalHits.value == 0L) return + val indexRequests = response.hits.map { hit -> + IndexRequest(AlertIndices.HISTORY_WRITE_INDEX) + .routing(monitorId) + .source(Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version) + .copy(state = Alert.State.DELETED) + .toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .version(hit.version) + .versionType(VersionType.EXTERNAL_GTE) + .id(hit.id) } - - private fun onDeleteResponse(response: BulkResponse) { - if (response.hasFailures()) { - hasFailures = true - for (it in response.items) { - logger.error("Failed to delete active alert from alert index: ${it.id}", - it.failure.cause) - } - } - if (hasFailures) reschedule() + val copyRequest = BulkRequest().add(indexRequests) + val copyResponse: BulkResponse = client.suspendUntil { bulk(copyRequest, it) } + + val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map { + DeleteRequest(AlertIndices.ALERT_INDEX, it.id) + .routing(monitorId) + .version(it.version) + .versionType(VersionType.EXTERNAL_GTE) } - - private fun onFailure(e: Exception) { - logger.error("Failed to move alerts for ${monitorIdTriggerIdsTuple()}", e) - reschedule() + val deleteResponse: BulkResponse = client.suspendUntil { bulk(BulkRequest().add(deleteRequests), it) } + + if (copyResponse.hasFailures()) { + val retryCause = copyResponse.items.filter { it.isFailed } + .firstOrNull { it.status() == RestStatus.TOO_MANY_REQUESTS } + ?.failure?.cause + throw RuntimeException("Failed to copy alerts for [$monitorId, ${monitor?.triggers?.map { it.id }}]: " + + copyResponse.buildFailureMessage(), retryCause) } - - private fun reschedule() { - if (backoff.hasNext()) { - logger.warn("Rescheduling AlertMover due to failure for ${monitorIdTriggerIdsTuple()}") - val wait = backoff.next() - val runnable = Runnable { - monitorRunner.rescheduleAlertMover(monitorId, monitor, backoff) - } - threadPool.schedule(runnable, wait, ThreadPool.Names.SAME) - } else { - logger.warn("Retries exhausted for ${monitorIdTriggerIdsTuple()}") - } + if (deleteResponse.hasFailures()) { + val retryCause = deleteResponse.items.filter { it.isFailed } + .firstOrNull { it.status() == RestStatus.TOO_MANY_REQUESTS } + ?.failure?.cause + throw RuntimeException("Failed to delete alerts for [$monitorId, ${monitor?.triggers?.map { it.id }}]: " + + deleteResponse.buildFailureMessage(), retryCause) } +} - private fun alertContentParser(bytesReference: BytesReference): XContentParser { - val xcp = XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, +private fun alertContentParser(bytesReference: BytesReference): XContentParser { + val xcp = XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) - return xcp - } - - private fun monitorIdTriggerIdsTuple(): String { - return "[$monitorId, ${monitor?.triggers?.map { it.id }}]" - } + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) + return xcp } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Action.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Action.kt index aafdd074..1d642de5 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Action.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Action.kt @@ -46,15 +46,19 @@ data class Action( } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - return builder.startObject() + val xContentBuilder = builder.startObject() .field(ID_FIELD, id) .field(NAME_FIELD, name) .field(DESTINATION_ID_FIELD, destinationId) - .field(SUBJECT_TEMPLATE_FIELD, subjectTemplate) .field(MESSAGE_TEMPLATE_FIELD, messageTemplate) .field(THROTTLE_ENABLED_FIELD, throttleEnabled) - .field(THROTTLE_FIELD, throttle) - .endObject() + if (subjectTemplate != null) { + xContentBuilder.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate) + } + if (throttle != null) { + xContentBuilder.field(THROTTLE_FIELD, throttle) + } + return xContentBuilder.endObject() } fun asTemplateArg(): Map { @@ -93,9 +97,14 @@ data class Action( ID_FIELD -> id = xcp.text() NAME_FIELD -> name = xcp.textOrNull() DESTINATION_ID_FIELD -> destinationId = xcp.textOrNull() - SUBJECT_TEMPLATE_FIELD -> subjectTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG) + SUBJECT_TEMPLATE_FIELD -> { + subjectTemplate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else + Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG) + } MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG) - THROTTLE_FIELD -> throttle = Throttle.parse(xcp) + THROTTLE_FIELD -> { + throttle = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else Throttle.parse(xcp) + } THROTTLE_ENABLED_FIELD -> { throttleEnabled = xcp.booleanValue() } @@ -106,6 +115,10 @@ data class Action( } } + if (throttleEnabled) { + requireNotNull(throttle, { "Action throttle enabled but not set throttle value" }) + } + return Action(requireNotNull(name) { "Action name is null" }, requireNotNull(destinationId) { "Destination id is null" }, subjectTemplate, diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Throttle.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Throttle.kt index b2f3367f..6a2656d3 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Throttle.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/action/Throttle.kt @@ -46,7 +46,7 @@ data class Throttle( @Throws(IOException::class) fun parse(xcp: XContentParser): Throttle { var value: Int = 0 - var unit: ChronoUnit? = null + var unit: ChronoUnit = ChronoUnit.MINUTES // only support MINUTES throttle unit currently XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/destination/Destination.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/destination/Destination.kt index c5e53d6f..fb0dbe53 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/destination/Destination.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/destination/Destination.kt @@ -132,6 +132,7 @@ data class Destination( } } + @Throws(IOException::class) fun publish(compiledSubject: String?, compiledMessage: String): String { val destinationMessage: BaseMessage when (type) { diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt index 9bc19d03..3349de8d 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt @@ -20,6 +20,9 @@ import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin import org.apache.logging.log4j.LogManager +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import org.elasticsearch.action.get.GetRequest import org.elasticsearch.action.get.GetResponse import org.elasticsearch.client.node.NodeClient @@ -61,15 +64,17 @@ class RestExecuteMonitorAction( val requestEnd = request.paramAsTime("period_end", TimeValue(Instant.now().toEpochMilli())) val executeMonitor = fun(monitor: Monitor) { - runner.executor().submit { + runner.launch { val (periodStart, periodEnd) = monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(requestEnd.millis)) try { val response = runner.runMonitor(monitor, periodStart, periodEnd, dryrun) - channel.sendResponse(BytesRestResponse(RestStatus.OK, channel.newBuilder().value(response))) + withContext(Dispatchers.IO) { + channel.sendResponse(BytesRestResponse(RestStatus.OK, channel.newBuilder().value(response))) + } } catch (e: Exception) { log.error("Unexpected error running monitor", e) - channel.sendResponse(BytesRestResponse(channel, e)) + withContext(Dispatchers.IO) { channel.sendResponse(BytesRestResponse(channel, e)) } } } } diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/XContentTests.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/XContentTests.kt index 9adcec10..fcf7d189 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/XContentTests.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/model/XContentTests.kt @@ -46,6 +46,27 @@ class XContentTests : ESTestCase() { assertEquals("Round tripping Monitor doesn't work", action, parsedAction) } + fun `test action parsing with null subject template`() { + val action = randomAction().copy(subjectTemplate = null) + val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedAction = Action.parse(parser(actionString)) + assertEquals("Round tripping Monitor doesn't work", action, parsedAction) + } + + fun `test action parsing with null throttle`() { + val action = randomAction().copy(throttle = null) + val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedAction = Action.parse(parser(actionString)) + assertEquals("Round tripping Monitor doesn't work", action, parsedAction) + } + + fun `test action parsing with throttled enabled and null throttle`() { + val action = randomAction().copy(throttle = null).copy(throttleEnabled = true) + val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + assertFailsWith("Action throttle enabled but not set throttle value") { + Action.parse(parser(actionString)) } + } + fun `test throttle parsing`() { val throttle = randomThrottle() val throttleString = throttle.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() diff --git a/core/build.gradle b/core/build.gradle index 415c49db..4e6e3aeb 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -20,6 +20,7 @@ apply plugin: 'jacoco' dependencies { compileOnly "org.elasticsearch:elasticsearch:${es_version}" compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" + compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1' compile "com.cronutils:cron-utils:7.0.5" testImplementation "org.elasticsearch.test:framework:${es_version}" diff --git a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt index cca71a73..4a504b0e 100644 --- a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt +++ b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt @@ -15,10 +15,14 @@ package com.amazon.opendistroforelasticsearch.alerting.elasticapi +import kotlinx.coroutines.delay +import org.apache.logging.log4j.Logger import org.elasticsearch.ElasticsearchException +import org.elasticsearch.action.ActionListener import org.elasticsearch.action.bulk.BackoffPolicy import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.action.search.ShardSearchFailure +import org.elasticsearch.client.ElasticsearchClient import org.elasticsearch.common.bytes.BytesReference import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.XContentBuilder @@ -26,10 +30,14 @@ import org.elasticsearch.common.xcontent.XContentHelper import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentParserUtils import org.elasticsearch.common.xcontent.XContentType +import org.elasticsearch.rest.RestStatus import org.elasticsearch.rest.RestStatus.BAD_GATEWAY import org.elasticsearch.rest.RestStatus.GATEWAY_TIMEOUT import org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE import java.time.Instant +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine /** Convert an object to maps and lists representation */ fun ToXContent.convertToMap(): Map { @@ -56,6 +64,39 @@ fun BackoffPolicy.retry(block: () -> T): T { } while (true) } +/** + * Retries the given [block] of code as specified by the receiver [BackoffPolicy], if [block] throws an [ElasticsearchException] + * that is retriable (502, 503, 504). + * + * If all retries fail the final exception will be rethrown. Exceptions caught during intermediate retries are + * logged as warnings to [logger]. Similar to [org.elasticsearch.action.bulk.Retry], except this retries on + * 502, 503, 504 error codes as well as 429. + * + * @param logger - logger used to log intermediate failures + * @param retryOn - any additional [RestStatus] values that should be retried + * @param block - the block of code to retry. This should be a suspend function. + */ +suspend fun BackoffPolicy.retry( + logger: Logger, + retryOn: List = emptyList(), + block: suspend () -> T +): T { + val iter = iterator() + do { + try { + return block() + } catch (e: ElasticsearchException) { + if (iter.hasNext() && (e.isRetriable() || retryOn.contains(e.status()))) { + val backoff = iter.next() + logger.warn("Operation failed. Retrying in $backoff.", e) + delay(backoff.millis) + } else { + throw e + } + } + } while (true) +} + /** * Retries on 502, 503 and 504 per elastic client's behavior: https://github.com/elastic/elasticsearch-net/issues/2061 * 429 must be retried manually as it's not clear if it's ok to retry for requests other than Bulk requests. @@ -90,3 +131,17 @@ fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContent * Extension function for ES 6.3 and above that duplicates the ES 6.2 XContentBuilder.string() method. */ fun XContentBuilder.string(): String = BytesReference.bytes(this).utf8ToString() + +/** + * Converts [ElasticsearchClient] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the ES client API. + */ +suspend fun C.suspendUntil(block: C.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } diff --git a/notification/src/main/java/com/amazon/opendistroforelasticsearch/alerting/destination/Notification.java b/notification/src/main/java/com/amazon/opendistroforelasticsearch/alerting/destination/Notification.java index 7eb8f04a..1b3d64e2 100644 --- a/notification/src/main/java/com/amazon/opendistroforelasticsearch/alerting/destination/Notification.java +++ b/notification/src/main/java/com/amazon/opendistroforelasticsearch/alerting/destination/Notification.java @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.alerting.destination.message.BaseMessage; import com.amazon.opendistroforelasticsearch.alerting.destination.response.BaseResponse; +import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedAction; @@ -37,7 +38,7 @@ public class Notification { * @param notificationMessage * @return BaseResponse */ - public static BaseResponse publish(BaseMessage notificationMessage) { + public static BaseResponse publish(BaseMessage notificationMessage) throws IOException { return AccessController.doPrivileged((PrivilegedAction) () -> { DestinationFactory destinationFactory = DestinationFactoryProvider.getFactory(notificationMessage.getChannelType()); return destinationFactory.publish(notificationMessage);