Skip to content

Commit

Permalink
Merge branch 'master' into slow-dataset-files
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet authored May 5, 2022
2 parents d318762 + eda31a4 commit e1fdc83
Show file tree
Hide file tree
Showing 35 changed files with 961 additions and 913 deletions.
25 changes: 25 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
2.0.3.pre3 to 2.0.3.pre4:
- Enforce dataset_lifetime column to be integer and not null (Alan Malta Rodrigues) #11115
- Remove block open logic from DBS3Reader and WorkQueue (Alan Malta Rodrigues) #11123


2.0.3.pre2 to 2.0.3.pre3:
- change fallback for missing system xrootd-client (Dirk Hufnagel) #11117
- Update ReqMgr2 CP thread to properly deal with aborted/force-complete workflows (Alan Malta Rodrigues) #11113
- Create script to cancel GQEs (Alan Malta Rodrigues) #11113
- Remove support for Unpacking user tarballs. No longer used by CRAB. (khurtado) #11114
- Minor test json template updates (Alan Malta Rodrigues) #10864
- Check CMSSW version before getting sim-datatier map (germanfgv) #11110


2.0.3.pre1 to 2.0.3.pre2:
- move ckey/cert functions to Utils.CertTools (Valentin Kuznetsov) #11101
- Add bin and dependencies to wmcore PyPI package (Erik Gough) #11103
- Remove cPickle support (Erik Gough) #11096
- Update wmagent deploy script to add opportunistic resources one by one (Alan Malta Rodrigues) #11097
- Support adding diskless resources into the database (Alan Malta Rodrigues) #11097
- Remove no longer needed code in the StatusChangeTasks ReqMgr2 thread (Alan Malta Rodrigues) #11095
- Bump WMAgent deployment example to the latest stable version (Alan Malta Rodrigues) #11091
- Remove PFNs from a final document we send to WMArchive (Valentin Kuznetsov) #10998


2.0.2 to 2.0.3.pre1:
- Enhance logic to map ScramArch to OS (Alan Malta Rodrigues) #11088
- When ScramArch is empty str/list/None, return any as required_os (Alan Malta Rodrigues) #11083
Expand Down
44 changes: 44 additions & 0 deletions bin/kill-workflow-in-global
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""
When to use this script: when a workflow is in status "aborted" but
it still has active GQEs.
Use this script to mimic exactly the same action as the one taken
by ReqMgr2 when aborting a workflow (without a state transition).
This script will mark the global workqueue elements - for a given
workflow - as CancelRequested, such that the agents can proceed
and acknowledge it, moving elements to status Canceled.
"""
from __future__ import print_function

import os
import sys

from WMCore.Configuration import loadConfigurationFile
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue


def main():
args = sys.argv[1:]
if not len(args) == 1:
print("usage: kill-workflow-in-global workflowname")
sys.exit(0)
wflowName = args[0]

# get configuration file path
if "WMAGENT_CONFIG" not in os.environ:
os.environ['WMAGENT_CONFIG'] = '/data/srv/wmagent/current/config/wmagent/config.py'

# load config
wmConfig = loadConfigurationFile(os.environ['WMAGENT_CONFIG'])

gqService = WorkQueue(wmConfig.WorkloadSummary.couchurl,
wmConfig.WorkQueueManager.dbname)

gqService.cancelWorkflow(wflowName)
print("Cancel requested for workflow: {}".format(wflowName))


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion deploy/addUSOpportunistic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ PEND_JOBS=3000
RUNN_JOBS=3000
# default manage location
manage=/data/srv/wmagent/current/config/wmagent/manage
for site in {T3_US_NERSC,T3_US_OSG,T3_US_PSC,T3_US_SDSC,T3_US_TACC,T3_US_Anvil};
for site in {T3_US_NERSC,T3_US_OSG,T3_US_PSC,T3_US_SDSC,T3_US_TACC,T3_US_Anvil,T3_US_Lancium};
do
echo "Adding site: $site into the resource-control with $PEND_JOBS pending and $RUNN_JOBS running slots"
$manage execute-agent wmagent-resource-control --site-name=$site --cms-name=$site --ce-name=$site --pnn=$site --plugin=SimpleCondorPlugin --pending-slots=1000 --running-slots=1000;
Expand Down
2 changes: 1 addition & 1 deletion deploy/deploy-wmagent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ echo "Done!" && echo
echo "*** Setting up US opportunistic resources ***"
if [[ "$HOSTNAME" == *fnal.gov ]]; then
sed -i "s+forceSiteDown = \[\]+forceSiteDown = \[$FORCEDOWN\]+" $MANAGE_DIR/config.py
for resourceName in {T3_US_NERSC,T3_US_OSG,T3_US_PSC,T3_US_SDSC,T3_US_TACC,T3_US_Anvil,T3_ES_PIC_BSC};
for resourceName in {T3_US_NERSC,T3_US_OSG,T3_US_PSC,T3_US_SDSC,T3_US_TACC,T3_US_Anvil,T3_US_Lancium,T3_ES_PIC_BSC};
do
./manage execute-agent wmagent-resource-control --plugin=SimpleCondorPlugin --opportunistic \
--pending-slots=$HPC_PEND_JOBS --running-slots=$HPC_RUNN_JOBS --add-one-site $resourceName
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMComponent/DBS3Buffer/MySQL/Create.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, logger=None, dbi=None, params=None):
subscribed INTEGER DEFAULT 0,
phedex_group VARCHAR(100),
delete_blocks INTEGER,
dataset_lifetime INTEGER DEFAULT 0,
dataset_lifetime INTEGER DEFAULT 0 NOT NULL,
PRIMARY KEY (id),
CONSTRAINT uq_dbs_dat_sub UNIQUE (dataset_id, site, custodial, auto_approve, move, priority))"""

Expand Down
32 changes: 23 additions & 9 deletions src/python/WMComponent/DBS3Buffer/MySQL/NewSubscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ class NewSubscription(DBFormatter):
"""

def _createPhEDExSubBinds(self, datasetID, subscriptionInfo, custodialFlag):
"""
Creates the database binds for both custodial and non custodial data
placements.
:param datasetID: integer with the dataset id
:param subscriptionInfo: dictionary object from the request spec
:param custodialFlag: boolean flag defining whether it's custodial or not
:return: a list of dictionary binds
"""
# NOTE: the subscription information is already validated upstream, at
# the request factory. Thus, there is no need to cast/validate anything
# at this level.

# DeleteFromSource is not supported for move subscriptions
delete_blocks = None
Expand All @@ -40,15 +52,17 @@ def _createPhEDExSubBinds(self, datasetID, subscriptionInfo, custodialFlag):

binds = []
for site in sites:
binds.append({'id': datasetID,
'site': site,
'custodial': custodialFlag,
'auto_approve': 1 if site in subscriptionInfo['AutoApproveSites'] else 0,
'move': isMove,
'priority': subscriptionInfo['Priority'],
'phedex_group': phedex_group,
'delete_blocks': delete_blocks,
'dataset_lifetime': subscriptionInfo['DatasetLifetime']})
bind = {'id': datasetID,
'site': site,
'custodial': 1 if custodialFlag else 0,
'auto_approve': 1 if site in subscriptionInfo['AutoApproveSites'] else 0,
'move': isMove,
'priority': subscriptionInfo['Priority'],
'phedex_group': phedex_group,
'delete_blocks': delete_blocks}
if subscriptionInfo['DatasetLifetime'] is not None:
bind.update(dict(dataset_lifetime=subscriptionInfo['DatasetLifetime']))
binds.append(bind)
return binds

def execute(self, datasetID, subscriptionInfo, conn=None, transaction=False):
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMComponent/DBS3Buffer/Oracle/Create.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, logger = None, dbi = None, params = None):
subscribed INTEGER DEFAULT 0,
phedex_group VARCHAR(100),
delete_blocks INTEGER,
dataset_lifetime INTEGER DEFAULT 0,
dataset_lifetime INTEGER DEFAULT 0 NOT NULL,
PRIMARY KEY (id),
CONSTRAINT uq_dbs_dat_sub UNIQUE (dataset_id, site, custodial, auto_approve, move, priority)
)"""
Expand Down
70 changes: 36 additions & 34 deletions src/python/WMCore/ReqMgr/CherryPyThreads/StatusChangeTasks.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
"""
Created on May 19, 2015
"""
from __future__ import (division, print_function)
from builtins import range
from future.utils import viewitems

from WMCore.REST.CherryPyPeriodicTask import CherryPyPeriodicTask
from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION
from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION, CANCEL_AUTO_TRANSITION
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue
from WMCore.Services.ReqMgr.ReqMgr import ReqMgr


def moveForwardStatus(reqmgrSvc, wfStatusDict, logger):

for status, nextStatus in viewitems(AUTO_TRANSITION):
for status, nextStatus in AUTO_TRANSITION.items():
count = 0
requests = reqmgrSvc.getRequestByStatus([status], detail=False)
for wf in requests:
stateFromGQ = wfStatusDict.get(wf, None)
if stateFromGQ is None:
if stateFromGQ in [None, "canceled"]:
continue
elif stateFromGQ == status:
continue
Expand All @@ -38,40 +35,45 @@ def moveForwardStatus(reqmgrSvc, wfStatusDict, logger):
except ValueError:
# No state change needed
continue
# special case for aborted workflow - aborted-completed instead of completed
if status == "aborted" and i == 0:
for j in range(i + 1):
count += 1
reqmgrSvc.updateRequestStatus(wf, "aborted-completed")
logger.info("%s in %s moved to %s", wf, status, "aborted-completed")
else:
for j in range(i + 1):
count += 1
reqmgrSvc.updateRequestStatus(wf, nextStatus[j])
logger.info("%s in %s moved to %s", wf, status, nextStatus[j])
reqmgrSvc.updateRequestStatus(wf, nextStatus[j])
logger.info("%s in %s moved to %s", wf, status, nextStatus[j])
logger.info("%s requests moved to new state from %s", count, status)
return


def moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, logger):
def moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, logger):
"""
Handle the case when request is aborted/rejected before elements are created in GQ
Handle workflows that have been either aborted or force-completed.
This will ensure that no global workqueue elements will be left behind.
:param reqmgrSvc: object instance of the ReqMgr class
:param globalQSvc: object instance of the WorkQueue class
:param wfStatusDict: workflow status according to the workqueue elements
:param logger: a logger object instance
:return: None object
"""

statusTransition = {"aborted": ["aborted-completed"]}

for status, nextStatusList in viewitems(statusTransition):
for status, nextStatus in CANCEL_AUTO_TRANSITION.items():
requests = reqmgrSvc.getRequestByStatus([status], detail=False)
count = 0
for wf in requests:
# check whether wq elements exists for given request
# if not, it means
if wf not in wfStatusDict:
for nextStatus in nextStatusList:
reqmgrSvc.updateRequestStatus(wf, nextStatus)
count += 1
logger.info("Total aborted-completed: %d", count)

return
for wflowName in requests:
stateFromGQ = wfStatusDict.get(wflowName, None)
if stateFromGQ == "canceled":
# elements still in CancelRequested, wait for the agent to do his job
continue
elif stateFromGQ in ["acquired", "running-open", "running-closed"]:
# then something went wrong with the workflow abortion/force-completion
# trigger another cancel request
logger.info("%s in %s but WQEs in %s, cancelling it again!",
wflowName, status, stateFromGQ)
globalQSvc.cancelWorkflow(wflowName)
elif stateFromGQ in ["completed", None]:
# all elements are already in a final state or no longer exist, advance status
count += 1
reqmgrSvc.updateRequestStatus(wflowName, nextStatus)
logger.info("%s in %s moved to %s", wflowName, status, nextStatus)
logger.info("Total %s: %d", nextStatus, count)


class StatusChangeTasks(CherryPyPeriodicTask):
Expand All @@ -89,14 +91,14 @@ def advanceStatus(self, config):
Advance the request status based on the global workqueue elements status
"""
reqmgrSvc = ReqMgr(config.reqmgr2_url, logger=self.logger)
gqService = WorkQueue(config.workqueue_url)
globalQSvc = WorkQueue(config.workqueue_url)

self.logger.info("Getting GQ data for status check")
wfStatusDict = gqService.getWorkflowStatusFromWQE()
wfStatusDict = globalQSvc.getWorkflowStatusFromWQE()

self.logger.info("Advancing statuses")
moveForwardStatus(reqmgrSvc, wfStatusDict, self.logger)
moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, self.logger)
moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, self.logger)

self.logger.info("Done advancing status")

Expand Down
15 changes: 7 additions & 8 deletions src/python/WMCore/ReqMgr/DataStructs/RequestStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,17 @@
"rejected-archived": [],
}

# transition automatically controlled by ReqMgr2
# aborted to completed instead of aborted-completed
# since workqueue mapping doesn't have aborted-completed status.
# but it need to be converted to aborted-completed whenever update db
### NOTE: the order of the list matters and it's used for status transition
# Workflow state transition automatically controlled by ReqMgr2
### NOTE: order of this list matters and it's used for status transition
AUTO_TRANSITION = {"staged": ["acquired", "running-open", "running-closed", "completed"],
"acquired": ["running-open", "running-closed", "completed"],
"running-open": ["running-closed", "completed"],
"aborted": ["completed"],
"running-closed": ["completed"],
"force-complete": ["completed"]}
"running-closed": ["completed"]}

# Workflow state transition automatically controlled by ReqMgr2
# Specific to workflows either aborted or force-completed
CANCEL_AUTO_TRANSITION = {"aborted": "aborted-completed",
"force-complete": "completed"}

# list of destination states which doesn't allow any additional argument update
STATES_ALLOW_ONLY_STATE_TRANSITION = [key for key, val in viewitems(ALLOWED_ACTIONS_FOR_STATUS) if len(val) == 0]
Expand Down
Loading

0 comments on commit e1fdc83

Please sign in to comment.