-
Notifications
You must be signed in to change notification settings - Fork 108
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for pycurl_manager multi_getdata
- Loading branch information
Showing
2 changed files
with
176 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
#!/usr/bin/env python | ||
""" | ||
_DBSUtils_ | ||
set of common utilities for DBS3Reader | ||
""" | ||
import os | ||
import json | ||
import concurrent.futures | ||
|
||
# third-party libraries | ||
import requests | ||
|
||
from Utils.CertTools import getKeyCertFromEnv | ||
from dbs.apis.dbsClient import aggFileLumis, aggFileParents | ||
from WMCore.Services.pycurl_manager import getdata as multi_getdata | ||
|
||
|
||
######### NOTE ABOUT code-refactoring/relocation ######### | ||
|
||
### The following functions are borrowed from WMCore/MicroService/Tools/Common.py | ||
### It implies that we need to put them into separate independent from DBS and MicroServices | ||
### area to share among the two. For instance, | ||
### ckey and cert functions should belog to Utils.CertTools | ||
|
||
def ckey(): | ||
"Return user CA key either from proxy or userkey.pem" | ||
pair = getKeyCertFromEnv() | ||
return pair[0] | ||
|
||
|
||
def cert(): | ||
"Return user CA cert either from proxy or usercert.pem" | ||
pair = getKeyCertFromEnv() | ||
return pair[1] | ||
|
||
################## | ||
|
||
# global DBS URL to be used by stand-alone functions in concurrent calls | ||
# see wrapperBlock functions | ||
DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader' | ||
|
||
def dbsRequest(url): | ||
""" | ||
Wrapper function to perform call to given URL using client's certificates | ||
:param url: url name | ||
:return: results | ||
""" | ||
crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem') | ||
key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem') | ||
if os.environ.get('X509_USER_PROXY', ''): | ||
crt = os.getenv('X509_USER_PROXY') | ||
key = os.getenv('X509_USER_PROXY') | ||
if not os.path.exists(crt): | ||
raise Exception('Unable to locate user X509 certificate file') from None | ||
if not os.path.exists(key): | ||
raise Exception('Unable to locate user X509 key certificate file') from None | ||
caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates') | ||
if not os.path.exists(caPath): | ||
raise Exception('Unable to locate X509_CERT_DIR') from None | ||
res = requests.get(url, | ||
cert=(crt, key), | ||
headers={'Accept-Encoding': 'gzip'}, | ||
verify=caPath) | ||
return res.json() | ||
|
||
def wrapperBlockFileParents(blk): | ||
""" | ||
Wrapper function to perform DBS listFileParents API call for a given block | ||
:param blk: block name | ||
:return: result of DBS listFileParents API | ||
""" | ||
blk = blk.replace('#', '%23') | ||
url = '{}/fileparents?block_name={}'.format(DBS_URL, blk) | ||
return aggFileParents(dbsRequest(url)) | ||
|
||
def wrapperBlockFileLumis(blk): | ||
""" | ||
Wrapper function to perform DBS listFileLumis API call for a given block | ||
:param blk: block name | ||
:return: result of DBS listFileLumis API | ||
""" | ||
blk = blk.replace('#', '%23') | ||
url = '{}/filelumis?block_name={}'.format(DBS_URL, blk) | ||
return aggFileLumis(dbsRequest(url)) | ||
|
||
def runConcurrentProcess(func, blocks, maxWorkers=10): | ||
""" | ||
run concurrently set of processes for given function and set of blocks. | ||
Internally, the code is based on Python concurrent futures primitives, see | ||
# https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures | ||
:param func: the function to run | ||
:param blocks: list of blocks our function should process | ||
:param maxWorkers: maximum number of workers to use | ||
""" | ||
with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor: | ||
# Start the load operations and mark each future with its blk | ||
futures = {executor.submit(func, blk): blk for blk in blocks} | ||
rdict = {} | ||
for future in concurrent.futures.as_completed(futures): | ||
blk = futures[future] | ||
try: | ||
data = future.result() | ||
except Exception as exc: | ||
rdict[blk] = [{'error': str(exc)}] | ||
else: | ||
rdict[blk] = data | ||
return rdict | ||
|
||
|
||
def listFileParents(dbsUrl, blocks, logger): | ||
""" | ||
Concurrent counter part of DBS listFileParents API | ||
:param dbsUrl: DBS URL | ||
:param blocks: list of blocks | ||
:param logger: logger | ||
:return: list of file parents | ||
""" | ||
return dbsBlockInfo(dbsUrl, 'fileparents', blocks, aggFileParents, logger) | ||
|
||
def listFileLumis(dbsUrl, blocks, logger=None): | ||
""" | ||
Concurrent counter part of DBS listFileLumis API | ||
:param dbsUrl: DBS URL | ||
:param blocks: list of blocks | ||
:param logger: logger | ||
:return: list of file lumis | ||
""" | ||
return dbsBlockInfo(dbsUrl, 'filelumis', blocks, aggFileLumis, logger) | ||
|
||
def dbsBlockInfo(dbsUrl, api, blocks, func, logger=None): | ||
""" | ||
Generic block related DBS call for given api and set of blocks | ||
:param dbsUrl: DBS URL | ||
:param api: api name | ||
:param blocks: list of blocks | ||
:return: result of DBS api | ||
""" | ||
urls = [] | ||
for blk in blocks: | ||
urls.append('%s/%s?block_name=%s' % (dbsUrl, api, blk.replace('#', '%23'))) | ||
if logger: | ||
logger.info("Executing %d requests against DBS '%s' API", len(urls), api) | ||
data = multi_getdata(urls, ckey(), cert()) | ||
|
||
rdict = {} | ||
for row in data: | ||
blk = row['url'].rsplit('=')[-1] | ||
code = int(row.get('code', 200)) | ||
if code != 200: | ||
msg = "Failure in %s for %s. Error: %s %s" % (api, blk, | ||
row.get('code'), | ||
row.get('error')) | ||
raise RuntimeError(msg) | ||
data = row.get('data', []) | ||
res = json.loads(data) | ||
rdict[blk] = func(res) | ||
return rdict |