Skip to content

Commit

Permalink
Create alert for pileup containers eligible for rule deletion
Browse files Browse the repository at this point in the history
fix Valentins comments

Make alert expiration configurable
  • Loading branch information
amaltaro committed Aug 31, 2022
1 parent 479aefc commit 91d391e
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 26 deletions.
22 changes: 21 additions & 1 deletion src/python/WMCore/MicroService/MSCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from WMCore.Services.ReqMgr.ReqMgr import ReqMgr
from WMCore.Services.ReqMgrAux.ReqMgrAux import ReqMgrAux
from WMCore.Services.Rucio.Rucio import Rucio
from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI


class MSCore(object):
Expand All @@ -21,7 +22,7 @@ class MSCore(object):

def __init__(self, msConfig, **kwargs):
"""
Provides setup for MSTransferor and MSMonitor classes
Provides a basic setup for all the microservices
:param config: MS service configuration
:param kwargs: can be used to skip the initialization of specific services, such as:
Expand All @@ -47,6 +48,8 @@ def __init__(self, msConfig, **kwargs):
hostUrl=self.msConfig['rucioUrl'],
authUrl=self.msConfig['rucioAuthUrl'],
configDict={"logger": self.logger, "user_agent": "wmcore-microservices"})
self.alertManagerUrl = self.msConfig.get("alertManagerUrl", None)
self.alertManagerApi = AlertManagerAPI(self.alertManagerUrl, logger=self.logger)

def unifiedConfig(self):
"""
Expand Down Expand Up @@ -88,3 +91,20 @@ def updateReportDict(self, reportDict, keyName, value):
else:
reportDict[keyName] = value
return reportDict

def sendAlert(self, alertName, severity, summary, description, service, endSecs=1 * 60 * 60):
"""
Sends an alert to Prometheus.
:param alertName: string with unique alert name
:param severity: string with the alert severity
:param summary: string with a short summary of the alert
:param description: string with a longer description of the alert
:param service: string with the service name generating this alert
:param endSecs: integer with an expiration time
:return: none
"""
try:
self.alertManagerApi.sendAlert(alertName, severity, summary, description,
service, endSecs=endSecs)
except Exception as ex:
self.logger.exception("Failed to send alert to %s. Error: %s", self.alertManagerUrl, str(ex))
2 changes: 0 additions & 2 deletions src/python/WMCore/MicroService/MSOutput/MSOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from WMCore.MicroService.MSOutput.MSOutputTemplate import MSOutputTemplate
from WMCore.MicroService.MSOutput.RelValPolicy import RelValPolicy
from WMCore.WMException import WMException
from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI


class MSOutputException(WMException):
Expand Down Expand Up @@ -105,7 +104,6 @@ def __init__(self, msConfig, mode, logger=None):
self.uConfig = {}
# service name used to route alerts via AlertManager
self.alertServiceName = "ms-output"
self.alertManagerAPI = AlertManagerAPI(self.msConfig.get("alertManagerUrl", None), logger=logger)

# RelVal output data placement policy from the service configuration
self.msConfig.setdefault("dbsUrl", "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader")
Expand Down
37 changes: 33 additions & 4 deletions src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ def __init__(self, msConfig, logger=None):
self.msConfig["logDBReporter"],
logger=self.logger)
self.wmstatsSvc = WMStatsServer(self.msConfig['wmstatsUrl'], logger=self.logger)
# service name used to route alerts via AlertManager
self.alertServiceName = "ms-rulecleaner"
# 2 days of expiration time
self.alertExpiration = self.msConfig.get("alertExpirSecs", 2 * 24 * 60 * 60)

# Building all the Pipelines:
pName = 'plineMSTrCont'
Expand Down Expand Up @@ -675,10 +679,16 @@ def getRucioRules(self, wflow, gran, rucioAcct):
self.logger.info(msg, dataCont, wflow['RequestName'])
continue
if gran == 'container':
for rule in self.rucio.listDataRules(dataCont, account=rucioAcct):
wflow['RulesToClean'][currPline].append(rule['id'])
msg = "Found %s container-level rule to be deleted for container %s"
self.logger.info(msg, rule['id'], dataCont)
ruleIds = [rule['id'] for rule in self.rucio.listDataRules(dataCont, account=rucioAcct)]
if ruleIds and dataType in ("MCPileup", "DataPileup"):
msg = "Pileup container %s has the following container-level rules to be removed: %s."
msg += " However, this component is no longer removing pileup rules."
self.logger.info(msg, dataCont, ruleIds)
self.alertDeletablePU(wflow['RequestName'], dataCont, ruleIds)
elif ruleIds:
wflow['RulesToClean'][currPline].extend(ruleIds)
msg = "Container %s has the following container-level rules to be removed: %s"
self.logger.info(msg, dataCont, ruleIds)
elif gran == 'block':
try:
blocks = self.rucio.getBlocksInContainer(dataCont)
Expand Down Expand Up @@ -735,3 +745,22 @@ def getRequestRecords(self, reqStatus):
requests = result[0]
self.logger.info(' retrieved %s requests in status: %s', len(requests), reqStatus)
return requests

def alertDeletablePU(self, workflowName, containerName, ruleList):
"""
Send alert notifying that there is a pileup dataset eligible for rule removal
:param workflowName: string with the workflow name
:param containerName: string with the container name
:param ruleList: list of strings with the rule ids
:return: none
"""
alertName = "{}: PU eligible for deletion: {}".format(self.alertServiceName, containerName)
alertSeverity = "high"
alertSummary = "[MSRuleCleaner] Found pileup container no longer locked and available for rule deletion."
alertDescription = "Workflow: {} has the following pileup container ".format(workflowName)
alertDescription += "\n{}\n no longer in the global locks. ".format(containerName)
alertDescription += "These rules\n{}\nare eligible for deletion.".format(ruleList)
# alert to expiry in 2 days
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
service=self.alertServiceName, endSecs=self.alertExpiration)
self.logger.critical(alertDescription)
15 changes: 1 addition & 14 deletions src/python/WMCore/MicroService/MSTransferor/MSTransferor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from WMCore.MicroService.MSTransferor.RequestInfo import RequestInfo
from WMCore.MicroService.MSTransferor.DataStructs.RSEQuotas import RSEQuotas
from WMCore.Services.CRIC.CRIC import CRIC
from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI


def newTransferRec(dataIn):
"""
Expand Down Expand Up @@ -105,8 +105,6 @@ def __init__(self, msConfig, logger=None):
self.blockCounter = 0
# service name used to route alerts via AlertManager
self.alertServiceName = "ms-transferor"
self.alertManagerUrl = self.msConfig.get("alertManagerUrl", None)
self.alertManagerApi = AlertManagerAPI(self.alertManagerUrl, logger=logger)

@retry(tries=3, delay=2, jitter=2)
def updateCaches(self):
Expand Down Expand Up @@ -558,17 +556,6 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod
self.logger.info(msg, wflow.getName(), dids, rseExpr, ruleAttrs)
return success, transferId

def sendAlert(self, alertName, severity, summary, description, service, endSecs=1 * 60 * 60):
"""
Send alert to Prometheus, wrap function in a try-except clause
"""
try:
# alert to expiry in an hour from now
self.alertManagerApi.sendAlert(alertName, severity, summary, description,
service, endSecs=endSecs)
except Exception as ex:
self.logger.exception("Failed to send alert to %s. Error: %s", self.alertManagerUrl, str(ex))

def alertPUMisconfig(self, workflowName):
"""
Send alert to Prometheus with PU misconfiguration error
Expand Down
5 changes: 0 additions & 5 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from WMCore.MicroService.MSUnmerged.MSUnmergedRSE import MSUnmergedRSE
from WMCore.Services.RucioConMon.RucioConMon import RucioConMon
from WMCore.Services.WMStatsServer.WMStatsServer import WMStatsServer
# from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI
from WMCore.Database.MongoDB import MongoDB
from WMCore.WMException import WMException
from Utils.Pipeline import Pipeline, Functor
Expand Down Expand Up @@ -134,10 +133,6 @@ def __init__(self, msConfig, logger=None):
msg += "set to emulate it. Crashing the service!"
raise ImportError(msg)

# TODO: Add 'alertManagerUrl' to msConfig'
# self.alertServiceName = "ms-unmerged"
# self.alertManagerAPI = AlertManagerAPI(self.msConfig.get("alertManagerUrl", None), logger=logger)

# Instantiating the Rucio Consistency Monitor Client
self.rucioConMon = RucioConMon(self.msConfig['rucioConMon'], logger=self.logger)

Expand Down

0 comments on commit 91d391e

Please sign in to comment.