Skip to content

Commit

Permalink
Add checks for StatusAdvanceTimeout expiration and send alarms.
Browse files Browse the repository at this point in the history
Rename SatusAdvanceTimeout && Change _isStatusAdvanceExpired signature && Fix log messages.

Fix Alert call.

Fix _isStatusAdvanceExpired call.

Typo

Query only for last status transition time && Fix alarm text.

Move the call to _isStatusAdvanceExpired to a single place.

Add configuration flag for enabling alarms from msConfig.

Rename _getStatusTransitionTime
  • Loading branch information
todor-ivanov committed Dec 2, 2022
1 parent dfc07c2 commit e98ff86
Showing 1 changed file with 90 additions and 16 deletions.
106 changes: 90 additions & 16 deletions src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def __init__(self, msConfig, logger=None):
self.msConfig.setdefault("rucioWmaAccount", "wma_test")
self.msConfig.setdefault("rucioMStrAccount", "wmcore_transferor")
self.msConfig.setdefault('enableRealMode', False)
self.msConfig.setdefault('archiveDelayHours', 24 * 2)
self.msConfig.setdefault('archiveAlarmHours', 24 * 30)
self.msConfig.setdefault("sendNotification", False)

self.currThread = None
self.currThreadIdent = None
Expand Down Expand Up @@ -316,37 +319,42 @@ def _dispatchWflow(self, wflow):
elif wflow['RequestStatus'] == 'announced' and not wflow['ParentageResolved']:
# NOTE: We skip workflows which are not having 'ParentageResolved'
# flag, but we still need some proper logging for them.
msg = "Skipping workflow: %s - 'ParentageResolved' flag set to false."
msg = "Skipping workflow: %s - 'ParentageResolved' flag set to false." % wflow['RequestName']
msg += " Will retry again in the next cycle."
self.logger.info(msg, wflow['RequestName'])
self.logger.info(msg)
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
elif wflow['RequestStatus'] == 'announced' and not wflow['TransferDone']:
# NOTE: We skip workflows which have not yet finalised their TransferStatus
# in MSOutput, but we still need some proper logging for them.
msg = "Skipping workflow: %s - 'TransferStatus' is 'pending' or 'TransferInfo' is missing in MSOutput."
msg = "Skipping workflow: %s - 'TransferStatus' is 'pending' or 'TransferInfo' is missing in MSOutput." % wflow['RequestName']
msg += " Will retry again in the next cycle."
self.logger.info(msg, wflow['RequestName'])
self.logger.info(msg)
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
elif wflow['RequestStatus'] == 'announced' and not wflow['TransferTape']:
# NOTE: We skip workflows which have not yet finalised their tape transfers.
# (i.e. even if a single output which is supposed to be covered
# by a tape rule is in any of the following transient states:
# {REPLICATING, STUCK, SUSPENDED, WAITING_APPROVAL}.)
# We still need some proper logging for them.
msg = "Skipping workflow: %s - tape transfers are not yet completed."
msg = "Skipping workflow: %s - tape transfers are not yet completed." % wflow['RequestName']
msg += " Will retry again in the next cycle."
self.logger.info(msg, wflow['RequestName'])
self.logger.info(msg)
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
elif wflow['RequestStatus'] == 'announced':
for pline in self.cleanuplines:
try:
pline.run(wflow)
except MSRuleCleanerResolveParentError as ex:
msg = "%s: Parentage Resolve Error: %s. "
msg = "%s: Parentage Resolve Error: %s. " % (pline.name, str(ex))
msg += "Will retry again in the next cycle."
self.logger.error(msg, pline.name, str(ex))
self.logger.error(msg)
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
continue
except Exception as ex:
msg = "%s: General error from pipeline. Workflow: %s. Error: \n%s. "
msg = "%s: General error from pipeline. Workflow: %s. Error: \n%s. " % (pline.name, wflow['RequestName'], str(ex))
msg += "\nWill retry again in the next cycle."
self.logger.exception(msg, pline.name, wflow['RequestName'], str(ex))
self.logger.exception(msg)
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
continue
if wflow['CleanupStatus'][pline.name]:
self.wfCounters['cleaned'][pline.name] += 1
Expand Down Expand Up @@ -404,6 +412,37 @@ def setPlineMarker(self, wflow, pName):
wflow['CleanupStatus'][pName] = False
return wflow

def _getLastStatusTransitionTime(self, wflow):
"""
A method to return status transition time for a workflow.
:param wflow: A MSRuleCleanerWorkflow instance
:return: The UTC timestamp for the transition if it exists, None otherwise
"""
try:
for trans in wflow['RequestTransition'][::-1]:
if trans['Status'] == wflow['RequestStatus']:
return trans['UpdateTime']
except KeyError:
self.logger.error("Missing or broken status transition history for %s", wflow['RequestName'])
return None

def _isStatusAdvanceExpired(self, wflow):
"""
A method to check if a given status transition has timed out e.g. before
an alarm being set. The timeout should be configurable and hence taken
from the instance attributes.
:param wflow: A MSRuleCleanerWorkflow instance
:return: Bool: True if status alarm time has expired False otherwise
"""
statusAdvanceExpired = False
currentTime = int(time.time())
alarmThreshold = self.msConfig['archiveAlarmHours'] * 3600
transitionTime = self._getLastStatusTransitionTime(wflow)

if transitionTime and (currentTime - transitionTime) > alarmThreshold:
statusAdvanceExpired = True
return statusAdvanceExpired

def _checkClean(self, wflow):
"""
An auxiliary function used to only check the temporary cleanup status.
Expand Down Expand Up @@ -520,12 +559,9 @@ def _checkArchDelayExpired(self, wflow):
archDelayExpired = False
currentTime = int(time.time())
threshold = self.msConfig['archiveDelayHours'] * 3600
try:
lastTransitionTime = wflow['RequestTransition'][-1]['UpdateTime']
if lastTransitionTime and (currentTime - lastTransitionTime) > threshold:
archDelayExpired = True
except KeyError:
self.logger.debug("Could not find status transition history for %s", wflow['RequestName'])
lastTransitionTime = self._getLastStatusTransitionTime(wflow)
if lastTransitionTime and (currentTime - lastTransitionTime) > threshold:
archDelayExpired = True
return archDelayExpired

def setArchivalDelayExpired(self, wflow):
Expand All @@ -545,12 +581,15 @@ def archive(self, wflow):
# Make all the needed checks before trying to archive
if not (wflow['IsClean'] or wflow['ForceArchive']):
msg = "Not properly cleaned workflow: %s" % wflow['RequestName']
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
raise MSRuleCleanerArchivalSkip(msg)
if not wflow['TargetStatus']:
msg = "Could not determine which archival status to target for workflow: %s" % wflow['RequestName']
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
raise MSRuleCleanerArchivalError(msg)
if not wflow['IsLogDBClean']:
msg = "LogDB records have not been cleaned for workflow: %s" % wflow['RequestName']
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
raise MSRuleCleanerArchivalSkip(msg)
if not wflow['IsArchivalDelayExpired']:
msg = "Archival delay period has not yet expired for workflow: %s." % wflow['RequestName']
Expand Down Expand Up @@ -755,6 +794,10 @@ def alertDeletablePU(self, workflowName, containerName, ruleList):
:param ruleList: list of strings with the rule ids
:return: none
"""
# Check if alarms are enabled for this service:
if not self.msConfig["sendNotification"]:
return

alertName = "{}: PU eligible for deletion: {}".format(self.alertServiceName, containerName)
alertSeverity = "high"
alertSummary = "[MSRuleCleaner] Found pileup container no longer locked and available for rule deletion."
Expand All @@ -765,3 +808,34 @@ def alertDeletablePU(self, workflowName, containerName, ruleList):
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
service=self.alertServiceName, endSecs=self.alertExpiration)
self.logger.critical(alertDescription)

def alertStatusAdvanceExpired(self, wflow, additionalInfo=None):
"""
Send alert notifying that a workflow has spend too much time in a given status
:param wflow: MSRuleCleanerWorkflow instance
:additionalInfo: String with additional information.
:return: none
"""
# Check if alarms are enabled for this service:
if not self.msConfig["sendNotification"]:
return

# Check if alarm threshold has expired:
if not self._isStatusAdvanceExpired(wflow):
return
lastTransitionTime = self._getLastStatusTransitionTime(wflow)
alertName = "{}: Not archived workflow: {}".format(self.alertServiceName, wflow['RequestName'])
alertSeverity = "high"
alertDescription = "Found a workflow not archived since {}.".format(time.ctime(lastTransitionTime))
alertDescription += "\nWorkflow: {}".format(wflow['RequestName'])
alertDescription += "\nHas exceeded the Status Advance Timeout of: {} hours".format(self.msConfig['archiveAlarmHours'])
alertDescription += " for its current status: {}.".format(wflow['RequestStatus'])
if additionalInfo:
alertDescription += "\nAdditional info: {}".format(additionalInfo)
alertDescription += "\nActions need to be taken!"
alertSummary = "[MSRuleCleaner] Found workflow: {} not archived since {}.".format(wflow['RequestName'], time.ctime(lastTransitionTime))

# alert to expiry in 2 days
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
service=self.alertServiceName, endSecs=self.alertExpiration)
self.logger.critical(alertDescription)

0 comments on commit e98ff86

Please sign in to comment.