diff --git a/src/python/WMCore/Services/Rucio/Rucio.py b/src/python/WMCore/Services/Rucio/Rucio.py index be2ee10c45e..b4315607d8e 100644 --- a/src/python/WMCore/Services/Rucio/Rucio.py +++ b/src/python/WMCore/Services/Rucio/Rucio.py @@ -60,6 +60,32 @@ def weightedChoice(choices): assert False, "Shouldn't get here" +def isTapeRSE(rseName): + """ + Given an RSE name, return True if it's a Tape RSE (rse_type=TAPE), otherwise False + :param rseName: string with the RSE name + :return: True or False + """ + # NOTE: a more reliable - but more expensive - way to know that would be + # to query `get_rse` and evaluate the rse_type parameter + return rseName.endswith("_Tape") + + +def dropTapeRSEs(listRSEs): + """ + Method to parse a list of RSE names and return only those that + are not a rse_type=TAPE, so in general only Disk endpoints + :param listRSEs: list with the RSE names + :return: a new list with only DISK RSE names + """ + diskRSEs = [] + for rse in listRSEs: + if rse.endswith("_Tape"): + continue + diskRSEs.append(rse) + return diskRSEs + + class Rucio(object): """ Service class providing additional Rucio functionality on top of the @@ -629,17 +655,20 @@ def deleteRule(self, ruleId, purgeReplicas=False): res = False return res - def evaluateRSEExpression(self, rseExpr, useCache=True): + def evaluateRSEExpression(self, rseExpr, useCache=True, returnTape=True): """ Provided an RSE expression, resolve it and return a flat list of RSEs :param rseExpr: an RSE expression (which could be the RSE itself...) :param useCache: boolean defining whether cached data is meant to be used or not + :param returnTape: boolean to also return Tape RSEs from the RSE expression result :return: a list of RSE names """ if self.cachedRSEs.isCacheExpired(): self.cachedRSEs.reset() if useCache and rseExpr in self.cachedRSEs: - return self.cachedRSEs[rseExpr] + if returnTape: + return self.cachedRSEs[rseExpr] + return dropTapeRSEs(self.cachedRSEs[rseExpr]) else: matchingRSEs = [] try: @@ -650,7 +679,9 @@ def evaluateRSEExpression(self, rseExpr, useCache=True): raise WMRucioException(msg) # add this key/value pair to the cache self.cachedRSEs.addItemToCache({rseExpr: matchingRSEs}) - return matchingRSEs + if returnTape: + return matchingRSEs + return dropTapeRSEs(matchingRSEs) def pickRSE(self, rseExpression='rse_type=TAPE\cms_type=test', rseAttribute='ddm_quota', minNeeded=0): """ @@ -879,11 +910,12 @@ def getPileupLockedAndAvailable(self, container, account, scope="cms"): # First, find all the rules and where data is supposed to be locked for rule in self.cli.list_replication_rules(kargs): - rses = self.evaluateRSEExpression(rule['rse_expression']) - if rule['copies'] == len(rses) and rule['state'] == "OK": + rses = self.evaluateRSEExpression(rule['rse_expression'], returnTape=False) + if rses and rule['copies'] == len(rses) and rule['state'] == "OK": # then we can guarantee that data is locked and available on these RSEs finalRSEs.update(set(rses)) - else: + # it could be that the rule was made against Tape only, so check + elif rses: multiRSERules.append(rule['id']) self.logger.info("Pileup container location for %s from single RSE locks at: %s", kargs['name'], list(finalRSEs)) @@ -900,6 +932,8 @@ def getPileupLockedAndAvailable(self, container, account, scope="cms"): # List every single block lock and check if the rule belongs to the WMCore system for blockName in result: for blockLock in self.cli.get_dataset_locks(scope, blockName): + if isTapeRSE(blockLock['rse']): + continue if blockLock['state'] == 'OK' and blockLock['rule_id'] in multiRSERules: result[blockName].add(blockLock['rse']) return result diff --git a/test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py b/test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py index 76268de017f..cc29ce4033f 100644 --- a/test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py +++ b/test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py @@ -8,12 +8,12 @@ from rucio.client import Client as testClient -from WMCore.Services.Rucio.Rucio import Rucio, validateMetaData, RUCIO_VALID_PROJECT, WMRucioException +from WMCore.Services.Rucio import Rucio from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase DSET = "/SingleElectron/Run2017F-17Nov2017-v1/MINIAOD" BLOCK = "/SingleElectron/Run2017F-17Nov2017-v1/MINIAOD#f924e248-e029-11e7-aa2a-02163e01b396" - +PUDSET = "/WPhi_2e_M-10_H_TuneCP5_madgraph-pythia8/RunIIAutumn18NanoAODv6-Nano25Oct2019_102X_upgrade2018_realistic_v20-v1/NANOAODSIM" class RucioTest(EmulatedUnitTestCase): """ @@ -45,7 +45,7 @@ def setUp(self): """ super(RucioTest, self).setUp() - self.myRucio = Rucio(self.acct, + self.myRucio = Rucio.Rucio(self.acct, hostUrl=self.defaultArgs['host'], authUrl=self.defaultArgs['auth_host'], configDict=self.defaultArgs) @@ -80,7 +80,7 @@ def testConfig(self): newKeys = newParams.keys() newKeys.remove("phedexCompatible") - rucio = Rucio(newParams['account'], hostUrl=newParams['host'], + rucio = Rucio.Rucio(newParams['account'], hostUrl=newParams['host'], authUrl=newParams['auth_host'], configDict=newParams) self.assertEqual(getattr(rucio, "phedexCompat"), False) @@ -169,7 +169,7 @@ def testGetBlocksInContainer(self): inside a container. """ # test a CMS dataset that does not exist - with self.assertRaises(WMRucioException): + with self.assertRaises(Rucio.WMRucioException): self.myRucio.getBlocksInContainer("Alan") # provide a CMS block instead of a dataset @@ -208,7 +208,7 @@ def testGetReplicaInfoForBlocksRucio(self): """ theseArgs = self.defaultArgs.copy() theseArgs['phedexCompatible'] = False - myRucio = Rucio(self.acct, + myRucio = Rucio.Rucio(self.acct, hostUrl=theseArgs['host'], authUrl=theseArgs['auth_host'], configDict=theseArgs) @@ -271,16 +271,16 @@ def testMetaDataValidation(self): """ Test the `validateMetaData` validation function """ - for thisProj in RUCIO_VALID_PROJECT: - response = validateMetaData("any_DID_name", dict(project=thisProj), self.myRucio.logger) + for thisProj in Rucio.RUCIO_VALID_PROJECT: + response = Rucio.validateMetaData("any_DID_name", dict(project=thisProj), self.myRucio.logger) self.assertTrue(response) # test with no "project" meta data at all - response = validateMetaData("any_DID_name", dict(), self.myRucio.logger) + response = Rucio.validateMetaData("any_DID_name", dict(), self.myRucio.logger) self.assertTrue(response) # now an invalid "project" meta data - response = validateMetaData("any_DID_name", dict(project="mistake"), self.myRucio.logger) + response = Rucio.validateMetaData("any_DID_name", dict(project="mistake"), self.myRucio.logger) self.assertFalse(response) def testEvaluateRSEExpression(self): @@ -299,3 +299,44 @@ def testPickRSE(self): resp = self.myRucio.pickRSE(rseExpression="ddm_quota>0", rseAttribute="ddm_quota") self.assertTrue(len(resp) == 2) self.assertTrue(resp[1] is True or resp[1] is False) + + def testIsTapeRSE(self): + """ + Test the `isTapeRSE` utilitarian function + """ + self.assertTrue(Rucio.isTapeRSE("T1_US_FNAL_Tape")) + self.assertFalse(Rucio.isTapeRSE("T1_US_FNAL_Disk")) + self.assertFalse(Rucio.isTapeRSE("T1_US_FNAL_Disk_Test")) + self.assertFalse(Rucio.isTapeRSE("T1_US_FNAL_Tape_Test")) + self.assertFalse(Rucio.isTapeRSE("")) + + def testDropTapeRSEs(self): + """ + Test the `dropTapeRSEs` utilitarian function + """ + tapeOnly = ["T1_US_FNAL_Tape", "T1_ES_PIC_Tape"] + diskOnly = ["T1_US_FNAL_Disk", "T1_US_FNAL_Disk_Test", "T2_CH_CERN"] + mixed = ["T1_US_FNAL_Tape", "T1_US_FNAL_Disk", "T1_US_FNAL_Disk_Test", "T1_ES_PIC_Tape"] + self.assertItemsEqual(Rucio.dropTapeRSEs(tapeOnly), []) + self.assertItemsEqual(Rucio.dropTapeRSEs(diskOnly), diskOnly) + self.assertItemsEqual(Rucio.dropTapeRSEs(mixed), ["T1_US_FNAL_Disk", "T1_US_FNAL_Disk_Test"]) + + def testGetPileupLockedAndAvailable(self): + """ + Test `getPileupLockedAndAvailable` method + """ + # as much as I dislike it, we need to use the production instance... + newParams = {"host": 'http://cms-rucio.cern.ch', + "auth_host": 'https://cms-rucio-auth.cern.ch', + "auth_type": "x509", "account": "wmcore_transferor", + "ca_cert": False, "timeout": 5} + prodRucio = Rucio.Rucio(newParams['account'], + hostUrl=newParams['host'], + authUrl=newParams['auth_host'], + configDict=newParams) + res = prodRucio.getPileupLockedAndAvailable(PUDSET, "transfer_ops") + # this dataset contains 10 blocks + self.assertEqual(len(res), 10) + # with more than 10 block replicas in the grid + for block, rses in res.viewitems(): + self.assertTrue(len(rses) > 5)