From dd89bd46b8fff931b4a4d5530c9c139ffa41e7ef Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Wed, 31 Aug 2022 10:22:40 -0400 Subject: [PATCH] Create alert for pileup containers eligible for rule deletion fix Valentins comments --- src/python/WMCore/MicroService/MSCore.py | 22 +++++++++++- .../WMCore/MicroService/MSOutput/MSOutput.py | 2 -- .../MSRuleCleaner/MSRuleCleaner.py | 35 ++++++++++++++++--- .../MicroService/MSTransferor/MSTransferor.py | 15 +------- .../MicroService/MSUnmerged/MSUnmerged.py | 5 --- 5 files changed, 53 insertions(+), 26 deletions(-) diff --git a/src/python/WMCore/MicroService/MSCore.py b/src/python/WMCore/MicroService/MSCore.py index 3f3c2df7aa5..64312239c3f 100644 --- a/src/python/WMCore/MicroService/MSCore.py +++ b/src/python/WMCore/MicroService/MSCore.py @@ -11,6 +11,7 @@ from WMCore.Services.ReqMgr.ReqMgr import ReqMgr from WMCore.Services.ReqMgrAux.ReqMgrAux import ReqMgrAux from WMCore.Services.Rucio.Rucio import Rucio +from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI class MSCore(object): @@ -21,7 +22,7 @@ class MSCore(object): def __init__(self, msConfig, **kwargs): """ - Provides setup for MSTransferor and MSMonitor classes + Provides a basic setup for all the microservices :param config: MS service configuration :param kwargs: can be used to skip the initialization of specific services, such as: @@ -47,6 +48,8 @@ def __init__(self, msConfig, **kwargs): hostUrl=self.msConfig['rucioUrl'], authUrl=self.msConfig['rucioAuthUrl'], configDict={"logger": self.logger, "user_agent": "wmcore-microservices"}) + self.alertManagerUrl = self.msConfig.get("alertManagerUrl", None) + self.alertManagerApi = AlertManagerAPI(self.alertManagerUrl, logger=self.logger) def unifiedConfig(self): """ @@ -88,3 +91,20 @@ def updateReportDict(self, reportDict, keyName, value): else: reportDict[keyName] = value return reportDict + + def sendAlert(self, alertName, severity, summary, description, service, endSecs=1 * 60 * 60): + """ + Sends an alert to Prometheus. + :param alertName: string with unique alert name + :param severity: string with the alert severity + :param summary: string with a short summary of the alert + :param description: string with a longer description of the alert + :param service: string with the service name generating this alert + :param endSecs: integer with an expiration time + :return: none + """ + try: + self.alertManagerApi.sendAlert(alertName, severity, summary, description, + service, endSecs=endSecs) + except Exception as ex: + self.logger.exception("Failed to send alert to %s. Error: %s", self.alertManagerUrl, str(ex)) diff --git a/src/python/WMCore/MicroService/MSOutput/MSOutput.py b/src/python/WMCore/MicroService/MSOutput/MSOutput.py index 97a68b70db0..6a9d7731a03 100644 --- a/src/python/WMCore/MicroService/MSOutput/MSOutput.py +++ b/src/python/WMCore/MicroService/MSOutput/MSOutput.py @@ -27,7 +27,6 @@ from WMCore.MicroService.MSOutput.MSOutputTemplate import MSOutputTemplate from WMCore.MicroService.MSOutput.RelValPolicy import RelValPolicy from WMCore.WMException import WMException -from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI class MSOutputException(WMException): @@ -105,7 +104,6 @@ def __init__(self, msConfig, mode, logger=None): self.uConfig = {} # service name used to route alerts via AlertManager self.alertServiceName = "ms-output" - self.alertManagerAPI = AlertManagerAPI(self.msConfig.get("alertManagerUrl", None), logger=logger) # RelVal output data placement policy from the service configuration self.msConfig.setdefault("dbsUrl", "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader") diff --git a/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py b/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py index b2396e9f88d..0e2953f95af 100644 --- a/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py +++ b/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py @@ -96,6 +96,8 @@ def __init__(self, msConfig, logger=None): self.msConfig["logDBReporter"], logger=self.logger) self.wmstatsSvc = WMStatsServer(self.msConfig['wmstatsUrl'], logger=self.logger) + # service name used to route alerts via AlertManager + self.alertServiceName = "ms-rulecleaner" # Building all the Pipelines: pName = 'plineMSTrCont' @@ -675,10 +677,16 @@ def getRucioRules(self, wflow, gran, rucioAcct): self.logger.info(msg, dataCont, wflow['RequestName']) continue if gran == 'container': - for rule in self.rucio.listDataRules(dataCont, account=rucioAcct): - wflow['RulesToClean'][currPline].append(rule['id']) - msg = "Found %s container-level rule to be deleted for container %s" - self.logger.info(msg, rule['id'], dataCont) + ruleIds = [rule['id'] for rule in self.rucio.listDataRules(dataCont, account=rucioAcct)] + if ruleIds and dataType in ("MCPileup", "DataPileup"): + msg = "Pileup container %s has the following container-level rules to be removed: %s." + msg += " However, this component is no longer removing pileup rules." + self.logger.info(msg, dataCont, ruleIds) + self.alertDeletablePU(wflow['RequestName'], dataCont, ruleIds) + elif ruleIds: + wflow['RulesToClean'][currPline].extend(ruleIds) + msg = "Container %s has the following container-level rules to be removed: %s" + self.logger.info(msg, dataCont, ruleIds) elif gran == 'block': try: blocks = self.rucio.getBlocksInContainer(dataCont) @@ -735,3 +743,22 @@ def getRequestRecords(self, reqStatus): requests = result[0] self.logger.info(' retrieved %s requests in status: %s', len(requests), reqStatus) return requests + + def alertDeletablePU(self, workflowName, containerName, ruleList): + """ + Send alert notifying that there is a pileup dataset eligible for rule removal + :param workflowName: string with the workflow name + :param containerName: string with the container name + :param ruleList: list of strings with the rule ids + :return: none + """ + alertName = "{}: PU eligible for deletion: {}".format(self.alertServiceName, containerName) + alertSeverity = "high" + alertSummary = "[MSRuleCleaner] Found pileup container no longer locked and available for rule deletion." + alertDescription = "Workflow: {} has the following pileup container ".format(workflowName) + alertDescription += "\n{}\n no longer in the global locks. ".format(containerName) + alertDescription += "These rules\n{}\nare eligible for deletion.".format(ruleList) + # alert to expiry in 2 days + self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, + service=self.alertServiceName, endSecs=2*24*60*60) + self.logger.critical(alertDescription) diff --git a/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py b/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py index 5917848dfe4..5e1acac389a 100644 --- a/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py +++ b/src/python/WMCore/MicroService/MSTransferor/MSTransferor.py @@ -29,7 +29,7 @@ from WMCore.MicroService.MSTransferor.RequestInfo import RequestInfo from WMCore.MicroService.MSTransferor.DataStructs.RSEQuotas import RSEQuotas from WMCore.Services.CRIC.CRIC import CRIC -from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI + def newTransferRec(dataIn): """ @@ -105,8 +105,6 @@ def __init__(self, msConfig, logger=None): self.blockCounter = 0 # service name used to route alerts via AlertManager self.alertServiceName = "ms-transferor" - self.alertManagerUrl = self.msConfig.get("alertManagerUrl", None) - self.alertManagerApi = AlertManagerAPI(self.alertManagerUrl, logger=logger) @retry(tries=3, delay=2, jitter=2) def updateCaches(self): @@ -558,17 +556,6 @@ def makeTransferRucio(self, wflow, dataIn, dids, dataSize, grouping, copies, nod self.logger.info(msg, wflow.getName(), dids, rseExpr, ruleAttrs) return success, transferId - def sendAlert(self, alertName, severity, summary, description, service, endSecs=1 * 60 * 60): - """ - Send alert to Prometheus, wrap function in a try-except clause - """ - try: - # alert to expiry in an hour from now - self.alertManagerApi.sendAlert(alertName, severity, summary, description, - service, endSecs=endSecs) - except Exception as ex: - self.logger.exception("Failed to send alert to %s. Error: %s", self.alertManagerUrl, str(ex)) - def alertPUMisconfig(self, workflowName): """ Send alert to Prometheus with PU misconfiguration error diff --git a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py index aead40ae9ee..25b5d097850 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py +++ b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py @@ -36,7 +36,6 @@ from WMCore.MicroService.MSUnmerged.MSUnmergedRSE import MSUnmergedRSE from WMCore.Services.RucioConMon.RucioConMon import RucioConMon from WMCore.Services.WMStatsServer.WMStatsServer import WMStatsServer -# from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI from WMCore.Database.MongoDB import MongoDB from WMCore.WMException import WMException from Utils.Pipeline import Pipeline, Functor @@ -134,10 +133,6 @@ def __init__(self, msConfig, logger=None): msg += "set to emulate it. Crashing the service!" raise ImportError(msg) - # TODO: Add 'alertManagerUrl' to msConfig' - # self.alertServiceName = "ms-unmerged" - # self.alertManagerAPI = AlertManagerAPI(self.msConfig.get("alertManagerUrl", None), logger=logger) - # Instantiating the Rucio Consistency Monitor Client self.rucioConMon = RucioConMon(self.msConfig['rucioConMon'], logger=self.logger)