Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Support filterBy in update/delete destination/monitor APIs (#311)
Browse files Browse the repository at this point in the history
* Support filterBy in update/delete destination/monitor APIs

* cleanup code

* fix message

* add comments to checkUserFilterByPermissions function
  • Loading branch information
lezzago authored Dec 2, 2020
1 parent 57995b6 commit 2451e21
Show file tree
Hide file tree
Showing 9 changed files with 550 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,133 @@ package com.amazon.opendistroforelasticsearch.alerting.transport
import com.amazon.opendistroforelasticsearch.alerting.action.DeleteDestinationAction
import com.amazon.opendistroforelasticsearch.alerting.action.DeleteDestinationRequest
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.model.destination.Destination
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings
import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException
import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles
import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants
import com.amazon.opendistroforelasticsearch.commons.authuser.User
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.TransportService
import java.io.IOException

private val log = LogManager.getLogger(TransportDeleteDestinationAction::class.java)

class TransportDeleteDestinationAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<DeleteDestinationRequest, DeleteResponse>(
DeleteDestinationAction.NAME, transportService, actionFilters, ::DeleteDestinationRequest
) {

@Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
private var user: User? = null

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it }
}

override fun doExecute(task: Task, request: DeleteDestinationRequest, actionListener: ActionListener<DeleteResponse>) {
val userStr = client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES)
log.debug("User and roles string from thread context: $userStr")
user = User.parse(userStr)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.destinationId)
.setRefreshPolicy(request.refreshPolicy)

if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) {
return
}
client.threadPool().threadContext.stashContext().use {
DeleteDestinationHandler(client, actionListener, deleteRequest, user, request.destinationId).resolveUserAndStart()
}
}

inner class DeleteDestinationHandler(
private val client: Client,
private val actionListener: ActionListener<DeleteResponse>,
private val deleteRequest: DeleteRequest,
private val user: User?,
private val destinationId: String
) {

fun resolveUserAndStart() {
if (user == null) {
// Security is disabled, so we can delete the destination without issues
deleteDestination()
} else if (!filterByEnabled) {
// security is enabled and filterby is disabled.
deleteDestination()
} else {
// security is enabled and filterby is enabled.
try {
start()
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
}
}
}

fun start() {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, destinationId)
client.get(getRequest, object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(AlertingException.wrap(
ElasticsearchStatusException("Destination with $destinationId is not found", RestStatus.NOT_FOUND)))
return
}
val id = response.id
val version = response.version
val seqNo = response.seqNo.toInt()
val primaryTerm = response.primaryTerm.toInt()
val xcp = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsString)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val dest = Destination.parse(xcp, id, version, seqNo, primaryTerm)
onGetResponse(dest)
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
})
}

private fun onGetResponse(destination: Destination) {
if (!checkUserFilterByPermissions(filterByEnabled, user, destination.user, actionListener, "destination", destinationId)) {
return
} else {
deleteDestination()
}
}

private fun deleteDestination() {
client.delete(deleteRequest, object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
actionListener.onResponse(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,123 @@ package com.amazon.opendistroforelasticsearch.alerting.transport
import com.amazon.opendistroforelasticsearch.alerting.action.DeleteMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.action.DeleteMonitorRequest
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings
import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException
import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles
import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants
import com.amazon.opendistroforelasticsearch.commons.authuser.User
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.TransportService
import java.io.IOException

private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)

class TransportDeleteMonitorAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<DeleteMonitorRequest, DeleteResponse>(
DeleteMonitorAction.NAME, transportService, actionFilters, ::DeleteMonitorRequest
) {

@Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
private var user: User? = null

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it }
}

override fun doExecute(task: Task, request: DeleteMonitorRequest, actionListener: ActionListener<DeleteResponse>) {
val userStr = client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES)
log.debug("User and roles string from thread context: $userStr")
user = User.parse(userStr)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.monitorId)
.setRefreshPolicy(request.refreshPolicy)

if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) {
return
}
client.threadPool().threadContext.stashContext().use {
DeleteMonitorHandler(client, actionListener, deleteRequest, user, request.monitorId).resolveUserAndStart()
}
}

inner class DeleteMonitorHandler(
private val client: Client,
private val actionListener: ActionListener<DeleteResponse>,
private val deleteRequest: DeleteRequest,
private val user: User?,
private val monitorId: String
) {

fun resolveUserAndStart() {
if (user == null) {
// Security is disabled, so we can delete the destination without issues
deleteMonitor()
} else if (!filterByEnabled) {
// security is enabled and filterby is disabled.
deleteMonitor()
} else {
try {
start()
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
}
}
}

fun start() {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
client.get(getRequest, object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(AlertingException.wrap(
ElasticsearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)))
return
}
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON)
val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
onGetResponse(monitor)
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
})
}

private fun onGetResponse(monitor: Monitor) {
if (!checkUserFilterByPermissions(filterByEnabled, user, monitor.user, actionListener, "monitor", monitorId)) {
return
} else {
deleteMonitor()
}
}

private fun deleteMonitor() {
client.delete(deleteRequest, object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
actionListener.onResponse(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorRequest
import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorResponse
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings
import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException
import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles
import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants
import com.amazon.opendistroforelasticsearch.commons.authuser.User
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
Expand All @@ -29,7 +34,9 @@ import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentHelper
Expand All @@ -44,17 +51,33 @@ class TransportGetMonitorAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
val clusterService: ClusterService,
settings: Settings
) : HandledTransportAction<GetMonitorRequest, GetMonitorResponse> (
GetMonitorAction.NAME, transportService, actionFilters, ::GetMonitorRequest
) {

@Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
private var user: User? = null

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it }
}

override fun doExecute(task: Task, getMonitorRequest: GetMonitorRequest, actionListener: ActionListener<GetMonitorResponse>) {
val userStr = client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES)
log.debug("User and roles string from thread context: $userStr")
user = User.parse(userStr)

val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getMonitorRequest.monitorId)
.version(getMonitorRequest.version)
.fetchSourceContext(getMonitorRequest.srcContext)

if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) {
return
}

/*
* Remove security context before you call elasticsearch api's. By this time, permissions required
* to call this api are validated.
Expand All @@ -66,20 +89,31 @@ class TransportGetMonitorAction @Inject constructor(
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(
AlertingException.wrap(ElasticsearchStatusException("Monitor not found.", RestStatus.NOT_FOUND)))
AlertingException.wrap(ElasticsearchStatusException("Monitor not found.", RestStatus.NOT_FOUND)))
return
}

var monitor: Monitor? = null
if (!response.isSourceEmpty) {
XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON).use { xcp ->
response.sourceAsBytesRef, XContentType.JSON).use { xcp ->
monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor

// security is enabled and filterby is enabled
if (!checkUserFilterByPermissions(
filterByEnabled,
user,
monitor?.user,
actionListener,
"monitor",
getMonitorRequest.monitorId)) {
return
}
}
}

actionListener.onResponse(
GetMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, monitor)
GetMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, monitor)
)
}

Expand Down
Loading

0 comments on commit 2451e21

Please sign in to comment.