diff --git a/src/python/WMCore/Services/DBS/DBS3Reader.py b/src/python/WMCore/Services/DBS/DBS3Reader.py index d4cbdb8ae3..3281a2a241 100644 --- a/src/python/WMCore/Services/DBS/DBS3Reader.py +++ b/src/python/WMCore/Services/DBS/DBS3Reader.py @@ -7,10 +7,8 @@ """ from __future__ import print_function, division -import os from builtins import object, str, bytes from future.utils import viewitems -import concurrent.futures from Utils.Utilities import decodeBytesToUnicode, encodeUnicodeToBytesConditional @@ -18,96 +16,20 @@ from collections import defaultdict from RestClient.ErrorHandling.RestClientExceptions import HTTPError -from dbs.apis.dbsClient import DbsApi, aggFileLumis, aggFileParents +from dbs.apis.dbsClient import DbsApi from dbs.exceptions.dbsClientException import dbsClientException from retry import retry from Utils.IteratorTools import grouper from Utils.PythonVersion import PY2 from WMCore.Services.DBS.DBSErrors import DBSReaderError, formatEx3 +from WMCore.Services.DBS.DBSUtils import listFileParents, listFileLumis -# third-party libraries -import requests ### Needed for the pycurl comment, leave it out for now # from WMCore.Services.pycurl_manager import getdata as multi_getdata -# global DBS URL to be used by stand-alone functions in concurrent calls -# see wrapperBlock functions -DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader' - -def dbsRequest(url): - """ - Wrapper function to perform call to given URL using client's certificates - - :param url: url name - :return: results - """ - crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem') - key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem') - if os.environ.get('X509_USER_PROXY', ''): - crt = os.getenv('X509_USER_PROXY') - key = os.getenv('X509_USER_PROXY') - if not os.path.exists(crt): - raise Exception('Unable to locate user X509 certificate file') from None - if not os.path.exists(key): - raise Exception('Unable to locate user X509 key certificate file') from None - caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates') - if not os.path.exists(caPath): - raise Exception('Unable to locate X509_CERT_DIR') from None - res = requests.get(url, - cert=(crt, key), - headers={'Accept-Encoding': 'gzip'}, - verify=caPath) - return res.json() - -def wrapperBlockFileParents(blk): - """ - Wrapper function to perform DBS listFileParents API call for a given block - - :param blk: block name - :return: result of DBS listFileParents API - """ - blk = blk.replace('#', '%23') - url = '{}/fileparents?block_name={}'.format(DBS_URL, blk) - return aggFileParents(dbsRequest(url)) - -def wrapperBlockFileLumis(blk): - """ - Wrapper function to perform DBS listFileLumis API call for a given block - - :param blk: block name - :return: result of DBS listFileLumis API - """ - blk = blk.replace('#', '%23') - url = '{}/filelumis?block_name={}'.format(DBS_URL, blk) - return aggFileLumis(dbsRequest(url)) - -def runConcurrentProcess(func, blocks, maxWorkers=10): - """ - run concurrently set of processes for given function and set of blocks. - Internally, the code is based on Python concurrent futures primitives, see - # https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures - - :param func: the function to run - :param blocks: list of blocks our function should process - :param maxWorkers: maximum number of workers to use - """ - with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor: - # Start the load operations and mark each future with its blk - futures = {executor.submit(func, blk): blk for blk in blocks} - rdict = {} - for future in concurrent.futures.as_completed(futures): - blk = futures[future] - try: - data = future.result() - except Exception as exc: - rdict[blk] = [{'error': str(exc)}] - else: - rdict[blk] = data - return rdict - def remapDBS3Keys(data, stringify=False, **others): """Fields have been renamed between DBS2 and 3, take fields from DBS3 and map to DBS2 values @@ -152,15 +74,14 @@ class DBS3Reader(object): General API for reading data from DBS """ - def __init__(self, url, logger=None, **contact): + def __init__(self, url, logger=None, parallel=None, **contact): # instantiate dbs api object try: self.dbsURL = url.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch") - global DBS_URL - DBS_URL = self.dbsURL self.dbs = DbsApi(self.dbsURL, **contact) self.logger = logger or logging.getLogger(self.__class__.__name__) + self.parallel = parallel except Exception as ex: msg = "Error in DBSReader with DbsApi\n" msg += "%s\n" % formatEx3(ex) @@ -358,7 +279,7 @@ def listDatatiers(self): """ return [tier['data_tier_name'] for tier in self.dbs.listDataTiers()] - def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, validFileOnly=1, parallel=0): + def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, validFileOnly=1): """ TODO: This is completely wrong need to be redone. or be removed - getting dataset altogether might be to costly @@ -390,15 +311,17 @@ def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, v files[f['logical_file_name']]['Parents'] = [] # parallel execution for listFileParents and listFileLumis APIs - if parallel: + if self.parallel: if getParents: - block_parents = runConcurrentProcess(wrapperBlockFileParents, blocks, parallel) +# block_parents = runConcurrentProcess(wrapperBlockFileParents, blocks, self.parallel) + block_parents = listFileParents(self.dbsURL, blocks, self.logger) for blockName, parents in block_parents.items(): for p in parents: if p['logical_file_name'] in files: # invalid files are not there if validFileOnly=1 files[p['logical_file_name']]['Parents'].extend(p['parent_logical_file_name']) if getLumis: - block_file_lumis = runConcurrentProcess(wrapperBlockFileLumis, blocks, parallel) +# block_file_lumis = runConcurrentProcess(wrapperBlockFileLumis, blocks, self.parallel) + block_file_lumis = listFileLumis(self.dbsURL, blocks, self.logger) for blockName, file_lumis in block_file_lumis.items(): for f in file_lumis: if f['logical_file_name'] in files: # invalid files are not there if validFileOnly=1 diff --git a/src/python/WMCore/Services/DBS/DBSUtils.py b/src/python/WMCore/Services/DBS/DBSUtils.py new file mode 100644 index 0000000000..630b491020 --- /dev/null +++ b/src/python/WMCore/Services/DBS/DBSUtils.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python +""" +_DBSUtils_ + +set of common utilities for DBS3Reader + +""" +import os +import json +import concurrent.futures + +# third-party libraries +import requests + +from Utils.CertTools import getKeyCertFromEnv +from dbs.apis.dbsClient import aggFileLumis, aggFileParents +from WMCore.Services.pycurl_manager import getdata as multi_getdata + + +######### NOTE ABOUT code-refactoring/relocation ######### + +### The following functions are borrowed from WMCore/MicroService/Tools/Common.py +### It implies that we need to put them into separate independent from DBS and MicroServices +### area to share among the two. For instance, +### ckey and cert functions should belog to Utils.CertTools + +def ckey(): + "Return user CA key either from proxy or userkey.pem" + pair = getKeyCertFromEnv() + return pair[0] + + +def cert(): + "Return user CA cert either from proxy or usercert.pem" + pair = getKeyCertFromEnv() + return pair[1] + +################## + +# global DBS URL to be used by stand-alone functions in concurrent calls +# see wrapperBlock functions +DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader' + +def dbsRequest(url): + """ + Wrapper function to perform call to given URL using client's certificates + + :param url: url name + :return: results + """ + crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem') + key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem') + if os.environ.get('X509_USER_PROXY', ''): + crt = os.getenv('X509_USER_PROXY') + key = os.getenv('X509_USER_PROXY') + if not os.path.exists(crt): + raise Exception('Unable to locate user X509 certificate file') from None + if not os.path.exists(key): + raise Exception('Unable to locate user X509 key certificate file') from None + caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates') + if not os.path.exists(caPath): + raise Exception('Unable to locate X509_CERT_DIR') from None + res = requests.get(url, + cert=(crt, key), + headers={'Accept-Encoding': 'gzip'}, + verify=caPath) + return res.json() + +def wrapperBlockFileParents(blk): + """ + Wrapper function to perform DBS listFileParents API call for a given block + + :param blk: block name + :return: result of DBS listFileParents API + """ + blk = blk.replace('#', '%23') + url = '{}/fileparents?block_name={}'.format(DBS_URL, blk) + return aggFileParents(dbsRequest(url)) + +def wrapperBlockFileLumis(blk): + """ + Wrapper function to perform DBS listFileLumis API call for a given block + + :param blk: block name + :return: result of DBS listFileLumis API + """ + blk = blk.replace('#', '%23') + url = '{}/filelumis?block_name={}'.format(DBS_URL, blk) + return aggFileLumis(dbsRequest(url)) + +def runConcurrentProcess(func, blocks, maxWorkers=10): + """ + run concurrently set of processes for given function and set of blocks. + Internally, the code is based on Python concurrent futures primitives, see + # https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures + + :param func: the function to run + :param blocks: list of blocks our function should process + :param maxWorkers: maximum number of workers to use + """ + with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor: + # Start the load operations and mark each future with its blk + futures = {executor.submit(func, blk): blk for blk in blocks} + rdict = {} + for future in concurrent.futures.as_completed(futures): + blk = futures[future] + try: + data = future.result() + except Exception as exc: + rdict[blk] = [{'error': str(exc)}] + else: + rdict[blk] = data + return rdict + + +def listFileParents(dbsUrl, blocks, logger): + """ + Concurrent counter part of DBS listFileParents API + + :param dbsUrl: DBS URL + :param blocks: list of blocks + :param logger: logger + :return: list of file parents + """ + return dbsBlockInfo(dbsUrl, 'fileparents', blocks, aggFileParents, logger) + +def listFileLumis(dbsUrl, blocks, logger=None): + """ + Concurrent counter part of DBS listFileLumis API + + :param dbsUrl: DBS URL + :param blocks: list of blocks + :param logger: logger + :return: list of file lumis + """ + return dbsBlockInfo(dbsUrl, 'filelumis', blocks, aggFileLumis, logger) + +def dbsBlockInfo(dbsUrl, api, blocks, func, logger=None): + """ + Generic block related DBS call for given api and set of blocks + + :param dbsUrl: DBS URL + :param api: api name + :param blocks: list of blocks + :return: result of DBS api + """ + urls = [] + for blk in blocks: + urls.append('%s/%s?block_name=%s' % (dbsUrl, api, blk.replace('#', '%23'))) + if logger: + logger.info("Executing %d requests against DBS '%s' API", len(urls), api) + data = multi_getdata(urls, ckey(), cert()) + + rdict = {} + for row in data: + blk = row['url'].rsplit('=')[-1] + code = int(row.get('code', 200)) + if code != 200: + msg = "Failure in %s for %s. Error: %s %s" % (api, blk, + row.get('code'), + row.get('error')) + raise RuntimeError(msg) + data = row.get('data', []) + res = json.loads(data) + rdict[blk] = func(res) + return rdict