Skip to content

Commit

Permalink
Add support for pycurl_manager multi_getdata
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Apr 15, 2022
1 parent 5a2ac3c commit 3cf4f31
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 84 deletions.
90 changes: 6 additions & 84 deletions src/python/WMCore/Services/DBS/DBS3Reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,107 +7,29 @@
"""
from __future__ import print_function, division

import os
from builtins import object, str, bytes
from future.utils import viewitems
import concurrent.futures

from Utils.Utilities import decodeBytesToUnicode, encodeUnicodeToBytesConditional

import logging
from collections import defaultdict

from RestClient.ErrorHandling.RestClientExceptions import HTTPError
from dbs.apis.dbsClient import DbsApi, aggFileLumis, aggFileParents
from dbs.apis.dbsClient import DbsApi
from dbs.exceptions.dbsClientException import dbsClientException
from retry import retry

from Utils.IteratorTools import grouper
from Utils.PythonVersion import PY2
from WMCore.Services.DBS.DBSErrors import DBSReaderError, formatEx3
from WMCore.Services.DBS.DBSUtils import listFileParents, listFileLumis

# third-party libraries
import requests

### Needed for the pycurl comment, leave it out for now
# from WMCore.Services.pycurl_manager import getdata as multi_getdata


# global DBS URL to be used by stand-alone functions in concurrent calls
# see wrapperBlock functions
DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader'

def dbsRequest(url):
"""
Wrapper function to perform call to given URL using client's certificates
:param url: url name
:return: results
"""
crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem')
key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem')
if os.environ.get('X509_USER_PROXY', ''):
crt = os.getenv('X509_USER_PROXY')
key = os.getenv('X509_USER_PROXY')
if not os.path.exists(crt):
raise Exception('Unable to locate user X509 certificate file') from None
if not os.path.exists(key):
raise Exception('Unable to locate user X509 key certificate file') from None
caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates')
if not os.path.exists(caPath):
raise Exception('Unable to locate X509_CERT_DIR') from None
res = requests.get(url,
cert=(crt, key),
headers={'Accept-Encoding': 'gzip'},
verify=caPath)
return res.json()

def wrapperBlockFileParents(blk):
"""
Wrapper function to perform DBS listFileParents API call for a given block
:param blk: block name
:return: result of DBS listFileParents API
"""
blk = blk.replace('#', '%23')
url = '{}/fileparents?block_name={}'.format(DBS_URL, blk)
return aggFileParents(dbsRequest(url))

def wrapperBlockFileLumis(blk):
"""
Wrapper function to perform DBS listFileLumis API call for a given block
:param blk: block name
:return: result of DBS listFileLumis API
"""
blk = blk.replace('#', '%23')
url = '{}/filelumis?block_name={}'.format(DBS_URL, blk)
return aggFileLumis(dbsRequest(url))

def runConcurrentProcess(func, blocks, maxWorkers=10):
"""
run concurrently set of processes for given function and set of blocks.
Internally, the code is based on Python concurrent futures primitives, see
# https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
:param func: the function to run
:param blocks: list of blocks our function should process
:param maxWorkers: maximum number of workers to use
"""
with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor:
# Start the load operations and mark each future with its blk
futures = {executor.submit(func, blk): blk for blk in blocks}
rdict = {}
for future in concurrent.futures.as_completed(futures):
blk = futures[future]
try:
data = future.result()
except Exception as exc:
rdict[blk] = [{'error': str(exc)}]
else:
rdict[blk] = data
return rdict

def remapDBS3Keys(data, stringify=False, **others):
"""Fields have been renamed between DBS2 and 3, take fields from DBS3
and map to DBS2 values
Expand Down Expand Up @@ -157,8 +79,6 @@ def __init__(self, url, logger=None, **contact):
# instantiate dbs api object
try:
self.dbsURL = url.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch")
global DBS_URL
DBS_URL = self.dbsURL
self.dbs = DbsApi(self.dbsURL, **contact)
self.logger = logger or logging.getLogger(self.__class__.__name__)
except Exception as ex:
Expand Down Expand Up @@ -392,13 +312,15 @@ def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, v
# parallel execution for listFileParents and listFileLumis APIs
if parallel:
if getParents:
block_parents = runConcurrentProcess(wrapperBlockFileParents, blocks, parallel)
# block_parents = runConcurrentProcess(wrapperBlockFileParents, blocks, parallel)
block_parents = listFileParents(self.dbsURL, blocks, self.logger)
for blockName, parents in block_parents.items():
for p in parents:
if p['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
files[p['logical_file_name']]['Parents'].extend(p['parent_logical_file_name'])
if getLumis:
block_file_lumis = runConcurrentProcess(wrapperBlockFileLumis, blocks, parallel)
# block_file_lumis = runConcurrentProcess(wrapperBlockFileLumis, blocks, parallel)
block_file_lumis = listFileLumis(self.dbsURL, blocks, self.logger)
for blockName, file_lumis in block_file_lumis.items():
for f in file_lumis:
if f['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
Expand Down
172 changes: 172 additions & 0 deletions src/python/WMCore/Services/DBS/DBSUtils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import os
import json
import concurrent.futures
from WMCore.Services.pycurl_manager import getdata as multi_getdata
from dbs.apis.dbsClient import aggFileLumis, aggFileParents
from Utils.CertTools import getKeyCertFromEnv

# third-party libraries
import requests


######### NOTE ABOUT code-refactoring/relocation #########

### The following functions are borrowed from WMCore/MicroService/Tools/Common.py
### It implies that we need to put them into separate independent from DBS and MicroServices
### area to share among the two. For instance,
### ckey and cert functions should belog to Utils.CertTools
### while isEmptyResults should probably go to Utils too

def isEmptyResults(row):
"""
_isEmptyResults_
Evaluates whether row data contains empty result set
:return: bool
"""
if 'data' not in row:
raise Exception("provided result dict does not contain 'data' key")
# if code is not present in row it means it was success (HTTP status code 200)
code = int(row.get('code', 200))
data = row['data']
if (code >= 200 and code < 400) and data in (None, []):
return True
return False

def ckey():
"Return user CA key either from proxy or userkey.pem"
pair = getKeyCertFromEnv()
return pair[0]


def cert():
"Return user CA cert either from proxy or usercert.pem"
pair = getKeyCertFromEnv()
return pair[1]

##################

# global DBS URL to be used by stand-alone functions in concurrent calls
# see wrapperBlock functions
DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader'

def dbsRequest(url):
"""
Wrapper function to perform call to given URL using client's certificates
:param url: url name
:return: results
"""
crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem')
key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem')
if os.environ.get('X509_USER_PROXY', ''):
crt = os.getenv('X509_USER_PROXY')
key = os.getenv('X509_USER_PROXY')
if not os.path.exists(crt):
raise Exception('Unable to locate user X509 certificate file') from None
if not os.path.exists(key):
raise Exception('Unable to locate user X509 key certificate file') from None
caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates')
if not os.path.exists(caPath):
raise Exception('Unable to locate X509_CERT_DIR') from None
res = requests.get(url,
cert=(crt, key),
headers={'Accept-Encoding': 'gzip'},
verify=caPath)
return res.json()

def wrapperBlockFileParents(blk):
"""
Wrapper function to perform DBS listFileParents API call for a given block
:param blk: block name
:return: result of DBS listFileParents API
"""
blk = blk.replace('#', '%23')
url = '{}/fileparents?block_name={}'.format(DBS_URL, blk)
return aggFileParents(dbsRequest(url))

def wrapperBlockFileLumis(blk):
"""
Wrapper function to perform DBS listFileLumis API call for a given block
:param blk: block name
:return: result of DBS listFileLumis API
"""
blk = blk.replace('#', '%23')
url = '{}/filelumis?block_name={}'.format(DBS_URL, blk)
return aggFileLumis(dbsRequest(url))

def runConcurrentProcess(func, blocks, maxWorkers=10):
"""
run concurrently set of processes for given function and set of blocks.
Internally, the code is based on Python concurrent futures primitives, see
# https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
:param func: the function to run
:param blocks: list of blocks our function should process
:param maxWorkers: maximum number of workers to use
"""
with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor:
# Start the load operations and mark each future with its blk
futures = {executor.submit(func, blk): blk for blk in blocks}
rdict = {}
for future in concurrent.futures.as_completed(futures):
blk = futures[future]
try:
data = future.result()
except Exception as exc:
rdict[blk] = [{'error': str(exc)}]
else:
rdict[blk] = data
return rdict


def listFileParents(dbsUrl, blocks, logger):
"""
Concurrent counter part of DBS listFileParents API
:param dbsUrl: DBS URL
:param blocks: list of blocks
:param logger: logger
:return: list of file parents
"""
return dbsBlockInfo(dbsUrl, 'fileparents', blocks, aggFileParents, logger)

def listFileLumis(dbsUrl, blocks, logger=None):
"""
Concurrent counter part of DBS listFileLumis API
:param dbsUrl: DBS URL
:param blocks: list of blocks
:param logger: logger
:return: list of file lumis
"""
return dbsBlockInfo(dbsUrl, 'filelumis', blocks, aggFileLumis, logger)

def dbsBlockInfo(dbsUrl, api, blocks, func, logger=None):
"""
Generic block related DBS call for given api and set of blocks
:param dbsUrl: DBS URL
:param api: api name
:param blocks: list of blocks
:return: result of DBS api
"""
urls = []
for blk in blocks:
urls.append('%s/%s?block_name=%s' % (dbsUrl, api, blk.replace('#', '%23')))
if logger:
logger.info("Executing %d requests against DBS '%s' API", len(urls), api)
data = multi_getdata(urls, ckey(), cert())

rdict = {}
for row in data:
blk = row['url'].rsplit('=')[-1]
if isEmptyResults(row):
msg = "Failure in %s for %s. Error: %s %s" % (api, blk,
row.get('code'),
row.get('error'))
raise RuntimeError(msg)
res = json.loads(row['data'])
rdict[blk] = func(res)
return rdict

0 comments on commit 3cf4f31

Please sign in to comment.