Skip to content

Commit

Permalink
Merge branch 'master' into slow-dataset-files
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet authored May 6, 2022
2 parents e2bfb55 + d3d7076 commit c987a77
Show file tree
Hide file tree
Showing 43 changed files with 1,216 additions and 1,158 deletions.
33 changes: 33 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,36 @@
2.0.3.pre4 to 2.0.3.pre5:
- fix unit tests (Alan Malta Rodrigues) #11119
- Properly handle OpenRunningTimeout in WorkQueue (Alan Malta Rodrigues) #11119
- Switch from pep8 to pycodestyle (Erik Gough) #11128
- first draft of iam-token script (Valentin Kuznetsov) #11093
- Add T3_US_Lancium. (Todor Ivanov) #11126


2.0.3.pre3 to 2.0.3.pre4:
- Enforce dataset_lifetime column to be integer and not null (Alan Malta Rodrigues) #11115
- Remove block open logic from DBS3Reader and WorkQueue (Alan Malta Rodrigues) #11123


2.0.3.pre2 to 2.0.3.pre3:
- change fallback for missing system xrootd-client (Dirk Hufnagel) #11117
- Update ReqMgr2 CP thread to properly deal with aborted/force-complete workflows (Alan Malta Rodrigues) #11113
- Create script to cancel GQEs (Alan Malta Rodrigues) #11113
- Remove support for Unpacking user tarballs. No longer used by CRAB. (khurtado) #11114
- Minor test json template updates (Alan Malta Rodrigues) #10864
- Check CMSSW version before getting sim-datatier map (germanfgv) #11110


2.0.3.pre1 to 2.0.3.pre2:
- move ckey/cert functions to Utils.CertTools (Valentin Kuznetsov) #11101
- Add bin and dependencies to wmcore PyPI package (Erik Gough) #11103
- Remove cPickle support (Erik Gough) #11096
- Update wmagent deploy script to add opportunistic resources one by one (Alan Malta Rodrigues) #11097
- Support adding diskless resources into the database (Alan Malta Rodrigues) #11097
- Remove no longer needed code in the StatusChangeTasks ReqMgr2 thread (Alan Malta Rodrigues) #11095
- Bump WMAgent deployment example to the latest stable version (Alan Malta Rodrigues) #11091
- Remove PFNs from a final document we send to WMArchive (Valentin Kuznetsov) #10998


2.0.2 to 2.0.3.pre1:
- Enhance logic to map ScramArch to OS (Alan Malta Rodrigues) #11088
- When ScramArch is empty str/list/None, return any as required_os (Alan Malta Rodrigues) #11083
Expand Down
54 changes: 54 additions & 0 deletions bin/create-iam-token.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash
# the iam-token.sh scripts generate new IAM token
# it relies on the following environment variables
# IAM_CLIENT_ID client id value obtained from IAM provider
# IAM_CLIENT_SECRET client secret value obtained from IAM provider
# IAM_TOKEN output file name to store obtained IAM token
# All steps to obtain client credentials can be found:
# https://github.com/dmwm/WMCore/pull/11093#issuecomment-1098131010

# check if curl exist on a system
if ! command -v curl &> /dev/null
then
echo "curl could not be found, please install it on your system"
exit
fi
# check if jq exist on a system
if ! command -v jq &> /dev/null
then
echo "jq could not be found, please install it on your system"
exit
fi

#echo "tools are checked"

# use either IAM_CLIENT_ID, /etc/secrets/client_id or fail
if [ -n "$IAM_CLIENT_ID" ] && [ -f $IAM_CLIENT_ID ]; then
export client_id=`cat $IAM_CLIENT_ID`
else
echo "unable to locate client_id file, please either setup IAM_CLIENT_ID to point to your client_id file name"
exit
fi
#echo "use client_id=$client_id"

# use either IAM_CLIENT_SECRET, /etc/secrets/client_secret or fail
if [ -n "$IAM_CLIENT_SECRET" ] && [ -f $IAM_CLIENT_SECRET ]; then
export client_secret=`cat $IAM_CLIENT_SECRET`
else
echo "unable to locate client_secret file, please either setup IAM_CLIENT_secret to point to your client_secret file name"
exit
fi
#echo "use client_secret=$client_secret"

# obtain new token using client credentials
if [ -n "IAM_TOKEN" ]; then
# grant_type=client_credentials key=value pair is required by IAM provider
# to specify that request contains clients credentials
curl -s -k -d grant_type=client_credentials \
-u ${client_id}:${client_secret} \
https://cms-auth.web.cern.ch/token | jq -r '.access_token' > $IAM_TOKEN
echo "New IAM token generated and can be found at $IAM_TOKEN"
else
echo "Please setup IAM_TOKEN environment variable pointing to a file name where token will be written"
exit
fi
44 changes: 44 additions & 0 deletions bin/kill-workflow-in-global
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""
When to use this script: when a workflow is in status "aborted" but
it still has active GQEs.
Use this script to mimic exactly the same action as the one taken
by ReqMgr2 when aborting a workflow (without a state transition).
This script will mark the global workqueue elements - for a given
workflow - as CancelRequested, such that the agents can proceed
and acknowledge it, moving elements to status Canceled.
"""
from __future__ import print_function

import os
import sys

from WMCore.Configuration import loadConfigurationFile
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue


def main():
args = sys.argv[1:]
if not len(args) == 1:
print("usage: kill-workflow-in-global workflowname")
sys.exit(0)
wflowName = args[0]

# get configuration file path
if "WMAGENT_CONFIG" not in os.environ:
os.environ['WMAGENT_CONFIG'] = '/data/srv/wmagent/current/config/wmagent/config.py'

# load config
wmConfig = loadConfigurationFile(os.environ['WMAGENT_CONFIG'])

gqService = WorkQueue(wmConfig.WorkloadSummary.couchurl,
wmConfig.WorkQueueManager.dbname)

gqService.cancelWorkflow(wflowName)
print("Cancel requested for workflow: {}".format(wflowName))


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion deploy/addUSOpportunistic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ PEND_JOBS=3000
RUNN_JOBS=3000
# default manage location
manage=/data/srv/wmagent/current/config/wmagent/manage
for site in {T3_US_NERSC,T3_US_OSG,T3_US_PSC,T3_US_SDSC,T3_US_TACC,T3_US_Anvil};
for site in {T3_US_NERSC,T3_US_OSG,T3_US_PSC,T3_US_SDSC,T3_US_TACC,T3_US_Anvil,T3_US_Lancium};
do
echo "Adding site: $site into the resource-control with $PEND_JOBS pending and $RUNN_JOBS running slots"
$manage execute-agent wmagent-resource-control --site-name=$site --cms-name=$site --ce-name=$site --pnn=$site --plugin=SimpleCondorPlugin --pending-slots=1000 --running-slots=1000;
Expand Down
2 changes: 1 addition & 1 deletion deploy/deploy-wmagent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ echo "Done!" && echo
echo "*** Setting up US opportunistic resources ***"
if [[ "$HOSTNAME" == *fnal.gov ]]; then
sed -i "s+forceSiteDown = \[\]+forceSiteDown = \[$FORCEDOWN\]+" $MANAGE_DIR/config.py
for resourceName in {T3_US_NERSC,T3_US_OSG,T3_US_PSC,T3_US_SDSC,T3_US_TACC,T3_US_Anvil,T3_ES_PIC_BSC};
for resourceName in {T3_US_NERSC,T3_US_OSG,T3_US_PSC,T3_US_SDSC,T3_US_TACC,T3_US_Anvil,T3_US_Lancium,T3_ES_PIC_BSC};
do
./manage execute-agent wmagent-resource-control --plugin=SimpleCondorPlugin --opportunistic \
--pending-slots=$HPC_PEND_JOBS --running-slots=$HPC_RUNN_JOBS --add-one-site $resourceName
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# for more details please refer to official Python documentation, see
# https://www.python.org/dev/peps/pep-0440/#version-specifiers

Cheetah3~=3.2.6.post2 # wmcore,wmagent,reqmgr2,reqmon
Cheetah3~=3.2.6.post1 # wmcore,wmagent,reqmgr2,reqmon
CherryPy~=17.4.0 # wmcore,wmagent,wmagent-devtools,reqmgr2,reqmon,global-workqueue,reqmgr2ms
CMSCouchapp~=1.3.4 # wmcore,wmagent
CMSMonitoring~=0.3.4 # wmcore,wmagent,reqmgr2,reqmon,global-workqueue,reqmgr2ms
Expand All @@ -30,10 +30,10 @@ mox3~=1.1.0 # wmcore,wmagent-devtools
mysqlclient~=2.0.3 # wmcore,wmagent
nose~=1.3.7 # wmcore,wmagent-devtools
nose2~=0.10.0 # wmcore,wmagent-devtools
pep8~=1.7.1 # wmcore,wmagent-devtools
pycodestyle~=2.8.0 # wmcore,wmagent-devtools
psutil~=5.8.0 # wmcore,wmagent,wmagent-devtools,reqmgr2,reqmon,global-workqueue
pycurl~=7.43.0.6 # wmcore,wmagent,reqmgr2,reqmon,global-workqueue,reqmgr2ms
pylint~=2.7.0 # wmcore,wmagent-devtools
pylint~=2.13.5 # wmcore,wmagent-devtools
pymongo~=4.0.1 # wmcore,wmagent-devtools,reqmgr2ms
pyOpenSSL~=18.0.0 # wmcore,wmagent
pyzmq~=19.0.2 # wmcore,wmagent
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[pep8]
[pycodestyle]
format=pylint
hang-closing=True
max-line-length = 160
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMComponent/DBS3Buffer/MySQL/Create.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, logger=None, dbi=None, params=None):
subscribed INTEGER DEFAULT 0,
phedex_group VARCHAR(100),
delete_blocks INTEGER,
dataset_lifetime INTEGER DEFAULT 0,
dataset_lifetime INTEGER DEFAULT 0 NOT NULL,
PRIMARY KEY (id),
CONSTRAINT uq_dbs_dat_sub UNIQUE (dataset_id, site, custodial, auto_approve, move, priority))"""

Expand Down
32 changes: 23 additions & 9 deletions src/python/WMComponent/DBS3Buffer/MySQL/NewSubscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ class NewSubscription(DBFormatter):
"""

def _createPhEDExSubBinds(self, datasetID, subscriptionInfo, custodialFlag):
"""
Creates the database binds for both custodial and non custodial data
placements.
:param datasetID: integer with the dataset id
:param subscriptionInfo: dictionary object from the request spec
:param custodialFlag: boolean flag defining whether it's custodial or not
:return: a list of dictionary binds
"""
# NOTE: the subscription information is already validated upstream, at
# the request factory. Thus, there is no need to cast/validate anything
# at this level.

# DeleteFromSource is not supported for move subscriptions
delete_blocks = None
Expand All @@ -40,15 +52,17 @@ def _createPhEDExSubBinds(self, datasetID, subscriptionInfo, custodialFlag):

binds = []
for site in sites:
binds.append({'id': datasetID,
'site': site,
'custodial': custodialFlag,
'auto_approve': 1 if site in subscriptionInfo['AutoApproveSites'] else 0,
'move': isMove,
'priority': subscriptionInfo['Priority'],
'phedex_group': phedex_group,
'delete_blocks': delete_blocks,
'dataset_lifetime': subscriptionInfo['DatasetLifetime']})
bind = {'id': datasetID,
'site': site,
'custodial': 1 if custodialFlag else 0,
'auto_approve': 1 if site in subscriptionInfo['AutoApproveSites'] else 0,
'move': isMove,
'priority': subscriptionInfo['Priority'],
'phedex_group': phedex_group,
'delete_blocks': delete_blocks}
if subscriptionInfo['DatasetLifetime'] is not None:
bind.update(dict(dataset_lifetime=subscriptionInfo['DatasetLifetime']))
binds.append(bind)
return binds

def execute(self, datasetID, subscriptionInfo, conn=None, transaction=False):
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMComponent/DBS3Buffer/Oracle/Create.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, logger = None, dbi = None, params = None):
subscribed INTEGER DEFAULT 0,
phedex_group VARCHAR(100),
delete_blocks INTEGER,
dataset_lifetime INTEGER DEFAULT 0,
dataset_lifetime INTEGER DEFAULT 0 NOT NULL,
PRIMARY KEY (id),
CONSTRAINT uq_dbs_dat_sub UNIQUE (dataset_id, site, custodial, auto_approve, move, priority)
)"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def interactWithReqmgr(self, config):
4. record this activity
"""
tStart = time()
self.logger.info("Executing WorkQueue/ReqMgr thread thread")
self.logger.info("Executing WorkQueue/ReqMgr thread")
self.reqMgrInt(self.globalQ)
self.logger.info("%s executed in %.3f secs.", self.__class__.__name__, time() - tStart)
self.logger.info("%s executed in %.3f secs.\n", self.__class__.__name__, time() - tStart)

return
70 changes: 36 additions & 34 deletions src/python/WMCore/ReqMgr/CherryPyThreads/StatusChangeTasks.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
"""
Created on May 19, 2015
"""
from __future__ import (division, print_function)
from builtins import range
from future.utils import viewitems

from WMCore.REST.CherryPyPeriodicTask import CherryPyPeriodicTask
from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION
from WMCore.ReqMgr.DataStructs.RequestStatus import AUTO_TRANSITION, CANCEL_AUTO_TRANSITION
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue
from WMCore.Services.ReqMgr.ReqMgr import ReqMgr


def moveForwardStatus(reqmgrSvc, wfStatusDict, logger):

for status, nextStatus in viewitems(AUTO_TRANSITION):
for status, nextStatus in AUTO_TRANSITION.items():
count = 0
requests = reqmgrSvc.getRequestByStatus([status], detail=False)
for wf in requests:
stateFromGQ = wfStatusDict.get(wf, None)
if stateFromGQ is None:
if stateFromGQ in [None, "canceled"]:
continue
elif stateFromGQ == status:
continue
Expand All @@ -38,40 +35,45 @@ def moveForwardStatus(reqmgrSvc, wfStatusDict, logger):
except ValueError:
# No state change needed
continue
# special case for aborted workflow - aborted-completed instead of completed
if status == "aborted" and i == 0:
for j in range(i + 1):
count += 1
reqmgrSvc.updateRequestStatus(wf, "aborted-completed")
logger.info("%s in %s moved to %s", wf, status, "aborted-completed")
else:
for j in range(i + 1):
count += 1
reqmgrSvc.updateRequestStatus(wf, nextStatus[j])
logger.info("%s in %s moved to %s", wf, status, nextStatus[j])
reqmgrSvc.updateRequestStatus(wf, nextStatus[j])
logger.info("%s in %s moved to %s", wf, status, nextStatus[j])
logger.info("%s requests moved to new state from %s", count, status)
return


def moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, logger):
def moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, logger):
"""
Handle the case when request is aborted/rejected before elements are created in GQ
Handle workflows that have been either aborted or force-completed.
This will ensure that no global workqueue elements will be left behind.
:param reqmgrSvc: object instance of the ReqMgr class
:param globalQSvc: object instance of the WorkQueue class
:param wfStatusDict: workflow status according to the workqueue elements
:param logger: a logger object instance
:return: None object
"""

statusTransition = {"aborted": ["aborted-completed"]}

for status, nextStatusList in viewitems(statusTransition):
for status, nextStatus in CANCEL_AUTO_TRANSITION.items():
requests = reqmgrSvc.getRequestByStatus([status], detail=False)
count = 0
for wf in requests:
# check whether wq elements exists for given request
# if not, it means
if wf not in wfStatusDict:
for nextStatus in nextStatusList:
reqmgrSvc.updateRequestStatus(wf, nextStatus)
count += 1
logger.info("Total aborted-completed: %d", count)

return
for wflowName in requests:
stateFromGQ = wfStatusDict.get(wflowName, None)
if stateFromGQ == "canceled":
# elements still in CancelRequested, wait for the agent to do his job
continue
elif stateFromGQ in ["acquired", "running-open", "running-closed"]:
# then something went wrong with the workflow abortion/force-completion
# trigger another cancel request
logger.info("%s in %s but WQEs in %s, cancelling it again!",
wflowName, status, stateFromGQ)
globalQSvc.cancelWorkflow(wflowName)
elif stateFromGQ in ["completed", None]:
# all elements are already in a final state or no longer exist, advance status
count += 1
reqmgrSvc.updateRequestStatus(wflowName, nextStatus)
logger.info("%s in %s moved to %s", wflowName, status, nextStatus)
logger.info("Total %s: %d", nextStatus, count)


class StatusChangeTasks(CherryPyPeriodicTask):
Expand All @@ -89,14 +91,14 @@ def advanceStatus(self, config):
Advance the request status based on the global workqueue elements status
"""
reqmgrSvc = ReqMgr(config.reqmgr2_url, logger=self.logger)
gqService = WorkQueue(config.workqueue_url)
globalQSvc = WorkQueue(config.workqueue_url)

self.logger.info("Getting GQ data for status check")
wfStatusDict = gqService.getWorkflowStatusFromWQE()
wfStatusDict = globalQSvc.getWorkflowStatusFromWQE()

self.logger.info("Advancing statuses")
moveForwardStatus(reqmgrSvc, wfStatusDict, self.logger)
moveToCompletedForNoWQJobs(reqmgrSvc, wfStatusDict, self.logger)
moveToCompletedForNoWQJobs(reqmgrSvc, globalQSvc, wfStatusDict, self.logger)

self.logger.info("Done advancing status")

Expand Down
Loading

0 comments on commit c987a77

Please sign in to comment.