Skip to content

Commit

Permalink
Add checks for StatusAdvanceTimeout expiration and send alarms.
Browse files Browse the repository at this point in the history
Rename SatusAdvanceTimeout && Change _isStatusAdvanceExpired signature && Fix log messages.

Fix Alert call.

Fix _isStatusAdvanceExpired call.

Typo
  • Loading branch information
todor-ivanov committed Dec 1, 2022
1 parent dfc07c2 commit 8ed856a
Showing 1 changed file with 91 additions and 16 deletions.
107 changes: 91 additions & 16 deletions src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def __init__(self, msConfig, logger=None):
self.msConfig.setdefault("rucioWmaAccount", "wma_test")
self.msConfig.setdefault("rucioMStrAccount", "wmcore_transferor")
self.msConfig.setdefault('enableRealMode', False)
self.msConfig.setdefault('archiveDelayHours', 24 * 2)
self.msConfig.setdefault('archiveAlarmHours', 24 * 30)

self.currThread = None
self.currThreadIdent = None
Expand Down Expand Up @@ -316,37 +318,47 @@ def _dispatchWflow(self, wflow):
elif wflow['RequestStatus'] == 'announced' and not wflow['ParentageResolved']:
# NOTE: We skip workflows which are not having 'ParentageResolved'
# flag, but we still need some proper logging for them.
msg = "Skipping workflow: %s - 'ParentageResolved' flag set to false."
msg = "Skipping workflow: %s - 'ParentageResolved' flag set to false." % wflow['RequestName']
msg += " Will retry again in the next cycle."
self.logger.info(msg, wflow['RequestName'])
self.logger.info(msg)
if self._isStatusAdvanceExpired(wflow):
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
elif wflow['RequestStatus'] == 'announced' and not wflow['TransferDone']:
# NOTE: We skip workflows which have not yet finalised their TransferStatus
# in MSOutput, but we still need some proper logging for them.
msg = "Skipping workflow: %s - 'TransferStatus' is 'pending' or 'TransferInfo' is missing in MSOutput."
msg = "Skipping workflow: %s - 'TransferStatus' is 'pending' or 'TransferInfo' is missing in MSOutput." % wflow['RequestName']
msg += " Will retry again in the next cycle."
self.logger.info(msg, wflow['RequestName'])
self.logger.info(msg)
if self._isStatusAdvanceExpired(wflow):
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
elif wflow['RequestStatus'] == 'announced' and not wflow['TransferTape']:
# NOTE: We skip workflows which have not yet finalised their tape transfers.
# (i.e. even if a single output which is supposed to be covered
# by a tape rule is in any of the following transient states:
# {REPLICATING, STUCK, SUSPENDED, WAITING_APPROVAL}.)
# We still need some proper logging for them.
msg = "Skipping workflow: %s - tape transfers are not yet completed."
msg = "Skipping workflow: %s - tape transfers are not yet completed." % wflow['RequestName']
msg += " Will retry again in the next cycle."
self.logger.info(msg, wflow['RequestName'])
self.logger.info(msg)
if self._isStatusAdvanceExpired(wflow):
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
elif wflow['RequestStatus'] == 'announced':
for pline in self.cleanuplines:
try:
pline.run(wflow)
except MSRuleCleanerResolveParentError as ex:
msg = "%s: Parentage Resolve Error: %s. "
msg = "%s: Parentage Resolve Error: %s. " % (pline.name, str(ex))
msg += "Will retry again in the next cycle."
self.logger.error(msg, pline.name, str(ex))
self.logger.error(msg)
if self._isStatusAdvanceExpired(wflow):
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
continue
except Exception as ex:
msg = "%s: General error from pipeline. Workflow: %s. Error: \n%s. "
msg = "%s: General error from pipeline. Workflow: %s. Error: \n%s. " % (pline.name, wflow['RequestName'], str(ex))
msg += "\nWill retry again in the next cycle."
self.logger.exception(msg, pline.name, wflow['RequestName'], str(ex))
self.logger.exception(msg)
if self._isStatusAdvanceExpired(wflow):
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
continue
if wflow['CleanupStatus'][pline.name]:
self.wfCounters['cleaned'][pline.name] += 1
Expand Down Expand Up @@ -404,6 +416,42 @@ def setPlineMarker(self, wflow, pName):
wflow['CleanupStatus'][pName] = False
return wflow

def _getStatusTransitionTime(self, wflow, status=None):
"""
A method to return status transition time for a workflow.
:param wflow: A MSRuleCleanerWorkflow instance
:param status: The status transition to search for. If status is None
then the last status transition time is returned
:return: The UTC timestamp for the transition if it exists, None otherwise
"""
try:
if status is None:
return wflow['RequestTransition'][-1]['UpdateTime']
else:
for trans in wflow['RequestTransition'][::-1]:
if trans['Status'] == status:
return trans['UpdateTime']
return None
except KeyError:
self.logger.error("Could not find status transition history for %s", wflow['RequestName'])

def _isStatusAdvanceExpired(self, wflow):
"""
A method to check if a given status transition has timed out e.g. before
an alarm being set. The timeout should be configurable and hence taken
from the instance attributes.
:param wflow: A MSRuleCleanerWorkflow instance
:return: Bool: True if status alarm time has expired False otherwise
"""
statusAdvanceExpired = False
currentTime = int(time.time())
alarmThreshold = self.msConfig['archiveAlarmHours'] * 3600
transitionTime = self._getStatusTransitionTime(wflow, status=wflow['RequestStatus'])

if transitionTime and (currentTime - transitionTime) > alarmThreshold:
statusAdvanceExpired = True
return statusAdvanceExpired

def _checkClean(self, wflow):
"""
An auxiliary function used to only check the temporary cleanup status.
Expand Down Expand Up @@ -520,12 +568,9 @@ def _checkArchDelayExpired(self, wflow):
archDelayExpired = False
currentTime = int(time.time())
threshold = self.msConfig['archiveDelayHours'] * 3600
try:
lastTransitionTime = wflow['RequestTransition'][-1]['UpdateTime']
if lastTransitionTime and (currentTime - lastTransitionTime) > threshold:
archDelayExpired = True
except KeyError:
self.logger.debug("Could not find status transition history for %s", wflow['RequestName'])
lastTransitionTime = self._getStatusTransitionTime(wflow)
if lastTransitionTime and (currentTime - lastTransitionTime) > threshold:
archDelayExpired = True
return archDelayExpired

def setArchivalDelayExpired(self, wflow):
Expand All @@ -545,12 +590,18 @@ def archive(self, wflow):
# Make all the needed checks before trying to archive
if not (wflow['IsClean'] or wflow['ForceArchive']):
msg = "Not properly cleaned workflow: %s" % wflow['RequestName']
if self._isStatusAdvanceExpired(wflow):
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
raise MSRuleCleanerArchivalSkip(msg)
if not wflow['TargetStatus']:
msg = "Could not determine which archival status to target for workflow: %s" % wflow['RequestName']
if self._isStatusAdvanceExpired(wflow):
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
raise MSRuleCleanerArchivalError(msg)
if not wflow['IsLogDBClean']:
msg = "LogDB records have not been cleaned for workflow: %s" % wflow['RequestName']
if self._isStatusAdvanceExpired(wflow):
self.alertStatusAdvanceExpired(wflow, additionalInfo=msg)
raise MSRuleCleanerArchivalSkip(msg)
if not wflow['IsArchivalDelayExpired']:
msg = "Archival delay period has not yet expired for workflow: %s." % wflow['RequestName']
Expand Down Expand Up @@ -765,3 +816,27 @@ def alertDeletablePU(self, workflowName, containerName, ruleList):
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
service=self.alertServiceName, endSecs=self.alertExpiration)
self.logger.critical(alertDescription)

def alertStatusAdvanceExpired(self, wflow, additionalInfo=None):
"""
Send alert notifying that a workflow has spend too much time in a given status
:param wflow: MSRuleCleanerWorkflow instance
::
:return: none
"""
lastTransitionTime = self._getStatusTransitionTime(wflow)
alertName = "{}: Stale Workflow: {}".format(self.alertServiceName, wflow['RequestName'])
alertSeverity = "high"
alertDescription = "[MSRuleCleaner] Found a workflow not archived since {}.".format(time.ctime(lastTransitionTime))
alertDescription += "\nWorkflow: {}".format(wflow['RequestName'])
alertDescription += "\nHas exceeded the Status Advance Timeout of: {} hours".format(self.msConfig['archiveAlarmHours'])
alertDescription += " for status: {}.".format(wflow['RequestStatus'])
if additionalInfo:
alertDescription += "\nAdditional info: {}".format(additionalInfo)
alertDescription += "\nActions need to be taken!"
alertSummary = "[MSRuleCleaner] Found a workflow not archived since {}.".format(time.ctime(lastTransitionTime))

# alert to expiry in 2 days
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
service=self.alertServiceName, endSecs=self.alertExpiration)
self.logger.critical(alertDescription)

0 comments on commit 8ed856a

Please sign in to comment.