diff --git a/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py b/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py index ed9598068df..f1e0fc24b4f 100644 --- a/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py +++ b/src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py @@ -85,6 +85,9 @@ def __init__(self, msConfig, logger=None): self.msConfig.setdefault("rucioWmaAccount", "wma_test") self.msConfig.setdefault("rucioMStrAccount", "wmcore_transferor") self.msConfig.setdefault('enableRealMode', False) + self.msConfig.setdefault('archiveDelayHours', 24 * 2) + self.msConfig.setdefault('archiveAlarmHours', 24 * 30) + self.msConfig.setdefault("sendNotification", False) self.currThread = None self.currThreadIdent = None @@ -316,37 +319,42 @@ def _dispatchWflow(self, wflow): elif wflow['RequestStatus'] == 'announced' and not wflow['ParentageResolved']: # NOTE: We skip workflows which are not having 'ParentageResolved' # flag, but we still need some proper logging for them. - msg = "Skipping workflow: %s - 'ParentageResolved' flag set to false." + msg = "Skipping workflow: %s - 'ParentageResolved' flag set to false." % wflow['RequestName'] msg += " Will retry again in the next cycle." - self.logger.info(msg, wflow['RequestName']) + self.logger.info(msg) + self.alertStatusAdvanceExpired(wflow, additionalInfo=msg) elif wflow['RequestStatus'] == 'announced' and not wflow['TransferDone']: # NOTE: We skip workflows which have not yet finalised their TransferStatus # in MSOutput, but we still need some proper logging for them. - msg = "Skipping workflow: %s - 'TransferStatus' is 'pending' or 'TransferInfo' is missing in MSOutput." + msg = "Skipping workflow: %s - 'TransferStatus' is 'pending' or 'TransferInfo' is missing in MSOutput." % wflow['RequestName'] msg += " Will retry again in the next cycle." - self.logger.info(msg, wflow['RequestName']) + self.logger.info(msg) + self.alertStatusAdvanceExpired(wflow, additionalInfo=msg) elif wflow['RequestStatus'] == 'announced' and not wflow['TransferTape']: # NOTE: We skip workflows which have not yet finalised their tape transfers. # (i.e. even if a single output which is supposed to be covered # by a tape rule is in any of the following transient states: # {REPLICATING, STUCK, SUSPENDED, WAITING_APPROVAL}.) # We still need some proper logging for them. - msg = "Skipping workflow: %s - tape transfers are not yet completed." + msg = "Skipping workflow: %s - tape transfers are not yet completed." % wflow['RequestName'] msg += " Will retry again in the next cycle." - self.logger.info(msg, wflow['RequestName']) + self.logger.info(msg) + self.alertStatusAdvanceExpired(wflow, additionalInfo=msg) elif wflow['RequestStatus'] == 'announced': for pline in self.cleanuplines: try: pline.run(wflow) except MSRuleCleanerResolveParentError as ex: - msg = "%s: Parentage Resolve Error: %s. " + msg = "%s: Parentage Resolve Error: %s. " % (pline.name, str(ex)) msg += "Will retry again in the next cycle." - self.logger.error(msg, pline.name, str(ex)) + self.logger.error(msg) + self.alertStatusAdvanceExpired(wflow, additionalInfo=msg) continue except Exception as ex: - msg = "%s: General error from pipeline. Workflow: %s. Error: \n%s. " + msg = "%s: General error from pipeline. Workflow: %s. Error: \n%s. " % (pline.name, wflow['RequestName'], str(ex)) msg += "\nWill retry again in the next cycle." - self.logger.exception(msg, pline.name, wflow['RequestName'], str(ex)) + self.logger.exception(msg) + self.alertStatusAdvanceExpired(wflow, additionalInfo=msg) continue if wflow['CleanupStatus'][pline.name]: self.wfCounters['cleaned'][pline.name] += 1 @@ -404,6 +412,37 @@ def setPlineMarker(self, wflow, pName): wflow['CleanupStatus'][pName] = False return wflow + def _getLastStatusTransitionTime(self, wflow): + """ + A method to return status transition time for a workflow. + :param wflow: A MSRuleCleanerWorkflow instance + :return: The UTC timestamp for the transition if it exists, None otherwise + """ + try: + for trans in wflow['RequestTransition'][::-1]: + if trans['Status'] == wflow['RequestStatus']: + return trans['UpdateTime'] + except KeyError: + self.logger.error("Missing or broken status transition history for %s", wflow['RequestName']) + return None + + def _isStatusAdvanceExpired(self, wflow): + """ + A method to check if a given status transition has timed out e.g. before + an alarm being set. The timeout should be configurable and hence taken + from the instance attributes. + :param wflow: A MSRuleCleanerWorkflow instance + :return: Bool: True if status alarm time has expired False otherwise + """ + statusAdvanceExpired = False + currentTime = int(time.time()) + alarmThreshold = self.msConfig['archiveAlarmHours'] * 3600 + transitionTime = self._getLastStatusTransitionTime(wflow) + + if transitionTime and (currentTime - transitionTime) > alarmThreshold: + statusAdvanceExpired = True + return statusAdvanceExpired + def _checkClean(self, wflow): """ An auxiliary function used to only check the temporary cleanup status. @@ -520,12 +559,9 @@ def _checkArchDelayExpired(self, wflow): archDelayExpired = False currentTime = int(time.time()) threshold = self.msConfig['archiveDelayHours'] * 3600 - try: - lastTransitionTime = wflow['RequestTransition'][-1]['UpdateTime'] - if lastTransitionTime and (currentTime - lastTransitionTime) > threshold: - archDelayExpired = True - except KeyError: - self.logger.debug("Could not find status transition history for %s", wflow['RequestName']) + lastTransitionTime = self._getLastStatusTransitionTime(wflow) + if lastTransitionTime and (currentTime - lastTransitionTime) > threshold: + archDelayExpired = True return archDelayExpired def setArchivalDelayExpired(self, wflow): @@ -545,12 +581,15 @@ def archive(self, wflow): # Make all the needed checks before trying to archive if not (wflow['IsClean'] or wflow['ForceArchive']): msg = "Not properly cleaned workflow: %s" % wflow['RequestName'] + self.alertStatusAdvanceExpired(wflow, additionalInfo=msg) raise MSRuleCleanerArchivalSkip(msg) if not wflow['TargetStatus']: msg = "Could not determine which archival status to target for workflow: %s" % wflow['RequestName'] + self.alertStatusAdvanceExpired(wflow, additionalInfo=msg) raise MSRuleCleanerArchivalError(msg) if not wflow['IsLogDBClean']: msg = "LogDB records have not been cleaned for workflow: %s" % wflow['RequestName'] + self.alertStatusAdvanceExpired(wflow, additionalInfo=msg) raise MSRuleCleanerArchivalSkip(msg) if not wflow['IsArchivalDelayExpired']: msg = "Archival delay period has not yet expired for workflow: %s." % wflow['RequestName'] @@ -755,6 +794,10 @@ def alertDeletablePU(self, workflowName, containerName, ruleList): :param ruleList: list of strings with the rule ids :return: none """ + # Check if alarms are enabled for this service: + if not self.msConfig["sendNotification"]: + return + alertName = "{}: PU eligible for deletion: {}".format(self.alertServiceName, containerName) alertSeverity = "high" alertSummary = "[MSRuleCleaner] Found pileup container no longer locked and available for rule deletion." @@ -765,3 +808,34 @@ def alertDeletablePU(self, workflowName, containerName, ruleList): self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, service=self.alertServiceName, endSecs=self.alertExpiration) self.logger.critical(alertDescription) + + def alertStatusAdvanceExpired(self, wflow, additionalInfo=None): + """ + Send alert notifying that a workflow has spend too much time in a given status + :param wflow: MSRuleCleanerWorkflow instance + :additionalInfo: String with additional information. + :return: none + """ + # Check if alarms are enabled for this service: + if not self.msConfig["sendNotification"]: + return + + # Check if alarm threshold has expired: + if not self._isStatusAdvanceExpired(wflow): + return + lastTransitionTime = self._getLastStatusTransitionTime(wflow) + alertName = "{}: Not archived workflow: {}".format(self.alertServiceName, wflow['RequestName']) + alertSeverity = "high" + alertDescription = "Found a workflow not archived since {}.".format(time.ctime(lastTransitionTime)) + alertDescription += "\nWorkflow: {}".format(wflow['RequestName']) + alertDescription += "\nHas exceeded the Status Advance Timeout of: {} hours".format(self.msConfig['archiveAlarmHours']) + alertDescription += " for its current status: {}.".format(wflow['RequestStatus']) + if additionalInfo: + alertDescription += "\nAdditional info: {}".format(additionalInfo) + alertDescription += "\nActions need to be taken!" + alertSummary = "[MSRuleCleaner] Found workflow: {} not archived since {}.".format(wflow['RequestName'], time.ctime(lastTransitionTime)) + + # alert to expiry in 2 days + self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, + service=self.alertServiceName, endSecs=self.alertExpiration) + self.logger.critical(alertDescription)