From f07765c11698db8879699c9a50d4fdcc4c4b3816 Mon Sep 17 00:00:00 2001 From: Valentin Kuznetsov Date: Fri, 15 Apr 2022 09:54:25 -0400 Subject: [PATCH] Add support for pycurl_manager multi_getdata --- src/python/WMCore/Services/DBS/DBS3Reader.py | 181 ++++++------------ src/python/WMCore/Services/DBS/DBSUtils.py | 140 ++++++++++++++ .../WMCore_t/Services_t/DBS_t/DBSUtils_t.py | 146 ++++++++++++++ 3 files changed, 343 insertions(+), 124 deletions(-) create mode 100644 src/python/WMCore/Services/DBS/DBSUtils.py create mode 100644 test/python/WMCore_t/Services_t/DBS_t/DBSUtils_t.py diff --git a/src/python/WMCore/Services/DBS/DBS3Reader.py b/src/python/WMCore/Services/DBS/DBS3Reader.py index d4cbdb8ae39..a9cfba5c794 100644 --- a/src/python/WMCore/Services/DBS/DBS3Reader.py +++ b/src/python/WMCore/Services/DBS/DBS3Reader.py @@ -7,10 +7,8 @@ """ from __future__ import print_function, division -import os from builtins import object, str, bytes from future.utils import viewitems -import concurrent.futures from Utils.Utilities import decodeBytesToUnicode, encodeUnicodeToBytesConditional @@ -18,96 +16,21 @@ from collections import defaultdict from RestClient.ErrorHandling.RestClientExceptions import HTTPError -from dbs.apis.dbsClient import DbsApi, aggFileLumis, aggFileParents +from dbs.apis.dbsClient import DbsApi from dbs.exceptions.dbsClientException import dbsClientException from retry import retry from Utils.IteratorTools import grouper from Utils.PythonVersion import PY2 from WMCore.Services.DBS.DBSErrors import DBSReaderError, formatEx3 +from WMCore.Services.DBS.DBSUtils import dbsListFileParents, dbsListFileLumis, \ + dbsBlockOrigin, dbsParentFilesGivenParentDataset -# third-party libraries -import requests ### Needed for the pycurl comment, leave it out for now # from WMCore.Services.pycurl_manager import getdata as multi_getdata -# global DBS URL to be used by stand-alone functions in concurrent calls -# see wrapperBlock functions -DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader' - -def dbsRequest(url): - """ - Wrapper function to perform call to given URL using client's certificates - - :param url: url name - :return: results - """ - crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem') - key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem') - if os.environ.get('X509_USER_PROXY', ''): - crt = os.getenv('X509_USER_PROXY') - key = os.getenv('X509_USER_PROXY') - if not os.path.exists(crt): - raise Exception('Unable to locate user X509 certificate file') from None - if not os.path.exists(key): - raise Exception('Unable to locate user X509 key certificate file') from None - caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates') - if not os.path.exists(caPath): - raise Exception('Unable to locate X509_CERT_DIR') from None - res = requests.get(url, - cert=(crt, key), - headers={'Accept-Encoding': 'gzip'}, - verify=caPath) - return res.json() - -def wrapperBlockFileParents(blk): - """ - Wrapper function to perform DBS listFileParents API call for a given block - - :param blk: block name - :return: result of DBS listFileParents API - """ - blk = blk.replace('#', '%23') - url = '{}/fileparents?block_name={}'.format(DBS_URL, blk) - return aggFileParents(dbsRequest(url)) - -def wrapperBlockFileLumis(blk): - """ - Wrapper function to perform DBS listFileLumis API call for a given block - - :param blk: block name - :return: result of DBS listFileLumis API - """ - blk = blk.replace('#', '%23') - url = '{}/filelumis?block_name={}'.format(DBS_URL, blk) - return aggFileLumis(dbsRequest(url)) - -def runConcurrentProcess(func, blocks, maxWorkers=10): - """ - run concurrently set of processes for given function and set of blocks. - Internally, the code is based on Python concurrent futures primitives, see - # https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures - - :param func: the function to run - :param blocks: list of blocks our function should process - :param maxWorkers: maximum number of workers to use - """ - with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor: - # Start the load operations and mark each future with its blk - futures = {executor.submit(func, blk): blk for blk in blocks} - rdict = {} - for future in concurrent.futures.as_completed(futures): - blk = futures[future] - try: - data = future.result() - except Exception as exc: - rdict[blk] = [{'error': str(exc)}] - else: - rdict[blk] = data - return rdict - def remapDBS3Keys(data, stringify=False, **others): """Fields have been renamed between DBS2 and 3, take fields from DBS3 and map to DBS2 values @@ -152,19 +75,18 @@ class DBS3Reader(object): General API for reading data from DBS """ - def __init__(self, url, logger=None, **contact): + def __init__(self, url, logger=None, parallel=None, **contact): # instantiate dbs api object try: self.dbsURL = url.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch") - global DBS_URL - DBS_URL = self.dbsURL self.dbs = DbsApi(self.dbsURL, **contact) self.logger = logger or logging.getLogger(self.__class__.__name__) + self.parallel = parallel except Exception as ex: msg = "Error in DBSReader with DbsApi\n" msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None def _getLumiList(self, blockName=None, lfns=None, validFileOnly=1): """ @@ -185,7 +107,7 @@ def _getLumiList(self, blockName=None, lfns=None, validFileOnly=1): msg = "Error in " msg += "DBSReader.listFileLumiArray(%s)\n" % lfns msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None lumiDict = {} for lumisItem in lumiLists: @@ -210,7 +132,7 @@ def checkDBSServer(self): msg = "Error in " msg += "DBS server is not up: %s" % self.dbsURL msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None def listPrimaryDatasets(self, match='*'): """ @@ -225,7 +147,7 @@ def listPrimaryDatasets(self, match='*'): except Exception as ex: msg = "Error in DBSReader.listPrimaryDataset(%s)\n" % match msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None result = [x['primary_ds_name'] for x in result] return result @@ -242,7 +164,7 @@ def matchProcessedDatasets(self, primary, tier, process): except dbsClientException as ex: msg = "Error in DBSReader.listProcessedDatasets(%s)\n" % primary msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None for dataset in datasets: dataset = remapDBS3Keys(dataset, processed_ds_name='Name') @@ -277,7 +199,7 @@ def listRuns(self, dataset=None, block=None): except dbsClientException as ex: msg = "Error in DBSReader.listRuns(%s, %s)\n" % (dataset, block) msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None for x in results: runs.extend(x['run_num']) return runs @@ -310,7 +232,7 @@ def listRunLumis(self, dataset=None, block=None): except dbsClientException as ex: msg = "Error in DBSReader.listRuns(%s, %s)\n" % (dataset, block) msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None # send runDict format as result, this format is for sync with dbs2 call # which has {run_number: num_lumis} but dbs3 call doesn't return num Lumis @@ -336,7 +258,7 @@ def listProcessedDatasets(self, primary, dataTier='*'): except dbsClientException as ex: msg = "Error in DBSReader.listProcessedDatasets(%s)\n" % primary msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None result = [x['dataset'].split('/')[2] for x in result] return result @@ -358,7 +280,7 @@ def listDatatiers(self): """ return [tier['data_tier_name'] for tier in self.dbs.listDataTiers()] - def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, validFileOnly=1, parallel=0): + def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, validFileOnly=1): """ TODO: This is completely wrong need to be redone. or be removed - getting dataset altogether might be to costly @@ -390,15 +312,15 @@ def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, v files[f['logical_file_name']]['Parents'] = [] # parallel execution for listFileParents and listFileLumis APIs - if parallel: + if self.parallel: if getParents: - block_parents = runConcurrentProcess(wrapperBlockFileParents, blocks, parallel) + block_parents = dbsListFileParents(self.dbsURL, blocks) for blockName, parents in block_parents.items(): for p in parents: if p['logical_file_name'] in files: # invalid files are not there if validFileOnly=1 files[p['logical_file_name']]['Parents'].extend(p['parent_logical_file_name']) if getLumis: - block_file_lumis = runConcurrentProcess(wrapperBlockFileLumis, blocks, parallel) + block_file_lumis = dbsListFileLumis(self.dbsURL, blocks) for blockName, file_lumis in block_file_lumis.items(): for f in file_lumis: if f['logical_file_name'] in files: # invalid files are not there if validFileOnly=1 @@ -445,7 +367,7 @@ def crossCheck(self, datasetPath, *lfns): except Exception as exc: msg = "Error in DBSReader.crossCheck({}) with {} lfns.".format(datasetPath, len(lfns)) msg += "\nDetails: {}\n".format(formatEx3(exc)) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None setOfAllLfns = set(allLfns) setOfKnownLfns = set(lfns) return list(setOfAllLfns.intersection(setOfKnownLfns)) @@ -464,7 +386,7 @@ def crossCheckMissing(self, datasetPath, *lfns): except Exception as exc: msg = "Error in DBSReader.crossCheckMissing({}) with {} lfns.".format(datasetPath, len(lfns)) msg += "\nDetails: {}\n".format(formatEx3(exc)) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None setOfAllLfns = set(allLfns) setOfKnownLfns = set(lfns) knownFiles = setOfAllLfns.intersection(setOfKnownLfns) @@ -485,7 +407,7 @@ def getDBSSummaryInfo(self, dataset=None, block=None): except Exception as ex: msg = "Error in DBSReader.getDBSSummaryInfo(%s, %s)\n" % (dataset, block) msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if not summary: # missing data or all files invalid return {} @@ -513,7 +435,7 @@ def listFileBlocks(self, dataset, onlyClosedBlocks=False, blockName=None): except dbsClientException as ex: msg = "Error in DBSReader.listFileBlocks(%s)\n" % dataset msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if onlyClosedBlocks: result = [x['block_name'] for x in blocks if str(x['open_for_writing']) != "1"] @@ -536,7 +458,7 @@ def listOpenFileBlocks(self, dataset): except dbsClientException as ex: msg = "Error in DBSReader.listFileBlocks(%s)\n" % dataset msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None result = [x['block_name'] for x in blocks if str(x['open_for_writing']) == "1"] @@ -560,7 +482,7 @@ def blockExists(self, fileBlockName): msg = "Error in " msg += "DBSReader.blockExists(%s)\n" % fileBlockName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if len(blocks) == 0: return False @@ -579,7 +501,7 @@ def listFilesInBlock(self, fileBlockName, lumis=True, validFileOnly=1): """ if not self.blockExists(fileBlockName): msg = "DBSReader.listFilesInBlock(%s): No matching data" - raise DBSReaderError(msg % fileBlockName) + raise DBSReaderError(msg % fileBlockName) from None try: files = self.dbs.listFileArray(block_name=fileBlockName, validFileOnly=validFileOnly, detail=True) @@ -587,7 +509,7 @@ def listFilesInBlock(self, fileBlockName, lumis=True, validFileOnly=1): msg = "Error in " msg += "DBSReader.listFilesInBlock(%s)\n" % fileBlockName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if lumis: lumiDict = self._getLumiList(blockName=fileBlockName, validFileOnly=validFileOnly) @@ -612,7 +534,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1 """ if not self.blockExists(fileBlockName): msg = "DBSReader.listFilesInBlockWithParents(%s): No matching data" - raise DBSReaderError(msg % fileBlockName) + raise DBSReaderError(msg % fileBlockName) from None try: # TODO: shoud we get only valid block for this? @@ -624,7 +546,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1 msg += "DBSReader.listFilesInBlockWithParents(%s)\n" % ( fileBlockName,) msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None childByParents = defaultdict(list) for f in files: @@ -638,7 +560,7 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1 msg = "Error in " msg += "DBSReader.listFilesInBlockWithParents(%s)\n There is no parents files" % ( fileBlockName) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None parentFilesDetail = [] # TODO: slicing parentLFNs util DBS api is handling that. @@ -674,7 +596,7 @@ def lfnsInBlock(self, fileBlockName): """ if not self.blockExists(fileBlockName): msg = "DBSReader.lfnsInBlock(%s): No matching data" - raise DBSReaderError(msg % fileBlockName) + raise DBSReaderError(msg % fileBlockName) from None try: lfns = self.dbs.listFileArray(block_name=fileBlockName, validFileOnly=1, detail=False) @@ -683,7 +605,7 @@ def lfnsInBlock(self, fileBlockName): msg = "Error in " msg += "DBSReader.listFilesInBlock(%s)\n" % fileBlockName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None def listFileBlockLocation(self, fileBlockNames): """ @@ -706,15 +628,22 @@ def listFileBlockLocation(self, fileBlockNames): blocksInfo = {} try: - for block in fileBlockNames: - blocksInfo.setdefault(block, []) - # there should be only one element with a single origin site string ... - for blockInfo in self.dbs.listBlockOrigin(block_name=block): - blocksInfo[block].append(blockInfo['origin_site_name']) + if self.parallel: + data = dbsBlockOrigin(self.dbsURL, fileBlockNames) + for block, items in data.items(): + blocksInfo.setdefault(block, []) + for blockInfo in items: + blocksInfo[block].append(blockInfo['origin_site_name']) + else: + for block in fileBlockNames: + blocksInfo.setdefault(block, []) + # there should be only one element with a single origin site string ... + for blockInfo in self.dbs.listBlockOrigin(block_name=block): + blocksInfo[block].append(blockInfo['origin_site_name']) except dbsClientException as ex: msg = "Error in DBS3Reader: self.dbs.listBlockOrigin(block_name=%s)\n" % fileBlockNames msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None for block in fileBlockNames: valid_nodes = set(blocksInfo.get(block, [])) - node_filter @@ -757,7 +686,7 @@ def getFileBlockWithParents(self, fileBlockName): if not self.blockExists(fileBlockName): msg = "DBSReader.getFileBlockWithParents(%s): No matching data" - raise DBSReaderError(msg % fileBlockName) + raise DBSReaderError(msg % fileBlockName) from None result = {"PhEDExNodeNames": [], # FIXME: we better get rid of this line! "Files": self.listFilesInBlockWithParents(fileBlockName), @@ -810,7 +739,7 @@ def blockToDatasetPath(self, blockName): msg = "Error in " msg += "DBSReader.blockToDatasetPath(%s)\n" % blockName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if blocks == []: return None @@ -833,7 +762,7 @@ def listDatasetLocation(self, datasetName): except dbsClientException as ex: msg = "Error in DBSReader: dbsApi.listBlocks(dataset=%s)\n" % datasetName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None if not blocksInfo: # no data location from dbs return list(locations) @@ -850,17 +779,17 @@ def checkDatasetPath(self, pathName): and datasets unknown to DBS. Otherwise None is returned. """ if pathName in ("", None): - raise DBSReaderError("Invalid Dataset Path name: => %s <=" % pathName) + raise DBSReaderError("Invalid Dataset Path name: => %s <=" % pathName) from None else: try: result = self.dbs.listDatasets(dataset=pathName, dataset_access_type='*') if len(result) == 0: - raise DBSReaderError("Dataset %s doesn't exist in DBS %s" % (pathName, self.dbsURL)) + raise DBSReaderError("Dataset %s doesn't exist in DBS %s" % (pathName, self.dbsURL)) from None except (dbsClientException, HTTPError) as ex: msg = "Error in " msg += "DBSReader.checkDatasetPath(%s)\n" % pathName msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None return def checkBlockName(self, blockName): @@ -868,7 +797,7 @@ def checkBlockName(self, blockName): _checkBlockName_ """ if blockName in ("", "*", None): - raise DBSReaderError("Invalid Block name: => %s <=" % blockName) + raise DBSReaderError("Invalid Block name: => %s <=" % blockName) from None def getFileListByDataset(self, dataset, validFileOnly=1, detail=True): @@ -887,7 +816,7 @@ def getFileListByDataset(self, dataset, validFileOnly=1, detail=True): msg = "Error in " msg += "DBSReader.getFileListByDataset(%s)\n" % dataset msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None def listDatasetParents(self, childDataset): """ @@ -900,7 +829,7 @@ def listDatasetParents(self, childDataset): msg = "Error in " msg += "DBSReader.listDatasetParents(%s)\n" % childDataset msg += "%s\n" % formatEx3(ex) - raise DBSReaderError(msg) + raise DBSReaderError(msg) from None # def getListFilesByLumiAndDataset(self, dataset, files): # "Unsing pycurl to get all the child parents pair for given dataset" @@ -929,6 +858,9 @@ def getParentFilesGivenParentDataset(self, parentDataset, childLFNs): :return: set of parent files for childLFN """ fInfo = self.dbs.listFileLumiArray(logical_file_name=childLFNs) + if self.parallel: + return dbsParentFilesGivenParentDataset(self.dbsURL, parentDataset, fInfo) + parentFiles = defaultdict(set) for f in fInfo: pFileList = self.dbs.listFiles(dataset=parentDataset, run_num=f['run_num'], lumi_list=f['lumi_section_num']) @@ -1058,7 +990,8 @@ def fixMissingParentageDatasets(self, childDataset, insertFlag=True): numFiles = self.findAndInsertMissingParentage(blockName, parentFullInfo, insertFlag=insertFlag) self.logger.debug("%s file parentage added for block %s", numFiles, blockName) except Exception as ex: - self.logger.exception("Parentage updated failed for block %s", blockName) + self.logger.exception( + "Parentage updated failed for block %s with error %s", blockName, str(ex)) failedBlocks.append(blockName) return failedBlocks diff --git a/src/python/WMCore/Services/DBS/DBSUtils.py b/src/python/WMCore/Services/DBS/DBSUtils.py new file mode 100644 index 00000000000..ede4e297330 --- /dev/null +++ b/src/python/WMCore/Services/DBS/DBSUtils.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +""" +_DBSUtils_ + +set of common utilities for DBS3Reader + +""" +import json +import urllib +from urllib.parse import urlparse, parse_qs, quote_plus +from collections import defaultdict + +from Utils.CertTools import cert, ckey +from dbs.apis.dbsClient import aggFileLumis, aggFileParents +from WMCore.Services.pycurl_manager import getdata as multi_getdata + + +def dbsListFileParents(dbsUrl, blocks): + """ + Concurrent counter part of DBS listFileParents API + + :param dbsUrl: DBS URL + :param blocks: list of blocks + :return: list of file parents + """ + urls = ['%s/fileparents?block_name=%s' % (dbsUrl, quote_plus(b)) for b in blocks] + func = aggFileParents + uKey = 'block_name' + return getUrls(urls, func, uKey) + + +def dbsListFileLumis(dbsUrl, blocks): + """ + Concurrent counter part of DBS listFileLumis API + + :param dbsUrl: DBS URL + :param blocks: list of blocks + :return: list of file lumis + """ + urls = ['%s/filelumis?block_name=%s' % (dbsUrl, quote_plus(b)) for b in blocks] + func = aggFileLumis + uKey = 'block_name' + return getUrls(urls, func, uKey) + + +def dbsBlockOrigin(dbsUrl, blocks): + """ + Concurrent counter part of DBS files API + + :param dbsUrl: DBS URL + :param blocks: list of blocks + :return: list of block origins for a given parent lfns + """ + urls = ['%s/blockorigin?block_name=%s' % (dbsUrl, quote_plus(b)) for b in blocks] + func = None + uKey = 'block_name' + return getUrls(urls, func, uKey) + + +def dbsParentFilesGivenParentDataset(dbsUrl, parentDataset, fInfo): + """ + Obtain parent files for given fileInfo object + + :param dbsUrl: DBS URL + :param parentDataset: parent dataset name + :param fInfo: file info object + :return: list of parent files for given file info object + """ + urls = [] + for fileInfo in fInfo: + run = fileInfo['run_num'] + lumis = urllib.parse.quote_plus(str(fileInfo['lumi_section_num'])) + url = f'{dbsUrl}/files?dataset={parentDataset}&run_num={run}&lumi_list={lumis}' + if ':8443' not in url: + url = url.replace("cern.ch", "cern.ch:8443") + urls.append(url) + func = None + uKey = None + rdict = getUrls(urls, func, uKey) + parentFiles = defaultdict(set) + for fileInfo in fInfo: + run = fileInfo['run_num'] + lumis = urllib.parse.quote_plus(str(fileInfo['lumi_section_num'])) + url = f'{dbsUrl}/files?dataset={parentDataset}&run_num={run}&lumi_list={lumis}' + if ':8443' not in url: + url = url.replace("cern.ch", "cern.ch:8443") + if url in rdict: + pFileList = rdict[url] + pFiles = {x['logical_file_name'] for x in pFileList} + parentFiles[fileInfo['logical_file_name']] = \ + parentFiles[fileInfo['logical_file_name']].union(pFiles) + return parentFiles + + +def getUrls(urls, aggFunc, uKey=None): + """ + Perform parallel DBS calls for given set of urls and apply given aggregation + function to the results. + + :param urls: list of DBS urls to call + :param aggFunc: aggregation function + :param uKey: url parameter to use for final dictionary + :return: dictionary of resuls where keys are urls and values are obtained results + """ + data = multi_getdata(urls, ckey(), cert()) + + rdict = {} + for row in data: + url = row['url'] + code = int(row.get('code', 200)) + error = row.get('error') + if code != 200: + msg = f"Fail to query {url}. Error: {code} {error}" + raise RuntimeError(msg) + if uKey: + key = urlParams(url).get(uKey) + else: + key = url + data = row.get('data', []) + res = json.loads(data) + if aggFunc: + rdict[key] = aggFunc(res) + else: + rdict[key] = res + return rdict + + +def urlParams(url): + """ + Return dictionary of URL parameters + + :param url: URL link + :return: dictionary of URL parameters + """ + parsedUrl = urlparse(url) + rdict = parse_qs(parsedUrl.query) + for key, vals in rdict.items(): + if len(vals) == 1: + rdict[key] = vals[0] + return rdict diff --git a/test/python/WMCore_t/Services_t/DBS_t/DBSUtils_t.py b/test/python/WMCore_t/Services_t/DBS_t/DBSUtils_t.py new file mode 100644 index 00000000000..03eefb0a7e7 --- /dev/null +++ b/test/python/WMCore_t/Services_t/DBS_t/DBSUtils_t.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python +""" +_DBSUtils_t_ + +Unit test for the DBSUtils module +""" + +import time +import unittest + +from nose.plugins.attrib import attr + +from WMCore.Services.DBS.DBSUtils import urlParams +from WMCore.Services.DBS.DBS3Reader import DBS3Reader + + +class DBSUtilsTest(unittest.TestCase): + """ + DBSUtilsTest represent unit test class + """ + + def testUrlParams(self): + """ + urlParams should return dictionary of URL parameters + """ + url = 'http://a.b.com?d=1&f=bla' + results = urlParams(url) + self.assertCountEqual(results, {'d': '1', 'f': 'bla'}) + self.assertTrue(results.get('d'), 1) + self.assertTrue(results.get('f'), 'bla') + + url = 'http://a.b.com?d=1&f=bla&d=2' + results = urlParams(url) + self.assertCountEqual(results, {'d': ['1', '2'], 'f': 'bla'}) + + @attr("integration") + def testGetParallelListDatasetFileDetails(self): + """ + test parallel execution of listDatasetFileDetails DBS API + We use small dataset with the following characteristics: + + dasgoclient -query="dataset=/VBF1Parked/HIRun2013A-v1/RAW summary" | jq + [ + { + "file_size": 6053097, + "nblocks": 7, + "nevents": 0, + "nfiles": 7, + "nlumis": 428, + "num_block": 7, + "num_event": 0, + "num_file": 7, + "num_lumi": 428 + } + ] + + The parallel call should perform better then sequential while results + should remain the same. + + """ + url = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" + reader1 = DBS3Reader(url, logger=None, parallel=False, aggregate=True) + reader2 = DBS3Reader(url, logger=None, parallel=True, aggregate=True) + dataset = '/VBF1Parked/HIRun2013A-v1/RAW' + time0 = time.time() + res1 = reader1.listDatasetFileDetails(dataset) + time1 = time.time() - time0 + self.assertTrue(time1 > 0) # to avoid pyling complaining about not used varaiable + time0 = time.time() + res2 = reader2.listDatasetFileDetails(dataset) + time2 = time.time() - time0 + self.assertTrue(time2 > 0) # to avoid pyling complaining about not used varaiable + self.assertTrue(res1 == res2) + + @attr("integration") + def testGetParallelListFileBlockLocation(self): + """ + test parallel execution of listFileBlockLocation DBS API + We use small dataset with the following characteristics: + + dasgoclient -query="block dataset=/VBF1Parked/HIRun2013A-v1/RAW" + + The parallel call should perform better then sequential while results + should remain the same. + + """ + url = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" + reader1 = DBS3Reader(url, logger=None, parallel=False, aggregate=True) + reader2 = DBS3Reader(url, logger=None, parallel=True, aggregate=True) + blocks = [ + '/VBF1Parked/HIRun2013A-v1/RAW#52da10be-5c87-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#6861d50a-5c7f-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#6dd88910-5c80-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#6e258f12-5c80-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#fc64292c-5c81-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#fc87c8dc-5c81-11e2-912f-842b2b4671d8', + '/VBF1Parked/HIRun2013A-v1/RAW#fcd9876c-5c81-11e2-912f-842b2b4671d8' + ] + time0 = time.time() + res1 = reader1.listFileBlockLocation(blocks) + time1 = time.time() - time0 + self.assertTrue(time1 > 0) # to avoid pyling complaining about not used varaiable + time0 = time.time() + res2 = reader2.listFileBlockLocation(blocks) + time2 = time.time() - time0 + self.assertTrue(time2 > 0) # to avoid pyling complaining about not used varaiable + self.assertTrue(res1 == res2) + + @attr("integration") + def testGetParallelGetParentFilesGivenParentDataset(self): + """ + test parallel execution of getParentFilesGivenParentDataset DBS API + We use small the following characteristics: + + # find lfn for some dataset + dasgoclient -query="file dataset=/VBF1Parked/Run2012D-22Jan2013-v1/AOD" + ... + /store/data/Run2012D/VBF1Parked/AOD/22Jan2013-v1/120000/F64DFA15-15A8-E211-9277-80000048FE80.root + + # find parent dataset + dasgoclient -query="parent dataset=/VBF1Parked/Run2012D-22Jan2013-v1/AOD" + /VBF1Parked/Run2012D-v1/RAW + + + The parallel call should perform better then sequential while results + should remain the same. + + """ + url = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" + reader1 = DBS3Reader(url, logger=None, parallel=False, aggregate=True) + reader2 = DBS3Reader(url, logger=None, parallel=True, aggregate=True) + parentDataset = '/VBF1Parked/Run2012D-v1/RAW' + childLFN = '/store/data/Run2012D/VBF1Parked/AOD/22Jan2013-v1/120000/F64DFA15-15A8-E211-9277-80000048FE80.root' + time0 = time.time() + res1 = reader1.getParentFilesGivenParentDataset(parentDataset, childLFN) + time1 = time.time() - time0 + self.assertTrue(time1 > 0) # to avoid pyling complaining about not used varaiable + time0 = time.time() + res2 = reader2.getParentFilesGivenParentDataset(parentDataset, childLFN) + time2 = time.time() - time0 + self.assertTrue(time2 > 0) # to avoid pyling complaining about not used varaiable + self.assertTrue(res1 == res2) + + +if __name__ == '__main__': + unittest.main()