Skip to content

Commit

Permalink
Update ReqMgr2 CP thread to properly deal with aborted/force-complete…
Browse files Browse the repository at this point in the history
… workflows
  • Loading branch information
amaltaro committed Apr 25, 2022
1 parent e39af22 commit 4217d1b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 44 deletions.
70 changes: 36 additions & 34 deletions src/python/WMCore/ReqMgr/CherryPyThreads/StatusChangeTasks.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
"""
Created on May 19, 2015
"""
from __future__ import (division, print_function)
from builtins import range
from future.utils import viewitems

from WMCore.REST.CherryPyPeriodicTask import CherryPyPeriodicTask
from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION
from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION, ABORTED_AUTO_TRANSITION
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue
from WMCore.Services.ReqMgr.ReqMgr import ReqMgr


def moveForwardStatus(reqmgrSvc, wfStatusDict, logger):

for status, nextStatus in viewitems(AUTO_TRANSITION):
for status, nextStatus in AUTO_TRANSITION.items():
count = 0
requests = reqmgrSvc.getRequestByStatus([status], detail=False)
for wf in requests:
stateFromGQ = wfStatusDict.get(wf, None)
if stateFromGQ is None:
if stateFromGQ in [None, "aborted"]:
continue
elif stateFromGQ == status:
continue
Expand All @@ -38,40 +35,45 @@ def moveForwardStatus(reqmgrSvc, wfStatusDict, logger):
except ValueError:
# No state change needed
continue
# special case for aborted workflow - aborted-completed instead of completed
if status == "aborted" and i == 0:
for j in range(i + 1):
count += 1
reqmgrSvc.updateRequestStatus(wf, "aborted-completed")
logger.info("%s in %s moved to %s", wf, status, "aborted-completed")
else:
for j in range(i + 1):
count += 1
reqmgrSvc.updateRequestStatus(wf, nextStatus[j])
logger.info("%s in %s moved to %s", wf, status, nextStatus[j])
reqmgrSvc.updateRequestStatus(wf, nextStatus[j])
logger.info("%s in %s moved to %s", wf, status, nextStatus[j])
logger.info("%s requests moved to new state from %s", count, status)
return


def moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, logger):
def moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, logger):
"""
Handle the case when request is aborted/rejected before elements are created in GQ
Handle workflows that have been either aborted or force-completed.
This will ensure that no global workqueue elements will be left behind.
:param reqmgrSvc: object instance of the ReqMgr class
:param globalQSvc: object instance of the WorkQueue class
:param wfStatusDict: workflow status according to the workqueue elements
:param logger: a logger object instance
:return: None object
"""

statusTransition = {"aborted": ["aborted-completed"]}

for status, nextStatusList in viewitems(statusTransition):
for status, nextStatus in ABORTED_AUTO_TRANSITION.items():
requests = reqmgrSvc.getRequestByStatus([status], detail=False)
count = 0
for wf in requests:
# check whether wq elements exists for given request
# if not, it means
if wf not in wfStatusDict:
for nextStatus in nextStatusList:
reqmgrSvc.updateRequestStatus(wf, nextStatus)
count += 1
logger.info("Total aborted-completed: %d", count)

return
for wflowName in requests:
stateFromGQ = wfStatusDict.get(wflowName, None)
if stateFromGQ == "aborted":
# elements still in CancelRequested, wait for the agent to do his job
continue
elif stateFromGQ in ["acquired", "running-open", "running-closed"]:
# then something went wrong with the workflow abortion/force-completion
# trigger another cancel request
logger.info("%s in %s but WQEs in %s, cancelling it again!",
wflowName, status, stateFromGQ)
globalQSvc.cancelWorkflow(wflowName)
elif stateFromGQ in ["completed", None]:
# all elements are already in a final state or no longer exist, advance status
count += 1
reqmgrSvc.updateRequestStatus(wflowName, nextStatus)
logger.info("%s in %s moved to %s", wflowName, status, nextStatus)
logger.info("Total %s: %d", nextStatus, count)


class StatusChangeTasks(CherryPyPeriodicTask):
Expand All @@ -89,14 +91,14 @@ def advanceStatus(self, config):
Advance the request status based on the global workqueue elements status
"""
reqmgrSvc = ReqMgr(config.reqmgr2_url, logger=self.logger)
gqService = WorkQueue(config.workqueue_url)
globalQSvc = WorkQueue(config.workqueue_url)

self.logger.info("Getting GQ data for status check")
wfStatusDict = gqService.getWorkflowStatusFromWQE()
wfStatusDict = globalQSvc.getWorkflowStatusFromWQE()

self.logger.info("Advancing statuses")
moveForwardStatus(reqmgrSvc, wfStatusDict, self.logger)
moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, self.logger)
moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, self.logger)

self.logger.info("Done advancing status")

Expand Down
15 changes: 7 additions & 8 deletions src/python/WMCore/ReqMgr/DataStructs/RequestStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,17 @@
"rejected-archived": [],
}

# transition automatically controlled by ReqMgr2
# aborted to completed instead of aborted-completed
# since workqueue mapping doesn't have aborted-completed status.
# but it need to be converted to aborted-completed whenever update db
### NOTE: the order of the list matters and it's used for status transition
# Workflow state transition automatically controlled by ReqMgr2
### NOTE: order of this list matters and it's used for status transition
AUTO_TRANSITION = {"staged": ["acquired", "running-open", "running-closed", "completed"],
"acquired": ["running-open", "running-closed", "completed"],
"running-open": ["running-closed", "completed"],
"aborted": ["completed"],
"running-closed": ["completed"],
"force-complete": ["completed"]}
"running-closed": ["completed"]}

# Workflow state transition automatically controlled by ReqMgr2
# Specific to workflows either aborted or force-completed
ABORTED_AUTO_TRANSITION = {"aborted": "aborted-completed",
"force-complete": "completed"}

# list of destination states which doesn't allow any additional argument update
STATES_ALLOW_ONLY_STATE_TRANSITION = [key for key, val in viewitems(ALLOWED_ACTIONS_FOR_STATUS) if len(val) == 0]
Expand Down
4 changes: 2 additions & 2 deletions src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def convertWQElementsStatusToWFStatus(elementsStatusSet):
failed = set(["Failed"])

if forceCompleted <= elementsStatusSet: # at least 1 WQE in CancelRequested
return None
return "aborted"
elif elementsStatusSet == acquired: # all WQEs in Acquired
return "running-open"
elif elementsStatusSet == running: # all WQEs in Running
Expand Down Expand Up @@ -213,7 +213,7 @@ def getAvailableWorkflows(self):

def cancelWorkflow(self, wf):
"""Cancel a workflow"""
nonCancelableElements = ['Done', 'Canceled', 'Failed']
nonCancelableElements = ['Done', 'CancelRequested', 'Canceled', 'Failed']
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus',
{'startkey': [wf], 'endkey': [wf, {}],
'reduce': False})
Expand Down

0 comments on commit 4217d1b

Please sign in to comment.