Skip to content

Commit

Permalink
Construct the PU location via rucio considering MSTransferor locks
Browse files Browse the repository at this point in the history
  • Loading branch information
amaltaro committed Sep 22, 2020
1 parent bab64dc commit 548b50f
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 33 deletions.
60 changes: 59 additions & 1 deletion src/python/WMCore/Services/Rucio/Rucio.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ def _getContainerLockedAndAvailable(self, multiRSERules, **kwargs):
rsesByBlocks.setdefault(block, set())
### FIXME: feature request made to the Rucio team to support bulk operations:
### https://github.com/rucio/rucio/issues/3982
for blockLock in self.cli.get_dataset_locks(block, kwargs['name']):
for blockLock in self.cli.get_dataset_locks(kwargs['scope'], block):
if blockLock['state'] == 'OK' and blockLock['rule_id'] in multiRSERules:
rsesByBlocks[block].add(blockLock['rse'])

Expand All @@ -845,3 +845,61 @@ def _getContainerLockedAndAvailable(self, multiRSERules, **kwargs):
for _block, rses in rsesByBlocks.viewitems():
finalRSEs = finalRSEs | rses
return finalRSEs

def getPileupLockedAndAvailable(self, container, account, scope="cms"):
"""
Method to resolve where the pileup container (and all its blocks)
is locked and available.
Pileup location resolution involves the following logic:
1. find replication rules at the container level
* if num of copies is equal to num of rses, and state is Ok, use
those RSEs as container location (thus, every single block)
* elif there are more rses than copies, keep that rule id for the next step
2. discover all the blocks in the container
3. if there are no multi RSEs rules, just build the block location map and return
3. otherwise, for every block, list their current locks and if they are in state=OK
and they belong to one of our multiRSEs rule, use that RSE as block location
:param container: string with the container name
:param account: string with the account name
:param scope: string with the scope name (default is "cms")
:return: a flat dictionary where the keys are the block names, and the value is
a set with the RSE locations
NOTE: This is somewhat complex, so I decided to make it more readable with
a specific method for this process, even though that adds some code duplication.
"""
result = dict()
if not self.isContainer(container):
raise WMRucioException("Pileup location needs to be resolved for a container DID type")

multiRSERules = []
finalRSEs = set()
kargs = dict(name=container, account=account, scope=scope)

# First, find all the rules and where data is supposed to be locked
for rule in self.cli.list_replication_rules(kargs):
rses = self.evaluateRSEExpression(rule['rse_expression'])
if rule['copies'] == len(rses) and rule['state'] == "OK":
# then we can guarantee that data is locked and available on these RSEs
finalRSEs.update(set(rses))
else:
multiRSERules.append(rule['id'])
self.logger.info("Pileup container location for %s from single RSE locks at: %s",
kargs['name'], list(finalRSEs))

# Second, find all the blocks in this pileup container and assign the container
# level locations to them
for blockName in self.getBlocksInContainer(kargs['name']):
result.update({blockName: finalRSEs})
if not multiRSERules:
# then that is it, we can return the current RSEs holding and locking this data
return result

# if we got here, then there is a third step to be done.
# List every single block lock and check if the rule belongs to the WMCore system
for blockName in result:
for blockLock in self.cli.get_dataset_locks(scope, blockName):
if blockLock['state'] == 'OK' and blockLock['rule_id'] in multiRSERules:
result[blockName].add(blockLock['rse'])
return result
2 changes: 1 addition & 1 deletion src/python/WMCore/WMRuntime/Scripts/SetupCMSSWPset.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def _processPileupMixingModules(self, pileupDict, PhEDExNodeName,
eventsAvailable += int(blockDict.get('NumberOfEvents', 0))
for fileLFN in blockDict["FileList"]:
# vstring does not support unicode
inputTypeAttrib.fileNames.append(str(fileLFN['logical_file_name']))
inputTypeAttrib.fileNames.append(str(fileLFN))
if requestedPileupType == 'data':
if getattr(self.jobBag, 'skipPileupEvents', None) is not None:
# For deterministic pileup, we want to shuffle the list the
Expand Down
47 changes: 16 additions & 31 deletions src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,24 @@ def __init__(self):
"""
super(PileupFetcher, self).__init__()
if usingRucio():
# Too much work to pass the rucio account name all the way to here
# just use the production rucio account for resolving pileup location
self.rucio = Rucio("wma_prod", configDict={'phedexCompatible': False})
# FIXME: find a way to pass the Rucio account name to this fetcher module
self.rucioAcct = "wmcore_transferor"
self.rucio = Rucio(self.rucioAcct)
else:
self.phedex = PhEDEx() # this will go away eventually

def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
"""
Method iterates over components of the pileup configuration input
and queries DBS. Then iterates over results from DBS.
and queries DBS for valid files in the dataset, plus some extra
information about each file.
There needs to be a list of files and their locations for each
dataset name.
Use dbsReader
the result data structure is a Python dict following dictionary:
FileList is a list of LFNs
Information is organized at block level, listing all its files,
number of events in the block, and its data location (to be resolved
by a different method using either PhEDEx or Rucio), such as:
{"pileupTypeA": {"BlockA": {"FileList": [], "PhEDExNodeNames": []},
{"pileupTypeA": {"BlockA": {"FileList": [], "PhEDExNodeNames": [], "NumberOfEvents": 123},
"BlockB": {"FileList": [], "PhEDExNodeName": []}, ....}
this structure preserves knowledge of where particular files of dataset
are physically (list of PNNs) located. DBS only lists sites which
have all files belonging to blocks but e.g. BlockA of dataset DS1 may
be located at site1 and BlockB only at site2 - it's possible that only
a subset of the blocks in a dataset will be at a site.
"""
resultDict = {}
# iterate over input pileup types (e.g. "cosmics", "minbias")
Expand All @@ -69,14 +61,11 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
blockDict = {}
for dataset in datasets:

blockFileInfo = dbsReader.getFileListByDataset(dataset=dataset, detail=True)

for fileInfo in blockFileInfo:
for fileInfo in dbsReader.getFileListByDataset(dataset=dataset, detail=True):
blockDict.setdefault(fileInfo['block_name'], {'FileList': [],
'NumberOfEvents': 0,
'PhEDExNodeNames': []})
blockDict[fileInfo['block_name']]['FileList'].append(
{'logical_file_name': fileInfo['logical_file_name']})
blockDict[fileInfo['block_name']]['FileList'].append(fileInfo['logical_file_name'])
blockDict[fileInfo['block_name']]['NumberOfEvents'] += fileInfo['event_count']

self._getDatasetLocation(dataset, blockDict)
Expand All @@ -91,22 +80,18 @@ def _getDatasetLocation(self, dset, blockDict):
:param blockDict: dictionary with DBS summary info
:return: update blockDict in place
"""
if hasattr(self, "rucio"):
# then it's Rucio!!
blockReplicasInfo = self.rucio.getReplicaInfoForBlocks(dataset=dset)
for item in blockReplicasInfo:
block = item['name']
if usingRucio():
blockReplicas = self.rucio.getPileupLockedAndAvailable(dset, account=self.rucioAcct)
for blockName, blockLocation in blockReplicas.viewitems():
try:
blockDict[block]['PhEDExNodeNames'] = item['replica']
blockDict[block]['FileList'] = sorted(blockDict[block]['FileList'])
blockDict[blockName]['PhEDExNodeNames'] = list(blockLocation)
except KeyError:
logging.warning("Block '%s' does not have any complete Rucio replica", block)
logging.warning("Block '%s' present in Rucio but not in DBS", blockName)
else:
blockReplicasInfo = self.phedex.getReplicaPhEDExNodesForBlocks(dataset=dset, complete='y')
for block in blockReplicasInfo:
try:
blockDict[block]['PhEDExNodeNames'] = list(blockReplicasInfo[block])
blockDict[block]['FileList'] = sorted(blockDict[block]['FileList'])
except KeyError:
logging.warning("Block '%s' does not have any complete PhEDEx replica", block)

Expand Down

0 comments on commit 548b50f

Please sign in to comment.