Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send alerts to prometheus when a workflow fails to be processed. #10959

Merged
merged 1 commit into from
Feb 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 60 additions & 12 deletions src/python/WMCore/MicroService/MSTransferor/MSTransferor.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ def execute(self, reqStatus):
self.checkPUDataLocation(wflow)
if wflow.getSecondarySummary() and not wflow.getPURSElist():
# then we still have pileup to be transferred, but with incorrect locations
msg = "Workflow: %s cannot proceed due to some PU misconfiguration. Check previous logs..."
self.logger.critical(msg, wflow.getName())
self.alertPUMisconfig(wflow.getname())
# FIXME: this needs to be logged somewhere and workflow be set to failed
counterProblematicRequests += 1
continue
Expand All @@ -212,8 +211,9 @@ def execute(self, reqStatus):
success, transfers = self.makeTransferRequest(wflow)
except Exception as ex:
success = False
msg = "Unknown exception while making Transfer Request for %s " % wflow.getName()
msg += "\tError: %s" % str(ex)
self.alertUnknownTransferError(wflow.getName())
msg = "Unknown exception while making transfer request for %s " % wflow.getName()
msg = "\tError: %s" % str(ex)
self.logger.exception(msg)
if success:
self.logger.info("Transfer requests successful for %s. Summary: %s",
Expand All @@ -225,6 +225,7 @@ def execute(self, reqStatus):
counterSuccessRequests += 1
else:
counterFailedRequests += 1
self.alertTransferCouchDBError(wflow.getname())
else:
counterFailedRequests += 1
# it can go slightly beyond the limit. It's evaluated for every slice
Expand Down Expand Up @@ -729,7 +730,7 @@ def makeTransferRucio(self, wflow, dataIn, subLevel, blocks, dataSize, nodes, no
self.logger.info("Rules successful created for %s : %s", dataIn['name'], res)
transferId.update(res)
# send an alert, if needed
self.notifyLargeData(aboveWarningThreshold, transferId, wflow.getName(), dataSize, dataIn)
self.alertLargeInputData(aboveWarningThreshold, transferId, wflow.getName(), dataSize, dataIn)
else:
self.logger.error("Failed to create rule for %s, will retry later", dids)
success = False
Expand All @@ -738,7 +739,58 @@ def makeTransferRucio(self, wflow, dataIn, subLevel, blocks, dataSize, nodes, no
self.logger.info(msg, wflow.getName(), dids, rseExpr, ruleAttrs)
return success, transferId

def notifyLargeData(self, aboveWarningThreshold, transferId, wflowName, dataSize, dataIn):
def sendAlert(self, alertName, severity, summary, description, service, endSecs = 1 * 60 * 60):
"""
Send alert to Prometheus, wrap function in a try-except clause
"""
try:
# alert to expiry in an hour from now
self.alertManagerApi.sendAlert(alertName, severity, summary, description,
service, endSecs)
except Exception as ex:
self.logger.exception("Failed to send alert to %s. Error: %s", self.alertManagerUrl, str(ex))

def alertPUMisconfig(self, workflowName):
"""
Send alert to Prometheus with PU misconfiguration error
"""
alertName = "{}: PU misconfiguration error. Workflow: {}".format(self.alertServiceName,
workflowName)
alertSeverity = "high"
alertSummary = "[MSTransferor] Workflow cannot proceed due to some PU misconfiguration."
alertDescription = "Workflow: {} could not proceed due to some PU misconfiguration,".format(workflowName)
alertDescription += "so it will be skipped."
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
self.alertServiceName)
self.logger.critical(alertDescription)

def alertUnknownTransferError(self, workflowName):
"""
Send alert to Prometheus with unknown transfer error
"""
alertName = "{}: Transfer request error. Workflow: {}".format(self.alertServiceName,
workflowName)
alertSeverity = "high"
alertSummary = "[MSTransferor] Unknown exception while making transfer request."
alertDescription = "Unknown exception while making Transfer request for workflow: {}".format(workflowName)
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
self.alertServiceName)

def alertTransferCouchDBError(self, workflowName):
"""
Send alert to Prometheus with CouchDB transfer error
"""
alertName = "{}: Failed to create a transfer document in CouchDB for workflow: {}".format(self.alertServiceName,
workflowName)
alertSeverity = "high"
alertSummary = "[MSTransferor] Transfer document could not be created in CouchDB."
alertDescription = "Workflow: {}, failed request due to error posting to CouchDB".format(workflowName)
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
self.alertServiceName)
self.logger.warning(alertDescription)


def alertLargeInputData(self, aboveWarningThreshold, transferId, wflowName, dataSize, dataIn):
"""
Evaluates whether the amount of data placed is too big, if so, send an alert
notification to a few persons
Expand All @@ -758,12 +810,8 @@ def notifyLargeData(self, aboveWarningThreshold, transferId, wflowName, dataSize
alertDescription += "data subscribed: {} TB, ".format(teraBytes(dataSize))
alertDescription += "for {} data: {}.""".format(dataIn['type'], dataIn['name'])

try:
# alert to expiry in an hour from now
self.alertManagerApi.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
self.alertServiceName, endSecs=1 * 60 * 60)
except Exception as ex:
self.logger.exception("Failed to send alert to %s. Error: %s", self.alertManagerUrl, str(ex))
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription,
self.alertServiceName)
self.logger.warning(alertDescription)

def _getValidSites(self, wflow, dataIn):
Expand Down