Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Remove custom monitor executor in favor of default dispatcher.
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinoo Vasudevan authored and Vinoo Vasudevan committed May 15, 2019
1 parent 2862074 commit 3eea987
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,4 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P
override fun getContexts(): List<ScriptContext<*>> {
return listOf(TriggerScript.CONTEXT)
}

// override fun getExecutorBuilders(settings: Settings): List<ExecutorBuilder<*>> {
// return listOf(MonitorRunner.executorBuilder(settings))
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult
import com.amazon.opendistroforelasticsearch.alerting.model.ActionRunResult
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACKNOWLEDGED
Expand All @@ -48,13 +49,12 @@ import com.amazon.opendistroforelasticsearch.alerting.script.TriggerExecutionCon
import com.amazon.opendistroforelasticsearch.alerting.script.TriggerScript
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.BULK_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INPUT_TIMEOUT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import org.apache.logging.log4j.LogManager
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
Expand All @@ -73,6 +73,7 @@ import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Strings
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.component.AbstractLifecycleComponent
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
Expand All @@ -91,6 +92,7 @@ import org.elasticsearch.script.TemplateScript
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.threadpool.ThreadPool
import java.time.Instant
import kotlin.coroutines.CoroutineContext

class MonitorRunner(
settings: Settings,
Expand All @@ -100,46 +102,44 @@ class MonitorRunner(
private val xContentRegistry: NamedXContentRegistry,
private val alertIndices: AlertIndices,
clusterService: ClusterService
) : JobRunner, CoroutineScope {
) : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

private val logger = LogManager.getLogger(MonitorRunner::class.java)

private val job = SupervisorJob()
override val coroutineContext = Dispatchers.Default + job
private lateinit var runnerSupervisor: Job
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + runnerSupervisor

@Volatile private var searchTimeout = INPUT_TIMEOUT.get(settings)
@Volatile private var bulkTimeout = BULK_TIMEOUT.get(settings)
@Volatile private var alertBackoffMillis = ALERT_BACKOFF_MILLIS.get(settings)
@Volatile private var alertBackoffCount = ALERT_BACKOFF_COUNT.get(settings)
@Volatile private var moveAlertsBackoffMillis = MOVE_ALERTS_BACKOFF_MILLIS.get(settings)
@Volatile private var moveAlertsBackoffCount = MOVE_ALERTS_BACKOFF_COUNT.get(settings)
@Volatile private var retryPolicy = BackoffPolicy.constantBackoff(alertBackoffMillis, alertBackoffCount)
@Volatile private var moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(moveAlertsBackoffMillis, moveAlertsBackoffCount)
@Volatile private var retryPolicy =
BackoffPolicy.constantBackoff(ALERT_BACKOFF_MILLIS.get(settings), ALERT_BACKOFF_COUNT.get(settings))
@Volatile private var moveAlertsRetryPolicy =
BackoffPolicy.exponentialBackoff(MOVE_ALERTS_BACKOFF_MILLIS.get(settings), MOVE_ALERTS_BACKOFF_COUNT.get(settings))

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(INPUT_TIMEOUT) { searchTimeout = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(BULK_TIMEOUT) { bulkTimeout = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS) {
retryPolicy = BackoffPolicy.constantBackoff(it, alertBackoffCount)
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) {
millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_COUNT) {
retryPolicy = BackoffPolicy.constantBackoff(alertBackoffMillis, it)
}
clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS) {
moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(it, moveAlertsBackoffCount)
}
clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_COUNT) {
moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(alertBackoffMillis, it)
clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) {
millis, count -> moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
}
}

override fun doStart() {
runnerSupervisor = SupervisorJob()
}

override fun doStop() {
runnerSupervisor.cancel()
}

override fun doClose() { }

override fun postIndex(job: ScheduledJob) {
if (job !is Monitor) {
throw IllegalArgumentException("Invalid job type")
}

// Using Dispatchers.Unconfined as moveAlerts isn't CPU intensive so we can just run it on the calling thread.
GlobalScope.launch(Dispatchers.Unconfined) {
launch {
try {
moveAlertsRetryPolicy.retry(logger) {
if (alertIndices.isInitialized()) {
Expand All @@ -153,11 +153,7 @@ class MonitorRunner(
}

override fun postDelete(jobId: String) {
// Using Dispatchers.Unconfined as moveAlerts isn't CPU intensive so we can just run it on the calling thread.
GlobalScope.launch(Dispatchers.Unconfined) {
// Using Unconfined dispatcher here as moveAlerts doesn't do much CPU intensive work, so we can just
// run it on the ES built-in thread pools.
launch(Dispatchers.Unconfined) {
launch {
try {
moveAlertsRetryPolicy.retry(logger) {
if (alertIndices.isInitialized()) {
Expand Down Expand Up @@ -451,8 +447,9 @@ class MonitorRunner(
}

val jobSource = getResponse.sourceAsBytesRef
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, jobSource, XContentType.JSON)
return withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
jobSource, XContentType.JSON)
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp::getTokenLocation)
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
Expand Down

0 comments on commit 3eea987

Please sign in to comment.