Skip to content

Commit

Permalink
Merge pull request #11113 from amaltaro/fix-11112
Browse files Browse the repository at this point in the history
Ensure aborted/force-complete workflows leave no elements behind
  • Loading branch information
amaltaro authored Apr 27, 2022
2 parents 3afa523 + ebaf3eb commit f8478de
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 65 deletions.
44 changes: 44 additions & 0 deletions bin/kill-workflow-in-global
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""
When to use this script: when a workflow is in status "aborted" but
it still has active GQEs.
Use this script to mimic exactly the same action as the one taken
by ReqMgr2 when aborting a workflow (without a state transition).
This script will mark the global workqueue elements - for a given
workflow - as CancelRequested, such that the agents can proceed
and acknowledge it, moving elements to status Canceled.
"""
from __future__ import print_function

import os
import sys

from WMCore.Configuration import loadConfigurationFile
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue


def main():
args = sys.argv[1:]
if not len(args) == 1:
print("usage: kill-workflow-in-global workflowname")
sys.exit(0)
wflowName = args[0]

# get configuration file path
if "WMAGENT_CONFIG" not in os.environ:
os.environ['WMAGENT_CONFIG'] = '/data/srv/wmagent/current/config/wmagent/config.py'

# load config
wmConfig = loadConfigurationFile(os.environ['WMAGENT_CONFIG'])

gqService = WorkQueue(wmConfig.WorkloadSummary.couchurl,
wmConfig.WorkQueueManager.dbname)

gqService.cancelWorkflow(wflowName)
print("Cancel requested for workflow: {}".format(wflowName))


if __name__ == "__main__":
main()
70 changes: 36 additions & 34 deletions src/python/WMCore/ReqMgr/CherryPyThreads/StatusChangeTasks.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
"""
Created on May 19, 2015
"""
from __future__ import (division, print_function)
from builtins import range
from future.utils import viewitems

from WMCore.REST.CherryPyPeriodicTask import CherryPyPeriodicTask
from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION
from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION, CANCEL_AUTO_TRANSITION
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue
from WMCore.Services.ReqMgr.ReqMgr import ReqMgr


def moveForwardStatus(reqmgrSvc, wfStatusDict, logger):

for status, nextStatus in viewitems(AUTO_TRANSITION):
for status, nextStatus in AUTO_TRANSITION.items():
count = 0
requests = reqmgrSvc.getRequestByStatus([status], detail=False)
for wf in requests:
stateFromGQ = wfStatusDict.get(wf, None)
if stateFromGQ is None:
if stateFromGQ in [None, "canceled"]:
continue
elif stateFromGQ == status:
continue
Expand All @@ -38,40 +35,45 @@ def moveForwardStatus(reqmgrSvc, wfStatusDict, logger):
except ValueError:
# No state change needed
continue
# special case for aborted workflow - aborted-completed instead of completed
if status == "aborted" and i == 0:
for j in range(i + 1):
count += 1
reqmgrSvc.updateRequestStatus(wf, "aborted-completed")
logger.info("%s in %s moved to %s", wf, status, "aborted-completed")
else:
for j in range(i + 1):
count += 1
reqmgrSvc.updateRequestStatus(wf, nextStatus[j])
logger.info("%s in %s moved to %s", wf, status, nextStatus[j])
reqmgrSvc.updateRequestStatus(wf, nextStatus[j])
logger.info("%s in %s moved to %s", wf, status, nextStatus[j])
logger.info("%s requests moved to new state from %s", count, status)
return


def moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, logger):
def moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, logger):
"""
Handle the case when request is aborted/rejected before elements are created in GQ
Handle workflows that have been either aborted or force-completed.
This will ensure that no global workqueue elements will be left behind.
:param reqmgrSvc: object instance of the ReqMgr class
:param globalQSvc: object instance of the WorkQueue class
:param wfStatusDict: workflow status according to the workqueue elements
:param logger: a logger object instance
:return: None object
"""

statusTransition = {"aborted": ["aborted-completed"]}

for status, nextStatusList in viewitems(statusTransition):
for status, nextStatus in CANCEL_AUTO_TRANSITION.items():
requests = reqmgrSvc.getRequestByStatus([status], detail=False)
count = 0
for wf in requests:
# check whether wq elements exists for given request
# if not, it means
if wf not in wfStatusDict:
for nextStatus in nextStatusList:
reqmgrSvc.updateRequestStatus(wf, nextStatus)
count += 1
logger.info("Total aborted-completed: %d", count)

return
for wflowName in requests:
stateFromGQ = wfStatusDict.get(wflowName, None)
if stateFromGQ == "canceled":
# elements still in CancelRequested, wait for the agent to do his job
continue
elif stateFromGQ in ["acquired", "running-open", "running-closed"]:
# then something went wrong with the workflow abortion/force-completion
# trigger another cancel request
logger.info("%s in %s but WQEs in %s, cancelling it again!",
wflowName, status, stateFromGQ)
globalQSvc.cancelWorkflow(wflowName)
elif stateFromGQ in ["completed", None]:
# all elements are already in a final state or no longer exist, advance status
count += 1
reqmgrSvc.updateRequestStatus(wflowName, nextStatus)
logger.info("%s in %s moved to %s", wflowName, status, nextStatus)
logger.info("Total %s: %d", nextStatus, count)


class StatusChangeTasks(CherryPyPeriodicTask):
Expand All @@ -89,14 +91,14 @@ def advanceStatus(self, config):
Advance the request status based on the global workqueue elements status
"""
reqmgrSvc = ReqMgr(config.reqmgr2_url, logger=self.logger)
gqService = WorkQueue(config.workqueue_url)
globalQSvc = WorkQueue(config.workqueue_url)

self.logger.info("Getting GQ data for status check")
wfStatusDict = gqService.getWorkflowStatusFromWQE()
wfStatusDict = globalQSvc.getWorkflowStatusFromWQE()

self.logger.info("Advancing statuses")
moveForwardStatus(reqmgrSvc, wfStatusDict, self.logger)
moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, self.logger)
moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, self.logger)

self.logger.info("Done advancing status")

Expand Down
15 changes: 7 additions & 8 deletions src/python/WMCore/ReqMgr/DataStructs/RequestStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,17 @@
"rejected-archived": [],
}

# transition automatically controlled by ReqMgr2
# aborted to completed instead of aborted-completed
# since workqueue mapping doesn't have aborted-completed status.
# but it need to be converted to aborted-completed whenever update db
### NOTE: the order of the list matters and it's used for status transition
# Workflow state transition automatically controlled by ReqMgr2
### NOTE: order of this list matters and it's used for status transition
AUTO_TRANSITION = {"staged": ["acquired", "running-open", "running-closed", "completed"],
"acquired": ["running-open", "running-closed", "completed"],
"running-open": ["running-closed", "completed"],
"aborted": ["completed"],
"running-closed": ["completed"],
"force-complete": ["completed"]}
"running-closed": ["completed"]}

# Workflow state transition automatically controlled by ReqMgr2
# Specific to workflows either aborted or force-completed
CANCEL_AUTO_TRANSITION = {"aborted": "aborted-completed",
"force-complete": "completed"}

# list of destination states which doesn't allow any additional argument update
STATES_ALLOW_ONLY_STATE_TRANSITION = [key for key, val in viewitems(ALLOWED_ACTIONS_FOR_STATUS) if len(val) == 0]
Expand Down
20 changes: 13 additions & 7 deletions src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def convertWQElementsStatusToWFStatus(elementsStatusSet):
:param: elementsStatusSet - dictionary of {request_name: set of all WQE status of this request, ...}
:returns: request status
Here is the mapping between request status and it GQE status
Here is the mapping between request status and the GQE status
1. acquired: all the GQEs are either Available or Negotiating.
Work is still in GQ, but not LQ.
2. running-open: at least one of the GQEs are in Acquired status.
Expand All @@ -24,9 +24,9 @@ def convertWQElementsStatusToWFStatus(elementsStatusSet):
All work is finished in WMBS (excluding cleanup and logcollect)
5. failed: all the GQEs are in Failed status. If the workflow has multiple GQEs
and only a few are in Failed status, then just follow the usual request status.
NOTE: CancelRequested status is a transient status and it should not trigger
any request status transition (thus, None gets returned).
6. canceled: used to distinguish requests that have been correctly canceled,
coming from workflows either aborted or force-complete. This state does not
trigger a workflow status transition.
"""
if not elementsStatusSet:
return None
Expand All @@ -37,11 +37,15 @@ def convertWQElementsStatusToWFStatus(elementsStatusSet):
running = set(["Running"])
runningOpen = set(["Available", "Negotiating", "Acquired"])
runningClosed = set(["Running", "Done", "Canceled"])
canceled = set(["CancelRequested", "Done", "Canceled", "Failed"])
completed = set(["Done", "Canceled", "Failed"])
failed = set(["Failed"])

if forceCompleted <= elementsStatusSet: # at least 1 WQE in CancelRequested
return None
# Just a reminder:
# <= every element in the left set is also in the right set
# & return elements common between the left and right set
if elementsStatusSet == forceCompleted: # all WQEs in CancelRequested
return "canceled"
elif elementsStatusSet == acquired: # all WQEs in Acquired
return "running-open"
elif elementsStatusSet == running: # all WQEs in Running
Expand All @@ -52,6 +56,8 @@ def convertWQElementsStatusToWFStatus(elementsStatusSet):
return "acquired"
elif elementsStatusSet <= completed: # all WQEs in a final state
return "completed"
elif elementsStatusSet <= canceled: # some WQEs still waiting to be cancelled
return "canceled"
elif elementsStatusSet & runningOpen: # at least 1 WQE still in GQ
return "running-open"
elif elementsStatusSet & runningClosed: # all WQEs already in LQ and WMBS
Expand Down Expand Up @@ -213,7 +219,7 @@ def getAvailableWorkflows(self):

def cancelWorkflow(self, wf):
"""Cancel a workflow"""
nonCancelableElements = ['Done', 'Canceled', 'Failed']
nonCancelableElements = ['Done', 'CancelRequested', 'Canceled', 'Failed']
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus',
{'startkey': [wf], 'endkey': [wf, {}],
'reduce': False})
Expand Down
41 changes: 25 additions & 16 deletions test/python/WMCore_t/Services_t/WorkQueue_t/WorkQueue_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,11 @@ def testConvertWQElementsStatusToWFStatus(self):
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Available", "Negotiating", "Acquired"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Available", "Negotiating", "Acquired", "Running"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Available", "Negotiating", "Acquired", "Running", "Done"])), "running-open")
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Available", "Negotiating", "Acquired", "Running", "Done", "CancelRequested"])))
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Available", "Negotiating", "Acquired", "Running", "Done", "CancelRequested", "Canceled"])))
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Negotiating", "Acquired"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Negotiating", "Acquired", "Running"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Negotiating", "Acquired", "Running", "Done"])), "running-open")
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Negotiating", "Acquired", "Running", "Done", "CancelRequested"])))
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Negotiating", "Acquired", "Running", "Done", "CancelRequested", "Canceled"])))
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Acquired", "Running"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Acquired", "Running", "Done"])), "running-open")
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Acquired", "Running", "Done", "CancelRequested"])))
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Acquired", "Running", "Done", "CancelRequested", "Canceled"])))
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Available", "Done"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Available", "Running", "Done", "Canceled"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Acquired", "Done"])), "running-open")
Expand All @@ -234,12 +228,10 @@ def testConvertWQElementsStatusToWFStatus(self):
# workflows completely acquired by the agents
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Running"])), "running-closed")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Running", "Done"])), "running-closed")
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Running", "Done", "CancelRequested"])))
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Running", "Done", "CancelRequested", "Canceled"])))
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Running", "Done", "Canceled"])), "running-closed")

# workflows completed/aborted/force-completed, thus existent elements
# but no more active workqueue elements in the system
# but no more active workqueue elements in the system
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Done"])), "completed")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Canceled"])), "completed")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Done", "Canceled"])), "completed")
Expand All @@ -256,9 +248,26 @@ def testConvertWQElementsStatusToWFStatus(self):
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Done", "Failed"])), "completed")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Canceled", "Failed"])), "completed")

# workflows in a temporary state, nothing to do with them yet
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Done", "CancelRequested"])))
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["CancelRequested"])))
# workflows that have been aborted but still have workqueue elements around
self.assertEqual("running-open", convertWQElementsStatusToWFStatus(
set(["Available", "Negotiating", "Acquired", "Running", "Done", "CancelRequested"])))
self.assertEqual("running-open", convertWQElementsStatusToWFStatus(
set(["Available", "Negotiating", "Acquired", "Running", "Done", "CancelRequested", "Canceled"])))
self.assertEqual("running-open", convertWQElementsStatusToWFStatus(
set(["Negotiating", "Acquired", "Running", "Done", "CancelRequested"])))
self.assertEqual("running-open", convertWQElementsStatusToWFStatus(
set(["Negotiating", "Acquired", "Running", "Done", "CancelRequested", "Canceled"])))
self.assertEqual("running-open", convertWQElementsStatusToWFStatus(
set(["Acquired", "Running", "Done", "CancelRequested"])))
self.assertEqual("running-open", convertWQElementsStatusToWFStatus(
set(["Acquired", "Running", "Done", "CancelRequested", "Canceled"])))
self.assertEqual("running-closed", convertWQElementsStatusToWFStatus(
set(["Running", "Done", "CancelRequested"])))
self.assertEqual("running-closed", convertWQElementsStatusToWFStatus(
set(["Running", "Done", "CancelRequested", "Canceled"])))
self.assertEqual("canceled", convertWQElementsStatusToWFStatus(
set(["Done", "CancelRequested"])))
self.assertEqual("canceled", convertWQElementsStatusToWFStatus(set(["CancelRequested"])))

def test2ConvertWQElementsStatusToWFStatus(self):
"""
Expand All @@ -271,7 +280,7 @@ def test2ConvertWQElementsStatusToWFStatus(self):
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Acquired"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Running"])), "running-closed")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Done"])), "completed")
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["CancelRequested"])))
self.assertEqual(convertWQElementsStatusToWFStatus(set(["CancelRequested"])), "canceled")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Canceled"])), "completed")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Failed"])), "failed")

Expand All @@ -291,9 +300,9 @@ def test2ConvertWQElementsStatusToWFStatus(self):
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Running", "Done"])), 'running-closed')
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Running", "Failed"])), "running-closed")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Done", "Failed"])), "completed")
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["Done", "CancelRequested"])))
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Done", "CancelRequested"])), "canceled")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Done", "Canceled"])), "completed")
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["CancelRequested", "Canceled"])))
self.assertEqual(convertWQElementsStatusToWFStatus(set(["CancelRequested", "Canceled"])), "canceled")

# triple WQE with standard state transition
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Available", "Negotiating", "Acquired"])), "running-open")
Expand All @@ -316,7 +325,7 @@ def test2ConvertWQElementsStatusToWFStatus(self):
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Acquired", "Running", "Failed"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Acquired", "Done", "Failed"])), "running-open")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Running", "Done", "Failed"])), "running-closed")
self.assertIsNone(convertWQElementsStatusToWFStatus(set(["CancelRequested", "Done", "Failed"])))
self.assertEqual(convertWQElementsStatusToWFStatus(set(["CancelRequested", "Done", "Failed"])), "canceled")
self.assertEqual(convertWQElementsStatusToWFStatus(set(["Canceled", "Done", "Failed"])), "completed")


Expand Down

0 comments on commit f8478de

Please sign in to comment.