Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up listDatasetFileDetails API #11099

Merged
merged 1 commit into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 80 additions & 37 deletions src/python/WMCore/Services/DBS/DBS3Reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from Utils.IteratorTools import grouper
from Utils.PythonVersion import PY2
from WMCore.Services.DBS.DBSErrors import DBSReaderError, formatEx3
from WMCore.Services.DBS.DBSUtils import dbsListFileParents, dbsListFileLumis, \
dbsBlockOrigin, dbsParentFilesGivenParentDataset


### Needed for the pycurl comment, leave it out for now
Expand Down Expand Up @@ -72,17 +74,28 @@ class DBS3Reader(object):
General API for reading data from DBS
"""

def __init__(self, url, logger=None, **contact):
def __init__(self, url, logger=None, parallel=None, **contact):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please create a docstring for this class and specify what parallel is for?

vkuznet marked this conversation as resolved.
Show resolved Hide resolved
"""
DBS3Reader constructor
:param url: url of DBS server
:param logger: logger to be used by this class
:param parallel: optional parameter to specify parallel execution of some APIs
You may pass any true value, e.g. True or 1. The parallel APIs are:
listDatasetFileDetails, listFileBlockLocation, getParentFilesGivenParentDataset
:param contact: optional parameters to pass to DbsApi class
"""

# instantiate dbs api object
try:
self.dbsURL = url.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch")
self.dbs = DbsApi(self.dbsURL, **contact)
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.parallel = parallel
except Exception as ex:
msg = "Error in DBSReader with DbsApi\n"
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

def _getLumiList(self, blockName=None, lfns=None, validFileOnly=1):
"""
Expand All @@ -103,7 +116,7 @@ def _getLumiList(self, blockName=None, lfns=None, validFileOnly=1):
msg = "Error in "
msg += "DBSReader.listFileLumiArray(%s)\n" % lfns
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

lumiDict = {}
for lumisItem in lumiLists:
Expand All @@ -128,7 +141,7 @@ def checkDBSServer(self):
msg = "Error in "
msg += "DBS server is not up: %s" % self.dbsURL
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

def listPrimaryDatasets(self, match='*'):
"""
Expand All @@ -143,7 +156,7 @@ def listPrimaryDatasets(self, match='*'):
except Exception as ex:
msg = "Error in DBSReader.listPrimaryDataset(%s)\n" % match
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

result = [x['primary_ds_name'] for x in result]
return result
Expand All @@ -160,7 +173,7 @@ def matchProcessedDatasets(self, primary, tier, process):
except dbsClientException as ex:
msg = "Error in DBSReader.listProcessedDatasets(%s)\n" % primary
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

for dataset in datasets:
dataset = remapDBS3Keys(dataset, processed_ds_name='Name')
Expand Down Expand Up @@ -195,7 +208,7 @@ def listRuns(self, dataset=None, block=None):
except dbsClientException as ex:
msg = "Error in DBSReader.listRuns(%s, %s)\n" % (dataset, block)
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None
for x in results:
runs.extend(x['run_num'])
return runs
Expand Down Expand Up @@ -228,7 +241,7 @@ def listRunLumis(self, dataset=None, block=None):
except dbsClientException as ex:
msg = "Error in DBSReader.listRuns(%s, %s)\n" % (dataset, block)
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

# send runDict format as result, this format is for sync with dbs2 call
# which has {run_number: num_lumis} but dbs3 call doesn't return num Lumis
Expand All @@ -254,7 +267,7 @@ def listProcessedDatasets(self, primary, dataTier='*'):
except dbsClientException as ex:
msg = "Error in DBSReader.listProcessedDatasets(%s)\n" % primary
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

result = [x['dataset'].split('/')[2] for x in result]
return result
Expand Down Expand Up @@ -307,6 +320,25 @@ def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, v
files[f['logical_file_name']]['Lumis'] = {}
files[f['logical_file_name']]['Parents'] = []

# parallel execution for listFileParents and listFileLumis APIs
if self.parallel:
amaltaro marked this conversation as resolved.
Show resolved Hide resolved
if getParents:
block_parents = dbsListFileParents(self.dbsURL, blocks)
for blockName, parents in block_parents.items():
for p in parents:
if p['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
files[p['logical_file_name']]['Parents'].extend(p['parent_logical_file_name'])
if getLumis:
block_file_lumis = dbsListFileLumis(self.dbsURL, blocks)
for blockName, file_lumis in block_file_lumis.items():
for f in file_lumis:
if f['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
if f['run_num'] in files[f['logical_file_name']]['Lumis']:
files[f['logical_file_name']]['Lumis'][f['run_num']].extend(f['lumi_section_num'])
else:
files[f['logical_file_name']]['Lumis'][f['run_num']] = f['lumi_section_num']
return files

# Iterate over the blocks and get parents and lumis
for blockName in blocks:
# get the parents
Expand Down Expand Up @@ -344,7 +376,7 @@ def crossCheck(self, datasetPath, *lfns):
except Exception as exc:
msg = "Error in DBSReader.crossCheck({}) with {} lfns.".format(datasetPath, len(lfns))
msg += "\nDetails: {}\n".format(formatEx3(exc))
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None
setOfAllLfns = set(allLfns)
setOfKnownLfns = set(lfns)
return list(setOfAllLfns.intersection(setOfKnownLfns))
Expand All @@ -363,7 +395,7 @@ def crossCheckMissing(self, datasetPath, *lfns):
except Exception as exc:
msg = "Error in DBSReader.crossCheckMissing({}) with {} lfns.".format(datasetPath, len(lfns))
msg += "\nDetails: {}\n".format(formatEx3(exc))
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None
setOfAllLfns = set(allLfns)
setOfKnownLfns = set(lfns)
knownFiles = setOfAllLfns.intersection(setOfKnownLfns)
Expand All @@ -384,7 +416,7 @@ def getDBSSummaryInfo(self, dataset=None, block=None):
except Exception as ex:
msg = "Error in DBSReader.getDBSSummaryInfo(%s, %s)\n" % (dataset, block)
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if not summary: # missing data or all files invalid
return {}
Expand All @@ -411,7 +443,7 @@ def listFileBlocks(self, dataset, blockName=None):
except dbsClientException as ex:
msg = "Error in DBSReader.listFileBlocks(%s)\n" % dataset
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

result = [x['block_name'] for x in blocks]

Expand All @@ -435,7 +467,7 @@ def blockExists(self, fileBlockName):
msg = "Error in "
msg += "DBSReader.blockExists(%s)\n" % fileBlockName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if len(blocks) == 0:
return False
Expand All @@ -454,15 +486,15 @@ def listFilesInBlock(self, fileBlockName, lumis=True, validFileOnly=1):
"""
if not self.blockExists(fileBlockName):
msg = "DBSReader.listFilesInBlock(%s): No matching data"
raise DBSReaderError(msg % fileBlockName)
raise DBSReaderError(msg % fileBlockName) from None

try:
files = self.dbs.listFileArray(block_name=fileBlockName, validFileOnly=validFileOnly, detail=True)
except dbsClientException as ex:
msg = "Error in "
msg += "DBSReader.listFilesInBlock(%s)\n" % fileBlockName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if lumis:
lumiDict = self._getLumiList(blockName=fileBlockName, validFileOnly=validFileOnly)
Expand All @@ -487,7 +519,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1
"""
if not self.blockExists(fileBlockName):
msg = "DBSReader.listFilesInBlockWithParents(%s): No matching data"
raise DBSReaderError(msg % fileBlockName)
raise DBSReaderError(msg % fileBlockName) from None

try:
# TODO: shoud we get only valid block for this?
Expand All @@ -499,7 +531,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1
msg += "DBSReader.listFilesInBlockWithParents(%s)\n" % (
fileBlockName,)
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

childByParents = defaultdict(list)
for f in files:
Expand All @@ -513,7 +545,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1
msg = "Error in "
msg += "DBSReader.listFilesInBlockWithParents(%s)\n There is no parents files" % (
fileBlockName)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

parentFilesDetail = []
# TODO: slicing parentLFNs util DBS api is handling that.
Expand Down Expand Up @@ -549,7 +581,7 @@ def lfnsInBlock(self, fileBlockName):
"""
if not self.blockExists(fileBlockName):
msg = "DBSReader.lfnsInBlock(%s): No matching data"
raise DBSReaderError(msg % fileBlockName)
raise DBSReaderError(msg % fileBlockName) from None

try:
lfns = self.dbs.listFileArray(block_name=fileBlockName, validFileOnly=1, detail=False)
Expand All @@ -558,7 +590,7 @@ def lfnsInBlock(self, fileBlockName):
msg = "Error in "
msg += "DBSReader.listFilesInBlock(%s)\n" % fileBlockName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

def listFileBlockLocation(self, fileBlockNames):
"""
Expand All @@ -581,15 +613,22 @@ def listFileBlockLocation(self, fileBlockNames):

blocksInfo = {}
try:
for block in fileBlockNames:
blocksInfo.setdefault(block, [])
# there should be only one element with a single origin site string ...
for blockInfo in self.dbs.listBlockOrigin(block_name=block):
blocksInfo[block].append(blockInfo['origin_site_name'])
if self.parallel:
data = dbsBlockOrigin(self.dbsURL, fileBlockNames)
for block, items in data.items():
blocksInfo.setdefault(block, [])
for blockInfo in items:
blocksInfo[block].append(blockInfo['origin_site_name'])
else:
for block in fileBlockNames:
blocksInfo.setdefault(block, [])
# there should be only one element with a single origin site string ...
for blockInfo in self.dbs.listBlockOrigin(block_name=block):
blocksInfo[block].append(blockInfo['origin_site_name'])
except dbsClientException as ex:
msg = "Error in DBS3Reader: self.dbs.listBlockOrigin(block_name=%s)\n" % fileBlockNames
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

for block in fileBlockNames:
valid_nodes = set(blocksInfo.get(block, [])) - node_filter
Expand Down Expand Up @@ -629,7 +668,7 @@ def getFileBlockWithParents(self, fileBlockName):

if not self.blockExists(fileBlockName):
msg = "DBSReader.getFileBlockWithParents(%s): No matching data"
raise DBSReaderError(msg % fileBlockName)
raise DBSReaderError(msg % fileBlockName) from None

result = {"PhEDExNodeNames": [], # FIXME: we better get rid of this line!
"Files": self.listFilesInBlockWithParents(fileBlockName)}
Expand Down Expand Up @@ -663,7 +702,7 @@ def blockToDatasetPath(self, blockName):
msg = "Error in "
msg += "DBSReader.blockToDatasetPath(%s)\n" % blockName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if blocks == []:
return None
Expand All @@ -686,7 +725,7 @@ def listDatasetLocation(self, datasetName):
except dbsClientException as ex:
msg = "Error in DBSReader: dbsApi.listBlocks(dataset=%s)\n" % datasetName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

if not blocksInfo: # no data location from dbs
return list(locations)
Expand All @@ -703,25 +742,25 @@ def checkDatasetPath(self, pathName):
and datasets unknown to DBS. Otherwise None is returned.
"""
if pathName in ("", None):
raise DBSReaderError("Invalid Dataset Path name: => %s <=" % pathName)
raise DBSReaderError("Invalid Dataset Path name: => %s <=" % pathName) from None
else:
try:
result = self.dbs.listDatasets(dataset=pathName, dataset_access_type='*')
if len(result) == 0:
raise DBSReaderError("Dataset %s doesn't exist in DBS %s" % (pathName, self.dbsURL))
raise DBSReaderError("Dataset %s doesn't exist in DBS %s" % (pathName, self.dbsURL)) from None
except (dbsClientException, HTTPError) as ex:
msg = "Error in "
msg += "DBSReader.checkDatasetPath(%s)\n" % pathName
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None
return

def checkBlockName(self, blockName):
"""
_checkBlockName_
"""
if blockName in ("", "*", None):
raise DBSReaderError("Invalid Block name: => %s <=" % blockName)
raise DBSReaderError("Invalid Block name: => %s <=" % blockName) from None

def getFileListByDataset(self, dataset, validFileOnly=1, detail=True):

Expand All @@ -740,7 +779,7 @@ def getFileListByDataset(self, dataset, validFileOnly=1, detail=True):
msg = "Error in "
msg += "DBSReader.getFileListByDataset(%s)\n" % dataset
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

def listDatasetParents(self, childDataset):
"""
Expand All @@ -753,7 +792,7 @@ def listDatasetParents(self, childDataset):
msg = "Error in "
msg += "DBSReader.listDatasetParents(%s)\n" % childDataset
msg += "%s\n" % formatEx3(ex)
raise DBSReaderError(msg)
raise DBSReaderError(msg) from None

# def getListFilesByLumiAndDataset(self, dataset, files):
# "Unsing pycurl to get all the child parents pair for given dataset"
Expand Down Expand Up @@ -782,6 +821,9 @@ def getParentFilesGivenParentDataset(self, parentDataset, childLFNs):
:return: set of parent files for childLFN
"""
fInfo = self.dbs.listFileLumiArray(logical_file_name=childLFNs)
if self.parallel:
return dbsParentFilesGivenParentDataset(self.dbsURL, parentDataset, fInfo)

parentFiles = defaultdict(set)
for f in fInfo:
pFileList = self.dbs.listFiles(dataset=parentDataset, run_num=f['run_num'], lumi_list=f['lumi_section_num'])
Expand Down Expand Up @@ -911,7 +953,8 @@ def fixMissingParentageDatasets(self, childDataset, insertFlag=True):
numFiles = self.findAndInsertMissingParentage(blockName, parentFullInfo, insertFlag=insertFlag)
self.logger.debug("%s file parentage added for block %s", numFiles, blockName)
except Exception as ex:
self.logger.exception("Parentage updated failed for block %s", blockName)
self.logger.exception(
"Parentage updated failed for block %s with error %s", blockName, str(ex))
failedBlocks.append(blockName)

return failedBlocks
Expand Down
Loading