Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Execute monitor upon indexing & backport ElasticThreadContextElement. (
Browse files Browse the repository at this point in the history
…#93)

* Update execute API to keep thread context. (#90)

* Use the ElasticThreadContextElement when executing a monitor to preserve the context variables needed.
  • Loading branch information
lucaswin-amzn authored Aug 20, 2019
1 parent 478a58b commit bf2f6a1
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P
): List<RestHandler> {
return listOf(RestGetMonitorAction(settings, restController),
RestDeleteMonitorAction(settings, restController),
RestIndexMonitorAction(settings, restController, scheduledJobIndices, clusterService),
RestIndexMonitorAction(settings, restController, scheduledJobIndices, clusterService, runner),
RestSearchMonitorAction(settings, restController),
RestExecuteMonitorAction(settings, restController, runner),
RestAcknowledgeAlertAction(settings, restController),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.ElasticThreadContextElement
import org.apache.logging.log4j.LogManager
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -64,7 +65,9 @@ class RestExecuteMonitorAction(
val requestEnd = request.paramAsTime("period_end", TimeValue(Instant.now().toEpochMilli()))

val executeMonitor = fun(monitor: Monitor) {
runner.launch {
// Launch the coroutine with the clients threadContext. This is needed to preserve authentication information
// stored on the threadContext set by the security plugin when using the Alerting plugin with the Security plugin.
runner.launch(ElasticThreadContextElement(client.threadPool().threadContext)) {
val (periodStart, periodEnd) =
monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(requestEnd.millis))
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE
import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
Expand Down Expand Up @@ -53,6 +53,7 @@ import org.elasticsearch.rest.RestController
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestResponse
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestActionListener
import org.elasticsearch.rest.action.RestResponseListener
import java.io.IOException

Expand Down Expand Up @@ -119,12 +120,12 @@ class RestIndexDestinationAction(

fun start() {
if (!scheduledJobIndices.scheduledJobIndexExists()) {
scheduledJobIndices.initScheduledJobIndex(ActionListener.wrap(::onCreateMappingsResponse, ::onFailure))
scheduledJobIndices.initScheduledJobIndex(onCreateMappingsResponse(channel))
} else {
if (!IndexUtils.scheduledJobIndexUpdated) {
IndexUtils.updateIndexMapping(ScheduledJob.SCHEDULED_JOBS_INDEX, ScheduledJob.SCHEDULED_JOB_TYPE,
IndexUtils.updateIndexMapping(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE,
ScheduledJobIndices.scheduledJobMappings(), clusterService.state(), client.admin().indices(),
ActionListener.wrap(::onUpdateMappingsResponse, ::onFailure))
onUpdateMappingsResponse(channel))
} else {
prepareDestinationIndexing()
}
Expand All @@ -145,29 +146,37 @@ class RestIndexDestinationAction(
}
}

private fun onCreateMappingsResponse(response: CreateIndexResponse) {
if (response.isAcknowledged) {
log.info("Created ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.")
prepareDestinationIndexing()
IndexUtils.scheduledJobIndexUpdated()
} else {
log.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)))
private fun onCreateMappingsResponse(channel: RestChannel): RestActionListener<CreateIndexResponse> {
return object : RestActionListener<CreateIndexResponse>(channel) {
override fun processResponse(response: CreateIndexResponse) {
if (response.isAcknowledged) {
log.info("Created $SCHEDULED_JOBS_INDEX with mappings.")
prepareDestinationIndexing()
IndexUtils.scheduledJobIndexUpdated()
} else {
log.error("Create $SCHEDULED_JOBS_INDEX mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)))
}
}
}
}

private fun onUpdateMappingsResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Updated ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.")
IndexUtils.scheduledJobIndexUpdated()
prepareDestinationIndexing()
} else {
log.error("Update ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder().startObject()
.field("message", "Updated ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
.endObject(), ToXContent.EMPTY_PARAMS)))
private fun onUpdateMappingsResponse(channel: RestChannel): RestActionListener<AcknowledgedResponse> {
return object : RestActionListener<AcknowledgedResponse>(channel) {
override fun processResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Updated $SCHEDULED_JOBS_INDEX with mappings.")
IndexUtils.scheduledJobIndexUpdated()
prepareDestinationIndexing()
} else {
log.error("Update $SCHEDULED_JOBS_INDEX mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder().startObject()
.field("message", "Updated $SCHEDULED_JOBS_INDEX mappings call not acknowledged.")
.endObject(), ToXContent.EMPTY_PARAMS)))
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,28 @@
*/
package com.amazon.opendistroforelasticsearch.alerting.resthandler

import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOB_TYPE
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.ElasticThreadContextElement
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
Expand Down Expand Up @@ -64,6 +69,7 @@ import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.RestRequest.Method.PUT
import org.elasticsearch.rest.RestResponse
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestActionListener
import org.elasticsearch.rest.action.RestResponseListener
import org.elasticsearch.search.builder.SearchSourceBuilder
import java.io.IOException
Expand All @@ -79,7 +85,8 @@ class RestIndexMonitorAction(
settings: Settings,
controller: RestController,
jobIndices: ScheduledJobIndices,
clusterService: ClusterService
clusterService: ClusterService,
private val runner: MonitorRunner
) : BaseRestHandler(settings) {

private var scheduledJobIndices: ScheduledJobIndices
Expand All @@ -106,7 +113,7 @@ class RestIndexMonitorAction(
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): BaseRestHandler.RestChannelConsumer {
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val id = request.param("monitorID", Monitor.NO_ID)
if (request.method() == PUT && Monitor.NO_ID == id) {
throw IllegalArgumentException("Missing monitor ID")
Expand All @@ -123,8 +130,11 @@ class RestIndexMonitorAction(
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}

return RestChannelConsumer { channel ->
IndexMonitorHandler(client, channel, id, seqNo, primaryTerm, refreshPolicy, monitor).start()
GlobalScope.launch(ElasticThreadContextElement(client.threadPool().threadContext)) {
IndexMonitorHandler(client, channel, id, seqNo, primaryTerm, refreshPolicy, monitor).start()
}
}
}

Expand All @@ -138,14 +148,32 @@ class RestIndexMonitorAction(
private var newMonitor: Monitor
) : AsyncActionHandler(client, channel) {

fun start() {
suspend fun start() {
val (periodStart, periodEnd) = newMonitor.schedule.getPeriodEndingAt(Instant.now())
val result = runner.runMonitor(newMonitor, periodStart, periodEnd, true)
if (result.inputResults.error != null) {
log.error("Failed to index monitor due to an error during input execution: ${result.inputResults.error}")
channel.sendResponse(BytesRestResponse(channel, result.inputResults.error))
return
} else if (result.error != null) {
log.error("Failed to index monitor due to an error during execution: ${result.error}")
channel.sendResponse(BytesRestResponse(channel, result.error))
return
}
for ((_, triggerResult) in result.triggerResults) {
if (triggerResult.error != null) {
log.error("Failed to index monitor due to an error during trigger evaluation: ${triggerResult.error}")
channel.sendResponse(BytesRestResponse(channel, triggerResult.error))
return
}
}
if (!scheduledJobIndices.scheduledJobIndexExists()) {
scheduledJobIndices.initScheduledJobIndex(ActionListener.wrap(::onCreateMappingsResponse, ::onFailure))
scheduledJobIndices.initScheduledJobIndex(onCreateMappingsResponse(channel))
} else {
if (!IndexUtils.scheduledJobIndexUpdated) {
IndexUtils.updateIndexMapping(ScheduledJob.SCHEDULED_JOBS_INDEX, ScheduledJob.SCHEDULED_JOB_TYPE,
IndexUtils.updateIndexMapping(SCHEDULED_JOBS_INDEX, SCHEDULED_JOB_TYPE,
ScheduledJobIndices.scheduledJobMappings(), clusterService.state(), client.admin().indices(),
ActionListener.wrap(::onUpdateMappingsResponse, ::onFailure))
onUpdateMappingsResponse(channel))
} else {
prepareMonitorIndexing()
}
Expand All @@ -154,6 +182,7 @@ class RestIndexMonitorAction(

/**
* This function prepares for indexing a new monitor.
* Prior to indexing a new monitor we execute it to ensure the monitor will not violate the security plugin authentication.
* If this is an update request we can simply update the monitor. Otherwise we first check to see how many monitors already exist,
* and compare this to the [maxMonitorCount]. Requests that breach this threshold will be rejected.
*/
Expand All @@ -162,7 +191,7 @@ class RestIndexMonitorAction(
if (channel.request().method() == PUT) return updateMonitor()
val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE))
val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout)
val searchRequest = SearchRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX)
.source(searchSource)
client.search(searchRequest, ActionListener.wrap(::onSearchResponse, ::onFailure))
}
Expand Down Expand Up @@ -192,29 +221,37 @@ class RestIndexMonitorAction(
}
}

private fun onCreateMappingsResponse(response: CreateIndexResponse) {
if (response.isAcknowledged) {
log.info("Created ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.")
prepareMonitorIndexing()
IndexUtils.scheduledJobIndexUpdated()
} else {
log.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS)))
private fun onCreateMappingsResponse(channel: RestChannel): RestActionListener<CreateIndexResponse> {
return object : RestActionListener<CreateIndexResponse>(channel) {
override fun processResponse(response: CreateIndexResponse) {
if (response.isAcknowledged) {
log.info("Created $SCHEDULED_JOBS_INDEX with mappings.")
prepareMonitorIndexing()
IndexUtils.scheduledJobIndexUpdated()
} else {
log.error("Create $SCHEDULED_JOBS_INDEX mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS)))
}
}
}
}

private fun onUpdateMappingsResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Updated ${ScheduledJob.SCHEDULED_JOBS_INDEX} with mappings.")
IndexUtils.scheduledJobIndexUpdated()
prepareMonitorIndexing()
} else {
log.error("Updated ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder().startObject()
.field("message", "Updated ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
.endObject(), EMPTY_PARAMS)))
private fun onUpdateMappingsResponse(channel: RestChannel): RestActionListener<AcknowledgedResponse> {
return object : RestActionListener<AcknowledgedResponse>(channel) {
override fun processResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Updated $SCHEDULED_JOBS_INDEX with mappings.")
IndexUtils.scheduledJobIndexUpdated()
prepareMonitorIndexing()
} else {
log.error("Updated $SCHEDULED_JOBS_INDEX mappings call not acknowledged.")
channel.sendResponse(BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
response.toXContent(channel.newErrorBuilder().startObject()
.field("message", "Updated ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.")
.endObject(), EMPTY_PARAMS)))
}
}
}
}

Expand Down
Loading

0 comments on commit bf2f6a1

Please sign in to comment.