Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds support for Elasticsearch 7.10.0 #300

Merged
merged 11 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Multi node test workflow
# This workflow is triggered on pull requests to master
on:
pull_request:
branches:
- master
push:
branches:
- master

jobs:
build:
# Job name
name: Build Alerting
# This job runs on Linux
runs-on: ubuntu-latest
steps:
# This step uses the checkout Github action: https://github.com/actions/checkout
- name: Checkout Branch
uses: actions/checkout@v2
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 14
uses: actions/setup-java@v1
with:
java-version: 14
- name: Run integration tests with multi node config
run: ./gradlew integTest -PnumNodes=3
21 changes: 12 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ When launching a cluster using one of the above commands, logs are placed in `al
#### Run integration tests with Security enabled

1. Setup a local odfe cluster with security plugin.
`./gradlew build`
`./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=es-integrationtest -Dhttps=true -Duser=admin -Dpassword=admin`
`./gradlew :alerting:integTestRunner -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=es-integrationtest -Dhttps=true -Duser=admin -Dpassword=admin --tests "com.amazon.opendistroforelasticsearch.alerting.MonitorRunnerIT.test execute monitor returns search result"`

- `./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=es-integrationtest -Dhttps=true -Duser=admin -Dpassword=admin`

- `./gradlew :alerting:integTestRunner -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=es-integrationtest -Dhttps=true -Duser=admin -Dpassword=admin --tests "com.amazon.opendistroforelasticsearch.alerting.MonitorRunnerIT.test execute monitor returns search result"`

### Debugging

Expand All @@ -75,7 +76,7 @@ You must start your debugger to listen for remote JVM before running the below c
To debug code running in an actual server, run:

```
./gradlew :alerting:integTest -Des.debug # to start a cluster and run integ tests
./gradlew :alerting:integTest -Dcluster.debug # to start a cluster and run integ tests
```

OR
Expand All @@ -84,18 +85,20 @@ OR
./gradlew :alerting:run --debug-jvm # to just start a cluster that can be debugged
```

The Elasticsearch server JVM will launch suspended and wait for a debugger to attach to `localhost:5005` before starting the Elasticsearch server.
The IDE needs to listen for the remote JVM. If using Intellij you must set your debug configuration to "Listen to remote JVM" and make sure "Auto Restart" is checked.
You must start your debugger to listen for remote JVM before running the commands.
The Elasticsearch server JVM will launch suspended and wait for a debugger to attach to `localhost:5005` before starting the Elasticsearch server. The IDE needs to listen for the remote JVM. If using Intellij you must set your debug configuration to "Listen to remote JVM" and make sure "Auto Restart" is checked. You must start your debugger to listen for remote JVM before running the commands.

To debug code running in an integ test (which exercises the server from a separate JVM), run:

```
./gradlew -Dtest.debug :alerting:integTest
./gradlew :alerting:integTest -Dtest.debug
```

The test runner JVM will start suspended and wait for a debugger to attach to `localhost:5005` before running the tests.
The test runner JVM will start suspended and wait for a debugger to attach to `localhost:8000` before running the tests.

Additionally, it is possible to attach one debugger to the cluster JVM and another debugger to the test runner. First, make sure one debugger is listening on port `5005` and the other is listening on port `8000`. Then, run:
```
./gradlew :alerting:integTest -Dtest.debug -Dcluster.debug
```

### Advanced: Launching multi-node clusters locally

Expand Down
91 changes: 52 additions & 39 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import java.nio.charset.StandardCharsets
import java.nio.file.Files

/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
Expand All @@ -15,19 +12,16 @@ import java.nio.file.Files
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'elasticsearch.esplugin'
apply plugin: 'elasticsearch.testclusters'
apply plugin: 'jacoco'

def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster')
def usingMultiNode = project.properties.containsKey('numNodes')
// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: '../build-tools/esplugin-coverage.gradle'
}


ext {
projectSubstitutions = [:]
Expand Down Expand Up @@ -88,61 +82,74 @@ dependencyLicenses.enabled = false
validateNebulaPom.enabled = false
thirdPartyAudit.enabled = false

loggerUsageCheck.enabled = false
licenseHeaders.enabled = false

def es_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile
es_tmp_dir.mkdirs()

test {
systemProperty 'tests.security.manager', 'false'
}

def _numNodes = findProperty('numNodes') as Integer ?: 1
integTest {
runner {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath

systemProperty "https", System.getProperty("https")
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")

// The 'doFirst' delays till execution time.
doFirst {
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
// use longer timeouts for requests.
def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null
systemProperty 'cluster.debug', isDebuggingCluster
// Set number of nodes system property to be used in tests
systemProperty 'cluster.number_of_nodes', "${_numNodes}"
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
cluster.waitForAllConditions()
}
}

// The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}
}
import org.elasticsearch.gradle.test.RestIntegTestTask
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
task integTest(type: RestIntegTestTask) {
description = "Run integration tests against integTest cluster"
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
}
tasks.named("check").configure { dependsOn(integTest) }
Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin");
integTest.dependsOn(bundle)
integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))}

def _numNodes = findProperty('numNodes') as Integer ?: 1
testClusters.integTest {
testDistribution = "OSS"
// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
// i.e. we have to use a custom property to flag when we want to debug elasticsearch JVM
// since we also support multi node integration tests we increase debugPort per node
if (System.getProperty("es.debug") != null) {
if (System.getProperty("cluster.debug") != null) {
def debugPort = 5005
nodes.forEach { node ->
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}")
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=${debugPort}")
debugPort += 1
}
}
}

integTest {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath

systemProperty "https", System.getProperty("https")
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")

// The 'doFirst' delays till execution time.
doFirst {
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
// use longer timeouts for requests.
def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null
systemProperty 'cluster.debug', isDebuggingCluster
// Set number of nodes system property to be used in tests
systemProperty 'cluster.number_of_nodes', "${_numNodes}"
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
cluster.waitForAllConditions()
}
}

// The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000'
}
}

run {
doFirst {
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
Expand All @@ -151,6 +158,12 @@ run {
cluster.waitForAllConditions()
}
}
// useCluster testClusters.integTest
}

// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: '../build-tools/esplugin-coverage.gradle'
}

apply from: '../build-tools/pkgbuild.gradle'
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ class MonitorRunner(
private fun contentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
return xcp
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ data class AlertError(val timestamp: Instant, val message: String) : Writeable,
lateinit var timestamp: Instant
lateinit var message: String

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.ClusterChangedEvent
import org.elasticsearch.cluster.ClusterStateListener
import org.elasticsearch.cluster.LocalNodeMasterListener
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
Expand All @@ -66,11 +65,10 @@ class AlertIndices(
private val client: Client,
private val threadPool: ThreadPool,
private val clusterService: ClusterService
) : LocalNodeMasterListener, ClusterStateListener {
) : ClusterStateListener {

init {
clusterService.addListener(this)
clusterService.addLocalNodeMasterListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_ENABLED) { historyEnabled = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_MAX_DOCS) { historyMaxDocs = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_INDEX_MAX_AGE) { historyMaxAge = it }
Expand Down Expand Up @@ -123,6 +121,8 @@ class AlertIndices(

@Volatile private var requestTimeout = AlertingSettings.REQUEST_TIMEOUT.get(settings)

@Volatile private var isMaster = false

// for JobsMonitor to report
var lastRolloverTime: TimeValue? = null

Expand All @@ -132,7 +132,7 @@ class AlertIndices(

private var scheduledRollover: Cancellable? = null

override fun onMaster() {
fun onMaster() {
try {
// TODO: Change current actionGet requests within rolloverHistoryIndex() rolloverAndDeleteHistoryIndices() to use suspendUntil
// try to rollover immediately as we might be restarting the cluster
Expand All @@ -147,16 +147,28 @@ class AlertIndices(
}
}

override fun offMaster() {
fun offMaster() {
scheduledRollover?.cancel()
}

override fun executorName(): String {
private fun executorName(): String {
return ThreadPool.Names.MANAGEMENT
}

override fun clusterChanged(event: ClusterChangedEvent) {
// if the indexes have been deleted they need to be reinitalized
// Instead of using a LocalNodeMasterListener to track master changes, this service will
// track them here to avoid conditions where master listener events run after other
// listeners that depend on what happened in the master listener
if (this.isMaster != event.localNodeMaster()) {
this.isMaster = event.localNodeMaster()
if (this.isMaster) {
onMaster()
} else {
offMaster()
}
}

// if the indexes have been deleted they need to be reinitialized
alertIndexInitialized = event.state().routingTable().hasIndex(ALERT_INDEX)
historyIndexInitialized = event.state().metadata().hasAlias(HISTORY_WRITE_INDEX)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu
private fun alertContentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
return xcp
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ data class ActionExecutionResult(
var throttledCount: Int = 0
var lastExecutionTime: Instant? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ data class Alert(
val errorHistory: MutableList<AlertError> = mutableListOf()
var actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand All @@ -192,14 +192,14 @@ data class Alert(
ACKNOWLEDGED_TIME_FIELD -> acknowledgedTime = xcp.instant()
ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull()
ALERT_HISTORY_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
errorHistory.add(AlertError.parse(xcp))
}
}
SEVERITY_FIELD -> severity = xcp.text()
ACTION_EXECUTION_RESULTS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
actionExecutionResults.add(ActionExecutionResult.parse(xcp))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ data class Monitor(
val triggers: MutableList<Trigger> = mutableListOf()
val inputs: MutableList<Input> = mutableListOf()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand All @@ -195,13 +195,13 @@ data class Monitor(
ENABLED_FIELD -> enabled = xcp.booleanValue()
SCHEDULE_FIELD -> schedule = Schedule.parse(xcp)
INPUTS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
inputs.add(Input.parse(xcp))
}
}
TRIGGERS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
triggers.add(Trigger.parse(xcp))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ data class Trigger(
lateinit var severity: String
lateinit var condition: Script
val actions: MutableList<Action> = mutableListOf()
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)

while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand All @@ -106,7 +106,7 @@ data class Trigger(
xcp.nextToken()
}
ACTIONS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
actions.add(Action.parse(xcp))
}
Expand Down
Loading