From 9ab735c8eda35aad387132b0fe52608ff68f4462 Mon Sep 17 00:00:00 2001 From: Sriram <59816283+skkosuri-amzn@users.noreply.github.com> Date: Thu, 1 Oct 2020 09:28:12 -0700 Subject: [PATCH] Inject roles for alerting background jobs (#259) * Inject roles for background jobs * Removed dead test --- .../alerting/AlertingPlugin.kt | 8 +- .../alerting/MonitorRunner.kt | 26 +++++- .../resthandler/RestAcknowledgeAlertAction.kt | 2 + .../RestDeleteDestinationAction.kt | 2 + .../resthandler/RestDeleteMonitorAction.kt | 2 + .../resthandler/RestExecuteMonitorAction.kt | 5 ++ .../resthandler/RestGetMonitorAction.kt | 5 ++ .../resthandler/RestIndexDestinationAction.kt | 21 ++++- .../resthandler/RestIndexMonitorAction.kt | 25 +++++- .../resthandler/RestSearchMonitorAction.kt | 9 +- .../TransportAcknowledgeAlertAction.kt | 9 +- .../TransportDeleteDestinationAction.kt | 20 +++-- .../transport/TransportDeleteMonitorAction.kt | 20 +++-- .../TransportExecuteMonitorAction.kt | 83 +++++++++--------- .../transport/TransportGetMonitorAction.kt | 50 ++++++----- .../TransportIndexDestinationAction.kt | 34 +++---- .../transport/TransportIndexMonitorAction.kt | 82 ++++++++++++----- .../transport/TransportSearchMonitorAction.kt | 20 +++-- .../alerting/util/AlertingException.kt | 83 ++++++++++++++++++ .../alerting/MonitorRunnerIT.kt | 19 ++-- .../alerting/resthandler/MonitorRestApiIT.kt | 14 +++ core/build.gradle | 6 ++ core/libs/common-utils-1.10.1.0.jar | Bin 0 -> 12663 bytes .../alerting/elasticapi/ElasticExtensions.kt | 20 +++++ 24 files changed, 414 insertions(+), 151 deletions(-) create mode 100644 alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/AlertingException.kt create mode 100644 core/libs/common-utils-1.10.1.0.jar diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt index dfa18670..816bebea 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt @@ -81,10 +81,12 @@ import com.amazon.opendistroforelasticsearch.alerting.transport.TransportIndexMo import com.amazon.opendistroforelasticsearch.alerting.transport.TransportSearchEmailAccountAction import com.amazon.opendistroforelasticsearch.alerting.transport.TransportSearchEmailGroupAction import com.amazon.opendistroforelasticsearch.alerting.transport.TransportSearchMonitorAction +import com.amazon.opendistroforelasticsearch.commons.rest.SecureRestClientBuilder import com.amazon.opendistroforelasticsearch.alerting.transport.TransportGetDestinationsAction import org.elasticsearch.action.ActionRequest import org.elasticsearch.action.ActionResponse import org.elasticsearch.client.Client +import org.elasticsearch.client.RestClient import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver import org.elasticsearch.cluster.node.DiscoveryNodes import org.elasticsearch.cluster.service.ClusterService @@ -143,6 +145,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var threadPool: ThreadPool lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService + lateinit var restClient: RestClient override fun getRestHandlers( settings: Settings, @@ -155,12 +158,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ): List { return listOf(RestGetMonitorAction(), RestDeleteMonitorAction(), - RestIndexMonitorAction(), + RestIndexMonitorAction(settings, restClient), RestSearchMonitorAction(), RestExecuteMonitorAction(), RestAcknowledgeAlertAction(), RestScheduledJobStatsHandler("_alerting"), - RestIndexDestinationAction(settings), + RestIndexDestinationAction(settings, restClient), RestDeleteDestinationAction(), RestIndexEmailAccountAction(), RestDeleteEmailAccountAction(), @@ -225,6 +228,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) this.threadPool = threadPool this.clusterService = clusterService + this.restClient = SecureRestClientBuilder(settings, environment.configFile()).build() return listOf(sweeper, scheduler, runner, scheduledJobIndices) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt index 665d904e..f94f8379 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt @@ -21,6 +21,7 @@ import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput +import com.amazon.opendistroforelasticsearch.alerting.elasticapi.InjectorContextElement import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry @@ -58,6 +59,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.DocWriteRequest @@ -94,7 +96,7 @@ import java.time.Instant import kotlin.coroutines.CoroutineContext class MonitorRunner( - settings: Settings, + private val settings: Settings, private val client: Client, private val threadPool: ThreadPool, private val scriptService: ScriptService, @@ -185,6 +187,22 @@ class MonitorRunner( } suspend fun runMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant, dryrun: Boolean = false): MonitorRunResult { + /* + * We need to handle 3 cases: + * 1. Monitors created by older versions and never updated. These monitors wont have User details in the + * monitor object. `monitor.user` will be null. Insert `all_access, AmazonES_all_access` role. + * 2. Monitors are created when security plugin is disabled, these will have empty User object. + * (`monitor.user.name`, `monitor.user.roles` are empty ) + * 3. Monitors are created when security plugin is enabled, these will have an User object. + */ + var roles = if (monitor.user == null) { + // fixme: discuss and remove hardcoded to settings? + settings.getAsList("", listOf("all_access,AmazonES_all_access")) + } else { + monitor.user.roles + } + logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") + if (periodStart == periodEnd) { logger.warn("Start and end time are the same: $periodStart. This monitor will probably only run once.") } @@ -200,9 +218,9 @@ class MonitorRunner( logger.error("Error loading alerts for monitor: $id", e) return monitorResult.copy(error = e) } - - monitorResult = monitorResult.copy(inputResults = collectInputResults(monitor, periodStart, periodEnd)) - + runBlocking(InjectorContextElement(monitor.id, settings, threadPool.threadContext, roles)) { + monitorResult = monitorResult.copy(inputResults = collectInputResults(monitor, periodStart, periodEnd)) + } val updatedAlerts = mutableListOf() val triggerResults = mutableMapOf() for (trigger in monitor.triggers) { diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestAcknowledgeAlertAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestAcknowledgeAlertAction.kt index e239a603..68dc28e8 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestAcknowledgeAlertAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestAcknowledgeAlertAction.kt @@ -55,6 +55,8 @@ class RestAcknowledgeAlertAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/_acknowledge/alerts") + val monitorId = request.param("monitorID") require(!monitorId.isNullOrEmpty()) { "Missing monitor id." } val alertIds = getAlertIds(request.contentParser()) diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteDestinationAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteDestinationAction.kt index 4eee856b..49ebb4c3 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteDestinationAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteDestinationAction.kt @@ -49,6 +49,8 @@ class RestDeleteDestinationAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.DESTINATION_BASE_URI}/{destinationID}") + val destinationId = request.param("destinationID") log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/$destinationId") diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteMonitorAction.kt index 4b22c2da..4be9e8aa 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestDeleteMonitorAction.kt @@ -51,6 +51,8 @@ class RestDeleteMonitorAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}") + val monitorId = request.param("monitorID") log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/$monitorId") diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt index 3a942077..6e0912b5 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestExecuteMonitorAction.kt @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin import com.amazon.opendistroforelasticsearch.alerting.action.ExecuteMonitorAction import com.amazon.opendistroforelasticsearch.alerting.action.ExecuteMonitorRequest import com.amazon.opendistroforelasticsearch.alerting.model.Monitor +import org.apache.logging.log4j.LogManager import org.elasticsearch.client.node.NodeClient import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.common.xcontent.XContentParser.Token.START_OBJECT @@ -31,6 +32,8 @@ import org.elasticsearch.rest.RestRequest.Method.POST import org.elasticsearch.rest.action.RestToXContentListener import java.time.Instant +private val log = LogManager.getLogger(RestExecuteMonitorAction::class.java) + class RestExecuteMonitorAction : BaseRestHandler() { override fun getName(): String = "execute_monitor_action" @@ -43,6 +46,8 @@ class RestExecuteMonitorAction : BaseRestHandler() { } override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/_execute") + return RestChannelConsumer { channel -> val dryrun = request.paramAsBoolean("dryrun", false) val requestEnd = request.paramAsTime("period_end", TimeValue(Instant.now().toEpochMilli())) diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestGetMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestGetMonitorAction.kt index b7619b81..60f2c544 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestGetMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestGetMonitorAction.kt @@ -18,6 +18,7 @@ import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorAction import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorRequest import com.amazon.opendistroforelasticsearch.alerting.util.context +import org.apache.logging.log4j.LogManager import org.elasticsearch.client.node.NodeClient import org.elasticsearch.rest.BaseRestHandler import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer @@ -29,6 +30,8 @@ import org.elasticsearch.rest.action.RestActions import org.elasticsearch.rest.action.RestToXContentListener import org.elasticsearch.search.fetch.subphase.FetchSourceContext +private val log = LogManager.getLogger(RestGetMonitorAction::class.java) + /** * This class consists of the REST handler to retrieve a monitor . */ @@ -47,6 +50,8 @@ class RestGetMonitorAction : BaseRestHandler() { } override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}") + val monitorId = request.param("monitorID") if (monitorId == null || monitorId.isEmpty()) { throw IllegalArgumentException("missing id") diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexDestinationAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexDestinationAction.kt index 913edab6..ab6412c3 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexDestinationAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexDestinationAction.kt @@ -19,13 +19,17 @@ import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationAction import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationRequest import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationResponse +import com.amazon.opendistroforelasticsearch.alerting.core.model.User import com.amazon.opendistroforelasticsearch.alerting.model.destination.Destination import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH +import com.amazon.opendistroforelasticsearch.commons.ConfigConstants +import com.amazon.opendistroforelasticsearch.commons.authuser.AuthUser import org.apache.logging.log4j.LogManager import org.elasticsearch.action.support.WriteRequest +import org.elasticsearch.client.RestClient import org.elasticsearch.client.node.NodeClient import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.ToXContent @@ -49,9 +53,18 @@ private val log = LogManager.getLogger(RestIndexDestinationAction::class.java) * Rest handlers to create and update Destination */ class RestIndexDestinationAction( - settings: Settings + settings: Settings, + restClient: RestClient ) : BaseRestHandler() { + @Volatile private var indexTimeout = INDEX_TIMEOUT.get(settings) + private val restClient: RestClient + private val settings: Settings + + init { + this.restClient = restClient + this.settings = settings + } override fun getName(): String { return "index_destination_action" @@ -66,15 +79,21 @@ class RestIndexDestinationAction( @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): BaseRestHandler.RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.DESTINATION_BASE_URI}") + val id = request.param("destinationID", Destination.NO_ID) if (request.method() == RestRequest.Method.PUT && Destination.NO_ID == id) { throw IllegalArgumentException("Missing destination ID") } + // Get roles of the user executing this rest action + val user = AuthUser(settings, restClient, request.headers[ConfigConstants.AUTHORIZATION]).get() + // Validate request by parsing JSON to Destination val xcp = request.contentParser() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) val destination = Destination.parse(xcp, id) + .copy(user = User(user.userName, user.backendRoles, user.roles, user.customAttNames)) val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) val refreshPolicy = if (request.hasParam(REFRESH)) { diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt index 9fef1d6a..b5750901 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -19,12 +19,17 @@ import com.amazon.opendistroforelasticsearch.alerting.action.IndexMonitorAction import com.amazon.opendistroforelasticsearch.alerting.action.IndexMonitorRequest import com.amazon.opendistroforelasticsearch.alerting.action.IndexMonitorResponse import com.amazon.opendistroforelasticsearch.alerting.model.Monitor +import com.amazon.opendistroforelasticsearch.alerting.core.model.User import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH +import com.amazon.opendistroforelasticsearch.commons.ConfigConstants +import com.amazon.opendistroforelasticsearch.commons.authuser.AuthUser import org.apache.logging.log4j.LogManager import org.elasticsearch.action.support.WriteRequest +import org.elasticsearch.client.RestClient import org.elasticsearch.client.node.NodeClient +import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.XContentParser.Token import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken @@ -48,7 +53,18 @@ private val log = LogManager.getLogger(RestIndexMonitorAction::class.java) /** * Rest handlers to create and update monitors. */ -class RestIndexMonitorAction : BaseRestHandler() { +class RestIndexMonitorAction( + settings: Settings, + restClient: RestClient +) : BaseRestHandler() { + + private val restClient: RestClient + private val settings: Settings + + init { + this.restClient = restClient + this.settings = settings + } override fun getName(): String { return "index_monitor_action" @@ -63,15 +79,22 @@ class RestIndexMonitorAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}") + val id = request.param("monitorID", Monitor.NO_ID) if (request.method() == PUT && Monitor.NO_ID == id) { throw IllegalArgumentException("Missing monitor ID") } + // Get roles of the user executing this rest action + val user = AuthUser(settings, restClient, request.headers[ConfigConstants.AUTHORIZATION]).get() + // Validate request by parsing JSON to Monitor val xcp = request.contentParser() ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now()) + .copy(user = User(user.userName, user.backendRoles, user.roles, user.customAttNames)) + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) val refreshPolicy = if (request.hasParam(REFRESH)) { diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestSearchMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestSearchMonitorAction.kt index a97da56a..a48eb879 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestSearchMonitorAction.kt @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.util.context +import org.apache.logging.log4j.LogManager import org.elasticsearch.action.search.SearchRequest import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.client.node.NodeClient @@ -43,6 +44,8 @@ import org.elasticsearch.rest.action.RestResponseListener import org.elasticsearch.search.builder.SearchSourceBuilder import java.io.IOException +private val log = LogManager.getLogger(RestSearchMonitorAction::class.java) + /** * Rest handlers to search for monitors. */ @@ -62,6 +65,10 @@ class RestSearchMonitorAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/_search") + + val index = request.param("index", SCHEDULED_JOBS_INDEX) + val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.parseXContent(request.contentOrSourceParamParser()) searchSourceBuilder.fetchSource(context(request)) @@ -73,7 +80,7 @@ class RestSearchMonitorAction : BaseRestHandler() { .version(true) val searchRequest = SearchRequest() .source(searchSourceBuilder) - .indices(SCHEDULED_JOBS_INDEX) + .indices(index) return RestChannelConsumer { channel -> client.execute(SearchMonitorAction.INSTANCE, searchRequest, searchMonitorResponse(channel)) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportAcknowledgeAlertAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportAcknowledgeAlertAction.kt index 83340925..7eeb85b9 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportAcknowledgeAlertAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportAcknowledgeAlertAction.kt @@ -6,6 +6,7 @@ import com.amazon.opendistroforelasticsearch.alerting.action.AcknowledgeAlertRes import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField import com.amazon.opendistroforelasticsearch.alerting.model.Alert +import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import org.apache.logging.log4j.LogManager import org.elasticsearch.action.ActionListener import org.elasticsearch.action.bulk.BulkRequest @@ -42,7 +43,9 @@ class TransportAcknowledgeAlertAction @Inject constructor( ) { override fun doExecute(task: Task, request: AcknowledgeAlertRequest, actionListener: ActionListener) { - AcknowledgeHandler(client, actionListener, request).start() + client.threadPool().threadContext.stashContext().use { + AcknowledgeHandler(client, actionListener, request).start() + } } inner class AcknowledgeHandler( @@ -69,7 +72,7 @@ class TransportAcknowledgeAlertAction @Inject constructor( } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } @@ -103,7 +106,7 @@ class TransportAcknowledgeAlertAction @Inject constructor( } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteDestinationAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteDestinationAction.kt index dd883638..cf11349a 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteDestinationAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteDestinationAction.kt @@ -18,6 +18,7 @@ package com.amazon.opendistroforelasticsearch.alerting.transport import com.amazon.opendistroforelasticsearch.alerting.action.DeleteDestinationAction import com.amazon.opendistroforelasticsearch.alerting.action.DeleteDestinationRequest import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob +import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import org.elasticsearch.action.ActionListener import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.delete.DeleteResponse @@ -39,15 +40,16 @@ class TransportDeleteDestinationAction @Inject constructor( override fun doExecute(task: Task, request: DeleteDestinationRequest, actionListener: ActionListener) { val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.destinationId) .setRefreshPolicy(request.refreshPolicy) + client.threadPool().threadContext.stashContext().use { + client.delete(deleteRequest, object : ActionListener { + override fun onResponse(response: DeleteResponse) { + actionListener.onResponse(response) + } - client.delete(deleteRequest, object : ActionListener { - override fun onResponse(response: DeleteResponse) { - actionListener.onResponse(response) - } - - override fun onFailure(t: Exception) { - actionListener.onFailure(t) - } - }) + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteMonitorAction.kt index b701350b..abaec54e 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -18,6 +18,7 @@ package com.amazon.opendistroforelasticsearch.alerting.transport import com.amazon.opendistroforelasticsearch.alerting.action.DeleteMonitorAction import com.amazon.opendistroforelasticsearch.alerting.action.DeleteMonitorRequest import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob +import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import org.elasticsearch.action.ActionListener import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.delete.DeleteResponse @@ -39,15 +40,16 @@ class TransportDeleteMonitorAction @Inject constructor( override fun doExecute(task: Task, request: DeleteMonitorRequest, actionListener: ActionListener) { val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.monitorId) .setRefreshPolicy(request.refreshPolicy) + client.threadPool().threadContext.stashContext().use { + client.delete(deleteRequest, object : ActionListener { + override fun onResponse(response: DeleteResponse) { + actionListener.onResponse(response) + } - client.delete(deleteRequest, object : ActionListener { - override fun onResponse(response: DeleteResponse) { - actionListener.onResponse(response) - } - - override fun onFailure(t: Exception) { - actionListener.onFailure(t) - } - }) + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportExecuteMonitorAction.kt index d7a4c27a..eecdad52 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -5,8 +5,8 @@ import com.amazon.opendistroforelasticsearch.alerting.action.ExecuteMonitorActio import com.amazon.opendistroforelasticsearch.alerting.action.ExecuteMonitorRequest import com.amazon.opendistroforelasticsearch.alerting.action.ExecuteMonitorResponse import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob -import com.amazon.opendistroforelasticsearch.alerting.elasticapi.ElasticThreadContextElement import com.amazon.opendistroforelasticsearch.alerting.model.Monitor +import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.withContext @@ -41,52 +41,55 @@ class TransportExecuteMonitorAction @Inject constructor( override fun doExecute(task: Task, execMonitorRequest: ExecuteMonitorRequest, actionListener: ActionListener) { - val executeMonitor = fun(monitor: Monitor) { - // Launch the coroutine with the clients threadContext. This is needed to preserve authentication information - // stored on the threadContext set by the security plugin when using the Alerting plugin with the Security plugin. - runner.launch(ElasticThreadContextElement(client.threadPool().threadContext)) { - val (periodStart, periodEnd) = - monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorRequest.requestEnd.millis)) - try { - val monitorRunResult = runner.runMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun) - withContext(Dispatchers.IO) { - actionListener.onResponse(ExecuteMonitorResponse(monitorRunResult)) - } - } catch (e: Exception) { - log.error("Unexpected error running monitor", e) - withContext(Dispatchers.IO) { - actionListener.onFailure(e) + client.threadPool().threadContext.stashContext().use { + val executeMonitor = fun(monitor: Monitor) { + // Launch the coroutine with the clients threadContext. This is needed to preserve authentication information + // stored on the threadContext set by the security plugin when using the Alerting plugin with the Security plugin. + // runner.launch(ElasticThreadContextElement(client.threadPool().threadContext)) { + runner.launch { + val (periodStart, periodEnd) = + monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorRequest.requestEnd.millis)) + try { + val monitorRunResult = runner.runMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun) + withContext(Dispatchers.IO) { + actionListener.onResponse(ExecuteMonitorResponse(monitorRunResult)) + } + } catch (e: Exception) { + log.error("Unexpected error running monitor", e) + withContext(Dispatchers.IO) { + actionListener.onFailure(AlertingException.wrap(e)) + } } } } - } - if (execMonitorRequest.monitorId != null) { - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId) - client.get(getRequest, object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure( - ElasticsearchStatusException("Can't find monitor with id: ${response.id}", RestStatus.NOT_FOUND) - ) - return - } - if (!response.isSourceEmpty) { - XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON).use { xcp -> - val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor - executeMonitor(monitor) + if (execMonitorRequest.monitorId != null) { + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId) + client.get(getRequest, object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure(AlertingException.wrap( + ElasticsearchStatusException("Can't find monitor with id: ${response.id}", RestStatus.NOT_FOUND) + )) + return + } + if (!response.isSourceEmpty) { + XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, XContentType.JSON).use { xcp -> + val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor + executeMonitor(monitor) + } } } - } - override fun onFailure(t: Exception) { - actionListener.onFailure(t) - } - }) - } else { - val monitor = execMonitorRequest.monitor as Monitor - executeMonitor(monitor) + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } else { + val monitor = execMonitorRequest.monitor as Monitor + executeMonitor(monitor) + } } } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportGetMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportGetMonitorAction.kt index d1580dd1..6dbd2b71 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportGetMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportGetMonitorAction.kt @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorRequest import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorResponse import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.model.Monitor +import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import org.apache.logging.log4j.LogManager import org.elasticsearch.ElasticsearchStatusException import org.elasticsearch.action.ActionListener @@ -54,29 +55,38 @@ class TransportGetMonitorAction @Inject constructor( .version(getMonitorRequest.version) .fetchSourceContext(getMonitorRequest.srcContext) - client.get(getRequest, object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure(ElasticsearchStatusException("Monitor not found.", RestStatus.NOT_FOUND)) - return - } + /* + * Remove security context before you call elasticsearch api's. By this time, permissions required + * to call this api are validated. + * Once system-indices [https://github.com/opendistro-for-elasticsearch/security/issues/666] is done, we + * might further improve this logic. Also change try to kotlin-use for auto-closable. + */ + client.threadPool().threadContext.stashContext().use { + client.get(getRequest, object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure( + AlertingException.wrap(ElasticsearchStatusException("Monitor not found.", RestStatus.NOT_FOUND))) + return + } - var monitor: Monitor? = null - if (!response.isSourceEmpty) { - XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON).use { xcp -> - monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor + var monitor: Monitor? = null + if (!response.isSourceEmpty) { + XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, XContentType.JSON).use { xcp -> + monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor + } } - } - actionListener.onResponse( - GetMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, monitor) - ) - } + actionListener.onResponse( + GetMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, monitor) + ) + } - override fun onFailure(t: Exception) { - actionListener.onFailure(t) - } - }) + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexDestinationAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexDestinationAction.kt index b06795e4..846a0a43 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexDestinationAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexDestinationAction.kt @@ -6,6 +6,7 @@ import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationRes import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings +import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils import org.apache.logging.log4j.LogManager import org.elasticsearch.ElasticsearchStatusException @@ -49,7 +50,9 @@ class TransportIndexDestinationAction @Inject constructor( } override fun doExecute(task: Task, request: IndexDestinationRequest, actionListener: ActionListener) { - IndexDestinationHandler(client, actionListener, request).start() + client.threadPool().threadContext.stashContext().use { + IndexDestinationHandler(client, actionListener, request).start() + } } inner class IndexDestinationHandler( @@ -65,7 +68,7 @@ class TransportIndexDestinationAction @Inject constructor( onCreateMappingsResponse(response) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } else if (!IndexUtils.scheduledJobIndexUpdated) { @@ -76,7 +79,7 @@ class TransportIndexDestinationAction @Inject constructor( onUpdateMappingsResponse(response) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } else { @@ -99,14 +102,15 @@ class TransportIndexDestinationAction @Inject constructor( override fun onResponse(response: IndexResponse) { val failureReasons = checkShardsFailure(response) if (failureReasons != null) { - actionListener.onFailure(ElasticsearchStatusException(failureReasons.toString(), response.status())) + actionListener.onFailure( + AlertingException.wrap(ElasticsearchStatusException(failureReasons.toString(), response.status()))) return } actionListener.onResponse(IndexDestinationResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.CREATED, destination)) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } @@ -118,11 +122,11 @@ class TransportIndexDestinationAction @Inject constructor( IndexUtils.scheduledJobIndexUpdated() } else { log.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.") - actionListener.onFailure( + actionListener.onFailure(AlertingException.wrap( ElasticsearchStatusException( "Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged", RestStatus.INTERNAL_SERVER_ERROR - ) + )) ) } } @@ -134,11 +138,10 @@ class TransportIndexDestinationAction @Inject constructor( prepareDestinationIndexing() } else { log.error("Update ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.") - actionListener.onFailure( - ElasticsearchStatusException( + actionListener.onFailure(AlertingException.wrap(ElasticsearchStatusException( "Updated ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.", RestStatus.INTERNAL_SERVER_ERROR - ) + )) ) } } @@ -150,15 +153,15 @@ class TransportIndexDestinationAction @Inject constructor( onGetResponse(response) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } private fun onGetResponse(response: GetResponse) { if (!response.isExists) { - actionListener.onFailure( - ElasticsearchStatusException("Destination with ${request.destinationId} is not found", RestStatus.NOT_FOUND)) + actionListener.onFailure(AlertingException.wrap( + ElasticsearchStatusException("Destination with ${request.destinationId} is not found", RestStatus.NOT_FOUND))) return } @@ -175,14 +178,15 @@ class TransportIndexDestinationAction @Inject constructor( override fun onResponse(response: IndexResponse) { val failureReasons = checkShardsFailure(response) if (failureReasons != null) { - actionListener.onFailure(ElasticsearchStatusException(failureReasons.toString(), response.status())) + actionListener.onFailure( + AlertingException.wrap(ElasticsearchStatusException(failureReasons.toString(), response.status()))) return } actionListener.onResponse(IndexDestinationResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.CREATED, destination)) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexMonitorAction.kt index 96842307..aed16aa1 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexMonitorAction.kt @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.alerting.action.IndexMonitorRespons import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX +import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT +import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils import org.apache.logging.log4j.LogManager import org.elasticsearch.ElasticsearchStatusException @@ -57,7 +59,6 @@ import org.elasticsearch.rest.RestStatus import org.elasticsearch.search.builder.SearchSourceBuilder import org.elasticsearch.tasks.Task import org.elasticsearch.transport.TransportService -import java.lang.RuntimeException import java.time.Duration private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java) @@ -87,7 +88,13 @@ class TransportIndexMonitorAction @Inject constructor( } override fun doExecute(task: Task, request: IndexMonitorRequest, actionListener: ActionListener) { - IndexMonitorHandler(client, actionListener, request).start() + + if (!isValidIndex(request, actionListener)) + return + + client.threadPool().threadContext.stashContext().use { + IndexMonitorHandler(client, actionListener, request).start() + } } inner class IndexMonitorHandler( @@ -103,7 +110,7 @@ class TransportIndexMonitorAction @Inject constructor( onCreateMappingsResponse(response) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } else if (!IndexUtils.scheduledJobIndexUpdated) { @@ -114,7 +121,7 @@ class TransportIndexMonitorAction @Inject constructor( onUpdateMappingsResponse(response) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } else { @@ -132,7 +139,7 @@ class TransportIndexMonitorAction @Inject constructor( try { validateActionThrottle(request.monitor, maxActionThrottle, TimeValue.timeValueMinutes(1)) } catch (e: RuntimeException) { - actionListener.onFailure(e) + actionListener.onFailure(AlertingException.wrap(e)) return } @@ -147,7 +154,7 @@ class TransportIndexMonitorAction @Inject constructor( } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } @@ -171,11 +178,13 @@ class TransportIndexMonitorAction @Inject constructor( private fun onSearchResponse(response: SearchResponse) { val totalHits = response.hits.totalHits?.value if (totalHits != null && totalHits >= maxMonitors) { - log.error("This request would create more than the allowed monitors [$maxMonitors].") + log.error("This request would wrap more than the allowed monitors [$maxMonitors].") actionListener.onFailure( - IllegalArgumentException("This request would create more than the allowed monitors [$maxMonitors].") + AlertingException.wrap(IllegalArgumentException( + "This request would create more than the allowed monitors [$maxMonitors].")) ) } else { + indexMonitor() } } @@ -187,11 +196,8 @@ class TransportIndexMonitorAction @Inject constructor( IndexUtils.scheduledJobIndexUpdated() } else { log.error("Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.") - actionListener.onFailure( - ElasticsearchStatusException( - "Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged", - RestStatus.INTERNAL_SERVER_ERROR - ) + actionListener.onFailure(AlertingException.wrap(ElasticsearchStatusException( + "Create ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged", RestStatus.INTERNAL_SERVER_ERROR)) ) } } @@ -203,11 +209,9 @@ class TransportIndexMonitorAction @Inject constructor( prepareMonitorIndexing() } else { log.error("Update ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.") - actionListener.onFailure( - ElasticsearchStatusException( + actionListener.onFailure(AlertingException.wrap(ElasticsearchStatusException( "Updated ${ScheduledJob.SCHEDULED_JOBS_INDEX} mappings call not acknowledged.", - RestStatus.INTERNAL_SERVER_ERROR - ) + RestStatus.INTERNAL_SERVER_ERROR)) ) } } @@ -224,14 +228,15 @@ class TransportIndexMonitorAction @Inject constructor( override fun onResponse(response: IndexResponse) { val failureReasons = checkShardsFailure(response) if (failureReasons != null) { - actionListener.onFailure(ElasticsearchStatusException(failureReasons.toString(), response.status())) + actionListener.onFailure( + AlertingException.wrap(ElasticsearchStatusException(failureReasons.toString(), response.status()))) return } actionListener.onResponse(IndexMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.CREATED, request.monitor)) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } @@ -243,15 +248,15 @@ class TransportIndexMonitorAction @Inject constructor( onGetResponse(response) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } private fun onGetResponse(response: GetResponse) { if (!response.isExists) { - actionListener.onFailure( - ElasticsearchStatusException("Monitor with ${request.monitorId} is not found", RestStatus.NOT_FOUND)) + actionListener.onFailure(AlertingException.wrap( + ElasticsearchStatusException("Monitor with ${request.monitorId} is not found", RestStatus.NOT_FOUND))) return } @@ -276,7 +281,8 @@ class TransportIndexMonitorAction @Inject constructor( override fun onResponse(response: IndexResponse) { val failureReasons = checkShardsFailure(response) if (failureReasons != null) { - actionListener.onFailure(ElasticsearchStatusException(failureReasons.toString(), response.status())) + actionListener.onFailure( + AlertingException.wrap(ElasticsearchStatusException(failureReasons.toString(), response.status()))) return } actionListener.onResponse( @@ -285,7 +291,7 @@ class TransportIndexMonitorAction @Inject constructor( ) } override fun onFailure(t: Exception) { - actionListener.onFailure(t) + actionListener.onFailure(AlertingException.wrap(t)) } }) } @@ -301,4 +307,32 @@ class TransportIndexMonitorAction @Inject constructor( return null } } + + /** + * Check if user has permissions to read the configured indices on the monitor. + * Due to below issue with security plugin, we get security_exception when invalid index name is mentioned. + * https://github.com/opendistro-for-elasticsearch/security/issues/718 + */ + private fun isValidIndex(request: IndexMonitorRequest, actionListener: ActionListener): Boolean { + var ret = true + val searchInputs = request.monitor.inputs.filter { it.name() == SearchInput.SEARCH_FIELD } + searchInputs.forEach { + val searchInput = it as SearchInput + val searchRequest = SearchRequest().indices(*searchInput.indices.toTypedArray()) + .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) + client.search(searchRequest, object : ActionListener { + override fun onResponse(searchResponse: SearchResponse) { + // ignore + } + + override fun onFailure(t: Exception) { + val ex = ElasticsearchStatusException("User doesn't have read permissions for the configured index " + + "${searchInput.indices}", RestStatus.FORBIDDEN) + actionListener.onFailure(AlertingException.wrap(ex)) + ret = false + } + }) + } + return ret + } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportSearchMonitorAction.kt index 920edb31..b0b7c3eb 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportSearchMonitorAction.kt @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.alerting.transport import com.amazon.opendistroforelasticsearch.alerting.action.SearchMonitorAction +import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import org.elasticsearch.action.ActionListener import org.elasticsearch.action.search.SearchRequest import org.elasticsearch.action.search.SearchResponse @@ -35,14 +36,17 @@ class TransportSearchMonitorAction @Inject constructor( ) { override fun doExecute(task: Task, searchRequest: SearchRequest, actionListener: ActionListener) { - client.search(searchRequest, object : ActionListener { - override fun onResponse(response: SearchResponse) { - actionListener.onResponse(response) - } - override fun onFailure(t: Exception) { - actionListener.onFailure(t) - } - }) + client.threadPool().threadContext.stashContext().use { + client.search(searchRequest, object : ActionListener { + override fun onResponse(response: SearchResponse) { + actionListener.onResponse(response) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/AlertingException.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/AlertingException.kt new file mode 100644 index 00000000..a53b9f94 --- /dev/null +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/AlertingException.kt @@ -0,0 +1,83 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.alerting.util + +import org.apache.logging.log4j.LogManager +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.ElasticsearchSecurityException +import org.elasticsearch.ElasticsearchStatusException +import org.elasticsearch.common.Strings +import org.elasticsearch.index.IndexNotFoundException +import org.elasticsearch.index.engine.VersionConflictEngineException +import org.elasticsearch.indices.InvalidIndexNameException +import org.elasticsearch.rest.RestStatus + +private val log = LogManager.getLogger(AlertingException::class.java) + +/** + * Converts into a user friendly message. + */ +class AlertingException(message: String, val status: RestStatus, ex: Exception) : ElasticsearchException(message, ex) { + + override fun status(): RestStatus { + return status + } + + companion object { + @JvmStatic + fun wrap(ex: Exception): ElasticsearchException { + log.error("Alerting error: $ex") + + var friendlyMsg = "Unknown error" + var status = RestStatus.INTERNAL_SERVER_ERROR + when (ex) { + is IndexNotFoundException -> { + status = ex.status() + friendlyMsg = "Configured indices are not found: ${ex.index}" + } + is ElasticsearchSecurityException -> { + status = ex.status() + friendlyMsg = "User doesn't have permissions to execute this action. Contact administrator." + } + is ElasticsearchStatusException -> { + status = ex.status() + friendlyMsg = ex.message as String + } + is IllegalArgumentException -> { + status = RestStatus.BAD_REQUEST + friendlyMsg = ex.message as String + } + is VersionConflictEngineException -> { + status = ex.status() + friendlyMsg = ex.message as String + } + is InvalidIndexNameException -> { + status = RestStatus.BAD_REQUEST + friendlyMsg = ex.message as String + } + else -> { + if (!Strings.isNullOrEmpty(ex.message)) { + friendlyMsg = ex.message as String + } + } + } + // Wrapping the origin exception as runtime to avoid it being formatted. + // Currently, alerting-kibana is using `error.root_cause.reason` as text in the toast message. + // Below logic is to set friendly message to error.root_cause.reason. + return AlertingException(friendlyMsg, status, Exception("${ex.javaClass.name}: ${ex.message}")) + } + } +} diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt index eb93caa2..84f2a79e 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt @@ -71,6 +71,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor returns search result`() { val uniqueName = "unique name" val query = QueryBuilders.termQuery("monitor.name.keyword", uniqueName) + val input = SearchInput(indices = listOf(".*"), query = SearchSourceBuilder().query(query)) val monitor = createMonitor(randomMonitor(name = uniqueName, inputs = listOf(input), triggers = listOf(randomTrigger(condition = ALWAYS_RUN)))) @@ -78,6 +79,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { val response = executeMonitor(monitor, params = DRYRUN_MONITOR) val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) @Suppress("UNCHECKED_CAST") val searchResult = (output.objectMap("input_results")["results"] as List>).first() @@ -123,10 +125,12 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor input error`() { // use a non-existent index to trigger an input error + createIndex("foo", Settings.EMPTY) val input = SearchInput(indices = listOf("foo"), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(randomTrigger(condition = NEVER_RUN)))) + deleteIndex("foo") val response = executeMonitor(monitor.id) val output = entityAsMap(response) @@ -142,6 +146,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor wrong monitorid`() { // use a non-existent monitoid to trigger a 404. + createIndex("foo", Settings.EMPTY) val input = SearchInput(indices = listOf("foo"), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(randomTrigger(condition = NEVER_RUN)))) @@ -445,20 +450,6 @@ class MonitorRunnerIT : AlertingRestTestCase() { verifyAlert(alerts.single(), monitor, ACTIVE) } - fun `test execute monitor with bad search`() { - val query = QueryBuilders.matchAllQuery() - val input = SearchInput(indices = listOf("_#*IllegalIndexCharacters"), query = SearchSourceBuilder().query(query)) - val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(randomTrigger(condition = ALWAYS_RUN)))) - - val response = executeMonitor(monitor.id) - - val output = entityAsMap(response) - assertEquals(monitor.name, output["monitor_name"]) - @Suppress("UNCHECKED_CAST") - val inputResults = output.stringMap("input_results") - assertTrue("Missing error message from a bad query", (inputResults?.get("error") as String).isNotEmpty()) - } - fun `test execute monitor non-dryrun`() { val monitor = createMonitor( randomMonitor(triggers = listOf(randomTrigger( diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/MonitorRestApiIT.kt index ee548d3b..2668b36b 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/MonitorRestApiIT.kt @@ -143,6 +143,20 @@ class MonitorRestApiIT : AlertingRestTestCase() { } } + fun `test creating a monitor with illegal index name`() { + try { + val si = SearchInput(listOf("_#*IllegalIndexCharacters"), SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) + val monitor = randomMonitor() + client().makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.copy(inputs = listOf(si)).toHttpEntity()) + } catch (e: ResponseException) { + // When an index with invalid name is mentioned, instead of returning invalid_index_name_exception security plugin throws security_exception. + // Refer: https://github.com/opendistro-for-elasticsearch/security/issues/718 + // Without security plugin we get BAD_REQUEST correctly. With security_plugin we get INTERNAL_SERVER_ERROR, till above issue is fixed. + assertTrue("Unexpected status", + listOf(RestStatus.BAD_REQUEST, RestStatus.FORBIDDEN).contains(e.response.restStatus())) + } + } + @Throws(Exception::class) fun `test updating search for a monitor`() { val monitor = createRandomMonitor() diff --git a/core/build.gradle b/core/build.gradle index a4229964..e6867baf 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -23,8 +23,14 @@ dependencies { compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1' compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}" compile "com.cronutils:cron-utils:7.0.5" + compile "org.elasticsearch.client:elasticsearch-rest-client:${es_version}" + compile 'com.google.googlejavaformat:google-java-format:1.3' testImplementation "org.elasticsearch.test:framework:${es_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "org.jetbrains.kotlin:kotlin-test-junit:${kotlin_version}" + + + //fixme: pick it from maven, pending on https://github.com/opendistro-for-elasticsearch/common-utils/pull/3 + compile files('libs/common-utils-1.10.1.0.jar') } diff --git a/core/libs/common-utils-1.10.1.0.jar b/core/libs/common-utils-1.10.1.0.jar new file mode 100644 index 0000000000000000000000000000000000000000..da9c620e8735732ca7d92ece5139b71ebf81355c GIT binary patch literal 12663 zcmb_?1#nzTlC7B8Vziiaf zj$2(Fk$Fzvd$XddvW~nII0PIB2n-BJS;PkgkT(PS)w~((+a)8SEJQCME6NBCBL6Rr zbv|HccyA6kZx{MMJIV;jN{EUmDKp53UdxP)N=wl*Oe0Fs(@cy_)+;j2v2N|}?1TLV z_7?wlR*1L0k*&>tBm8F{_`mrW*ciCm0{+sK;$K~D?Mwj17LGs%TT@#H6KexUpoNj6 ziGhQW`CpStyf*hM*7fPY(qHt zXs)EvDK@%6ILN!AO#5DZPDZY#?Y+RvTHZ}3S)Qt))GXUVK7+PH1^?m!f>qWrKLO2) z04pF0m#mgneUKXL0eC-;&nAU|}8rv?@IC~vsf@HCal*vO{K^y_y`5m{}E!~$a->ogt(U(az_6=iR0EYdX&>)o&RK74j*9NSJ6 zr|%0?_GMpXKl$9fq)n-nnNagu`Bst9*zh;t1X`2cJ}L%* zfqFSMR4}5%62@o6R0hPS>B{$K6L}cck&4A>Y3}&SQ)I_w%PuJsj&t8l(vn!TC>j%7 zvxCr)l=A4tg(SqG*$QZon;3)w?Q-Xujt|oD=H$eshRNbw7xa@L2&bxE-XYrI^X$tBy9Kw$T&y9uUL!F`StJa?xdh)z_ZDMj?w1PZ#P9#+p>K8Kx(;F5xX(W|g(Dgnt;acCQRt-2PEPbsM%k zR4=7h?GT#Fb-NvjyAi-y54`R%i5TeZg+pSPus4Wyb(4HIR^7Z~L$gt}P2fs-?khqa zwkX$pS|5R4VO!c_BkCn!dvl?*MtSG5%SW?OzU}4equ#ME=8vhw>yh!E592yBj}ha< zS|h~C5CGM;JjGwKEnK%uH=JHYd31@Yt$e9~NYp#1!FJ7Y>u4YeJ#2-)bKqM=5_HI~ zTHE3EdTij$Fn1b%pAaTzb&tR=wTgMbEWASM;HCjtcQgUFNCZSmr|Th&g=IpH4sqa4 zr8>Ua8?wCTHKGl!Fi2M#W8sZtEW3}R3u3I7ra_`@;ogahDv8JL96^IiX=BY_tsxo% zFneaY>EN#fjql81_L7IeLDy~TPlOqGA#5tw6mc0Lrt|+GqK+A0n9z*;9&MoLJA*kG zpR!;FIeD3e>qzI^?uD^CU2=#dPJhxsbtle9iR(y4ndcD3fKyY~bcm%*{N&tc*J8#n zuRCK?+9qIzHY0<&u{i|7Nh8-{Asvr>OC}u;g_O@*=B8kkpd$y zUTK_KKDzV}J=wXa+Wcu@6pj#XLy^_XQ$}3nhx6_WwOo?GN(rq(CdN7Q7jlKn$+{iUCLxgcge@9feK)moFWb|vv`SU*aBB#xO7=yMmV+`| z;*q;#*8;uQB7>)p!ZE#*(TDQ}?g=}QAz~V|Vc%PL$bd*3TU~`M0Lt45pDlXSEL+~;! z;ri$%q3i(}y*O1y8JgN>6>?;q*@uTjxk_=FWYJ*-M=)IlQDI32KVzcP)@MzNQpbl#yJae+JQ<&& z2JON9{TT;g>Zx>;-8+aQ?v<*3|0PlXv#wsYOLRm$QQYW+)Z|n|YpAb=)EBp8<5CEU zAgM^7&>amVs6KX|n}tS0jD4m+t4{B$ z$3mT*FU~!DhsGRPs(2rUP;Y{e{xN9gg~n3#wgjIP{h3&?F>$^uGgSFBI=0W9z4o2N zGvzb)*r~%#<9wekgng~x>&G=c($tD_)2k&v=~v8QV`29AK`ebm|F6(f0vD4tz7J!O zKDnEHDt9|b2pY|NwnjQQ zsGuthXE!9)5urI@m6hEnHMgtIrpYRp6s+P>uz3SYPMZzjvz@yuFT&K`Y0}RU2I@63`f98LLNh> zu}uXE?wk0M`fQSq9$Qgfqbb}Rm2OurSe;<2jNrI3!mV3s93Ol9EmeDi_LM_A-}nLS zv~N%2l=F_Udu*>ds7I@J#$|KrQpPhp={mft&8n?lb&y^hzF|qkS2=U*J14LmULhc0Hz@rPYuPBZ2l1NlJ|0LGGo7n8K) zqI((f0+`VYG>e|x3YmR+*W)fF)NJH(Du+qveXPV;<>4;n2RrDqxf+eZ=Gwwsc|+i1 zB?aKMqm204@TexBRcdg3i6)0Re;~51F{Kf6QP#Sb4_6ep7XAbQc!<+RHHY`=C0kj4 zk?Ot*hi``^XN~Pc@?B+AyE2x)9xz5%k8g5_QAvRIiBsX%uqQd{rl@;&XSGzg3(8An zw`;OTkaeIQB&8LNEo*$rO*KU?+#KK^^}s+l6SG*0w+psxC8rCT8MQ`8H?-pJ^mIVZ zv1mxyS-fj-LVlzyoe`5(e(RFsZ&D{W=q(Kc>^oZ`5?`zakg%{4FB8pQVk5x-2TnX- zOWPF3s#)0O^k~nBw(Jq*zNhrXgB{YkzaOBKOeTmBi>e>|`c*sjrJ)oPIxZ!y;zxIU zqQ;!Tw*EMES7I*C+1t~$ z=W2GIKV)+G6|>>bGlv_2jp7-R#lV~x+0#UjUp9QkKIxL+&EM7_nB0B{YC_SP;9<=g zFX*V=9!}wS_H*K@)1EE68&@}f7)UTeq3Va%ZqO;$Izm*JJKkboRbB$OZWP z2?>+uYn16YQ)(2Jy2P_`PO-*hlAek+dxpiu!Y$Q}=)T&auJ5ub_anx}=&=_Xt2KV8 zLbW&sU}&|9oiMSO&PT68ndUq-h}Gb}iFYw1)NaMJGJE18NtJs3 z4}#Dc()o>*=N6kv^|4+wecNMzvS$-Z^>dKO^J&6%HO!R41<7WE^Ri-jVkG|;l?{VyJV%{THP)OPc{Sl4 zLxMeZnhVy3T#+by{57q;lM2-+EzAAy1MK8t!c{hVi*`w0&o6;dcki^%$G!a)DpG_! zIJEi^v1%QVtm?oWzYil#80a{gBY7?7e!{>RzH+IM*k{A6@s3>XFk&%+6?ZjSG`DK%o$o?|a%C8OXbYe@2t%vG9}UDFueFyo8xH#64O) z2cehVXk_h=>(bOQG=BL(B3h1JZa67Ei32o+E~Cm2X5mrqCsnKsP^?H#0!N~}XHo4H zL8eiU?sW>gVW64`N45;R*%5wDGUUFSP1#hq;8y$*X#4e``Ficzr*Lwb6z$&ss!z;# zi&fY03OXRUX%CuV=gBOlMexg$qA13>))4t_fT)f-8^^=Q70 zG6XHD?JkAa3BO4wK5xH<_$5EHOOz}%DCfZ|EQGN|1m0@2Ja+DeFlZz+c4!~f(Xgwo z;whYM@u`DQ_|jdrt-dp#oxnlAl62 z5nC4R;+SqN{K!pH{)_=q*I-}IW(MFsEKECP&6@O7AeMjp%pS>!6pb19X((ArB zaQmyPe6Pf7Wtqr@-1A6rA*PU2A`W|g(`j}cVT*RpDn3~u`T2qti#Abu!ps?+(KVrJ z%#0Y{rrwmHk<9URnBI-TNp|AJSE>*smdqkwADx9)$bX$YRX#1h&wQ)qEWg!Mg#Ycy zQ`i<@YGL-)Isy#%XkI1T9*NR;6$J0dMO1Xl+xN1=djQ%b~nd-oJ|MoUw<47%7M7TRTpjH zM@V;lXN&0O@?G}2O-x0Nx0zjCKggwEdKDGV-9*LK(Avxb@$Xudcam8!36@Nw`Bq1+ zm|4P?co9;l89;3&vBaz}ufoDUDtQ*lQB`WHSPU^>DI|XfJH|t~?3oF(>@AaHN4I9G z5RE4<=p}ZHy(?PJ1#WstU5}Dd<>oC&tJ*jR1OsP4ONme(%8Qb$y%?63J=O)+Nk*+t zF|0rXjw5OhPD6z7fz@2$xzKw6^jkp@osW`0Ij7KQN?YgBAK!n2z`XK{ziF=ZqKhI$ zWudQOe=3c0%~E77aYKBPqORdv{A|Kjn{`(53IEm!g5yiuzN)o=x$9i>$aoOJiTtL^ zcZI}-gSx;#Rhz-GWWO{}7v{Z!bGCg0M%2fglp=-8?=c;CSD$A2P%l#sk$m?JsIBkf zRgfy+Y+R%E@oIwFh@TXR>m|16(@H{iJ*W`C(mUswwm{RM(sW1Ofob(#<)&r96{=JV z9m`(lpBg@r1R+XA^UNr%D&e!^ihXH{Q6E1_U9Vr8fgUWwt_d7rO&=g%jdX<%F0atM zPcp+NZe2Y-yCNInUG6aLZ5&67y%g9jIfJR#`Esq73d^i4-ufgV>2~wyQw=$@i1{u7qyHgUK<~UGt!fJLj4cJ_HY@QMSaJ!fr=HXn!5$s7a_kbVLX5c&TM^1l|N zCW>$9!qyfh0N_U_3v1&)3vZR`@UH0d7_aBwnA67t1}Qp-=}-*n(~U7m-wUz5mo%23 z+BWz|FIy*E*RmYR`VOL0`MCYZ@`9$hRAXgcqsk|7zf!}-{)Hx0tIf(r1 z*~A7VRl1jx$9yHriqMjRbnx59gWD4E1v96UDbuoUSuF9392oJ1Pr^xdm-~R)f zrYP^SDya)WzG1~0%U31VaazMVs}8uYCnGj;sYxeP3e2GImZwu$N?J?eGvigHyOn*J zPm|`Cm#}qW=QPpWBYMXjT-(BM6vcf_fC}X6HRR*Us9kiRdR8>8xnjn)k$&7@2N6iY`%UuvGE!2SMOel5KR=bJNQp~zqzq;&uF_ZpBk#jd!<3f@ z7IxhW0gvL6HYkXdnk+&Yz8Hje_37)l0sA6wiAcC~x^ZZ}?MnN~kC=xWE7BVtC5iOO zxDQ*vxUK@r7YfLH`@3U1hikM<5l8Hk!XVYfLoW*&g)FTd%O=9>AxEGLR;?w2`YH)j zic(AJ?57B_ALRYX;U;3`=?K93>y&A7tt4Ot9UpH2UlF_9Xe?``1sf4@)$PIZAIXiA zH2tqtD!L#OR(60$xC=UipCPg>XhGCi!?!e%Unx}n3e&<7|d*C*;pW)Z$5-I3q>Ap`Q>ML%?<}A^| z5LlEWB0^hY{NAdDJBPI|Y{%3~jbvKC;`{doXBg?Bt*R?iNQ8@Ji`+!3rw3+Bg1 zIbgI+%}1H4yoon{t(FctaT3IY@IwWX7+}sT(l`;s1X_hA$tKIevWHvlrHu=vjZMd> z!*fHD{HiQG5-bJEz#WGoEc}lGfO)9s z-Q8zWbWib|rL57AGAnHC3j?gGSpIt^U)j6vHfK_&KxrqyoKEqM`?VB0!&50Ra(43#}`h$fB`JmpNV1`>Mf8 z8bI$U*sN=dL=P8Ft65Ewc!POm1f(OIf^L$`#P^Wz{h89HGjIzaM9F~&>HK7N$q+x{ z!Mdg0cpS&PCAS*3rw{@s*5`xUV@Zd9SYTv#o8cJVe4?q95!g`RIuXa_mgzna)qz?@ zP^TQN5)F6sTnewex^%

%!7F;;z6-ITGE|AF7;jS+cS$V81J03WiokCR5ZLOKMIV zMNkxak2%)l01->3S;Yah*nC=ogQJUs+L(AB$2;Uqb6+R;<3i;!YT}ase3FG}2Rpci zmn#f6q$|V|25;F82;n;_0tWHpx)st!PqrpBy(E~%AG|!^q;vQmq9?e??A!+=j@@|4 z8j-vrJ$MT&zV!;-h?Cro#NQT_#UKW{;(Nh$mhHgu9%da;VpZ_UU}sx^gVFj?vd~?- zrlIga@&;B@yK45c*4^WCYFk~=5C|0bch$cBet(P`{=) za(1uX-HL2P&$;aZRj#>FwqX#Ary5S3`eAORfvcLt8gqy8zSJ+f^q$+BnCThNKXw_#U7zFRIWsRd&T@mZ-~>mCbMbJI>pm~ExT-m#&|8>9TW zblQ5<{9^UK+3#nGEA)Ak{1X^|iL=2PuK-|bBB9&C*_$ceJ-zOnxLWq7QOQCoC_TzC zRs+}8ej&lTx54UC7mTog@AIL`Q_EF3_N};U5uI-3L{D){rAWH_ZB1woK0?Pc-- zBRf=JUN0w&@(zPp0wca3z4>SYbEdqpz%6UvsztT!;_H3=m&YC z;f?2(m9syJqD^MHoGdMGHM44qY$@$$(eb-VDc}c|EiDe>*y_qku;(!(GN!X@Dhc7LT9>pMNxc_jwm0`iB;T9=_swVsaeBa1Dt=eh2XV)3OmLpEco9cCod#Y$c(?YsY*Hccq1ek@0<|mQR{~6iQ#!Fsz7u@;iHAT zBUa#_C?5!HTq7UcFd^}UswyI_&V5&1_^!IwT01*PY8iyVgyT!VdK|=cNKR#_nKUS~ zX1M5LRdQ)NaIa^LGRAx)*JOA=7h8d&sd^(g?~(t+vh1XLUTn$H#;XOHlOvF3$=;C@ zu#CN7d)*8XZytBOu}{O3UJJ4|wxPA;!y^VLoN1mGafwNF*`eDQdP-I6Vc#eDq!F+1 zz9+AT!!*}wOdX~46>u4x^$sh#OkyJU~) zSlCyoeFK5&Qrh7g=37qYTpIMDaUjRAo7$8TCV!mwYH2TpZqJWGbNuKjx9ekpd|PqW zyFmdp%(vds%&$Eh(`swff`^P$cF;HGwYKN&*N8lO2}10Fg34=Lj=w`Vt3X#qj?9%8 zQr`!Lw_}NA$VdZo`LX9+gHNans)AiJq8|U^NuCGa6?`WA7NRp_O~?ltj==CGnmUaP z%G6p9P*`U)Vc7b6%3ziZdck ziS#UGOGbr?FdRp>QZM*7_bs0|=GoF+ZT5pHuBy48@u+JINw=v7MT{qC+NO31x$S#OZGEf$mzHnN2w z_H}b_LpMpO{y19$A_sFKzT&UoB~MgaS(3?Ik#Oi9wQj~Es_DYVdWhQ=j~_B)In}gQ zV$`DBFgV<%+mjzD7&yHsX7|z}m{ZC~itHix(x?@m->gq02lTN!3z4uo!}El40#? znk2`u$J%o!B;p|ks2a^qvtVg1m@nvsS^<`U#5b(c3wHg4$Ovf70+8dVYSn(hgtwwl zP7kB-?eDC#5F>Z7r;XnaA`m8`6NaEyG&zPS3+FyeysTclVB>&M{(+aYjnd9U3P%4B z7{K^B3xVl4_0YUMQ&x(4S#{MD&a*y@9=FL5k2xGi0k(?U@ct;OJOsAF-4luI!WogL zO9_>Q52xf5W{wRV3T-l)HNvb$tDg|@bV*8Qk_}ZJ8rZywef7*GJBmSod%FwLTmQUW zCUN3dFVTYxN9O@_AUoH=MH-$Z5HGS>l($#cp9ZhV0v+tk(-WUTwW>w(I3#Y`<*fY6ee^bUqtl|=fkB(I=c#R*R@_v7pt8yD5iyZVyAFjeGrQg*F z>)@rDv@IVQcQ3)9l#8>pA1vCV&h&!`mkC!m+VXc?hWEp`#b(Cy?(b_z6ea=;_;v>R z+(PFVW1%39aChogrFz408Hi=pU7Z0}+iTyxNW#$}ad3k<$T-f-pT(-&*7A;muGl%hoqwTJz z__Vb^0eY8?m#YUWdfdg^qQO&ZN+B#VUOu> zqoONbjhmOnF4V#AtikuPDa?If|W8?=N^gu5;?XmgMM7c-shl4fE=hc~1POn_x|FtC$h* z>J)1qB3}3*Y)0JVfr^GCQ6;}5lr56vIX`-(?P)?m!xNTnD`!Lz@WmSNVCkYy1nCw3?LjeA+Hpqi3C|I3`Nu~ zF^91RYi}*wHI-gZlN)R2gHD12py1F2qHlbpOiSJCI}%Cp_oCMJRuhEj6f{+UQGOSr zc_gG{6NE`JY-Fjy4-RKo?Lmi!M!7A*omzP=d4dj1^zA9W1`2@mGX_$h@(wU)KAl;_ zd+jqOd_ba|K7v@!NOeR3?t%Wr!m*i5(|z^5NBDz8NL_PQt?lQa$cD8~WgORWt33p^ z{X*8gunjwGm6N%V{Q{x|k^ESLEl>W^FNz%_)gNXaaCIp)!SxUpha^A4Fn9QJhU$*1 z*lyd@2{9DKvBb%R4U}wjw@2LYMOoqx=YElEK<0`QWa9G03{Xxey_XmH5*89BL&J|r zLxkEA{RsO_k~#yrxQ!B9L;tZ1;6rjFKXUIpUcO@sMcp{RoFS$0Kz{>l4q?u)d_~lf zYNDE#yj~}on$fPb4uRA>+d3(=&#=Yqbxfq@m>*;`6i8l!6YdrGm}GPO&glb)FS ziwApNgbFU(@SYOcabF;*U|;>=Fb4a*irgPNzg`u-vWrmkGh85^@1FY={yvc<-nT++ zzSYS+pg}+s|G$>?$e90j=&WKR_f81KJ1HrP9y$RF6cxg1eg-T!EToiqx=cuEN*yda z`M61^V}spA%kLTfUFd*bZm9kaw7_FGkx2#*O-Z8!vCT~8^769ThVwV?=jTVnE|N3R z>A;wvnkc|0?5iRrcGtvn?xyXz0G)&^hs?NLxNr~2w5+Wv4QBE3qou5EI5`|hg9V+- zxTOs8z)He}LKSqt6_oHNm4amL_iIRCW?tEohan^bQ_a1V1qT}3gQ|vV%n=984@jHr z*0~6Ac!3`~?J?J8oo@1?4d%X@wdIMo#A$87FD}PIOfaRptkrc}tyRF}=P8dffgg)j zY4G3)hl@9}pk8#Ly(gB$6E0>Y7>}!VCoFhbfmoycCa&LJ4yn zphF7#=&l%6>aL@meb}rMXGw992H4oCJw637op56JYKlE^jH1vF)s16hvLd89$#JW+ zQp(m1Z*iz~jFV)wJ|ci88X-q{7(ATVeSs9QHAq~!WXtN5VoY|Ca7@F+Rv!t0a zT5Z6WTiI91rIA$tA8$9yr1g)|QZL{ZVGS$)W)$==r$DlL3=yX4W4YQh9@|FAi|B$q zbGm7UZ8a~IQq+nI-HZP5vPB>XG>^>`TyhiRaSk}`1Cty|5u!zmOYtFN0{?TN8FEhH z0n$|btKX}f?=6NVX#i}8P~1W+hyk{mJt9oY+19gr4xh9(9BaelclNmZmCzeA^UYi{ zPxp}P^iLkRPg%m+Yh#{dmGb&l0T-W}r)uPCJ9?=$l(dOnYcjc-G6ivQ>e!_`v2`xJ zds5(^Q-V>4FzwzohvHeg?))sO3D_6E!IGB(1w#k_-&MqKW9rRNKu(2zeg9ok{Cn8% zn&My1zpTKQ`mOzY*gq+Ze@FP`{il5Smyq}m`$g>h*8L?b{;P;zvf@ANS47%d_gBQ< zM8=^Imy3@lYgiDT_N-j%9yu={J*`2KcCtEq!;>~@ApI1fAEc?{=0nt z;}Prcgue*>%;{g}Sbx~Bq!@h@egAtm_+Oi@-wA#{)%ug7z?Tr?KO*?cGp^s+f8Ry^ zgPo4#KVtuVPx(9H@3Z|s2!BxhM}+^&?fyCU%S%DNZIVGiklsE7kRTwR=zjhBe+N95 A(*OVf literal 0 HcmV?d00001 diff --git a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt index 9cf3312c..ed389123 100644 --- a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt +++ b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/elasticapi/ElasticExtensions.kt @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.alerting.elasticapi import com.amazon.opendistroforelasticsearch.alerting.core.model.User +import com.amazon.opendistroforelasticsearch.commons.InjectSecurity import kotlinx.coroutines.delay import org.apache.logging.log4j.Logger import org.elasticsearch.ElasticsearchException @@ -40,6 +41,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine import kotlinx.coroutines.ThreadContextElement +import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.util.concurrent.ThreadContext import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext import kotlin.coroutines.CoroutineContext @@ -177,3 +179,21 @@ class ElasticThreadContextElement(private val threadContext: ThreadContext) : Th override fun updateThreadContext(context: CoroutineContext) = this.context.close() } + +class InjectorContextElement(id: String, settings: Settings, threadContext: ThreadContext, private val roles: List?) + : ThreadContextElement { + + companion object Key : CoroutineContext.Key + override val key: CoroutineContext.Key<*> + get() = Key + + var rolesInjectorHelper = InjectSecurity(id, settings, threadContext) + + override fun updateThreadContext(context: CoroutineContext) { + rolesInjectorHelper.injectRoles(roles) + } + + override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) { + rolesInjectorHelper.close() + } +}