Skip to content

Commit

Permalink
Drop Tape RSEs from the pileup location
Browse files Browse the repository at this point in the history
  • Loading branch information
amaltaro committed Sep 22, 2020
1 parent 548b50f commit a2e077e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 16 deletions.
46 changes: 40 additions & 6 deletions src/python/WMCore/Services/Rucio/Rucio.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,32 @@ def weightedChoice(choices):
assert False, "Shouldn't get here"


def isTapeRSE(rseName):
"""
Given an RSE name, return True if it's a Tape RSE (rse_type=TAPE), otherwise False
:param rseName: string with the RSE name
:return: True or False
"""
# NOTE: a more reliable - but more expensive - way to know that would be
# to query `get_rse` and evaluate the rse_type parameter
return rseName.endswith("_Tape")


def dropTapeRSEs(listRSEs):
"""
Method to parse a list of RSE names and return only those that
are not a rse_type=TAPE, so in general only Disk endpoints
:param listRSEs: list with the RSE names
:return: a new list with only DISK RSE names
"""
diskRSEs = []
for rse in listRSEs:
if rse.endswith("_Tape"):
continue
diskRSEs.append(rse)
return diskRSEs


class Rucio(object):
"""
Service class providing additional Rucio functionality on top of the
Expand Down Expand Up @@ -629,17 +655,20 @@ def deleteRule(self, ruleId, purgeReplicas=False):
res = False
return res

def evaluateRSEExpression(self, rseExpr, useCache=True):
def evaluateRSEExpression(self, rseExpr, useCache=True, returnTape=True):
"""
Provided an RSE expression, resolve it and return a flat list of RSEs
:param rseExpr: an RSE expression (which could be the RSE itself...)
:param useCache: boolean defining whether cached data is meant to be used or not
:param returnTape: boolean to also return Tape RSEs from the RSE expression result
:return: a list of RSE names
"""
if self.cachedRSEs.isCacheExpired():
self.cachedRSEs.reset()
if useCache and rseExpr in self.cachedRSEs:
return self.cachedRSEs[rseExpr]
if returnTape:
return self.cachedRSEs[rseExpr]
return dropTapeRSEs(self.cachedRSEs[rseExpr])
else:
matchingRSEs = []
try:
Expand All @@ -650,7 +679,9 @@ def evaluateRSEExpression(self, rseExpr, useCache=True):
raise WMRucioException(msg)
# add this key/value pair to the cache
self.cachedRSEs.addItemToCache({rseExpr: matchingRSEs})
return matchingRSEs
if returnTape:
return matchingRSEs
return dropTapeRSEs(matchingRSEs)

def pickRSE(self, rseExpression='rse_type=TAPE\cms_type=test', rseAttribute='ddm_quota', minNeeded=0):
"""
Expand Down Expand Up @@ -879,11 +910,12 @@ def getPileupLockedAndAvailable(self, container, account, scope="cms"):

# First, find all the rules and where data is supposed to be locked
for rule in self.cli.list_replication_rules(kargs):
rses = self.evaluateRSEExpression(rule['rse_expression'])
if rule['copies'] == len(rses) and rule['state'] == "OK":
rses = self.evaluateRSEExpression(rule['rse_expression'], returnTape=False)
if rses and rule['copies'] == len(rses) and rule['state'] == "OK":
# then we can guarantee that data is locked and available on these RSEs
finalRSEs.update(set(rses))
else:
# it could be that the rule was made against Tape only, so check
elif rses:
multiRSERules.append(rule['id'])
self.logger.info("Pileup container location for %s from single RSE locks at: %s",
kargs['name'], list(finalRSEs))
Expand All @@ -900,6 +932,8 @@ def getPileupLockedAndAvailable(self, container, account, scope="cms"):
# List every single block lock and check if the rule belongs to the WMCore system
for blockName in result:
for blockLock in self.cli.get_dataset_locks(scope, blockName):
if isTapeRSE(blockLock['rse']):
continue
if blockLock['state'] == 'OK' and blockLock['rule_id'] in multiRSERules:
result[blockName].add(blockLock['rse'])
return result
61 changes: 51 additions & 10 deletions test/python/WMCore_t/Services_t/Rucio_t/Rucio_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

from rucio.client import Client as testClient

from WMCore.Services.Rucio.Rucio import Rucio, validateMetaData, RUCIO_VALID_PROJECT, WMRucioException
from WMCore.Services.Rucio import Rucio
from WMQuality.Emulators.EmulatedUnitTestCase import EmulatedUnitTestCase

DSET = "/SingleElectron/Run2017F-17Nov2017-v1/MINIAOD"
BLOCK = "/SingleElectron/Run2017F-17Nov2017-v1/MINIAOD#f924e248-e029-11e7-aa2a-02163e01b396"

PUDSET = "/WPhi_2e_M-10_H_TuneCP5_madgraph-pythia8/RunIIAutumn18NanoAODv6-Nano25Oct2019_102X_upgrade2018_realistic_v20-v1/NANOAODSIM"

class RucioTest(EmulatedUnitTestCase):
"""
Expand Down Expand Up @@ -45,7 +45,7 @@ def setUp(self):
"""
super(RucioTest, self).setUp()

self.myRucio = Rucio(self.acct,
self.myRucio = Rucio.Rucio(self.acct,
hostUrl=self.defaultArgs['host'],
authUrl=self.defaultArgs['auth_host'],
configDict=self.defaultArgs)
Expand Down Expand Up @@ -80,7 +80,7 @@ def testConfig(self):
newKeys = newParams.keys()
newKeys.remove("phedexCompatible")

rucio = Rucio(newParams['account'], hostUrl=newParams['host'],
rucio = Rucio.Rucio(newParams['account'], hostUrl=newParams['host'],
authUrl=newParams['auth_host'], configDict=newParams)

self.assertEqual(getattr(rucio, "phedexCompat"), False)
Expand Down Expand Up @@ -169,7 +169,7 @@ def testGetBlocksInContainer(self):
inside a container.
"""
# test a CMS dataset that does not exist
with self.assertRaises(WMRucioException):
with self.assertRaises(Rucio.WMRucioException):
self.myRucio.getBlocksInContainer("Alan")

# provide a CMS block instead of a dataset
Expand Down Expand Up @@ -208,7 +208,7 @@ def testGetReplicaInfoForBlocksRucio(self):
"""
theseArgs = self.defaultArgs.copy()
theseArgs['phedexCompatible'] = False
myRucio = Rucio(self.acct,
myRucio = Rucio.Rucio(self.acct,
hostUrl=theseArgs['host'],
authUrl=theseArgs['auth_host'],
configDict=theseArgs)
Expand Down Expand Up @@ -271,16 +271,16 @@ def testMetaDataValidation(self):
"""
Test the `validateMetaData` validation function
"""
for thisProj in RUCIO_VALID_PROJECT:
response = validateMetaData("any_DID_name", dict(project=thisProj), self.myRucio.logger)
for thisProj in Rucio.RUCIO_VALID_PROJECT:
response = Rucio.validateMetaData("any_DID_name", dict(project=thisProj), self.myRucio.logger)
self.assertTrue(response)

# test with no "project" meta data at all
response = validateMetaData("any_DID_name", dict(), self.myRucio.logger)
response = Rucio.validateMetaData("any_DID_name", dict(), self.myRucio.logger)
self.assertTrue(response)

# now an invalid "project" meta data
response = validateMetaData("any_DID_name", dict(project="mistake"), self.myRucio.logger)
response = Rucio.validateMetaData("any_DID_name", dict(project="mistake"), self.myRucio.logger)
self.assertFalse(response)

def testEvaluateRSEExpression(self):
Expand All @@ -299,3 +299,44 @@ def testPickRSE(self):
resp = self.myRucio.pickRSE(rseExpression="ddm_quota>0", rseAttribute="ddm_quota")
self.assertTrue(len(resp) == 2)
self.assertTrue(resp[1] is True or resp[1] is False)

def testIsTapeRSE(self):
"""
Test the `isTapeRSE` utilitarian function
"""
self.assertTrue(Rucio.isTapeRSE("T1_US_FNAL_Tape"))
self.assertFalse(Rucio.isTapeRSE("T1_US_FNAL_Disk"))
self.assertFalse(Rucio.isTapeRSE("T1_US_FNAL_Disk_Test"))
self.assertFalse(Rucio.isTapeRSE("T1_US_FNAL_Tape_Test"))
self.assertFalse(Rucio.isTapeRSE(""))

def testDropTapeRSEs(self):
"""
Test the `dropTapeRSEs` utilitarian function
"""
tapeOnly = ["T1_US_FNAL_Tape", "T1_ES_PIC_Tape"]
diskOnly = ["T1_US_FNAL_Disk", "T1_US_FNAL_Disk_Test", "T2_CH_CERN"]
mixed = ["T1_US_FNAL_Tape", "T1_US_FNAL_Disk", "T1_US_FNAL_Disk_Test", "T1_ES_PIC_Tape"]
self.assertItemsEqual(Rucio.dropTapeRSEs(tapeOnly), [])
self.assertItemsEqual(Rucio.dropTapeRSEs(diskOnly), diskOnly)
self.assertItemsEqual(Rucio.dropTapeRSEs(mixed), ["T1_US_FNAL_Disk", "T1_US_FNAL_Disk_Test"])

def testGetPileupLockedAndAvailable(self):
"""
Test `getPileupLockedAndAvailable` method
"""
# as much as I dislike it, we need to use the production instance...
newParams = {"host": 'http://cms-rucio.cern.ch',
"auth_host": 'https://cms-rucio-auth.cern.ch',
"auth_type": "x509", "account": "wmcore_transferor",
"ca_cert": False, "timeout": 5}
prodRucio = Rucio.Rucio(newParams['account'],
hostUrl=newParams['host'],
authUrl=newParams['auth_host'],
configDict=newParams)
res = prodRucio.getPileupLockedAndAvailable(PUDSET, "transfer_ops")
# this dataset contains 10 blocks
self.assertEqual(len(res), 10)
# with more than 10 block replicas in the grid
for block, rses in res.viewitems():
self.assertTrue(len(rses) > 5)

0 comments on commit a2e077e

Please sign in to comment.