Skip to content

Commit

Permalink
Create alert for pileup containers eligible for rule deletion
Browse files Browse the repository at this point in the history
fix Valentins comments
  • Loading branch information
amaltaro committed Aug 31, 2022
1 parent 479aefc commit dd89bd4
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 26 deletions.
22 changes: 21 additions & 1 deletion src/python/WMCore/MicroService/MSCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from WMCore.Services.ReqMgr.ReqMgr import ReqMgr
from WMCore.Services.ReqMgrAux.ReqMgrAux import ReqMgrAux
from WMCore.Services.Rucio.Rucio import Rucio
from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI


class MSCore(object):
Expand All @@ -21,7 +22,7 @@ class MSCore(object):

def __init__(self, msConfig, **kwargs):
"""
Provides setup for MSTransferor and MSMonitor classes
Provides a basic setup for all the microservices
:param config: MS service configuration
:param kwargs: can be used to skip the initialization of specific services, such as:
Expand All @@ -47,6 +48,8 @@ def __init__(self, msConfig, **kwargs):
hostUrl=self.msConfig['rucioUrl'],
authUrl=self.msConfig['rucioAuthUrl'],
configDict={"logger": self.logger, "user_agent": "wmcore-microservices"})
self.alertManagerUrl = self.msConfig.get("alertManagerUrl", None)
self.alertManagerApi = AlertManagerAPI(self.alertManagerUrl, logger=self.logger)

def unifiedConfig(self):
"""
Expand Down Expand Up @@ -88,3 +91,20 @@ def updateReportDict(self, reportDict, keyName, value):
else:
reportDict[keyName] = value
return reportDict

def sendAlert(self, alertName, severity, summary, description, service, endSecs=1 * 60 * 60):
"""
Sends an alert to Prometheus.
:param alertName: string with unique alert name
:param severity: string with the alert severity
:param summary: string with a short summary of the alert
:param description: string with a longer description of the alert
:param service: string with the service name generating this alert
:param endSecs: integer with an expiration time
:return: none
"""
try:
self.alertManagerApi.sendAlert(alertName, severity, summary, description,
service, endSecs=endSecs)
except Exception as ex:
self.logger.exception("Failed to send alert to %s. Error: %s", self.alertManagerUrl, str(ex))
2 changes: 0 additions & 2 deletions src/python/WMCore/MicroService/MSOutput/MSOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from WMCore.MicroService.MSOutput.MSOutputTemplate import MSOutputTemplate
from WMCore.MicroService.MSOutput.RelValPolicy import RelValPolicy
from WMCore.WMException import WMException
from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI


class MSOutputException(WMException):
Expand Down Expand Up @@ -105,7 +104,6 @@ def __init__(self, msConfig, mode, logger=None):
self.uConfig = {}
# service name used to route alerts via AlertManager
self.alertServiceName = "ms-output"
self.alertManagerAPI = AlertManagerAPI(self.msConfig.get("alertManagerUrl", None), logger=logger)

# RelVal output data placement policy from the service configuration
self.msConfig.setdefault("dbsUrl", "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader")
Expand Down
35 changes: 31 additions & 4 deletions src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def __init__(self, msConfig, logger=None):
self.msConfig["logDBReporter"],
logger=self.logger)
self.wmstatsSvc = WMStatsServer(self.msConfig['wmstatsUrl'], logger=self.logger)
# service name used to route alerts via AlertManager
self.alertServiceName = "ms-rulecleaner"

# Building all the Pipelines:
pName = 'plineMSTrCont'
Expand Down Expand Up @@ -675,10 +677,16 @@ def getRucioRules(self, wflow, gran, rucioAcct):
self.logger.info(msg, dataCont, wflow['RequestName'])
continue
if gran == 'container':
for rule in self.rucio.listDataRules(dataCont, account=rucioAcct):
wflow['RulesToClean'][currPline].append(rule['id'])
msg = "Found %s container-level rule to be deleted for container %s"
self.logger.info(msg, rule['id'], dataCont)
ruleIds = [rule['id'] for rule in self.rucio.listDataRules(dataCont, account=rucioAcct)]
if ruleIds and dataType in ("MCPileup", "DataPileup"):
msg = "Pileup container %s has the following container-level rules to be removed: %s."
msg += " However, this component is no longer removing pileup rules."
self.logger.info(msg, dataCont, ruleIds)
self.alertDeletablePU(wflow['RequestName'], dataCont, ruleIds)
elif ruleIds:
wflow['RulesToClean'][currPline].extend(ruleIds)
msg = "Container %s has the following container-level rules to be removed: %s"
self.logger.info(msg, dataCont, ruleIds)
elif gran == 'block':
try:
blocks = self.rucio.getBlocksInContainer(dataCont)
Expand Down Expand Up @@ -735,3 +743,22 @@ def getRequestRecords(self, reqStatus):
requests = result[0]
self.logger.info(' retrieved %s requests in status: %s', len(requests), reqStatus)
return requests

def alertDeletablePU(self, workflowName, containerName, ruleList):
"""
Send alert notifying that there is a pileup dataset eligible for rule removal
:param workflowName: string with the workflow name
:param containerName: string with the container name
:param ruleList: list of strings with the rule ids
:return: none
"""
alertName = "{}: PU eligible for deletion: {}".format(self.alertServiceName, containerName)
alertSeverity = "high"
alertSummary = "[MSRuleCleaner] Found pileup container no longer locked and available for rule deletion."
alertDescription = "Workflow: {} has the following pileup container ".format(workflowName)
alertDescription += "\n{}\n no longer in the global locks. ".format(containerName)
alertDescription += "These rules\n{}\nare eligible for deletion.".format(ruleList)
# alert to expiry in 2 days
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
service=self.alertServiceName, endSecs=2*24*60*60)
self.logger.critical(alertDescription)
15 changes: 1 addition & 14 deletions src/python/WMCore/MicroService/MSTransferor/MSTransferor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from WMCore.MicroService.MSTransferor.RequestInfo import RequestInfo
from WMCore.MicroService.MSTransferor.DataStructs.RSEQuotas import RSEQuotas
from WMCore.Services.CRIC.CRIC import CRIC
from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI


def newTransferRec(dataIn):
"""
Expand Down Expand Up @@ -105,8 +105,6 @@ def __init__(self, msConfig, logger=None):
self.blockCounter = 0
# service name used to route alerts via AlertManager
self.alertServiceName = "ms-transferor"
self.alertManagerUrl = self.msConfig.get("alertManagerUrl", None)
self.alertManagerApi = AlertManagerAPI(self.alertManagerUrl, logger=logger)

@retry(tries=3, delay=2, jitter=2)
def updateCaches(self):
Expand Down Expand Up @@ -558,17 +556,6 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod
self.logger.info(msg, wflow.getName(), dids, rseExpr, ruleAttrs)
return success, transferId

def sendAlert(self, alertName, severity, summary, description, service, endSecs=1 * 60 * 60):
"""
Send alert to Prometheus, wrap function in a try-except clause
"""
try:
# alert to expiry in an hour from now
self.alertManagerApi.sendAlert(alertName, severity, summary, description,
service, endSecs=endSecs)
except Exception as ex:
self.logger.exception("Failed to send alert to %s. Error: %s", self.alertManagerUrl, str(ex))

def alertPUMisconfig(self, workflowName):
"""
Send alert to Prometheus with PU misconfiguration error
Expand Down
5 changes: 0 additions & 5 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from WMCore.MicroService.MSUnmerged.MSUnmergedRSE import MSUnmergedRSE
from WMCore.Services.RucioConMon.RucioConMon import RucioConMon
from WMCore.Services.WMStatsServer.WMStatsServer import WMStatsServer
# from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI
from WMCore.Database.MongoDB import MongoDB
from WMCore.WMException import WMException
from Utils.Pipeline import Pipeline, Functor
Expand Down Expand Up @@ -134,10 +133,6 @@ def __init__(self, msConfig, logger=None):
msg += "set to emulate it. Crashing the service!"
raise ImportError(msg)

# TODO: Add 'alertManagerUrl' to msConfig'
# self.alertServiceName = "ms-unmerged"
# self.alertManagerAPI = AlertManagerAPI(self.msConfig.get("alertManagerUrl", None), logger=logger)

# Instantiating the Rucio Consistency Monitor Client
self.rucioConMon = RucioConMon(self.msConfig['rucioConMon'], logger=self.logger)

Expand Down

0 comments on commit dd89bd4

Please sign in to comment.