From edd67665873cea856b53b261fe881714d4fc31ff Mon Sep 17 00:00:00 2001 From: Lucas Winkelmann <46609415+lucaswin-amzn@users.noreply.github.com> Date: Mon, 18 Nov 2019 16:09:43 -0800 Subject: [PATCH] Opendistro 1.3 * Revert "Adding new type of input for Monitors - HttpInput (#82)" This reverts commit c2004f7c53cbfbebc72f79aa87b99dba3547b93b. --- README.md | 8 - alerting/build.gradle | 6 +- .../alerting/AlertingPlugin.kt | 3 +- .../alerting/MonitorRunner.kt | 36 +---- .../alerting/client/HttpInputClient.kt | 57 ------- .../resthandler/RestIndexMonitorAction.kt | 32 +--- .../alerting/MonitorRunnerIT.kt | 114 +------------ .../alerting/TestHelpers.kt | 22 --- core/build.gradle | 4 +- .../alerting/core/httpapi/HttpExtensions.kt | 71 -------- .../alerting/core/model/HttpInput.kt | 153 ------------------ .../alerting/core/model/HttpInputTest.kt | 108 ------------- 12 files changed, 13 insertions(+), 601 deletions(-) delete mode 100644 alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/client/HttpInputClient.kt delete mode 100644 core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/httpapi/HttpExtensions.kt delete mode 100644 core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/HttpInput.kt delete mode 100644 core/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/HttpInputTest.kt diff --git a/README.md b/README.md index 7bd1aa3f..71fafa0d 100644 --- a/README.md +++ b/README.md @@ -27,14 +27,6 @@ Please see our [documentation](https://opendistro.github.io/for-elasticsearch-do 1. Check out this package from version control. 1. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package. 1. To build from the command line, set `JAVA_HOME` to point to a JDK >= 12 before running `./gradlew`. - - Unix System - 1. `export JAVA_HOME=jdk-install-dir`: Replace `jdk-install-dir` by the JAVA_HOME directory of your system. - 1. `export PATH=$JAVA_HOME/bin:$PATH` - - - Windows System - 1. Find **My Computers** from file directory, right click and select **properties**. - 1. Select the **Advanced** tab, select **Environment variables**. - 1. Edit **JAVA_HOME** to path of where JDK software is installed. ## Build diff --git a/alerting/build.gradle b/alerting/build.gradle index 4c764ee0..1c71420d 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -49,10 +49,7 @@ configurations.all { force "commons-logging:commons-logging:${versions.commonslogging}" force "org.apache.httpcomponents:httpcore:${versions.httpcore}" force "commons-codec:commons-codec:${versions.commonscodec}" - force "commons-collections:commons-collections:3.2.2" - force "org.apache.httpcomponents:httpcore-nio:4.4.11" - force "org.apache.httpcomponents:httpclient:4.5.7" - + // This is required because kotlin coroutines core-1.1.1 still requires kotin stdlib 1.3.20 and we're using 1.3.21 force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}" @@ -61,7 +58,6 @@ configurations.all { dependencies { compileOnly "org.elasticsearch.plugin:elasticsearch-scripting-painless-spi:${versions.elasticsearch}" - compile "org.apache.httpcomponents:httpasyncclient:4.1.4" // Elasticsearch Nanny state compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt index 2d08994c..e1e0af02 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt @@ -19,7 +19,6 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction -import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler @@ -119,7 +118,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P } override fun getNamedXContent(): List { - return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, HttpInput.XCONTENT_REGISTRY) + return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY) } override fun createComponents( diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt index 32f61725..01155e83 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt @@ -18,12 +18,7 @@ package com.amazon.opendistroforelasticsearch.alerting import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts -import com.amazon.opendistroforelasticsearch.alerting.client.HttpInputClient import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner -import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.suspendUntil -import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toGetRequest -import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toMap -import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput @@ -56,14 +51,13 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings. import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils +import org.apache.logging.log4j.LogManager import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import kotlinx.coroutines.withContext -import org.apache.http.HttpResponse -import org.apache.logging.log4j.LogManager import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.DocWriteRequest import org.elasticsearch.action.bulk.BackoffPolicy @@ -111,7 +105,7 @@ class MonitorRunner( ) : JobRunner, CoroutineScope, AbstractLifecycleComponent() { private val logger = LogManager.getLogger(MonitorRunner::class.java) - private var httpClient: HttpInputClient + private lateinit var runnerSupervisor: Job override val coroutineContext: CoroutineContext get() = Dispatchers.Default + runnerSupervisor @@ -128,17 +122,14 @@ class MonitorRunner( clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) { millis, count -> moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count) } - httpClient = HttpInputClient() } override fun doStart() { runnerSupervisor = SupervisorJob() - httpClient.client.start() } override fun doStop() { runnerSupervisor.cancel() - httpClient.client.close() } override fun doClose() { } @@ -301,29 +292,6 @@ class MonitorRunner( val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } results += searchResponse.convertToMap() } - is HttpInput -> { - val response: HttpResponse = httpClient.client.suspendUntil { - httpClient.client.execute(input.toGetRequest(), it) - } - // Make sure response content length is not larger than 100MB - val contentLengthHeader = response.getFirstHeader("Content-Length").value - - // Use content-length header to check size. If content-length header does not exist, set Alert in Error state. - if (contentLengthHeader != null) { - logger.debug("Content length is $contentLengthHeader") - val contentLength = contentLengthHeader.toInt() - if (contentLength > httpClient.MAX_CONTENT_LENGTH) { - throw Exception("Response content size: $contentLength, is larger than ${httpClient.MAX_CONTENT_LENGTH}.") - } - } else { - logger.debug("Content-length header does not exist, set alert to error state.") - throw IllegalArgumentException("Response does not contain content-length header.") - } - - results += withContext(Dispatchers.IO) { - response.toMap() - } - } else -> { throw IllegalArgumentException("Unsupported input type: ${input.name()}.") } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/client/HttpInputClient.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/client/HttpInputClient.kt deleted file mode 100644 index 44c943a8..00000000 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/client/HttpInputClient.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.alerting.client - -import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput -import org.apache.http.client.config.RequestConfig -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder -import org.elasticsearch.common.unit.ByteSizeUnit -import org.elasticsearch.common.unit.TimeValue -import java.security.AccessController -import java.security.PrivilegedAction - -/** - * This class takes [HttpInput] and performs GET requests to given URIs. - */ -class HttpInputClient { - - // TODO: If possible, these settings should be implemented as changeable via the "_cluster/settings" API. - private val CONNECTION_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(5).millis().toInt() - private val REQUEST_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt() - private val SOCKET_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt() - val MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toBytes(100) - - val client = createHttpClient() - - /** - * Create [CloseableHttpAsyncClient] as a [PrivilegedAction] in order to avoid [java.net.NetPermission] error. - */ - private fun createHttpClient(): CloseableHttpAsyncClient { - val config = RequestConfig.custom() - .setConnectTimeout(CONNECTION_TIMEOUT_MILLISECONDS) - .setConnectionRequestTimeout(REQUEST_TIMEOUT_MILLISECONDS) - .setSocketTimeout(SOCKET_TIMEOUT_MILLISECONDS) - .build() - - return AccessController.doPrivileged(PrivilegedAction({ - HttpAsyncClientBuilder.create() - .setDefaultRequestConfig(config) - .useSystemProperties() - .build() - } as () -> CloseableHttpAsyncClient)) - } -} diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt index 8f109c4d..3175a609 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -14,25 +14,23 @@ */ package com.amazon.opendistroforelasticsearch.alerting.resthandler -import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices -import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toConstructedUrl -import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT -import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT +import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH +import com.amazon.opendistroforelasticsearch.alerting.util._ID +import com.amazon.opendistroforelasticsearch.alerting.util._VERSION +import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin +import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils -import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH -import com.amazon.opendistroforelasticsearch.alerting.util._ID import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO -import com.amazon.opendistroforelasticsearch.alerting.util._VERSION import org.apache.logging.log4j.LogManager import org.elasticsearch.action.ActionListener import org.elasticsearch.action.admin.indices.create.CreateIndexResponse @@ -78,7 +76,7 @@ private val log = LogManager.getLogger(RestIndexMonitorAction::class.java) * Rest handlers to create and update monitors. */ class RestIndexMonitorAction( - val settings: Settings, + settings: Settings, controller: RestController, jobIndices: ScheduledJobIndices, clusterService: ClusterService @@ -161,7 +159,6 @@ class RestIndexMonitorAction( */ private fun prepareMonitorIndexing() { validateActionThrottle(newMonitor, maxActionThrottle, TimeValue.timeValueMinutes(1)) - validateLocalPort(newMonitor, settings.get("http.port").toInt()) if (channel.request().method() == PUT) return updateMonitor() val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE)) val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout) @@ -183,23 +180,6 @@ class RestIndexMonitorAction( } } - /** - * This function checks whether the [Monitor] has an [HttpInput] with localhost. If so, make sure the port is same as specified in settings. - */ - private fun validateLocalPort(monitor: Monitor, settingsPort: Int) { - for (input in monitor.inputs) { - if (input is HttpInput) { - val constructedUrl = input.toConstructedUrl() - // Make sure that when host is "localhost", only port number specified in settings is allowed. - if (constructedUrl.host == "localhost") { - require(constructedUrl.port == settingsPort) { - "Host: ${constructedUrl.host} is restricted to port $settingsPort." - } - } - } - } - } - /** * After searching for all existing monitors we validate the system can support another monitor to be created. */ diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt index 5a6e7744..1f7ca1ce 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunnerIT.kt @@ -18,14 +18,14 @@ package com.amazon.opendistroforelasticsearch.alerting import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices import com.amazon.opendistroforelasticsearch.alerting.core.model.IntervalSchedule -import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput -import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult import com.amazon.opendistroforelasticsearch.alerting.model.Alert import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACKNOWLEDGED import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACTIVE import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.COMPLETED import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ERROR import com.amazon.opendistroforelasticsearch.alerting.model.Monitor +import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput +import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult import com.amazon.opendistroforelasticsearch.alerting.model.action.Throttle import org.elasticsearch.common.settings.Settings import org.elasticsearch.index.query.QueryBuilders @@ -582,116 +582,6 @@ class MonitorRunnerIT : AlertingRestTestCase() { actionResults2[actionThrottleEnabled.id]!!.lastExecutionTime) } - fun `test monitor HttpInput with non JSON response `() { - val clusterIndex = randomInt(clusterHosts.size - 1) - val input = randomHttpInput( - scheme = clusterHosts[clusterIndex].schemeName, - host = clusterHosts[clusterIndex].hostName, - port = clusterHosts[clusterIndex].port, - path = "_cat/indices", - params = mapOf(), - url = "", - connection_timeout = 5, - socket_timeout = 5) - val monitor = createMonitor(randomMonitor(inputs = listOf(input))) - val response = executeMonitor(monitor.id) - val output = entityAsMap(response) - @Suppress("UNCHECKED_CAST") - val inputResults = output.stringMap("input_results") - @Suppress("UNCHECKED_CAST") - val errorMessage = inputResults?.get("error") - assertTrue("Error did not occur from receiving invalid format of response, error message is actually: " + - "$errorMessage\nOutput: $output", - errorMessage.toString().contains("Unrecognized token")) - } - - fun `test monitor HttpInput with JSON response`() { - val clusterIndex = randomInt(clusterHosts.size - 1) - val input = randomHttpInput( - scheme = clusterHosts[clusterIndex].schemeName, - host = clusterHosts[clusterIndex].hostName, - port = clusterHosts[clusterIndex].port, - path = "_cluster/health", - params = mapOf(), - url = "", - connection_timeout = 5, - socket_timeout = 5) - val monitor = createMonitor(randomMonitor(inputs = listOf(input))) - val response = executeMonitor(monitor.id) - - val output = entityAsMap(response) - @Suppress("UNCHECKED_CAST") - val inputResults = output.stringMap("input_results") - val resultsContent = (inputResults?.get("results") as ArrayList<*>).get(0) - val errorMessage = inputResults.get("error") - - assertEquals(monitor.name, output["monitor_name"]) - @Suppress("UNCHECKED_CAST") - assertTrue("Monitor results should contain cluster_name field, but was actually: $resultsContent", - resultsContent.toString().contains("cluster_name")) - @Suppress("UNCHECKED_CAST") - assertTrue("Error message should not exist, error: $errorMessage", errorMessage == null) - } - - fun `test monitor HttpInput with alert triggered`() { - putAlertMappings() // Required as we do not have a create alert API. - val trigger = randomTrigger(condition = Script(""" - return ctx.results[0].number_of_pending_tasks < 1 - """.trimIndent()), destinationId = createDestination().id) - val clusterIndex = randomInt(clusterHosts.size - 1) - val input = randomHttpInput( - scheme = clusterHosts[clusterIndex].schemeName, - host = clusterHosts[clusterIndex].hostName, - port = clusterHosts[clusterIndex].port, - path = "_cluster/health", - params = mapOf(), - url = "", - connection_timeout = 5, - socket_timeout = 5) - val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger))) - val response = executeMonitor(monitor.id) - - val output = entityAsMap(response) - assertEquals(monitor.name, output["monitor_name"]) - for (triggerResult in output.objectMap("trigger_results").values) { - assertTrue("Should be triggered.", triggerResult.objectMap("action_results").isNotEmpty()) - } - - val alerts = searchAlerts(monitor) - assertEquals("Alert not saved, $output", 1, alerts.size) - verifyAlert(alerts.single(), monitor, ACTIVE) - } - - fun `test monitor HttpInput with no alert triggered`() { - putAlertMappings() // Required as we do not have a create alert API. - val trigger = randomTrigger(condition = Script(""" - return ctx.results[0].status.equals("red") - """.trimIndent())) - val clusterIndex = randomInt(clusterHosts.size - 1) - val input = randomHttpInput( - scheme = clusterHosts[clusterIndex].schemeName, - host = clusterHosts[clusterIndex].hostName, - port = clusterHosts[clusterIndex].port, - path = "_cluster/health", - params = mapOf(), - url = "", - connection_timeout = 5, - socket_timeout = 5) - val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger))) - val response = executeMonitor(monitor.id) - - val output = entityAsMap(response) - assertEquals(monitor.name, output["monitor_name"]) - for (triggerResult in output.objectMap("trigger_results").values) { - val actionResults = triggerResult.objectMap("action_results") - @Suppress("UNCHECKED_CAST") - assertTrue("Should not be triggered, $actionResults", actionResults.isEmpty()) - } - - val alerts = searchAlerts(monitor) - assertEquals("Alert saved for test monitor, output: $output", 0, alerts.size) - } - private fun verifyActionExecutionResultInAlert(alert: Alert, expectedResult: Map): MutableMap { val actionResult = mutableMapOf() diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt index 32f35064..435d7cd8 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/TestHelpers.kt @@ -14,7 +14,6 @@ */ package com.amazon.opendistroforelasticsearch.alerting -import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput import com.amazon.opendistroforelasticsearch.alerting.model.Alert import com.amazon.opendistroforelasticsearch.alerting.model.Monitor import com.amazon.opendistroforelasticsearch.alerting.model.Trigger @@ -67,27 +66,6 @@ fun randomMonitor( uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()) } -fun randomHttpInput( - scheme: String = "http", - host: String = "localhost", - port: Int = randomInt(65535), - path: String = ESRestTestCase.randomAlphaOfLength(10), - params: Map = mapOf(), - url: String = "", - connection_timeout: Int = randomInt(10), - socket_timeout: Int = randomInt(10) -): HttpInput { - return HttpInput( - scheme = scheme, - host = host, - port = port, - path = path, - params = params, - url = url, - connection_timeout = connection_timeout, - socket_timeout = socket_timeout) -} - fun randomTrigger( id: String = UUIDs.base64UUID(), name: String = ESRestTestCase.randomAlphaOfLength(10), diff --git a/core/build.gradle b/core/build.gradle index e4ae0277..4e6e3aeb 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -22,9 +22,7 @@ dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1' compile "com.cronutils:cron-utils:7.0.5" - compile "org.apache.httpcomponents:httpasyncclient:4.1.4" - compile 'commons-validator:commons-validator:1.6' - + testImplementation "org.elasticsearch.test:framework:${es_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "org.jetbrains.kotlin:kotlin-test-junit:${kotlin_version}" diff --git a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/httpapi/HttpExtensions.kt b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/httpapi/HttpExtensions.kt deleted file mode 100644 index 95463599..00000000 --- a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/httpapi/HttpExtensions.kt +++ /dev/null @@ -1,71 +0,0 @@ -package com.amazon.opendistroforelasticsearch.alerting.core.httpapi - -import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.suspendCancellableCoroutine -import org.apache.http.HttpResponse -import org.apache.http.client.config.RequestConfig -import org.apache.http.client.methods.HttpGet -import org.apache.http.client.utils.URIBuilder -import org.apache.http.concurrent.FutureCallback -import org.apache.http.nio.client.HttpAsyncClient -import org.apache.http.util.EntityUtils -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler -import org.elasticsearch.common.xcontent.NamedXContentRegistry -import org.elasticsearch.common.xcontent.XContentType -import java.net.URI -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException - -suspend fun C.suspendUntil(block: C.(FutureCallback) -> Unit): T = - suspendCancellableCoroutine { cont -> - block(object : FutureCallback { - override fun cancelled() { - cont.resumeWithException(CancellationException("Request cancelled")) - } - - override fun completed(result: T) { - cont.resume(result) - } - - override fun failed(ex: Exception) { - cont.resumeWithException(ex) - } - }) - } - -fun HttpResponse.toMap(): Map { - val xcp = XContentType.JSON.xContent().createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, EntityUtils.toString(entity)) - return xcp.map() -} - -fun HttpInput.toGetRequest(): HttpGet { - // Change timeout values to settings specified from input, multiply by 1000 to convert to milliseconds. - val requestConfig = RequestConfig.custom() - .setConnectTimeout(this.connection_timeout * 1000) - .setSocketTimeout(this.socket_timeout * 1000) - .build() - val constructedUrl = this.toConstructedUrl().toString() - val httpGetRequest = HttpGet(constructedUrl) - httpGetRequest.config = requestConfig - return httpGetRequest -} - -/** - * Construct url either by url or by scheme+host+port+path+params. - */ -fun HttpInput.toConstructedUrl(): URI { - return if (url.isEmpty()) { - val uriBuilder = URIBuilder() - uriBuilder.scheme = scheme - uriBuilder.host = host - uriBuilder.port = port - uriBuilder.path = path - for (e in params.entries) - uriBuilder.addParameter(e.key, e.value) - uriBuilder.build() - } else { - URIBuilder(url).build() - } -} diff --git a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/HttpInput.kt b/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/HttpInput.kt deleted file mode 100644 index d33dddfd..00000000 --- a/core/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/HttpInput.kt +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.alerting.core.model - -import org.apache.commons.validator.routines.UrlValidator -import org.apache.http.client.utils.URIBuilder -import org.elasticsearch.common.CheckedFunction -import org.elasticsearch.common.ParseField -import org.elasticsearch.common.Strings -import org.elasticsearch.common.xcontent.NamedXContentRegistry -import org.elasticsearch.common.xcontent.ToXContent -import org.elasticsearch.common.xcontent.XContentBuilder -import org.elasticsearch.common.xcontent.XContentParser -import org.elasticsearch.common.xcontent.XContentParserUtils -import java.io.IOException - -/** - * This is a data class of HTTP type of input for Monitors. - */ -data class HttpInput( - val scheme: String, - val host: String, - val port: Int, - val path: String, - val params: Map, - val url: String, - val connection_timeout: Int, - val socket_timeout: Int -) : Input { - - // Verify parameters are valid during creation - init { - require(validateFields()) { - "Either one of url or scheme + host + port+ + path + params can be set." - } - require(connection_timeout in 1..5) { - "Connection timeout: $connection_timeout is not in the range of 1 - 5" - } - require(socket_timeout in 1..60) { - "Socket timeout: $socket_timeout is not in the range of 1 - 60" - } - - // Create an UrlValidator that only accepts "http" and "https" as valid scheme and allows local URLs. - val urlValidator = UrlValidator(arrayOf("http", "https"), UrlValidator.ALLOW_LOCAL_URLS) - - // Build url field by field if not provided as whole. - val constructedUrl = if (Strings.isEmpty(url)) { - val uriBuilder = URIBuilder() - uriBuilder.scheme = scheme - uriBuilder.host = host - uriBuilder.port = port - uriBuilder.path = path - for (e in params.entries) - uriBuilder.addParameter(e.key, e.value) - uriBuilder.build() - } else { - URIBuilder(url).build() - } - - require(urlValidator.isValid(constructedUrl.toString())) { - "Invalid url: $constructedUrl" - } - } - - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - return builder.startObject() - .startObject(HTTP_FIELD) - .field(SCHEME_FIELD, scheme) - .field(HOST_FIELD, host) - .field(PORT_FIELD, port) - .field(PATH_FIELD, path) - .field(PARAMS_FIELD, this.params) - .field(URL_FIELD, url) - .field(CONNECTION_TIMEOUT_FIELD, connection_timeout) - .field(SOCKET_TIMEOUT_FIELD, socket_timeout) - .endObject() - .endObject() - } - - override fun name(): String { - return HTTP_FIELD - } - - companion object { - const val SCHEME_FIELD = "scheme" - const val HOST_FIELD = "host" - const val PORT_FIELD = "port" - const val PATH_FIELD = "path" - const val PARAMS_FIELD = "params" - const val URL_FIELD = "url" - const val CONNECTION_TIMEOUT_FIELD = "connection_timeout" - const val SOCKET_TIMEOUT_FIELD = "socket_timeout" - const val HTTP_FIELD = "http" - - val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField("http"), CheckedFunction { parseInner(it) }) - - /** - * This parse function uses [XContentParser] to parse JSON input and store corresponding fields to create a [HttpInput] object - */ - @JvmStatic @Throws(IOException::class) - private fun parseInner(xcp: XContentParser): HttpInput { - var scheme = "http" - var host = "" - var port: Int = -1 - var path = "" - var params: Map = mutableMapOf() - var url = "" - var connectionTimeout = 5 - var socketTimeout = 10 - - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation) - - while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { - val fieldName = xcp.currentName() - xcp.nextToken() - when (fieldName) { - SCHEME_FIELD -> scheme = xcp.text() - HOST_FIELD -> host = xcp.text() - PORT_FIELD -> port = xcp.intValue() - PATH_FIELD -> path = xcp.text() - PARAMS_FIELD -> params = xcp.mapStrings() - URL_FIELD -> url = xcp.text() - CONNECTION_TIMEOUT_FIELD -> connectionTimeout = xcp.intValue() - SOCKET_TIMEOUT_FIELD -> socketTimeout = xcp.intValue() - } - } - return HttpInput(scheme, host, port, path, params, url, connectionTimeout, socketTimeout) - } - } - - /** - * Helper function to check whether one of url or scheme+host+port+path+params is defined. - */ - private fun validateFields(): Boolean { - if (url.isNotEmpty()) { - return (host.isEmpty() && (port == -1) && path.isEmpty() && params.isEmpty()) - } - return true - } -} diff --git a/core/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/HttpInputTest.kt b/core/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/HttpInputTest.kt deleted file mode 100644 index 3e284a6c..00000000 --- a/core/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/core/model/HttpInputTest.kt +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.alerting.core.model - -import java.net.URISyntaxException -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertTrue -import kotlin.test.fail - -class HttpInputTest { - // Test invalid url with different format in one function - @Test - fun `test invalid urls`() { - try { - // Invalid scheme - HttpInput("notAValidScheme", "localhost", 9200, "_cluster/health", mapOf(), "", 5, 5) - fail("Invalid scheme when creating HttpInput should fail.") - } catch (e: IllegalArgumentException) { - assertEquals("Invalid url: notAValidScheme://localhost:9200/_cluster/health", e.message) - } - try { - // Invalid host - HttpInput("http", "loco//host", 9200, "_cluster/health", mapOf(), "", 5, 5) - fail("Invalid host when creating HttpInput should fail.") - } catch (e: IllegalArgumentException) { - assertEquals("Invalid url: http://loco//host:9200/_cluster/health", e.message) - } - try { - // Invalid path - HttpInput("http", "localhost", 9200, "///", mapOf(), "", 5, 5) - fail("Invalid path when creating HttpInput should fail.") - } catch (e: IllegalArgumentException) { - assertEquals("Invalid url: http://localhost:9200///", e.message) - } - try { - // Invalid url - HttpInput("", "", -1, "", mapOf(), "¯¯\\_( ͡° ͜ʖ ͡°)_//¯ ", 5, 5) - fail("Invalid url when creating HttpInput should fail.") - } catch (e: URISyntaxException) { - assertTrue(e.message.toString().contains("Illegal character in path at index"), "Error message is : ${e.message}") - } - try { - // Invalid connection timeout - HttpInput("http", "localhost", 9200, "_cluster/health", mapOf(), "", 70, 5) - fail("Invalid connection timeout when creating HttpInput should fail.") - } catch (e: IllegalArgumentException) { - assertEquals("Connection timeout: 70 is not in the range of 1 - 5", e.message) - } - try { - // Invalid socket timeout - HttpInput("http", "localhost", 9200, "_cluster/health", mapOf(), "", 5, -5) - fail("Invalid socket timeout when creating HttpInput should fail.") - } catch (e: IllegalArgumentException) { - assertEquals("Socket timeout: -5 is not in the range of 1 - 60", e.message) - } - try { - // Setting other fields along with url field is not allowed - HttpInput("http", "localhost", 9200, "_cluster/health", mapOf(), "http://localhost:9200/_cluster/health", 5, 5) - fail("Setting url and other fields at the same time should fail.") - } catch (e: IllegalArgumentException) { - assertEquals("Either one of url or scheme + host + port+ + path + params can be set.", e.message) - } - } - - // Test valid url with complete url - @Test - fun `test valid HttpInput using url`() { - val validHttpInput = HttpInput("", "", -1, "", mapOf(), "http://localhost:9200/_cluster/health/", 5, 5) - assertEquals(validHttpInput.url, "http://localhost:9200/_cluster/health/") - assertEquals(validHttpInput.connection_timeout, 5) - assertEquals(validHttpInput.socket_timeout, 5) - } - - @Test - fun `test valid HttpInput created field by field`() { - val validHttpInput = HttpInput( - scheme = "http", - host = "localhost", - port = 9200, - path = "_cluster/health", - params = mapOf("value" to "x", "secondVal" to "second"), - url = "", - connection_timeout = 5, - socket_timeout = 10) - assertEquals(validHttpInput.scheme, "http") - assertEquals(validHttpInput.host, "localhost") - assertEquals(validHttpInput.port, 9200) - assertEquals(validHttpInput.path, "_cluster/health") - assertEquals(validHttpInput.params, mapOf("value" to "x", "secondVal" to "second")) - assertEquals(validHttpInput.url, "") - assertEquals(validHttpInput.connection_timeout, 5) - assertEquals(validHttpInput.socket_timeout, 10) - } -}