Skip to content

Commit

Permalink
Add support for pycurl_manager multi_getdata
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Apr 19, 2022
1 parent 2438454 commit b337ce1
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 93 deletions.
114 changes: 21 additions & 93 deletions src/python/WMCore/Services/DBS/DBS3Reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,107 +7,30 @@
"""
from __future__ import print_function, division

import os
from builtins import object, str, bytes
from future.utils import viewitems
import concurrent.futures

from Utils.Utilities import decodeBytesToUnicode, encodeUnicodeToBytesConditional

import logging
from collections import defaultdict

from RestClient.ErrorHandling.RestClientExceptions import HTTPError
from dbs.apis.dbsClient import DbsApi, aggFileLumis, aggFileParents
from dbs.apis.dbsClient import DbsApi
from dbs.exceptions.dbsClientException import dbsClientException
from retry import retry

from Utils.IteratorTools import grouper
from Utils.PythonVersion import PY2
from WMCore.Services.DBS.DBSErrors import DBSReaderError, formatEx3
from WMCore.Services.DBS.DBSUtils import dbsListFileParents, dbsListFileLumis, \
dbsParentFilesDetail

# third-party libraries
import requests

### Needed for the pycurl comment, leave it out for now
# from WMCore.Services.pycurl_manager import getdata as multi_getdata


# global DBS URL to be used by stand-alone functions in concurrent calls
# see wrapperBlock functions
DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader'

def dbsRequest(url):
"""
Wrapper function to perform call to given URL using client's certificates
:param url: url name
:return: results
"""
crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem')
key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem')
if os.environ.get('X509_USER_PROXY', ''):
crt = os.getenv('X509_USER_PROXY')
key = os.getenv('X509_USER_PROXY')
if not os.path.exists(crt):
raise Exception('Unable to locate user X509 certificate file') from None
if not os.path.exists(key):
raise Exception('Unable to locate user X509 key certificate file') from None
caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates')
if not os.path.exists(caPath):
raise Exception('Unable to locate X509_CERT_DIR') from None
res = requests.get(url,
cert=(crt, key),
headers={'Accept-Encoding': 'gzip'},
verify=caPath)
return res.json()

def wrapperBlockFileParents(blk):
"""
Wrapper function to perform DBS listFileParents API call for a given block
:param blk: block name
:return: result of DBS listFileParents API
"""
blk = blk.replace('#', '%23')
url = '{}/fileparents?block_name={}'.format(DBS_URL, blk)
return aggFileParents(dbsRequest(url))

def wrapperBlockFileLumis(blk):
"""
Wrapper function to perform DBS listFileLumis API call for a given block
:param blk: block name
:return: result of DBS listFileLumis API
"""
blk = blk.replace('#', '%23')
url = '{}/filelumis?block_name={}'.format(DBS_URL, blk)
return aggFileLumis(dbsRequest(url))

def runConcurrentProcess(func, blocks, maxWorkers=10):
"""
run concurrently set of processes for given function and set of blocks.
Internally, the code is based on Python concurrent futures primitives, see
# https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
:param func: the function to run
:param blocks: list of blocks our function should process
:param maxWorkers: maximum number of workers to use
"""
with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor:
# Start the load operations and mark each future with its blk
futures = {executor.submit(func, blk): blk for blk in blocks}
rdict = {}
for future in concurrent.futures.as_completed(futures):
blk = futures[future]
try:
data = future.result()
except Exception as exc:
rdict[blk] = [{'error': str(exc)}]
else:
rdict[blk] = data
return rdict

def remapDBS3Keys(data, stringify=False, **others):
"""Fields have been renamed between DBS2 and 3, take fields from DBS3
and map to DBS2 values
Expand Down Expand Up @@ -152,15 +75,14 @@ class DBS3Reader(object):
General API for reading data from DBS
"""

def __init__(self, url, logger=None, **contact):
def __init__(self, url, logger=None, parallel=None, **contact):

# instantiate dbs api object
try:
self.dbsURL = url.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch")
global DBS_URL
DBS_URL = self.dbsURL
self.dbs = DbsApi(self.dbsURL, **contact)
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.parallel = parallel
except Exception as ex:
msg = "Error in DBSReader with DbsApi\n"
msg += "%s\n" % formatEx3(ex)
Expand Down Expand Up @@ -358,7 +280,7 @@ def listDatatiers(self):
"""
return [tier['data_tier_name'] for tier in self.dbs.listDataTiers()]

def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, validFileOnly=1, parallel=0):
def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, validFileOnly=1):
"""
TODO: This is completely wrong need to be redone. or be removed - getting dataset altogether
might be to costly
Expand Down Expand Up @@ -390,15 +312,17 @@ def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, v
files[f['logical_file_name']]['Parents'] = []

# parallel execution for listFileParents and listFileLumis APIs
if parallel:
if self.parallel:
if getParents:
block_parents = runConcurrentProcess(wrapperBlockFileParents, blocks, parallel)
# block_parents = runConcurrentProcess(wrapperBlockFileParents, blocks, self.parallel)
block_parents = dbsListFileParents(self.dbsURL, blocks, self.logger)
for blockName, parents in block_parents.items():
for p in parents:
if p['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
files[p['logical_file_name']]['Parents'].extend(p['parent_logical_file_name'])
if getLumis:
block_file_lumis = runConcurrentProcess(wrapperBlockFileLumis, blocks, parallel)
# block_file_lumis = runConcurrentProcess(wrapperBlockFileLumis, blocks, self.parallel)
block_file_lumis = dbsListFileLumis(self.dbsURL, blocks, self.logger)
for blockName, file_lumis in block_file_lumis.items():
for f in file_lumis:
if f['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
Expand Down Expand Up @@ -641,13 +565,17 @@ def listFilesInBlockWithParents(self, fileBlockName, lumis=True, validFileOnly=1
raise DBSReaderError(msg)

parentFilesDetail = []
# TODO: slicing parentLFNs util DBS api is handling that.
# Remove slicing if DBS api handles
for pLFNs in grouper(parentsLFNs, 50):
parentFilesDetail.extend(self.dbs.listFileArray(logical_file_name=pLFNs, detail=True))
if self.parallel:
detail = True
parentFilesDetail = dbsParentFilesDetail(self.dbsURL, parentsLFNs, self.logger)
else:
# TODO: slicing parentLFNs util DBS api is handling that.
# Remove slicing if DBS api handles
for pLFNs in grouper(parentsLFNs, 50):
parentFilesDetail.extend(self.dbs.listFileArray(logical_file_name=pLFNs, detail=True))

if lumis:
parentLumis = self._getLumiList(lfns=parentsLFNs)
if lumis:
parentLumis = self._getLumiList(lfns=parentsLFNs)

parentsByLFN = defaultdict(list)

Expand Down
176 changes: 176 additions & 0 deletions src/python/WMCore/Services/DBS/DBSUtils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#!/usr/bin/env python
"""
_DBSUtils_
set of common utilities for DBS3Reader
"""
import os
import json
import concurrent.futures

# third-party libraries
import requests

from Utils.CertTools import getKeyCertFromEnv, cert, ckey
from dbs.apis.dbsClient import aggFileLumis, aggFileParents
from WMCore.Services.pycurl_manager import getdata as multi_getdata



######## NOTE: the code below demonstrates how to use Python concurrent.futures
# we decided to keep it around since it may be used as an example
# for other concurrent use-cases
# For logic please see runConcurrentProcess function
# The wrapperXXX functions can be passed to runConcurrentProcess
#
# global DBS URL to be used by stand-alone functions in concurrent calls
# see wrapperBlock functions
# DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader'

# def dbsRequest(url):
# """
# Wrapper function to perform call to given URL using client's certificates

# :param url: url name
# :return: results
# """
# crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem')
# key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem')
# if os.environ.get('X509_USER_PROXY', ''):
# crt = os.getenv('X509_USER_PROXY')
# key = os.getenv('X509_USER_PROXY')
# if not os.path.exists(crt):
# raise Exception('Unable to locate user X509 certificate file') from None
# if not os.path.exists(key):
# raise Exception('Unable to locate user X509 key certificate file') from None
# caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates')
# if not os.path.exists(caPath):
# raise Exception('Unable to locate X509_CERT_DIR') from None
# res = requests.get(url,
# cert=(crt, key),
# headers={'Accept-Encoding': 'gzip'},
# verify=caPath)
# return res.json()

# def wrapperBlockFileParents(blk):
# """
# Wrapper function to perform DBS listFileParents API call for a given block

# :param blk: block name
# :return: result of DBS listFileParents API
# """
# blk = blk.replace('#', '%23')
# url = '{}/fileparents?block_name={}'.format(DBS_URL, blk)
# return aggFileParents(dbsRequest(url))

# def wrapperBlockFileLumis(blk):
# """
# Wrapper function to perform DBS listFileLumis API call for a given block

# :param blk: block name
# :return: result of DBS listFileLumis API
# """
# blk = blk.replace('#', '%23')
# url = '{}/filelumis?block_name={}'.format(DBS_URL, blk)
# return aggFileLumis(dbsRequest(url))

# def runConcurrentProcess(func, blocks, maxWorkers=10):
# """
# run concurrently set of processes for given function and set of blocks.
# Internally, the code is based on Python concurrent futures primitives, see
# # https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures

# :param func: the function to run
# :param blocks: list of blocks our function should process
# :param maxWorkers: maximum number of workers to use
# """
# with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor:
# futures = {executor.submit(func, blk): blk for blk in blocks}
# rdict = {}
# for future in concurrent.futures.as_completed(futures):
# blk = futures[future]
# try:
# data = future.result()
# except Exception as exc:
# rdict[blk] = [{'error': str(exc)}]
# else:
# rdict[blk] = data
# return rdict


def dbsListFileParents(dbsUrl, blocks, logger):
"""
Concurrent counter part of DBS listFileParents API
:param dbsUrl: DBS URL
:param blocks: list of blocks
:param logger: logger
:return: list of file parents
"""
params = {}
return dbsParallelApi(dbsUrl, 'fileparents', 'block_name', blocks, params, aggFileParents, logger)

def dbsListFileLumis(dbsUrl, blocks, logger=None):
"""
Concurrent counter part of DBS listFileLumis API
:param dbsUrl: DBS URL
:param blocks: list of blocks
:param logger: logger
:return: list of file lumis
"""
params = {}
return dbsParallelApi(dbsUrl, 'filelumis', 'block_name', blocks, params, aggFileLumis, logger)

def dbsParentFilesDetail(dbsUrl, lfns, logger=None):
"""
Concurrent counter part of DBS files API
:param dbsUrl: DBS URL
:param params: input parameters
:param logger: logger
:return: list of file details for given parent lfns
"""
params = {'detail': True}
func = None
return dbsParallelApi(dbsUrl, 'files', 'logical_file_name', lfns, params, func, logger)

def dbsParallelApi(dbsUrl, api, attr, values, params, aggFunc, logger=None):
"""
Generic paralle DBS calls for given api, attribute and values
:param dbsUrl: DBS URL
:param api: api name
:param attr: api attribute
:param values: api attribute values
:param params: optional params dict
:param aggFunc: aggregation function
:return: result of DBS api
"""
urls = []
for item in values:
if '#' in item:
item = item.replace('#', '%23')
url = '%s/%s?%s=%s' % (dbsUrl, api, attr, item)
for key, val in params.items():
url += '&%s=%s' % (key, val)
urls.append(url)
if logger:
logger.info("Executing %d requests against DBS '%s' API", len(urls), api)
data = multi_getdata(urls, ckey(), cert())

rdict = {}
for row in data:
item = row['url'].rsplit('=')[-1]
code = int(row.get('code', 200))
if code != 200:
msg = "Failure in %s for %s. Error: %s %s" % (api, item,
row.get('code'),
row.get('error'))
raise RuntimeError(msg)
data = row.get('data', [])
res = json.loads(data)
if aggFunc:
rdict[item] = aggFunc(res)
return rdict

0 comments on commit b337ce1

Please sign in to comment.