diff --git a/src/python/WMCore/ReqMgr/CherryPyThreads/StatusChangeTasks.py b/src/python/WMCore/ReqMgr/CherryPyThreads/StatusChangeTasks.py index f20d4b180bf..438d63477cf 100644 --- a/src/python/WMCore/ReqMgr/CherryPyThreads/StatusChangeTasks.py +++ b/src/python/WMCore/ReqMgr/CherryPyThreads/StatusChangeTasks.py @@ -1,24 +1,21 @@ """ Created on May 19, 2015 """ -from __future__ import (division, print_function) -from builtins import range -from future.utils import viewitems from WMCore.REST.CherryPyPeriodicTask import CherryPyPeriodicTask -from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION +from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION, ABORTED_AUTO_TRANSITION from WMCore.Services.WorkQueue.WorkQueue import WorkQueue from WMCore.Services.ReqMgr.ReqMgr import ReqMgr def moveForwardStatus(reqmgrSvc, wfStatusDict, logger): - for status, nextStatus in viewitems(AUTO_TRANSITION): + for status, nextStatus in AUTO_TRANSITION.items(): count = 0 requests = reqmgrSvc.getRequestByStatus([status], detail=False) for wf in requests: stateFromGQ = wfStatusDict.get(wf, None) - if stateFromGQ is None: + if stateFromGQ in [None, "aborted"]: continue elif stateFromGQ == status: continue @@ -38,40 +35,45 @@ def moveForwardStatus(reqmgrSvc, wfStatusDict, logger): except ValueError: # No state change needed continue - # special case for aborted workflow - aborted-completed instead of completed - if status == "aborted" and i == 0: + for j in range(i + 1): count += 1 - reqmgrSvc.updateRequestStatus(wf, "aborted-completed") - logger.info("%s in %s moved to %s", wf, status, "aborted-completed") - else: - for j in range(i + 1): - count += 1 - reqmgrSvc.updateRequestStatus(wf, nextStatus[j]) - logger.info("%s in %s moved to %s", wf, status, nextStatus[j]) + reqmgrSvc.updateRequestStatus(wf, nextStatus[j]) + logger.info("%s in %s moved to %s", wf, status, nextStatus[j]) logger.info("%s requests moved to new state from %s", count, status) return -def moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, logger): +def moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, logger): """ - Handle the case when request is aborted/rejected before elements are created in GQ + Handle workflows that have been either aborted or force-completed. + This will ensure that no global workqueue elements will be left behind. + + :param reqmgrSvc: object instance of the ReqMgr class + :param globalQSvc: object instance of the WorkQueue class + :param wfStatusDict: workflow status according to the workqueue elements + :param logger: a logger object instance + :return: None object """ - - statusTransition = {"aborted": ["aborted-completed"]} - - for status, nextStatusList in viewitems(statusTransition): + for status, nextStatus in ABORTED_AUTO_TRANSITION.items(): requests = reqmgrSvc.getRequestByStatus([status], detail=False) count = 0 - for wf in requests: - # check whether wq elements exists for given request - # if not, it means - if wf not in wfStatusDict: - for nextStatus in nextStatusList: - reqmgrSvc.updateRequestStatus(wf, nextStatus) - count += 1 - logger.info("Total aborted-completed: %d", count) - - return + for wflowName in requests: + stateFromGQ = wfStatusDict.get(wflowName, None) + if stateFromGQ == "aborted": + # elements still in CancelRequested, wait for the agent to do his job + continue + elif stateFromGQ in ["acquired", "running-open", "running-closed"]: + # then something went wrong with the workflow abortion/force-completion + # trigger another cancel request + logger.info("%s in %s but WQEs in %s, cancelling it again!", + wflowName, status, stateFromGQ) + globalQSvc.cancelWorkflow(wflowName) + elif stateFromGQ in ["completed", None]: + # all elements are already in a final state or no longer exist, advance status + count += 1 + reqmgrSvc.updateRequestStatus(wflowName, nextStatus) + logger.info("%s in %s moved to %s", wflowName, status, nextStatus) + logger.info("Total %s: %d", nextStatus, count) class StatusChangeTasks(CherryPyPeriodicTask): @@ -89,14 +91,14 @@ def advanceStatus(self, config): Advance the request status based on the global workqueue elements status """ reqmgrSvc = ReqMgr(config.reqmgr2_url, logger=self.logger) - gqService = WorkQueue(config.workqueue_url) + globalQSvc = WorkQueue(config.workqueue_url) self.logger.info("Getting GQ data for status check") - wfStatusDict = gqService.getWorkflowStatusFromWQE() + wfStatusDict = globalQSvc.getWorkflowStatusFromWQE() self.logger.info("Advancing statuses") moveForwardStatus(reqmgrSvc, wfStatusDict, self.logger) - moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, self.logger) + moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, self.logger) self.logger.info("Done advancing status") diff --git a/src/python/WMCore/ReqMgr/DataStructs/RequestStatus.py b/src/python/WMCore/ReqMgr/DataStructs/RequestStatus.py index 138c31160cf..734292c3619 100644 --- a/src/python/WMCore/ReqMgr/DataStructs/RequestStatus.py +++ b/src/python/WMCore/ReqMgr/DataStructs/RequestStatus.py @@ -146,18 +146,17 @@ "rejected-archived": [], } -# transition automatically controlled by ReqMgr2 -# aborted to completed instead of aborted-completed -# since workqueue mapping doesn't have aborted-completed status. -# but it need to be converted to aborted-completed whenever update db -### NOTE: the order of the list matters and it's used for status transition +# Workflow state transition automatically controlled by ReqMgr2 +### NOTE: order of this list matters and it's used for status transition AUTO_TRANSITION = {"staged": ["acquired", "running-open", "running-closed", "completed"], "acquired": ["running-open", "running-closed", "completed"], "running-open": ["running-closed", "completed"], - "aborted": ["completed"], - "running-closed": ["completed"], - "force-complete": ["completed"]} + "running-closed": ["completed"]} +# Workflow state transition automatically controlled by ReqMgr2 +# Specific to workflows either aborted or force-completed +ABORTED_AUTO_TRANSITION = {"aborted": "aborted-completed", + "force-complete": "completed"} # list of destination states which doesn't allow any additional argument update STATES_ALLOW_ONLY_STATE_TRANSITION = [key for key, val in viewitems(ALLOWED_ACTIONS_FOR_STATUS) if len(val) == 0] diff --git a/src/python/WMCore/Services/WorkQueue/WorkQueue.py b/src/python/WMCore/Services/WorkQueue/WorkQueue.py index e95505b3b72..2888057969e 100644 --- a/src/python/WMCore/Services/WorkQueue/WorkQueue.py +++ b/src/python/WMCore/Services/WorkQueue/WorkQueue.py @@ -41,7 +41,7 @@ def convertWQElementsStatusToWFStatus(elementsStatusSet): failed = set(["Failed"]) if forceCompleted <= elementsStatusSet: # at least 1 WQE in CancelRequested - return None + return "aborted" elif elementsStatusSet == acquired: # all WQEs in Acquired return "running-open" elif elementsStatusSet == running: # all WQEs in Running @@ -213,7 +213,7 @@ def getAvailableWorkflows(self): def cancelWorkflow(self, wf): """Cancel a workflow""" - nonCancelableElements = ['Done', 'Canceled', 'Failed'] + nonCancelableElements = ['Done', 'CancelRequested', 'Canceled', 'Failed'] data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus', {'startkey': [wf], 'endkey': [wf, {}], 'reduce': False})