From dfc527273fe2346236041261e32f8815add7fa87 Mon Sep 17 00:00:00 2001 From: Valentin Kuznetsov Date: Fri, 15 Apr 2022 08:12:39 -0400 Subject: [PATCH] speed up listDatasetFileDetails API --- src/python/WMCore/Services/DBS/DBS3Reader.py | 117 +++++++++----- src/python/WMCore/Services/DBS/DBSUtils.py | 139 +++++++++++++++++ .../WMCore_t/Services_t/DBS_t/DBSUtils_t.py | 145 ++++++++++++++++++ 3 files changed, 364 insertions(+), 37 deletions(-) create mode 100644 src/python/WMCore/Services/DBS/DBSUtils.py create mode 100644 test/python/WMCore_t/Services_t/DBS_t/DBSUtils_t.py diff --git a/src/python/WMCore/Services/DBS/DBS3Reader.py b/src/python/WMCore/Services/DBS/DBS3Reader.py index 488c3117f4..0305f2559f 100644 --- a/src/python/WMCore/Services/DBS/DBS3Reader.py +++ b/src/python/WMCore/Services/DBS/DBS3Reader.py @@ -23,6 +23,8 @@ from Utils.IteratorTools import grouper from Utils.PythonVersion import PY2 from WMCore.Services.DBS.DBSErrors import DBSReaderError, formatEx3 +from WMCore.Services.DBS.DBSUtils import dbsListFileParents, dbsListFileLumis, \ + dbsBlockOrigin, dbsParentFilesGivenParentDataset ### Needed for the pycurl comment, leave it out for now @@ -72,17 +74,28 @@ class DBS3Reader(object): General API for reading data from DBS """ - def __init__(self, url, logger=None, **contact): + def __init__(self, url, logger=None, parallel=None, **contact): + """ + DBS3Reader constructor + + :param url: url of DBS server + :param logger: logger to be used by this class + :param parallel: optional parameter to specify parallel execution of some APIs + You may pass any true value, e.g. True or 1. The parallel APIs are: + listDatasetFileDetails, listFileBlockLocation, getParentFilesGivenParentDataset + :param contact: optional parameters to pass to DbsApi class + """ # instantiate dbs api object try: self.dbsURL = url.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch") self.dbs = DbsApi(self.dbsURL, **contact) self.logger = logger or logging.getLogger(self.__class__.__name__) + self.parallel = parallel except Exception as ex: msg = "Error in DBSReader with DbsApi\n" msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None def _getLumiList(self, blockName=None, lfns=None, validFileOnly=1): """ @@ -103,7 +116,7 @@ def _getLumiList(self, blockName=None, lfns=None, validFileOnly=1): msg = "Error in " msg += "DBSReader.listFileLumiArray(%s)\n" % lfns msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None lumiDict = {} for lumisItem in lumiLists: @@ -128,7 +141,7 @@ def checkDBSServer(self): msg = "Error in " msg += "DBS server is not up: %s" % self.dbsURL msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None def listPrimaryDatasets(self, match='*'): """ @@ -143,7 +156,7 @@ def listPrimaryDatasets(self, match='*'): except Exception as ex: msg = "Error in DBSReader.listPrimaryDataset(%s)\n" % match msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None result = [x['primary_ds_name'] for x in result] return result @@ -160,7 +173,7 @@ def matchProcessedDatasets(self, primary, tier, process): except dbsClientException as ex: msg = "Error in DBSReader.listProcessedDatasets(%s)\n" % primary msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None for dataset in datasets: dataset = remapDBS3Keys(dataset, processed_ds_name='Name') @@ -195,7 +208,7 @@ def listRuns(self, dataset=None, block=None): except dbsClientException as ex: msg = "Error in DBSReader.listRuns(%s, %s)\n" % (dataset, block) msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None for x in results: runs.extend(x['run_num']) return runs @@ -228,7 +241,7 @@ def listRunLumis(self, dataset=None, block=None): except dbsClientException as ex: msg = "Error in DBSReader.listRuns(%s, %s)\n" % (dataset, block) msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None # send runDict format as result, this format is for sync with dbs2 call # which has {run_number: num_lumis} but dbs3 call doesn't return num Lumis @@ -254,7 +267,7 @@ def listProcessedDatasets(self, primary, dataTier='*'): except dbsClientException as ex: msg = "Error in DBSReader.listProcessedDatasets(%s)\n" % primary msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None result = [x['dataset'].split('/')[2] for x in result] return result @@ -307,6 +320,25 @@ def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, v files[f['logical_file_name']]['Lumis'] = {} files[f['logical_file_name']]['Parents'] = [] + # parallel execution for listFileParents and listFileLumis APIs + if self.parallel: + if getParents: + block_parents = dbsListFileParents(self.dbsURL, blocks) + for blockName, parents in block_parents.items(): + for p in parents: + if p['logical_file_name'] in files: # invalid files are not there if validFileOnly=1 + files[p['logical_file_name']]['Parents'].extend(p['parent_logical_file_name']) + if getLumis: + block_file_lumis = dbsListFileLumis(self.dbsURL, blocks) + for blockName, file_lumis in block_file_lumis.items(): + for f in file_lumis: + if f['logical_file_name'] in files: # invalid files are not there if validFileOnly=1 + if f['run_num'] in files[f['logical_file_name']]['Lumis']: + files[f['logical_file_name']]['Lumis'][f['run_num']].extend(f['lumi_section_num']) + else: + files[f['logical_file_name']]['Lumis'][f['run_num']] = f['lumi_section_num'] + return files + # Iterate over the blocks and get parents and lumis for blockName in blocks: # get the parents @@ -344,7 +376,7 @@ def crossCheck(self, datasetPath, *lfns): except Exception as exc: msg = "Error in DBSReader.crossCheck({}) with {} lfns.".format(datasetPath, len(lfns)) msg += "\nDetails: {}\n".format(formatEx3(exc)) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None setOfAllLfns = set(allLfns) setOfKnownLfns = set(lfns) return list(setOfAllLfns.intersection(setOfKnownLfns)) @@ -363,7 +395,7 @@ def crossCheckMissing(self, datasetPath, *lfns): except Exception as exc: msg = "Error in DBSReader.crossCheckMissing({}) with {} lfns.".format(datasetPath, len(lfns)) msg += "\nDetails: {}\n".format(formatEx3(exc)) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None setOfAllLfns = set(allLfns) setOfKnownLfns = set(lfns) knownFiles = setOfAllLfns.intersection(setOfKnownLfns) @@ -384,7 +416,7 @@ def getDBSSummaryInfo(self, dataset=None, block=None): except Exception as ex: msg = "Error in DBSReader.getDBSSummaryInfo(%s, %s)\n" % (dataset, block) msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if not summary: # missing data or all files invalid return {} @@ -411,7 +443,7 @@ def listFileBlocks(self, dataset, blockName=None): except dbsClientException as ex: msg = "Error in DBSReader.listFileBlocks(%s)\n" % dataset msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None result = [x['block_name'] for x in blocks] @@ -435,7 +467,7 @@ def blockExists(self, fileBlockName): msg = "Error in " msg += "DBSReader.blockExists(%s)\n" % fileBlockName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if len(blocks) == 0: return False @@ -454,7 +486,7 @@ def listFilesInBlock(self, fileBlockName, lumis=True, validFileOnly=1): """ if not self.blockExists(fileBlockName): msg = "DBSReader.listFilesInBlock(%s): No matching data" - raise DBSReaderError(msg % fileBlockName) + raise DBSReaderError(msg % fileBlockName) from None try: files = self.dbs.listFileArray(block_name=fileBlockName, validFileOnly=validFileOnly, detail=True) @@ -462,7 +494,7 @@ def listFilesInBlock(self, fileBlockName, lumis=True, validFileOnly=1): msg = "Error in " msg += "DBSReader.listFilesInBlock(%s)\n" % fileBlockName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if lumis: lumiDict = self._getLumiList(blockName=fileBlockName, validFileOnly=validFileOnly) @@ -487,7 +519,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1 """ if not self.blockExists(fileBlockName): msg = "DBSReader.listFilesInBlockWithParents(%s): No matching data" - raise DBSReaderError(msg % fileBlockName) + raise DBSReaderError(msg % fileBlockName) from None try: # TODO: shoud we get only valid block for this? @@ -499,7 +531,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1 msg += "DBSReader.listFilesInBlockWithParents(%s)\n" % ( fileBlockName,) msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None childByParents = defaultdict(list) for f in files: @@ -513,7 +545,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1 msg = "Error in " msg += "DBSReader.listFilesInBlockWithParents(%s)\n There is no parents files" % ( fileBlockName) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None parentFilesDetail = [] # TODO: slicing parentLFNs util DBS api is handling that. @@ -549,7 +581,7 @@ def lfnsInBlock(self, fileBlockName): """ if not self.blockExists(fileBlockName): msg = "DBSReader.lfnsInBlock(%s): No matching data" - raise DBSReaderError(msg % fileBlockName) + raise DBSReaderError(msg % fileBlockName) from None try: lfns = self.dbs.listFileArray(block_name=fileBlockName, validFileOnly=1, detail=False) @@ -558,7 +590,7 @@ def lfnsInBlock(self, fileBlockName): msg = "Error in " msg += "DBSReader.listFilesInBlock(%s)\n" % fileBlockName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None def listFileBlockLocation(self, fileBlockNames): """ @@ -581,15 +613,22 @@ def listFileBlockLocation(self, fileBlockNames): blocksInfo = {} try: - for block in fileBlockNames: - blocksInfo.setdefault(block, []) - # there should be only one element with a single origin site string ... - for blockInfo in self.dbs.listBlockOrigin(block_name=block): - blocksInfo[block].append(blockInfo['origin_site_name']) + if self.parallel: + data = dbsBlockOrigin(self.dbsURL, fileBlockNames) + for block, items in data.items(): + blocksInfo.setdefault(block, []) + for blockInfo in items: + blocksInfo[block].append(blockInfo['origin_site_name']) + else: + for block in fileBlockNames: + blocksInfo.setdefault(block, []) + # there should be only one element with a single origin site string ... + for blockInfo in self.dbs.listBlockOrigin(block_name=block): + blocksInfo[block].append(blockInfo['origin_site_name']) except dbsClientException as ex: msg = "Error in DBS3Reader: self.dbs.listBlockOrigin(block_name=%s)\n" % fileBlockNames msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None for block in fileBlockNames: valid_nodes = set(blocksInfo.get(block, [])) - node_filter @@ -629,7 +668,7 @@ def getFileBlockWithParents(self, fileBlockName): if not self.blockExists(fileBlockName): msg = "DBSReader.getFileBlockWithParents(%s): No matching data" - raise DBSReaderError(msg % fileBlockName) + raise DBSReaderError(msg % fileBlockName) from None result = {"PhEDExNodeNames": [], # FIXME: we better get rid of this line! "Files": self.listFilesInBlockWithParents(fileBlockName)} @@ -663,7 +702,7 @@ def blockToDatasetPath(self, blockName): msg = "Error in " msg += "DBSReader.blockToDatasetPath(%s)\n" % blockName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if blocks == []: return None @@ -686,7 +725,7 @@ def listDatasetLocation(self, datasetName): except dbsClientException as ex: msg = "Error in DBSReader: dbsApi.listBlocks(dataset=%s)\n" % datasetName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if not blocksInfo: # no data location from dbs return list(locations) @@ -703,17 +742,17 @@ def checkDatasetPath(self, pathName): and datasets unknown to DBS. Otherwise None is returned. """ if pathName in ("", None): - raise DBSReaderError("Invalid Dataset Path name: => %s <=" % pathName) + raise DBSReaderError("Invalid Dataset Path name: => %s <=" % pathName) from None else: try: result = self.dbs.listDatasets(dataset=pathName, dataset_access_type='*') if len(result) == 0: - raise DBSReaderError("Dataset %s doesn't exist in DBS %s" % (pathName, self.dbsURL)) + raise DBSReaderError("Dataset %s doesn't exist in DBS %s" % (pathName, self.dbsURL)) from None except (dbsClientException, HTTPError) as ex: msg = "Error in " msg += "DBSReader.checkDatasetPath(%s)\n" % pathName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None return def checkBlockName(self, blockName): @@ -721,7 +760,7 @@ def checkBlockName(self, blockName): _checkBlockName_ """ if blockName in ("", "*", None): - raise DBSReaderError("Invalid Block name: => %s <=" % blockName) + raise DBSReaderError("Invalid Block name: => %s <=" % blockName) from None def getFileListByDataset(self, dataset, validFileOnly=1, detail=True): @@ -740,7 +779,7 @@ def getFileListByDataset(self, dataset, validFileOnly=1, detail=True): msg = "Error in " msg += "DBSReader.getFileListByDataset(%s)\n" % dataset msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None def listDatasetParents(self, childDataset): """ @@ -753,7 +792,7 @@ def listDatasetParents(self, childDataset): msg = "Error in " msg += "DBSReader.listDatasetParents(%s)\n" % childDataset msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None # def getListFilesByLumiAndDataset(self, dataset, files): # "Unsing pycurl to get all the child parents pair for given dataset" @@ -782,6 +821,9 @@ def getParentFilesGivenParentDataset(self, parentDataset, childLFNs): :return: set of parent files for childLFN """ fInfo = self.dbs.listFileLumiArray(logical_file_name=childLFNs) + if self.parallel: + return dbsParentFilesGivenParentDataset(self.dbsURL, parentDataset, fInfo) + parentFiles = defaultdict(set) for f in fInfo: pFileList = self.dbs.listFiles(dataset=parentDataset, run_num=f['run_num'], lumi_list=f['lumi_section_num']) @@ -911,7 +953,8 @@ def fixMissingParentageDatasets(self, childDataset, insertFlag=True): numFiles = self.findAndInsertMissingParentage(blockName, parentFullInfo, insertFlag=insertFlag) self.logger.debug("%s file parentage added for block %s", numFiles, blockName) except Exception as ex: - self.logger.exception("Parentage updated failed for block %s", blockName) + self.logger.exception( + "Parentage updated failed for block %s with error %s", blockName, str(ex)) failedBlocks.append(blockName) return failedBlocks diff --git a/src/python/WMCore/Services/DBS/DBSUtils.py b/src/python/WMCore/Services/DBS/DBSUtils.py new file mode 100644 index 0000000000..246bb10dd2 --- /dev/null +++ b/src/python/WMCore/Services/DBS/DBSUtils.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +""" +_DBSUtils_ + +set of common utilities for DBS3Reader + +""" +import json +import urllib +from urllib.parse import urlparse, parse_qs, quote_plus +from collections import defaultdict + +from Utils.CertTools import cert, ckey +from dbs.apis.dbsClient import aggFileLumis, aggFileParents +from WMCore.Services.pycurl_manager import getdata as multi_getdata +from Utils.PortForward import PortForward + + +def dbsListFileParents(dbsUrl, blocks): + """ + Concurrent counter part of DBS listFileParents API + + :param dbsUrl: DBS URL + :param blocks: list of blocks + :return: list of file parents + """ + urls = ['%s/fileparents?block_name=%s' % (dbsUrl, quote_plus(b)) for b in blocks] + func = aggFileParents + uKey = 'block_name' + return getUrls(urls, func, uKey) + + +def dbsListFileLumis(dbsUrl, blocks): + """ + Concurrent counter part of DBS listFileLumis API + + :param dbsUrl: DBS URL + :param blocks: list of blocks + :return: list of file lumis + """ + urls = ['%s/filelumis?block_name=%s' % (dbsUrl, quote_plus(b)) for b in blocks] + func = aggFileLumis + uKey = 'block_name' + return getUrls(urls, func, uKey) + + +def dbsBlockOrigin(dbsUrl, blocks): + """ + Concurrent counter part of DBS files API + + :param dbsUrl: DBS URL + :param blocks: list of blocks + :return: list of block origins for a given parent lfns + """ + urls = ['%s/blockorigin?block_name=%s' % (dbsUrl, quote_plus(b)) for b in blocks] + func = None + uKey = 'block_name' + return getUrls(urls, func, uKey) + + +def dbsParentFilesGivenParentDataset(dbsUrl, parentDataset, fInfo): + """ + Obtain parent files for given fileInfo object + + :param dbsUrl: DBS URL + :param parentDataset: parent dataset name + :param fInfo: file info object + :return: list of parent files for given file info object + """ + portForwarder = PortForward(8443) + urls = [] + for fileInfo in fInfo: + run = fileInfo['run_num'] + lumis = urllib.parse.quote_plus(str(fileInfo['lumi_section_num'])) + url = f'{dbsUrl}/files?dataset={parentDataset}&run_num={run}&lumi_list={lumis}' + urls.append(portForwarder(url)) + func = None + uKey = None + rdict = getUrls(urls, func, uKey) + parentFiles = defaultdict(set) + for fileInfo in fInfo: + run = fileInfo['run_num'] + lumis = urllib.parse.quote_plus(str(fileInfo['lumi_section_num'])) + url = f'{dbsUrl}/files?dataset={parentDataset}&run_num={run}&lumi_list={lumis}' + url = portForwarder(url) + if url in rdict: + pFileList = rdict[url] + pFiles = {x['logical_file_name'] for x in pFileList} + parentFiles[fileInfo['logical_file_name']] = \ + parentFiles[fileInfo['logical_file_name']].union(pFiles) + return parentFiles + + +def getUrls(urls, aggFunc, uKey=None): + """ + Perform parallel DBS calls for given set of urls and apply given aggregation + function to the results. + + :param urls: list of DBS urls to call + :param aggFunc: aggregation function + :param uKey: url parameter to use for final dictionary + :return: dictionary of resuls where keys are urls and values are obtained results + """ + data = multi_getdata(urls, ckey(), cert()) + + rdict = {} + for row in data: + url = row['url'] + code = int(row.get('code', 200)) + error = row.get('error') + if code != 200: + msg = f"Fail to query {url}. Error: {code} {error}" + raise RuntimeError(msg) + if uKey: + key = urlParams(url).get(uKey) + else: + key = url + data = row.get('data', []) + res = json.loads(data) + if aggFunc: + rdict[key] = aggFunc(res) + else: + rdict[key] = res + return rdict + + +def urlParams(url): + """ + Return dictionary of URL parameters + + :param url: URL link + :return: dictionary of URL parameters + """ + parsedUrl = urlparse(url) + rdict = parse_qs(parsedUrl.query) + for key, vals in rdict.items(): + if len(vals) == 1: + rdict[key] = vals[0] + return rdict diff --git a/test/python/WMCore_t/Services_t/DBS_t/DBSUtils_t.py b/test/python/WMCore_t/Services_t/DBS_t/DBSUtils_t.py new file mode 100644 index 0000000000..d1369e68a9 --- /dev/null +++ b/test/python/WMCore_t/Services_t/DBS_t/DBSUtils_t.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python +""" +_DBSUtils_t_ + +Unit test for the DBSUtils module +""" + +import time +import unittest + +from nose.plugins.attrib import attr + +from WMCore.Services.DBS.DBSUtils import urlParams +from WMCore.Services.DBS.DBS3Reader import DBS3Reader + + +class DBSUtilsTest(unittest.TestCase): + """ + DBSUtilsTest represent unit test class + """ + + def testUrlParams(self): + """ + urlParams should return dictionary of URL parameters + """ + url = 'http://a.b.com?d=1&f=bla' + results = urlParams(url) + self.assertCountEqual(results, {'d': '1', 'f': 'bla'}) + self.assertTrue(results.get('d'), 1) + self.assertTrue(results.get('f'), 'bla') + + url = 'http://a.b.com?d=1&f=bla&d=2' + results = urlParams(url) + self.assertCountEqual(results, {'d': ['1', '2'], 'f': 'bla'}) + + @attr("integration") + def testGetParallelListDatasetFileDetails(self): + """ + test parallel execution of listDatasetFileDetails DBS API + We use small dataset with the following characteristics: + + dasgoclient -query="dataset=/VBF1Parked/HIRun2013A-v1/RAW summary" | jq + [ + { + "file_size": 6053097, + "nblocks": 7, + "nevents": 0, + "nfiles": 7, + "nlumis": 428, + "num_block": 7, + "num_event": 0, + "num_file": 7, + "num_lumi": 428 + } + ] + + The parallel call should perform better then sequential while results + should remain the same. + + """ + url = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" + reader1 = DBS3Reader(url, logger=None, parallel=False, aggregate=True) + reader2 = DBS3Reader(url, logger=None, parallel=True, aggregate=True) + dataset = '/VBF1Parked/HIRun2013A-v1/RAW' + time0 = time.time() + res1 = reader1.listDatasetFileDetails(dataset) + time1 = time.time() - time0 + self.assertTrue(time1 > 0) # to avoid pyling complaining about not used varaiable + time0 = time.time() + res2 = reader2.listDatasetFileDetails(dataset) + time2 = time.time() - time0 + self.assertTrue(time2 > 0) # to avoid pyling complaining about not used varaiable + self.assertTrue(res1 == res2) + + @attr("integration") + def testGetParallelListFileBlockLocation(self): + """ + test parallel execution of listFileBlockLocation DBS API + We use small dataset with the following data: + + dasgoclient -query="block dataset=/VBF1Parked/HIRun2013A-v1/RAW" + + The parallel call should perform better then sequential while results + should remain the same. + + """ + url = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" + reader1 = DBS3Reader(url, logger=None, parallel=False, aggregate=True) + reader2 = DBS3Reader(url, logger=None, parallel=True, aggregate=True) + blocks = [ + '/VBF1Parked/HIRun2013A-v1/RAW#52da10be-5c87-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#6861d50a-5c7f-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#6dd88910-5c80-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#6e258f12-5c80-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#fc64292c-5c81-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#fc87c8dc-5c81-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#fcd9876c-5c81-11e2-912f-842b2b4671d8' + ] + time0 = time.time() + res1 = reader1.listFileBlockLocation(blocks) + time1 = time.time() - time0 + self.assertTrue(time1 > 0) # to avoid pyling complaining about not used varaiable + time0 = time.time() + res2 = reader2.listFileBlockLocation(blocks) + time2 = time.time() - time0 + self.assertTrue(time2 > 0) # to avoid pyling complaining about not used varaiable + self.assertTrue(res1 == res2) + + @attr("integration") + def testGetParallelGetParentFilesGivenParentDataset(self): + """ + test parallel execution of getParentFilesGivenParentDataset DBS API + We use small the following data: + + # find lfn for some dataset + dasgoclient -query="file dataset=/VBF1Parked/Run2012D-22Jan2013-v1/AOD" + ... + /store/data/Run2012D/VBF1Parked/AOD/22Jan2013-v1/120000/F64DFA15-15A8-E211-9277-80000048FE80.root + + # find parent dataset + dasgoclient -query="parent dataset=/VBF1Parked/Run2012D-22Jan2013-v1/AOD" + /VBF1Parked/Run2012D-v1/RAW + + The parallel call should perform better then sequential while results + should remain the same. + + """ + url = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" + reader1 = DBS3Reader(url, logger=None, parallel=False, aggregate=True) + reader2 = DBS3Reader(url, logger=None, parallel=True, aggregate=True) + parentDataset = '/VBF1Parked/Run2012D-v1/RAW' + childLFN = '/store/data/Run2012D/VBF1Parked/AOD/22Jan2013-v1/120000/F64DFA15-15A8-E211-9277-80000048FE80.root' + time0 = time.time() + res1 = reader1.getParentFilesGivenParentDataset(parentDataset, childLFN) + time1 = time.time() - time0 + self.assertTrue(time1 > 0) # to avoid pyling complaining about not used varaiable + time0 = time.time() + res2 = reader2.getParentFilesGivenParentDataset(parentDataset, childLFN) + time2 = time.time() - time0 + self.assertTrue(time2 > 0) # to avoid pyling complaining about not used varaiable + self.assertTrue(res1 == res2) + + +if __name__ == '__main__': + unittest.main()