Skip to content

Commit

Permalink
speed up listDatasetFileDetails API
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Apr 15, 2022
1 parent eba0a31 commit 2a48633
Showing 1 changed file with 101 additions and 2 deletions.
103 changes: 101 additions & 2 deletions src/python/WMCore/Services/DBS/DBS3Reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,107 @@
"""
from __future__ import print_function, division

import os
from builtins import object, str, bytes
from future.utils import viewitems
import concurrent.futures

from Utils.Utilities import decodeBytesToUnicode, encodeUnicodeToBytesConditional

import logging
from collections import defaultdict

from RestClient.ErrorHandling.RestClientExceptions import HTTPError
from dbs.apis.dbsClient import DbsApi
from dbs.apis.dbsClient import DbsApi, aggFileLumis, aggFileParents
from dbs.exceptions.dbsClientException import dbsClientException
from retry import retry

from Utils.IteratorTools import grouper
from Utils.PythonVersion import PY2
from WMCore.Services.DBS.DBSErrors import DBSReaderError, formatEx3

# third-party libraries
import requests

### Needed for the pycurl comment, leave it out for now
# from WMCore.Services.pycurl_manager import getdata as multi_getdata


# global DBS URL to be used by stand-alone functions in concurrent calls
# see wrapperBlock functions
DBS_URL = 'https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader'

def dbsRequest(url):
"""
Wrapper function to perform call to given URL using client's certificates
:param url: url name
:return: results
"""
crt = os.path.join(os.getenv('HOME'), '.globus/usercert.pem')
key = os.path.join(os.getenv('HOME'), '.globus/userkey.pem')
if os.environ.get('X509_USER_PROXY', ''):
crt = os.getenv('X509_USER_PROXY')
key = os.getenv('X509_USER_PROXY')
if not os.path.exists(crt):
raise Exception('Unable to locate user X509 certificate file') from None
if not os.path.exists(key):
raise Exception('Unable to locate user X509 key certificate file') from None
caPath = os.environ.get('X509_CERT_DIR', '/etc/grid-security/certificates')
if not os.path.exists(caPath):
raise Exception('Unable to locate X509_CERT_DIR') from None
res = requests.get(url,
cert=(crt, key),
headers={'Accept-Encoding': 'gzip'},
verify=caPath)
return res.json()

def wrapperBlockFileParents(blk):
"""
Wrapper function to perform DBS listFileParents API call for a given block
:param blk: block name
:return: result of DBS listFileParents API
"""
blk = blk.replace('#', '%23')
url = '{}/fileparents?block_name={}'.format(DBS_URL, blk)
return aggFileParents(dbsRequest(url))

def wrapperBlockFileLumis(blk):
"""
Wrapper function to perform DBS listFileLumis API call for a given block
:param blk: block name
:return: result of DBS listFileLumis API
"""
blk = blk.replace('#', '%23')
url = '{}/filelumis?block_name={}'.format(DBS_URL, blk)
return aggFileLumis(dbsRequest(url))

def runConcurrentProcess(func, blocks, maxWorkers=10):
"""
run concurrently set of processes for given function and set of blocks.
Internally, the code is based on Python concurrent futures primitives, see
# https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
:param func: the function to run
:param blocks: list of blocks our function should process
:param maxWorkers: maximum number of workers to use
"""
with concurrent.futures.ThreadPoolExecutor(maxWorkers) as executor:
# Start the load operations and mark each future with its blk
futures = {executor.submit(func, blk): blk for blk in blocks}
rdict = {}
for future in concurrent.futures.as_completed(futures):
blk = futures[future]
try:
data = future.result()
except Exception as exc:
rdict[blk] = [{'error': str(exc)}]
else:
rdict[blk] = data
return rdict

def remapDBS3Keys(data, stringify=False, **others):
"""Fields have been renamed between DBS2 and 3, take fields from DBS3
and map to DBS2 values
Expand Down Expand Up @@ -78,6 +157,7 @@ def __init__(self, url, logger=None, **contact):
# instantiate dbs api object
try:
self.dbsURL = url.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch")
DBS_URL = self.dbsURL
self.dbs = DbsApi(self.dbsURL, **contact)
self.logger = logger or logging.getLogger(self.__class__.__name__)
except Exception as ex:
Expand Down Expand Up @@ -277,7 +357,7 @@ def listDatatiers(self):
"""
return [tier['data_tier_name'] for tier in self.dbs.listDataTiers()]

def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, validFileOnly=1):
def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, validFileOnly=1, parallel=0):
"""
TODO: This is completely wrong need to be redone. or be removed - getting dataset altogether
might be to costly
Expand Down Expand Up @@ -308,6 +388,25 @@ def listDatasetFileDetails(self, datasetPath, getParents=False, getLumis=True, v
files[f['logical_file_name']]['Lumis'] = {}
files[f['logical_file_name']]['Parents'] = []

# parallel execution for listFileParents and listFileLumis APIs
if parallel:
if getParents:
block_parents = runConcurrentProcess(wrapperBLockFileParents, blocks, parallel)
for blockName, parents in block_parents.items():
for p in parents:
if p['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
files[p['logical_file_name']]['Parents'].extend(p['parent_logical_file_name'])
if getLumis:
block_file_lumis = runConcurrentProcess(wrapperBlockFileLumis, blocks, parallel)
for blockName, file_lumis in block_file_lumis.items():
for f in file_lumis:
if f['logical_file_name'] in files: # invalid files are not there if validFileOnly=1
if f['run_num'] in files[f['logical_file_name']]['Lumis']:
files[f['logical_file_name']]['Lumis'][f['run_num']].extend(f['lumi_section_num'])
else:
files[f['logical_file_name']]['Lumis'][f['run_num']] = f['lumi_section_num']
return files

# Iterate over the blocks and get parents and lumis
for blockName in blocks:
# get the parents
Expand Down

0 comments on commit 2a48633

Please sign in to comment.