diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteDestinationAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteDestinationAction.kt index cf11349a..1640d7cb 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteDestinationAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteDestinationAction.kt @@ -18,29 +18,133 @@ package com.amazon.opendistroforelasticsearch.alerting.transport import com.amazon.opendistroforelasticsearch.alerting.action.DeleteDestinationAction import com.amazon.opendistroforelasticsearch.alerting.action.DeleteDestinationRequest import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob +import com.amazon.opendistroforelasticsearch.alerting.model.destination.Destination +import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException +import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles +import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions +import com.amazon.opendistroforelasticsearch.commons.ConfigConstants +import com.amazon.opendistroforelasticsearch.commons.authuser.User +import org.apache.logging.log4j.LogManager +import org.elasticsearch.ElasticsearchStatusException import org.elasticsearch.action.ActionListener import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.delete.DeleteResponse +import org.elasticsearch.action.get.GetRequest +import org.elasticsearch.action.get.GetResponse import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.HandledTransportAction import org.elasticsearch.client.Client +import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler +import org.elasticsearch.common.xcontent.NamedXContentRegistry +import org.elasticsearch.common.xcontent.XContentFactory +import org.elasticsearch.common.xcontent.XContentParser +import org.elasticsearch.common.xcontent.XContentParserUtils +import org.elasticsearch.common.xcontent.XContentType +import org.elasticsearch.rest.RestStatus import org.elasticsearch.tasks.Task import org.elasticsearch.transport.TransportService +import java.io.IOException + +private val log = LogManager.getLogger(TransportDeleteDestinationAction::class.java) class TransportDeleteDestinationAction @Inject constructor( transportService: TransportService, val client: Client, - actionFilters: ActionFilters + actionFilters: ActionFilters, + val clusterService: ClusterService, + settings: Settings, + val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( DeleteDestinationAction.NAME, transportService, actionFilters, ::DeleteDestinationRequest ) { + @Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + private var user: User? = null + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it } + } + override fun doExecute(task: Task, request: DeleteDestinationRequest, actionListener: ActionListener) { + val userStr = client.threadPool().threadContext.getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES) + log.debug("User and roles string from thread context: $userStr") + user = User.parse(userStr) val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.destinationId) .setRefreshPolicy(request.refreshPolicy) + + if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) { + return + } client.threadPool().threadContext.stashContext().use { + DeleteDestinationHandler(client, actionListener, deleteRequest, user, request.destinationId).resolveUserAndStart() + } + } + + inner class DeleteDestinationHandler( + private val client: Client, + private val actionListener: ActionListener, + private val deleteRequest: DeleteRequest, + private val user: User?, + private val destinationId: String + ) { + + fun resolveUserAndStart() { + if (user == null) { + // Security is disabled, so we can delete the destination without issues + deleteDestination() + } else if (!filterByEnabled) { + // security is enabled and filterby is disabled. + deleteDestination() + } else { + // security is enabled and filterby is enabled. + try { + start() + } catch (ex: IOException) { + actionListener.onFailure(AlertingException.wrap(ex)) + } + } + } + + fun start() { + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, destinationId) + client.get(getRequest, object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure(AlertingException.wrap( + ElasticsearchStatusException("Destination with $destinationId is not found", RestStatus.NOT_FOUND))) + return + } + val id = response.id + val version = response.version + val seqNo = response.seqNo.toInt() + val primaryTerm = response.primaryTerm.toInt() + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsString) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val dest = Destination.parse(xcp, id, version, seqNo, primaryTerm) + onGetResponse(dest) + } + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } + + private fun onGetResponse(destination: Destination) { + if (!checkUserFilterByPermissions(filterByEnabled, user, destination.user, actionListener, "destination", destinationId)) { + return + } else { + deleteDestination() + } + } + + private fun deleteDestination() { client.delete(deleteRequest, object : ActionListener { override fun onResponse(response: DeleteResponse) { actionListener.onResponse(response) diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteMonitorAction.kt index abaec54e..15d5f79e 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -18,29 +18,123 @@ package com.amazon.opendistroforelasticsearch.alerting.transport import com.amazon.opendistroforelasticsearch.alerting.action.DeleteMonitorAction import com.amazon.opendistroforelasticsearch.alerting.action.DeleteMonitorRequest import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob +import com.amazon.opendistroforelasticsearch.alerting.model.Monitor +import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException +import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles +import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions +import com.amazon.opendistroforelasticsearch.commons.ConfigConstants +import com.amazon.opendistroforelasticsearch.commons.authuser.User +import org.apache.logging.log4j.LogManager +import org.elasticsearch.ElasticsearchStatusException import org.elasticsearch.action.ActionListener import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.delete.DeleteResponse +import org.elasticsearch.action.get.GetRequest +import org.elasticsearch.action.get.GetResponse import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.HandledTransportAction import org.elasticsearch.client.Client +import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler +import org.elasticsearch.common.xcontent.NamedXContentRegistry +import org.elasticsearch.common.xcontent.XContentHelper +import org.elasticsearch.common.xcontent.XContentType +import org.elasticsearch.rest.RestStatus import org.elasticsearch.tasks.Task import org.elasticsearch.transport.TransportService +import java.io.IOException + +private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java) class TransportDeleteMonitorAction @Inject constructor( transportService: TransportService, val client: Client, - actionFilters: ActionFilters + actionFilters: ActionFilters, + val clusterService: ClusterService, + settings: Settings, + val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( DeleteMonitorAction.NAME, transportService, actionFilters, ::DeleteMonitorRequest ) { + @Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + private var user: User? = null + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it } + } + override fun doExecute(task: Task, request: DeleteMonitorRequest, actionListener: ActionListener) { + val userStr = client.threadPool().threadContext.getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES) + log.debug("User and roles string from thread context: $userStr") + user = User.parse(userStr) val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.monitorId) .setRefreshPolicy(request.refreshPolicy) + + if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) { + return + } client.threadPool().threadContext.stashContext().use { + DeleteMonitorHandler(client, actionListener, deleteRequest, user, request.monitorId).resolveUserAndStart() + } + } + + inner class DeleteMonitorHandler( + private val client: Client, + private val actionListener: ActionListener, + private val deleteRequest: DeleteRequest, + private val user: User?, + private val monitorId: String + ) { + + fun resolveUserAndStart() { + if (user == null) { + // Security is disabled, so we can delete the destination without issues + deleteMonitor() + } else if (!filterByEnabled) { + // security is enabled and filterby is disabled. + deleteMonitor() + } else { + try { + start() + } catch (ex: IOException) { + actionListener.onFailure(AlertingException.wrap(ex)) + } + } + } + + fun start() { + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) + client.get(getRequest, object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure(AlertingException.wrap( + ElasticsearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND))) + return + } + val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, XContentType.JSON) + val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor + onGetResponse(monitor) + } + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } + + private fun onGetResponse(monitor: Monitor) { + if (!checkUserFilterByPermissions(filterByEnabled, user, monitor.user, actionListener, "monitor", monitorId)) { + return + } else { + deleteMonitor() + } + } + + private fun deleteMonitor() { client.delete(deleteRequest, object : ActionListener { override fun onResponse(response: DeleteResponse) { actionListener.onResponse(response) diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportGetMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportGetMonitorAction.kt index 6dbd2b71..22957af7 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportGetMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportGetMonitorAction.kt @@ -20,7 +20,12 @@ import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorRequest import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorResponse import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob import com.amazon.opendistroforelasticsearch.alerting.model.Monitor +import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException +import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles +import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions +import com.amazon.opendistroforelasticsearch.commons.ConfigConstants +import com.amazon.opendistroforelasticsearch.commons.authuser.User import org.apache.logging.log4j.LogManager import org.elasticsearch.ElasticsearchStatusException import org.elasticsearch.action.ActionListener @@ -29,7 +34,9 @@ import org.elasticsearch.action.get.GetResponse import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.HandledTransportAction import org.elasticsearch.client.Client +import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.LoggingDeprecationHandler import org.elasticsearch.common.xcontent.NamedXContentRegistry import org.elasticsearch.common.xcontent.XContentHelper @@ -44,17 +51,33 @@ class TransportGetMonitorAction @Inject constructor( transportService: TransportService, val client: Client, actionFilters: ActionFilters, - val xContentRegistry: NamedXContentRegistry + val xContentRegistry: NamedXContentRegistry, + val clusterService: ClusterService, + settings: Settings ) : HandledTransportAction ( GetMonitorAction.NAME, transportService, actionFilters, ::GetMonitorRequest ) { + @Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + private var user: User? = null + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it } + } + override fun doExecute(task: Task, getMonitorRequest: GetMonitorRequest, actionListener: ActionListener) { + val userStr = client.threadPool().threadContext.getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES) + log.debug("User and roles string from thread context: $userStr") + user = User.parse(userStr) val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getMonitorRequest.monitorId) .version(getMonitorRequest.version) .fetchSourceContext(getMonitorRequest.srcContext) + if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) { + return + } + /* * Remove security context before you call elasticsearch api's. By this time, permissions required * to call this api are validated. @@ -66,20 +89,31 @@ class TransportGetMonitorAction @Inject constructor( override fun onResponse(response: GetResponse) { if (!response.isExists) { actionListener.onFailure( - AlertingException.wrap(ElasticsearchStatusException("Monitor not found.", RestStatus.NOT_FOUND))) + AlertingException.wrap(ElasticsearchStatusException("Monitor not found.", RestStatus.NOT_FOUND))) return } var monitor: Monitor? = null if (!response.isSourceEmpty) { XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON).use { xcp -> + response.sourceAsBytesRef, XContentType.JSON).use { xcp -> monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor + + // security is enabled and filterby is enabled + if (!checkUserFilterByPermissions( + filterByEnabled, + user, + monitor?.user, + actionListener, + "monitor", + getMonitorRequest.monitorId)) { + return + } } } actionListener.onResponse( - GetMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, monitor) + GetMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, monitor) ) } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexDestinationAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexDestinationAction.kt index 0521744e..24381a76 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexDestinationAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexDestinationAction.kt @@ -5,11 +5,13 @@ import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationReq import com.amazon.opendistroforelasticsearch.alerting.action.IndexDestinationResponse import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob +import com.amazon.opendistroforelasticsearch.alerting.model.destination.Destination import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings import com.amazon.opendistroforelasticsearch.alerting.settings.DestinationSettings import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles +import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions import com.amazon.opendistroforelasticsearch.commons.ConfigConstants import com.amazon.opendistroforelasticsearch.commons.authuser.User import org.apache.logging.log4j.LogManager @@ -27,8 +29,14 @@ import org.elasticsearch.client.Client import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.inject.Inject import org.elasticsearch.common.settings.Settings +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler +import org.elasticsearch.common.xcontent.NamedXContentRegistry import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.XContentFactory import org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder +import org.elasticsearch.common.xcontent.XContentParser +import org.elasticsearch.common.xcontent.XContentParserUtils +import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.rest.RestRequest import org.elasticsearch.rest.RestStatus import org.elasticsearch.tasks.Task @@ -43,7 +51,8 @@ class TransportIndexDestinationAction @Inject constructor( actionFilters: ActionFilters, val scheduledJobIndices: ScheduledJobIndices, val clusterService: ClusterService, - settings: Settings + settings: Settings, + val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( IndexDestinationAction.NAME, transportService, actionFilters, ::IndexDestinationRequest ) { @@ -198,7 +207,22 @@ class TransportIndexDestinationAction @Inject constructor( val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.destinationId) client.get(getRequest, object : ActionListener { override fun onResponse(response: GetResponse) { - onGetResponse(response) + if (!response.isExists) { + actionListener.onFailure(AlertingException.wrap( + ElasticsearchStatusException("Destination with ${request.destinationId} is not found", RestStatus.NOT_FOUND))) + return + } + val id = response.id + val version = response.version + val seqNo = response.seqNo.toInt() + val primaryTerm = response.primaryTerm.toInt() + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsString) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val dest = Destination.parse(xcp, id, version, seqNo, primaryTerm) + onGetResponse(dest) } override fun onFailure(t: Exception) { actionListener.onFailure(AlertingException.wrap(t)) @@ -206,17 +230,21 @@ class TransportIndexDestinationAction @Inject constructor( }) } - private fun onGetResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure(AlertingException.wrap( - ElasticsearchStatusException("Destination with ${request.destinationId} is not found", RestStatus.NOT_FOUND))) + private fun onGetResponse(destination: Destination) { + if (!checkUserFilterByPermissions( + filterByEnabled, + user, + destination.user, + actionListener, + "destination", + request.destinationId)) { return } - val destination = request.destination.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) + val indexDestination = request.destination.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) .setRefreshPolicy(request.refreshPolicy) - .source(destination.toXContent(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .source(indexDestination.toXContent(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) .id(request.destinationId) .setIfSeqNo(request.seqNo) .setIfPrimaryTerm(request.primaryTerm) @@ -231,7 +259,7 @@ class TransportIndexDestinationAction @Inject constructor( return } actionListener.onResponse(IndexDestinationResponse(response.id, response.version, response.seqNo, - response.primaryTerm, RestStatus.CREATED, destination)) + response.primaryTerm, RestStatus.CREATED, indexDestination)) } override fun onFailure(t: Exception) { actionListener.onFailure(AlertingException.wrap(t)) diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexMonitorAction.kt index 2d749cb8..5a29a914 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/transport/TransportIndexMonitorAction.kt @@ -35,6 +35,7 @@ import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils import com.amazon.opendistroforelasticsearch.alerting.util.addUserBackendRolesFilter import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles +import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions import com.amazon.opendistroforelasticsearch.alerting.util.isADMonitor import com.amazon.opendistroforelasticsearch.commons.ConfigConstants import com.amazon.opendistroforelasticsearch.commons.authuser.User @@ -379,7 +380,15 @@ class TransportIndexMonitorAction @Inject constructor( val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, request.monitorId) client.get(getRequest, object : ActionListener { override fun onResponse(response: GetResponse) { - onGetResponse(response) + if (!response.isExists) { + actionListener.onFailure(AlertingException.wrap( + ElasticsearchStatusException("Monitor with ${request.monitorId} is not found", RestStatus.NOT_FOUND))) + return + } + val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, XContentType.JSON) + val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor + onGetResponse(monitor) } override fun onFailure(t: Exception) { actionListener.onFailure(AlertingException.wrap(t)) @@ -387,16 +396,11 @@ class TransportIndexMonitorAction @Inject constructor( }) } - private fun onGetResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure(AlertingException.wrap( - ElasticsearchStatusException("Monitor with ${request.monitorId} is not found", RestStatus.NOT_FOUND))) + private fun onGetResponse(currentMonitor: Monitor) { + if (!checkUserFilterByPermissions(filterByEnabled, user, currentMonitor.user, actionListener, "monitor", request.monitorId)) { return } - val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON) - val currentMonitor = ScheduledJob.parse(xcp, request.monitorId) as Monitor // If both are enabled, use the current existing monitor enabled time, otherwise the next execution will be // incorrect. if (request.monitor.enabled && currentMonitor.enabled) @@ -440,39 +444,5 @@ class TransportIndexMonitorAction @Inject constructor( } return null } - - /*private fun checkForDisallowedDestinations(allowList: List) { - this.request.monitor.triggers.forEach { trigger -> - trigger.actions.forEach { action -> - // Check for empty destinationId for test cases, otherwise we get test failures - if (action.destinationId.isNotEmpty()) checkIfDestinationIsAllowed(action.destinationId, allowList) - } - } - } - - private fun checkIfDestinationIsAllowed(destinationId: String, allowList: List) { - val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, destinationId) - client.threadPool().threadContext.stashContext().use { - client.get(getRequest, object : ActionListener { - override fun onResponse(response: GetResponse) { - val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON) - val destination = Destination.parseWithType(xcp) - if (!allowList.contains(destination.type.value)) { - actionListener.onFailure( - AlertingException.wrap(ElasticsearchStatusException( - "Monitor contains a destination type that is not allowed: ${destination.type.value}", - RestStatus.FORBIDDEN - )) - ) - } - } - - override fun onFailure(e: Exception) { - actionListener.onFailure(e) - } - }) - } - }*/ } } diff --git a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/AlertingUtils.kt index 9236747f..2b0f2ffb 100644 --- a/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/util/AlertingUtils.kt @@ -72,3 +72,33 @@ fun checkFilterByUserBackendRoles(filterByEnabled: Boolean, user: User } return true } + +/** + * If FilterBy is enabled, this function verifies that the requester user has FilterBy permissions to access + * the resource. If FilterBy is disabled, we will assume the user has permissions and return true. + * + * This check will later to moved to the security plugin. + */ +fun checkUserFilterByPermissions( + filterByEnabled: Boolean, + requesterUser: User?, + resourceUser: User?, + actionListener: ActionListener, + resourceType: String, + resourceId: String +): Boolean { + + if (!filterByEnabled) return true + + val resourceBackendRoles = resourceUser?.backendRoles + val requesterBackendRoles = requesterUser?.backendRoles + + if (resourceBackendRoles == null || requesterBackendRoles == null || resourceBackendRoles.intersect(requesterBackendRoles).isEmpty()) { + actionListener.onFailure(AlertingException.wrap( + ElasticsearchStatusException("Do not have permissions to resource, $resourceType, with id, $resourceId", + RestStatus.FORBIDDEN) + )) + return false + } + return true +} diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingRestTestCase.kt index 4b1c4ea2..48132191 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingRestTestCase.kt @@ -92,6 +92,14 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return monitor.copy(id = monitorJson["_id"] as String, version = (monitorJson["_version"] as Int).toLong()) } + protected fun deleteMonitor(monitor: Monitor, refresh: Boolean = true): Response { + val response = client().makeRequest("DELETE", "$ALERTING_BASE_URI/${monitor.id}?refresh=$refresh", emptyMap(), + monitor.toHttpEntity()) + assertEquals("Unable to delete a monitor", RestStatus.OK, response.restStatus()) + + return response + } + protected fun createDestination(destination: Destination = getTestDestination(), refresh: Boolean = true): Destination { val response = client().makeRequest( "POST", @@ -108,6 +116,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { ) } + protected fun deleteDestination(destination: Destination = getTestDestination(), refresh: Boolean = true): Response { + val response = client().makeRequest( + "DELETE", + "$DESTINATION_BASE_URI/${destination.id}?refresh=$refresh", + emptyMap(), + destination.toHttpEntity()) + assertEquals("Unable to create a new destination", RestStatus.OK, response.restStatus()) + + return response + } + protected fun updateDestination(destination: Destination, refresh: Boolean = true): Destination { val response = client().makeRequest( "PUT", diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/SecureDestinationRestApiIT.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/SecureDestinationRestApiIT.kt index 7f3d7875..5a3558ff 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/SecureDestinationRestApiIT.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/SecureDestinationRestApiIT.kt @@ -86,6 +86,130 @@ class SecureDestinationRestApiIT : AlertingRestTestCase() { } } + fun `test update destination with disable filter by`() { + disableFilterBy() + + val chime = Chime("http://abc.com") + val destination = Destination( + type = DestinationType.CHIME, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = chime, + slack = null, + customWebhook = null, + email = null) + val createdDestination = createDestination(destination = destination) + + assertEquals("Incorrect destination name", createdDestination.name, "test") + assertEquals("Incorrect destination type", createdDestination.type, DestinationType.CHIME) + + val slack = Slack("http://abc.com") + val destinationV2 = createdDestination.copy(name = "testUpdate", type = DestinationType.SLACK, chime = null, slack = slack) + + val updatedDestination = updateDestination(destination = destinationV2) + assertEquals("Incorrect destination name", updatedDestination.name, "testUpdate") + assertEquals("Incorrect destination type", updatedDestination.type, DestinationType.SLACK) + } + + fun `test update destination with enable filter by`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create destination with enable filter by` + return + } + + val chime = Chime("http://abc.com") + val destination = Destination( + type = DestinationType.CHIME, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = chime, + slack = null, + customWebhook = null, + email = null) + + // 1. create a destination as admin user + val createdDestination = createDestination(destination, true) + + assertEquals("Incorrect destination name", createdDestination.name, "test") + assertEquals("Incorrect destination type", createdDestination.type, DestinationType.CHIME) + + val slack = Slack("http://abc.com") + val destinationV2 = createdDestination.copy(name = "testUpdate", type = DestinationType.SLACK, chime = null, slack = slack) + + val updatedDestination = updateDestination(destination = destinationV2) + assertEquals("Incorrect destination name", updatedDestination.name, "testUpdate") + assertEquals("Incorrect destination type", updatedDestination.type, DestinationType.SLACK) + } + + fun `test delete destination with disable filter by`() { + disableFilterBy() + + val chime = Chime("http://abc.com") + val destination = Destination( + type = DestinationType.CHIME, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = chime, + slack = null, + customWebhook = null, + email = null) + val createdDestination = createDestination(destination = destination) + + assertEquals("Incorrect destination name", createdDestination.name, "test") + assertEquals("Incorrect destination type", createdDestination.type, DestinationType.CHIME) + + deleteDestination(createdDestination) + + val inputMap = HashMap() + inputMap["missing"] = "_last" + inputMap["destinationType"] = "chime" + + // get destinations as admin user + val adminResponse = getDestinations(client(), inputMap) + assertEquals(0, adminResponse.size) + } + + fun `test delete destination with enable filter by`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create destination with enable filter by` + return + } + + val chime = Chime("http://abc.com") + val destination = Destination( + type = DestinationType.CHIME, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = chime, + slack = null, + customWebhook = null, + email = null) + + // 1. create a destination as admin user + val createdDestination = createDestination(destination, true) + + assertEquals("Incorrect destination name", createdDestination.name, "test") + assertEquals("Incorrect destination type", createdDestination.type, DestinationType.CHIME) + + deleteDestination(createdDestination) + + val inputMap = HashMap() + inputMap["missing"] = "_last" + inputMap["destinationType"] = "chime" + + // 2. get destinations as admin user + val adminResponse = getDestinations(client(), inputMap) + assertEquals(0, adminResponse.size) + } + fun `test get destinations with a destination type and disable filter by`() { disableFilterBy() val slack = Slack("url") diff --git a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/SecureMonitorRestApiIT.kt b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/SecureMonitorRestApiIT.kt index 3e8f865c..b1c18289 100644 --- a/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/SecureMonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/SecureMonitorRestApiIT.kt @@ -49,6 +49,96 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { } } + fun `test update monitor with disable filter by`() { + disableFilterBy() + val monitor = randomMonitor(enabled = true) + + val createdMonitor = createMonitor(monitor = monitor) + + assertNotNull("The monitor was not created", createdMonitor) + assertTrue("The monitor was not enabled", createdMonitor.enabled) + + val monitorV2 = createdMonitor.copy(enabled = false, enabledTime = null) + val updatedMonitor = updateMonitor(monitor = monitorV2) + + assertFalse("The monitor was not disabled", updatedMonitor.enabled) + } + + fun `test update monitor with enable filter by`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomMonitor(enabled = true) + + val createdMonitor = createMonitor(monitor = monitor) + + assertNotNull("The monitor was not created", createdMonitor) + assertTrue("The monitor was not enabled", createdMonitor.enabled) + + val monitorV2 = createdMonitor.copy(enabled = false, enabledTime = null) + val updatedMonitor = updateMonitor(monitor = monitorV2) + + assertFalse("The monitor was not disabled", updatedMonitor.enabled) + } + + fun `test delete monitor with disable filter by`() { + disableFilterBy() + val monitor = randomMonitor(enabled = true) + + val createdMonitor = createMonitor(monitor = monitor) + + assertNotNull("The monitor was not created", createdMonitor) + assertTrue("The monitor was not enabled", createdMonitor.enabled) + + deleteMonitor(monitor = createdMonitor) + + val search = SearchSourceBuilder().query(QueryBuilders.termQuery("_id", createdMonitor.id)).toString() + // search as "admin" - must get 0 docs + val adminSearchResponse = client().makeRequest("POST", + "$ALERTING_BASE_URI/_search", + emptyMap(), + NStringEntity(search, ContentType.APPLICATION_JSON)) + assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) + + val adminHits = createParser(XContentType.JSON.xContent(), + adminSearchResponse.entity.content).map()["hits"]!! as Map> + val adminDocsFound = adminHits["total"]?.get("value") + assertEquals("Monitor found during search", 0, adminDocsFound) + } + + fun `test delete monitor with enable filter by`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomMonitor(enabled = true) + + val createdMonitor = createMonitor(monitor = monitor) + + assertNotNull("The monitor was not created", createdMonitor) + assertTrue("The monitor was not enabled", createdMonitor.enabled) + + deleteMonitor(monitor = createdMonitor) + + val search = SearchSourceBuilder().query(QueryBuilders.termQuery("_id", createdMonitor.id)).toString() + // search as "admin" - must get 0 docs + val adminSearchResponse = client().makeRequest("POST", + "$ALERTING_BASE_URI/_search", + emptyMap(), + NStringEntity(search, ContentType.APPLICATION_JSON)) + assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) + + val adminHits = createParser(XContentType.JSON.xContent(), + adminSearchResponse.entity.content).map()["hits"]!! as Map> + val adminDocsFound = adminHits["total"]?.get("value") + assertEquals("Monitor found during search", 0, adminDocsFound) + } + fun `test query monitors with disable filter by`() { disableFilterBy() // creates monitor as "admin" user.