Skip to content

Commit

Permalink
speed up listDatasetFileDetails API
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed May 6, 2022
1 parent e879178 commit dfc5272
Show file tree
Hide file tree
Showing 3 changed files with 364 additions and 37 deletions.
117 changes: 80 additions & 37 deletions src/python/WMCore/Services/DBS/DBS3Reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from Utils.IteratorTools import grouper
from Utils.PythonVersion import PY2
from WMCore.Services.DBS.DBSErrors import DBSReaderError, formatEx3
from WMCore.Services.DBS.DBSUtils import dbsListFileParents, dbsListFileLumis, \
dbsBlockOrigin, dbsParentFilesGivenParentDataset


### Needed for the pycurl comment, leave it out for now
Expand Down Expand Up @@ -72,17 +74,28 @@ class DBS3Reader(object):
General API for reading data from DBS
"""

def __init__(self, url, logger=None, **contact):
def __init__(self, url, logger=None, parallel=None, **contact):
"""
DBS3Reader constructor
:param url: url of DBS server
:param logger: logger to be used by this class
:param parallel: optional parameter to specify parallel execution of some APIs
You may pass any true value, e.g. True or 1. The parallel APIs are:
listDatasetFileDetails, listFileBlockLocation, getParentFilesGivenParentDataset
:param contact: optional parameters to pass to DbsApi class
"""

# instantiate dbs api object
try:
self.dbsURL = url.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch")
self.dbs = DbsApi(self.dbsURL, **contact)
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.parallel = parallel
except Exception as ex:
msg = "Error in DBSReader with DbsApi\n"
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

def _getLumiList(self, blockName=None, lfns=None, validFileOnly=1):
"""
Expand All @@ -103,7 +116,7 @@ def _getLumiList(self, blockName=None, lfns=None, validFileOnly=1):
msg = "Error in "
msg += "DBSReader.listFileLumiArray(%s)\n" % lfns
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

lumiDict = {}
for lumisItem in lumiLists:
Expand All @@ -128,7 +141,7 @@ def checkDBSServer(self):
msg = "Error in "
msg += "DBS server is not up: %s" % self.dbsURL
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

def listPrimaryDatasets(self, match='*'):
"""
Expand All @@ -143,7 +156,7 @@ def listPrimaryDatasets(self, match='*'):
except Exception as ex:
msg = "Error in DBSReader.listPrimaryDataset(%s)\n" % match
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

result = [x['primary_ds_name'] for x in result]
return result
Expand All @@ -160,7 +173,7 @@ def matchProcessedDatasets(self, primary, tier, process):
except dbsClientException as ex:
msg = "Error in DBSReader.listProcessedDatasets(%s)\n" % primary
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

for dataset in datasets:
dataset = remapDBS3Keys(dataset, processed_ds_name='Name')
Expand Down Expand Up @@ -195,7 +208,7 @@ def listRuns(self, dataset=None, block=None):
except dbsClientException as ex:
msg = "Error in DBSReader.listRuns(%s, %s)\n" % (dataset, block)
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None
for x in results:
runs.extend(x['run_num'])
return runs
Expand Down Expand Up @@ -228,7 +241,7 @@ def listRunLumis(self, dataset=None, block=None):
except dbsClientException as ex:
msg = "Error in DBSReader.listRuns(%s, %s)\n" % (dataset, block)
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

# send runDict format as result, this format is for sync with dbs2 call
# which has {run_number: num_lumis} but dbs3 call doesn't return num Lumis
Expand All @@ -254,7 +267,7 @@ def listProcessedDatasets(self, primary, dataTier='*'):
except dbsClientException as ex:
msg = "Error in DBSReader.listProcessedDatasets(%s)\n" % primary
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

result = [x['dataset'].split('/')[2] for x in result]
return result
Expand Down Expand Up @@ -307,6 +320,25 @@ def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, v
files[f['logical_file_name']]['Lumis'] = {}
files[f['logical_file_name']]['Parents'] = []

# parallel execution for listFileParents and listFileLumis APIs
if self.parallel:
if getParents:
block_parents = dbsListFileParents(self.dbsURL, blocks)
for blockName, parents in block_parents.items():
for p in parents:
if p['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
files[p['logical_file_name']]['Parents'].extend(p['parent_logical_file_name'])
if getLumis:
block_file_lumis = dbsListFileLumis(self.dbsURL, blocks)
for blockName, file_lumis in block_file_lumis.items():
for f in file_lumis:
if f['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
if f['run_num'] in files[f['logical_file_name']]['Lumis']:
files[f['logical_file_name']]['Lumis'][f['run_num']].extend(f['lumi_section_num'])
else:
files[f['logical_file_name']]['Lumis'][f['run_num']] = f['lumi_section_num']
return files

# Iterate over the blocks and get parents and lumis
for blockName in blocks:
# get the parents
Expand Down Expand Up @@ -344,7 +376,7 @@ def crossCheck(self, datasetPath, *lfns):
except Exception as exc:
msg = "Error in DBSReader.crossCheck({}) with {} lfns.".format(datasetPath, len(lfns))
msg += "\nDetails: {}\n".format(formatEx3(exc))
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None
setOfAllLfns = set(allLfns)
setOfKnownLfns = set(lfns)
return list(setOfAllLfns.intersection(setOfKnownLfns))
Expand All @@ -363,7 +395,7 @@ def crossCheckMissing(self, datasetPath, *lfns):
except Exception as exc:
msg = "Error in DBSReader.crossCheckMissing({}) with {} lfns.".format(datasetPath, len(lfns))
msg += "\nDetails: {}\n".format(formatEx3(exc))
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None
setOfAllLfns = set(allLfns)
setOfKnownLfns = set(lfns)
knownFiles = setOfAllLfns.intersection(setOfKnownLfns)
Expand All @@ -384,7 +416,7 @@ def getDBSSummaryInfo(self, dataset=None, block=None):
except Exception as ex:
msg = "Error in DBSReader.getDBSSummaryInfo(%s, %s)\n" % (dataset, block)
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if not summary: # missing data or all files invalid
return {}
Expand All @@ -411,7 +443,7 @@ def listFileBlocks(self, dataset, blockName=None):
except dbsClientException as ex:
msg = "Error in DBSReader.listFileBlocks(%s)\n" % dataset
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

result = [x['block_name'] for x in blocks]

Expand All @@ -435,7 +467,7 @@ def blockExists(self, fileBlockName):
msg = "Error in "
msg += "DBSReader.blockExists(%s)\n" % fileBlockName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if len(blocks) == 0:
return False
Expand All @@ -454,15 +486,15 @@ def listFilesInBlock(self, fileBlockName, lumis=True, validFileOnly=1):
"""
if not self.blockExists(fileBlockName):
msg = "DBSReader.listFilesInBlock(%s): No matching data"
raise DBSReaderError(msg % fileBlockName)
raise DBSReaderError(msg % fileBlockName) from None

try:
files = self.dbs.listFileArray(block_name=fileBlockName, validFileOnly=validFileOnly, detail=True)
except dbsClientException as ex:
msg = "Error in "
msg += "DBSReader.listFilesInBlock(%s)\n" % fileBlockName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if lumis:
lumiDict = self._getLumiList(blockName=fileBlockName, validFileOnly=validFileOnly)
Expand All @@ -487,7 +519,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1
"""
if not self.blockExists(fileBlockName):
msg = "DBSReader.listFilesInBlockWithParents(%s): No matching data"
raise DBSReaderError(msg % fileBlockName)
raise DBSReaderError(msg % fileBlockName) from None

try:
# TODO: shoud we get only valid block for this?
Expand All @@ -499,7 +531,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1
msg += "DBSReader.listFilesInBlockWithParents(%s)\n" % (
fileBlockName,)
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

childByParents = defaultdict(list)
for f in files:
Expand All @@ -513,7 +545,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1
msg = "Error in "
msg += "DBSReader.listFilesInBlockWithParents(%s)\n There is no parents files" % (
fileBlockName)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

parentFilesDetail = []
# TODO: slicing parentLFNs util DBS api is handling that.
Expand Down Expand Up @@ -549,7 +581,7 @@ def lfnsInBlock(self, fileBlockName):
"""
if not self.blockExists(fileBlockName):
msg = "DBSReader.lfnsInBlock(%s): No matching data"
raise DBSReaderError(msg % fileBlockName)
raise DBSReaderError(msg % fileBlockName) from None

try:
lfns = self.dbs.listFileArray(block_name=fileBlockName, validFileOnly=1, detail=False)
Expand All @@ -558,7 +590,7 @@ def lfnsInBlock(self, fileBlockName):
msg = "Error in "
msg += "DBSReader.listFilesInBlock(%s)\n" % fileBlockName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

def listFileBlockLocation(self, fileBlockNames):
"""
Expand All @@ -581,15 +613,22 @@ def listFileBlockLocation(self, fileBlockNames):

blocksInfo = {}
try:
for block in fileBlockNames:
blocksInfo.setdefault(block, [])
# there should be only one element with a single origin site string ...
for blockInfo in self.dbs.listBlockOrigin(block_name=block):
blocksInfo[block].append(blockInfo['origin_site_name'])
if self.parallel:
data = dbsBlockOrigin(self.dbsURL, fileBlockNames)
for block, items in data.items():
blocksInfo.setdefault(block, [])
for blockInfo in items:
blocksInfo[block].append(blockInfo['origin_site_name'])
else:
for block in fileBlockNames:
blocksInfo.setdefault(block, [])
# there should be only one element with a single origin site string ...
for blockInfo in self.dbs.listBlockOrigin(block_name=block):
blocksInfo[block].append(blockInfo['origin_site_name'])
except dbsClientException as ex:
msg = "Error in DBS3Reader: self.dbs.listBlockOrigin(block_name=%s)\n" % fileBlockNames
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

for block in fileBlockNames:
valid_nodes = set(blocksInfo.get(block, [])) - node_filter
Expand Down Expand Up @@ -629,7 +668,7 @@ def getFileBlockWithParents(self, fileBlockName):

if not self.blockExists(fileBlockName):
msg = "DBSReader.getFileBlockWithParents(%s): No matching data"
raise DBSReaderError(msg % fileBlockName)
raise DBSReaderError(msg % fileBlockName) from None

result = {"PhEDExNodeNames": [], # FIXME: we better get rid of this line!
"Files": self.listFilesInBlockWithParents(fileBlockName)}
Expand Down Expand Up @@ -663,7 +702,7 @@ def blockToDatasetPath(self, blockName):
msg = "Error in "
msg += "DBSReader.blockToDatasetPath(%s)\n" % blockName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if blocks == []:
return None
Expand All @@ -686,7 +725,7 @@ def listDatasetLocation(self, datasetName):
except dbsClientException as ex:
msg = "Error in DBSReader: dbsApi.listBlocks(dataset=%s)\n" % datasetName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if not blocksInfo: # no data location from dbs
return list(locations)
Expand All @@ -703,25 +742,25 @@ def checkDatasetPath(self, pathName):
and datasets unknown to DBS. Otherwise None is returned.
"""
if pathName in ("", None):
raise DBSReaderError("Invalid Dataset Path name: => %s <=" % pathName)
raise DBSReaderError("Invalid Dataset Path name: => %s <=" % pathName) from None
else:
try:
result = self.dbs.listDatasets(dataset=pathName, dataset_access_type='*')
if len(result) == 0:
raise DBSReaderError("Dataset %s doesn't exist in DBS %s" % (pathName, self.dbsURL))
raise DBSReaderError("Dataset %s doesn't exist in DBS %s" % (pathName, self.dbsURL)) from None
except (dbsClientException, HTTPError) as ex:
msg = "Error in "
msg += "DBSReader.checkDatasetPath(%s)\n" % pathName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None
return

def checkBlockName(self, blockName):
"""
_checkBlockName_
"""
if blockName in ("", "*", None):
raise DBSReaderError("Invalid Block name: => %s <=" % blockName)
raise DBSReaderError("Invalid Block name: => %s <=" % blockName) from None

def getFileListByDataset(self, dataset, validFileOnly=1, detail=True):

Expand All @@ -740,7 +779,7 @@ def getFileListByDataset(self, dataset, validFileOnly=1, detail=True):
msg = "Error in "
msg += "DBSReader.getFileListByDataset(%s)\n" % dataset
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

def listDatasetParents(self, childDataset):
"""
Expand All @@ -753,7 +792,7 @@ def listDatasetParents(self, childDataset):
msg = "Error in "
msg += "DBSReader.listDatasetParents(%s)\n" % childDataset
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

# def getListFilesByLumiAndDataset(self, dataset, files):
# "Unsing pycurl to get all the child parents pair for given dataset"
Expand Down Expand Up @@ -782,6 +821,9 @@ def getParentFilesGivenParentDataset(self, parentDataset, childLFNs):
:return: set of parent files for childLFN
"""
fInfo = self.dbs.listFileLumiArray(logical_file_name=childLFNs)
if self.parallel:
return dbsParentFilesGivenParentDataset(self.dbsURL, parentDataset, fInfo)

parentFiles = defaultdict(set)
for f in fInfo:
pFileList = self.dbs.listFiles(dataset=parentDataset, run_num=f['run_num'], lumi_list=f['lumi_section_num'])
Expand Down Expand Up @@ -911,7 +953,8 @@ def fixMissingParentageDatasets(self, childDataset, insertFlag=True):
numFiles = self.findAndInsertMissingParentage(blockName, parentFullInfo, insertFlag=insertFlag)
self.logger.debug("%s file parentage added for block %s", numFiles, blockName)
except Exception as ex:
self.logger.exception("Parentage updated failed for block %s", blockName)
self.logger.exception(
"Parentage updated failed for block %s with error %s", blockName, str(ex))
failedBlocks.append(blockName)

return failedBlocks
Expand Down
Loading

0 comments on commit dfc5272

Please sign in to comment.