Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Inject roles for alerting background jobs (#259)
Browse files Browse the repository at this point in the history
* Inject roles for background jobs
* Removed dead test
  • Loading branch information
skkosuri-amzn authored Oct 1, 2020
1 parent 01269ec commit 9ab735c
Show file tree
Hide file tree
Showing 24 changed files with 414 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ import com.amazon.opendistroforelasticsearch.alerting.transport.TransportIndexMo
import com.amazon.opendistroforelasticsearch.alerting.transport.TransportSearchEmailAccountAction
import com.amazon.opendistroforelasticsearch.alerting.transport.TransportSearchEmailGroupAction
import com.amazon.opendistroforelasticsearch.alerting.transport.TransportSearchMonitorAction
import com.amazon.opendistroforelasticsearch.commons.rest.SecureRestClientBuilder
import com.amazon.opendistroforelasticsearch.alerting.transport.TransportGetDestinationsAction
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.client.Client
import org.elasticsearch.client.RestClient
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.node.DiscoveryNodes
import org.elasticsearch.cluster.service.ClusterService
Expand Down Expand Up @@ -143,6 +145,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var restClient: RestClient

override fun getRestHandlers(
settings: Settings,
Expand All @@ -155,12 +158,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
): List<RestHandler> {
return listOf(RestGetMonitorAction(),
RestDeleteMonitorAction(),
RestIndexMonitorAction(),
RestIndexMonitorAction(settings, restClient),
RestSearchMonitorAction(),
RestExecuteMonitorAction(),
RestAcknowledgeAlertAction(),
RestScheduledJobStatsHandler("_alerting"),
RestIndexDestinationAction(settings),
RestIndexDestinationAction(settings, restClient),
RestDeleteDestinationAction(),
RestIndexEmailAccountAction(),
RestDeleteEmailAccountAction(),
Expand Down Expand Up @@ -225,6 +228,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
this.threadPool = threadPool
this.clusterService = clusterService
this.restClient = SecureRestClientBuilder(settings, environment.configFile()).build()
return listOf(sweeper, scheduler, runner, scheduledJobIndices)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.InjectorContextElement
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry
Expand Down Expand Up @@ -58,6 +59,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.DocWriteRequest
Expand Down Expand Up @@ -94,7 +96,7 @@ import java.time.Instant
import kotlin.coroutines.CoroutineContext

class MonitorRunner(
settings: Settings,
private val settings: Settings,
private val client: Client,
private val threadPool: ThreadPool,
private val scriptService: ScriptService,
Expand Down Expand Up @@ -185,6 +187,22 @@ class MonitorRunner(
}

suspend fun runMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant, dryrun: Boolean = false): MonitorRunResult {
/*
* We need to handle 3 cases:
* 1. Monitors created by older versions and never updated. These monitors wont have User details in the
* monitor object. `monitor.user` will be null. Insert `all_access, AmazonES_all_access` role.
* 2. Monitors are created when security plugin is disabled, these will have empty User object.
* (`monitor.user.name`, `monitor.user.roles` are empty )
* 3. Monitors are created when security plugin is enabled, these will have an User object.
*/
var roles = if (monitor.user == null) {
// fixme: discuss and remove hardcoded to settings?
settings.getAsList("", listOf("all_access,AmazonES_all_access"))
} else {
monitor.user.roles
}
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")

if (periodStart == periodEnd) {
logger.warn("Start and end time are the same: $periodStart. This monitor will probably only run once.")
}
Expand All @@ -200,9 +218,9 @@ class MonitorRunner(
logger.error("Error loading alerts for monitor: $id", e)
return monitorResult.copy(error = e)
}

monitorResult = monitorResult.copy(inputResults = collectInputResults(monitor, periodStart, periodEnd))

runBlocking(InjectorContextElement(monitor.id, settings, threadPool.threadContext, roles)) {
monitorResult = monitorResult.copy(inputResults = collectInputResults(monitor, periodStart, periodEnd))
}
val updatedAlerts = mutableListOf<Alert>()
val triggerResults = mutableMapOf<String, TriggerRunResult>()
for (trigger in monitor.triggers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class RestAcknowledgeAlertAction : BaseRestHandler() {

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/_acknowledge/alerts")

val monitorId = request.param("monitorID")
require(!monitorId.isNullOrEmpty()) { "Missing monitor id." }
val alertIds = getAlertIds(request.contentParser())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class RestDeleteDestinationAction : BaseRestHandler() {

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.DESTINATION_BASE_URI}/{destinationID}")

val destinationId = request.param("destinationID")
log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/$destinationId")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class RestDeleteMonitorAction : BaseRestHandler() {

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}")

val monitorId = request.param("monitorID")
log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/$monitorId")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.action.ExecuteMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.action.ExecuteMonitorRequest
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.XContentParser.Token.START_OBJECT
Expand All @@ -31,6 +32,8 @@ import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.action.RestToXContentListener
import java.time.Instant

private val log = LogManager.getLogger(RestExecuteMonitorAction::class.java)

class RestExecuteMonitorAction : BaseRestHandler() {

override fun getName(): String = "execute_monitor_action"
Expand All @@ -43,6 +46,8 @@ class RestExecuteMonitorAction : BaseRestHandler() {
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/_execute")

return RestChannelConsumer { channel ->
val dryrun = request.paramAsBoolean("dryrun", false)
val requestEnd = request.paramAsTime("period_end", TimeValue(Instant.now().toEpochMilli()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorRequest
import com.amazon.opendistroforelasticsearch.alerting.util.context
import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
Expand All @@ -29,6 +30,8 @@ import org.elasticsearch.rest.action.RestActions
import org.elasticsearch.rest.action.RestToXContentListener
import org.elasticsearch.search.fetch.subphase.FetchSourceContext

private val log = LogManager.getLogger(RestGetMonitorAction::class.java)

/**
* This class consists of the REST handler to retrieve a monitor .
*/
Expand All @@ -47,6 +50,8 @@ class RestGetMonitorAction : BaseRestHandler() {
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}")

val monitorId = request.param("monitorID")
if (monitorId == null || monitorId.isEmpty()) {
throw IllegalArgumentException("missing id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationAction
import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationRequest
import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationResponse
import com.amazon.opendistroforelasticsearch.alerting.core.model.User
import com.amazon.opendistroforelasticsearch.alerting.model.destination.Destination
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants
import com.amazon.opendistroforelasticsearch.commons.authuser.AuthUser
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.support.WriteRequest
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.ToXContent
Expand All @@ -49,9 +53,18 @@ private val log = LogManager.getLogger(RestIndexDestinationAction::class.java)
* Rest handlers to create and update Destination
*/
class RestIndexDestinationAction(
settings: Settings
settings: Settings,
restClient: RestClient
) : BaseRestHandler() {

@Volatile private var indexTimeout = INDEX_TIMEOUT.get(settings)
private val restClient: RestClient
private val settings: Settings

init {
this.restClient = restClient
this.settings = settings
}

override fun getName(): String {
return "index_destination_action"
Expand All @@ -66,15 +79,21 @@ class RestIndexDestinationAction(

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): BaseRestHandler.RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.DESTINATION_BASE_URI}")

val id = request.param("destinationID", Destination.NO_ID)
if (request.method() == RestRequest.Method.PUT && Destination.NO_ID == id) {
throw IllegalArgumentException("Missing destination ID")
}

// Get roles of the user executing this rest action
val user = AuthUser(settings, restClient, request.headers[ConfigConstants.AUTHORIZATION]).get()

// Validate request by parsing JSON to Destination
val xcp = request.contentParser()
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
val destination = Destination.parse(xcp, id)
.copy(user = User(user.userName, user.backendRoles, user.roles, user.customAttNames))
val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
val refreshPolicy = if (request.hasParam(REFRESH)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ import com.amazon.opendistroforelasticsearch.alerting.action.IndexMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.action.IndexMonitorRequest
import com.amazon.opendistroforelasticsearch.alerting.action.IndexMonitorResponse
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.core.model.User
import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants
import com.amazon.opendistroforelasticsearch.commons.authuser.AuthUser
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.support.WriteRequest
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
Expand All @@ -48,7 +53,18 @@ private val log = LogManager.getLogger(RestIndexMonitorAction::class.java)
/**
* Rest handlers to create and update monitors.
*/
class RestIndexMonitorAction : BaseRestHandler() {
class RestIndexMonitorAction(
settings: Settings,
restClient: RestClient
) : BaseRestHandler() {

private val restClient: RestClient
private val settings: Settings

init {
this.restClient = restClient
this.settings = settings
}

override fun getName(): String {
return "index_monitor_action"
Expand All @@ -63,15 +79,22 @@ class RestIndexMonitorAction : BaseRestHandler() {

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}")

val id = request.param("monitorID", Monitor.NO_ID)
if (request.method() == PUT && Monitor.NO_ID == id) {
throw IllegalArgumentException("Missing monitor ID")
}

// Get roles of the user executing this rest action
val user = AuthUser(settings, restClient, request.headers[ConfigConstants.AUTHORIZATION]).get()

// Validate request by parsing JSON to Monitor
val xcp = request.contentParser()
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())
.copy(user = User(user.userName, user.backendRoles, user.roles, user.customAttNames))

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
val refreshPolicy = if (request.hasParam(REFRESH)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.util.context
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.node.NodeClient
Expand All @@ -43,6 +44,8 @@ import org.elasticsearch.rest.action.RestResponseListener
import org.elasticsearch.search.builder.SearchSourceBuilder
import java.io.IOException

private val log = LogManager.getLogger(RestSearchMonitorAction::class.java)

/**
* Rest handlers to search for monitors.
*/
Expand All @@ -62,6 +65,10 @@ class RestSearchMonitorAction : BaseRestHandler() {

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/_search")

val index = request.param("index", SCHEDULED_JOBS_INDEX)

val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser())
searchSourceBuilder.fetchSource(context(request))
Expand All @@ -73,7 +80,7 @@ class RestSearchMonitorAction : BaseRestHandler() {
.version(true)
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(SCHEDULED_JOBS_INDEX)
.indices(index)
return RestChannelConsumer { channel ->
client.execute(SearchMonitorAction.INSTANCE, searchRequest, searchMonitorResponse(channel))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.amazon.opendistroforelasticsearch.alerting.action.AcknowledgeAlertRes
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BulkRequest
Expand Down Expand Up @@ -42,7 +43,9 @@ class TransportAcknowledgeAlertAction @Inject constructor(
) {

override fun doExecute(task: Task, request: AcknowledgeAlertRequest, actionListener: ActionListener<AcknowledgeAlertResponse>) {
AcknowledgeHandler(client, actionListener, request).start()
client.threadPool().threadContext.stashContext().use {
AcknowledgeHandler(client, actionListener, request).start()
}
}

inner class AcknowledgeHandler(
Expand All @@ -69,7 +72,7 @@ class TransportAcknowledgeAlertAction @Inject constructor(
}

override fun onFailure(t: Exception) {
actionListener.onFailure(t)
actionListener.onFailure(AlertingException.wrap(t))
}
})
}
Expand Down Expand Up @@ -103,7 +106,7 @@ class TransportAcknowledgeAlertAction @Inject constructor(
}

override fun onFailure(t: Exception) {
actionListener.onFailure(t)
actionListener.onFailure(AlertingException.wrap(t))
}
})
}
Expand Down
Loading

0 comments on commit 9ab735c

Please sign in to comment.