Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Support filterBy in update/delete destination/monitor APIs #311

Merged
merged 5 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,133 @@ package com.amazon.opendistroforelasticsearch.alerting.transport
import com.amazon.opendistroforelasticsearch.alerting.action.DeleteDestinationAction
import com.amazon.opendistroforelasticsearch.alerting.action.DeleteDestinationRequest
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.model.destination.Destination
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings
import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException
import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles
import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants
import com.amazon.opendistroforelasticsearch.commons.authuser.User
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.TransportService
import java.io.IOException

private val log = LogManager.getLogger(TransportDeleteDestinationAction::class.java)

class TransportDeleteDestinationAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<DeleteDestinationRequest, DeleteResponse>(
DeleteDestinationAction.NAME, transportService, actionFilters, ::DeleteDestinationRequest
) {

@Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
private var user: User? = null

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it }
}

override fun doExecute(task: Task, request: DeleteDestinationRequest, actionListener: ActionListener<DeleteResponse>) {
val userStr = client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES)
log.debug("User and roles string from thread context: $userStr")
user = User.parse(userStr)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.destinationId)
.setRefreshPolicy(request.refreshPolicy)

if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) {
return
}
client.threadPool().threadContext.stashContext().use {
DeleteDestinationHandler(client, actionListener, deleteRequest, user, request.destinationId).resolveUserAndStart()
}
}

inner class DeleteDestinationHandler(
private val client: Client,
private val actionListener: ActionListener<DeleteResponse>,
private val deleteRequest: DeleteRequest,
private val user: User?,
private val destinationId: String
) {

fun resolveUserAndStart() {
if (user == null) {
// Security is disabled, so we can delete the destination without issues
deleteDestination()
} else if (!filterByEnabled) {
// security is enabled and filterby is disabled.
deleteDestination()
} else {
// security is enabled and filterby is enabled.
try {
start()
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
}
}
}

fun start() {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, destinationId)
client.get(getRequest, object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(AlertingException.wrap(
ElasticsearchStatusException("Destination with $destinationId is not found", RestStatus.NOT_FOUND)))
return
}
val id = response.id
val version = response.version
val seqNo = response.seqNo.toInt()
val primaryTerm = response.primaryTerm.toInt()
val xcp = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsString)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val dest = Destination.parse(xcp, id, version, seqNo, primaryTerm)
onGetResponse(dest)
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
})
}

private fun onGetResponse(destination: Destination) {
if (!checkUserFilterByPermissions(filterByEnabled, user, destination.user, actionListener, "destination", destinationId)) {
return
} else {
deleteDestination()
}
}

private fun deleteDestination() {
client.delete(deleteRequest, object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
actionListener.onResponse(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,123 @@ package com.amazon.opendistroforelasticsearch.alerting.transport
import com.amazon.opendistroforelasticsearch.alerting.action.DeleteMonitorAction
import com.amazon.opendistroforelasticsearch.alerting.action.DeleteMonitorRequest
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings
import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException
import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles
import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants
import com.amazon.opendistroforelasticsearch.commons.authuser.User
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.delete.DeleteRequest
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.TransportService
import java.io.IOException

private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)

class TransportDeleteMonitorAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<DeleteMonitorRequest, DeleteResponse>(
DeleteMonitorAction.NAME, transportService, actionFilters, ::DeleteMonitorRequest
) {

@Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
private var user: User? = null

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it }
}

override fun doExecute(task: Task, request: DeleteMonitorRequest, actionListener: ActionListener<DeleteResponse>) {
val userStr = client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES)
log.debug("User and roles string from thread context: $userStr")
user = User.parse(userStr)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, request.monitorId)
.setRefreshPolicy(request.refreshPolicy)

if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) {
return
}
client.threadPool().threadContext.stashContext().use {
DeleteMonitorHandler(client, actionListener, deleteRequest, user, request.monitorId).resolveUserAndStart()
}
}

inner class DeleteMonitorHandler(
private val client: Client,
private val actionListener: ActionListener<DeleteResponse>,
private val deleteRequest: DeleteRequest,
private val user: User?,
private val monitorId: String
) {

fun resolveUserAndStart() {
if (user == null) {
// Security is disabled, so we can delete the destination without issues
deleteMonitor()
} else if (!filterByEnabled) {
// security is enabled and filterby is disabled.
deleteMonitor()
} else {
try {
start()
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
}
}
}

fun start() {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
client.get(getRequest, object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(AlertingException.wrap(
ElasticsearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)))
return
}
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON)
val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
onGetResponse(monitor)
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
})
}

private fun onGetResponse(monitor: Monitor) {
if (!checkUserFilterByPermissions(filterByEnabled, user, monitor.user, actionListener, "monitor", monitorId)) {
return
} else {
deleteMonitor()
}
}

private fun deleteMonitor() {
client.delete(deleteRequest, object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
actionListener.onResponse(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorRequest
import com.amazon.opendistroforelasticsearch.alerting.action.GetMonitorResponse
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings
import com.amazon.opendistroforelasticsearch.alerting.util.AlertingException
import com.amazon.opendistroforelasticsearch.alerting.util.checkFilterByUserBackendRoles
import com.amazon.opendistroforelasticsearch.alerting.util.checkUserFilterByPermissions
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants
import com.amazon.opendistroforelasticsearch.commons.authuser.User
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
Expand All @@ -29,7 +34,9 @@ import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentHelper
Expand All @@ -44,17 +51,33 @@ class TransportGetMonitorAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
val clusterService: ClusterService,
settings: Settings
) : HandledTransportAction<GetMonitorRequest, GetMonitorResponse> (
GetMonitorAction.NAME, transportService, actionFilters, ::GetMonitorRequest
) {

@Volatile private var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
private var user: User? = null

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FILTER_BY_BACKEND_ROLES) { filterByEnabled = it }
}

override fun doExecute(task: Task, getMonitorRequest: GetMonitorRequest, actionListener: ActionListener<GetMonitorResponse>) {
val userStr = client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES)
log.debug("User and roles string from thread context: $userStr")
user = User.parse(userStr)

val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getMonitorRequest.monitorId)
.version(getMonitorRequest.version)
.fetchSourceContext(getMonitorRequest.srcContext)

if (!checkFilterByUserBackendRoles(filterByEnabled, user, actionListener)) {
return
}

/*
* Remove security context before you call elasticsearch api's. By this time, permissions required
* to call this api are validated.
Expand All @@ -66,20 +89,31 @@ class TransportGetMonitorAction @Inject constructor(
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(
AlertingException.wrap(ElasticsearchStatusException("Monitor not found.", RestStatus.NOT_FOUND)))
AlertingException.wrap(ElasticsearchStatusException("Monitor not found.", RestStatus.NOT_FOUND)))
return
}

var monitor: Monitor? = null
if (!response.isSourceEmpty) {
XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON).use { xcp ->
response.sourceAsBytesRef, XContentType.JSON).use { xcp ->
monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor

// security is enabled and filterby is enabled
if (!checkUserFilterByPermissions(
filterByEnabled,
user,
monitor?.user,
actionListener,
"monitor",
getMonitorRequest.monitorId)) {
return
}
}
}

actionListener.onResponse(
GetMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, monitor)
GetMonitorResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, monitor)
)
}

Expand Down
Loading