Skip to content

Commit

Permalink
Merge pull request #12197 from amaltaro/fix-12195
Browse files Browse the repository at this point in the history
Adopt MSPileup data into PileupFetcher
  • Loading branch information
amaltaro authored Dec 10, 2024
2 parents beefc74 + 45eaf42 commit 53491a6
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 48 deletions.
13 changes: 12 additions & 1 deletion src/python/Utils/Patterns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Patterns module provides set of CS patterns
"""

import re

class Singleton(type):
"""Implementation of Singleton class"""
Expand All @@ -11,3 +11,14 @@ def __call__(cls, *args, **kwargs):
cls._instances[cls] = \
super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


def getDomainName(urlStr):
"""
Given a URL string, return the domain name.
:param urlStr: URL string
:return: a string with the domain name (e.g. "cmsweb-prod")
"""
domainPattern = re.compile(r'https?://([^/]+)\.cern\.ch')
match = domainPattern.search(urlStr)
return match.group(1) if match else ""
72 changes: 55 additions & 17 deletions src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
of pileup files in the job sandbox for the dataset.
"""
from __future__ import print_function

from future.utils import viewitems

import datetime
import os
import hashlib
Expand All @@ -15,8 +11,10 @@
import logging
from json import JSONEncoder
import WMCore.WMSpec.WMStep as WMStep
from Utils.Patterns import getDomainName
from Utils.Utilities import encodeUnicodeToBytes
from WMCore.Services.DBS.DBSReader import DBSReader
from WMCore.Services.MSPileup.MSPileupUtils import getPileupDocs
from WMCore.Services.Rucio.Rucio import Rucio
from WMCore.WMSpec.Steps.Fetchers.FetcherInterface import FetcherInterface

Expand All @@ -34,7 +32,6 @@ def __init__(self):
Prepare module setup
"""
super(PileupFetcher, self).__init__()
# FIXME: find a way to pass the Rucio account name to this fetcher module
self.rucioAcct = "wmcore_pileup"
self.rucio = None

Expand All @@ -52,41 +49,81 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
"BlockB": {"FileList": [], "PhEDExNodeName": []}, ....}
"""
resultDict = {}
# first, figure out which instance of MSPileup and Rucio to use
pileupInstance = getDomainName(dbsReader.dbsURL)
msPileupUrl = f"https://{pileupInstance}.cern.ch/ms-pileup/data/pileup"
# FIXME: this juggling with Rucio is tough! We can get away without it,
# but for that we would have to use testbed MSPileup against Prod Rucio
if pileupInstance == "cmsweb-prod" or pileupInstance == "cmsweb":
rucioAuthUrl, rucioUrl = "cms-rucio-auth", "cms-rucio"
else:
rucioAuthUrl, rucioUrl = "cms-rucio-auth-int", "cms-rucio-int"
# initialize Rucio here to avoid this authentication on T0-WMAgent
self.rucio = Rucio(self.rucioAcct,
authUrl=f"https://{rucioAuthUrl}.cern.ch",
hostUrl=f"http://{rucioUrl}.cern.ch")

# iterate over input pileup types (e.g. "cosmics", "minbias")
for pileupType in stepHelper.data.pileup.listSections_():
# the format here is: step.data.pileup.cosmics.dataset = [/some/data/set]
datasets = getattr(getattr(stepHelper.data.pileup, pileupType), "dataset")
# each dataset input can generally be a list, iterate over dataset names
blockDict = {}
for dataset in datasets:

# using the original dataset, resolve blocks, files and number of events with DBS
fCounter = 0
for fileInfo in dbsReader.getFileListByDataset(dataset=dataset, detail=True):
blockDict.setdefault(fileInfo['block_name'], {'FileList': [],
'NumberOfEvents': 0,
'PhEDExNodeNames': []})
blockDict[fileInfo['block_name']]['FileList'].append(fileInfo['logical_file_name'])
blockDict[fileInfo['block_name']]['NumberOfEvents'] += fileInfo['event_count']
fCounter += 1

self._getDatasetLocation(dataset, blockDict)
logging.info(f"Found {len(blockDict)} blocks in DBS for dataset {dataset} with {fCounter} files")
self._getDatasetLocation(dataset, blockDict, msPileupUrl)

resultDict[pileupType] = blockDict
return resultDict

def _getDatasetLocation(self, dset, blockDict):
def _getDatasetLocation(self, dset, blockDict, msPileupUrl):
"""
Given a dataset name, query PhEDEx or Rucio and resolve the block location
:param dset: string with the dataset name
:param blockDict: dictionary with DBS summary info
:param msPileupUrl: string with the MSPileup url
:return: update blockDict in place
"""
# initialize Rucio here to avoid this authentication on T0-WMAgent
self.rucio = Rucio(self.rucioAcct)
blockReplicas = self.rucio.getPileupLockedAndAvailable(dset, account=self.rucioAcct)
for blockName, blockLocation in viewitems(blockReplicas):
try:
blockDict[blockName]['PhEDExNodeNames'] = list(blockLocation)
except KeyError:
logging.warning("Block '%s' present in Rucio but not in DBS", blockName)
# fetch the pileup configuration from MSPileup
try:
queryDict = {'query': {'pileupName': dset},
'filters': ['pileupName', 'customName', 'containerFraction', 'currentRSEs']}
doc = getPileupDocs(msPileupUrl, queryDict, method='POST')[0]
msg = f'Pileup dataset {doc["pileupName"]} with:\n\tcustom name: {doc["customName"]},'
msg += f'\n\tcurrent RSEs: {doc["currentRSEs"]}\n\tand container fraction: {doc["containerFraction"]}'
logging.info(msg)
except Exception as ex:
logging.error(f'Error querying MSPileup for dataset {dset}. Details: {str(ex)}')
raise ex

# custom dataset name means there was a container fraction change, use different scope
puScope = 'cms'
if doc["customName"]:
dset = doc["customName"]
puScope = 'group.wmcore'

blockReplicas = self.rucio.getBlocksInContainer(container=dset, scope=puScope)
logging.info(f"Found {len(blockReplicas)} blocks in container {dset} for scope {puScope}")

# Finally, update blocks present in Rucio with the MSPileup currentRSEs.
# Blocks not present in Rucio - hence only in DBS - are meant to be removed.
for blockName in list(blockDict):
if blockName not in blockReplicas:
logging.warning(f"Block {blockName} present in DBS but not in Rucio. Removing it.")
blockDict.pop(blockName)
else:
blockDict[blockName]['PhEDExNodeNames'] = doc["currentRSEs"]
logging.info(f"Final pileup dataset {dset} has a total of {len(blockDict)} blocks.")

def _getCacheFilePath(self, stepHelper):

Expand Down Expand Up @@ -171,7 +208,8 @@ def createPileupConfigFile(self, helper):
"""
Stores pileup JSON configuration file in the working
directory / sandbox.
:param helper: WMStepHelper instance
:return: None
"""
if self._isCacheValid(helper):
# if file already exist don't make a new dbs call and overwrite the file.
Expand Down
1 change: 1 addition & 0 deletions src/python/WMQuality/Emulators/EmulatedUnitTestCase.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def setUp(self):
patchMSPileupAt = ['WMCore.MicroService.MSTransferor.MSTransferor.getPileupDocs',
'WMCore.WorkQueue.Policy.Start.StartPolicyInterface.getPileupDocs',
'WMCore.WorkQueue.DataLocationMapper.getPileupDocs',
'WMCore.WMSpec.Steps.Fetchers.PileupFetcher.getPileupDocs',
'WMComponent.WorkflowUpdater.WorkflowUpdaterPoller.getPileupDocs',
'WMCore_t.Services_t.MSPileup_t.MSPileupUtils_t.getPileupDocs']
for module in patchMSPileupAt:
Expand Down
12 changes: 7 additions & 5 deletions src/python/WMQuality/Emulators/MSPileup/MockMSPileupAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ def getPileupDocs(mspileupUrl, queryDict=None, method='GET'):
"""
Returns list of Pileup Documents.
"""
print(f"Mocking MSPileup getPileupDocs: \
url: {mspileupUrl}, query: {queryDict}, method: {method}")
print(f"Mocking MSPileup getPileupDocs: url: {mspileupUrl}, query: {queryDict}, method: {method}")

queryDict = queryDict or {}

if 'pileupName' in queryDict.get("query", {}):
pileupName = queryDict['query']['pileupName']
else:
pileupName = "/GammaGammaToEE_Elastic_Pt15_8TeV-lpair/Summer12-START53_V7C-v1/GEN-SIM"
print(f"pileupName: {pileupName}")
pdict = {
# "pileupName": "/MinBias_TuneCP5_14TeV-pythia8/PhaseIITDRSpring19GS-106X_upgrade2023_realistic_v2_ext1-v1/GEN-SIM",
"pileupName": "/GammaGammaToEE_Elastic_Pt15_8TeV-lpair/Summer12-START53_V7C-v1/GEN-SIM",
"pileupName": pileupName,
"pileupType": "classic",
"insertTime": 1680873642,
"lastUpdateTime": 1706216047,
Expand Down
12 changes: 5 additions & 7 deletions src/python/WMQuality/Emulators/RucioClient/MakeRucioMockFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@
"""
Script to produce a json file to mock Rucio data
"""

from __future__ import (division, print_function)

import json
import os
import sys

from future.utils import viewitems

from WMCore.Services.Rucio.Rucio import Rucio
from WMCore.WMBase import getTestBase


### Here goes a list of data that we want to fetch from Rucio and
### persist in our json file to mock those calls/data
CONTAINERS = [u"/MinimumBias/ComissioningHI-v1/RAW",
u"/Cosmics/ComissioningHI-PromptReco-v1/RECO"]
CONTAINERS = ["/MinimumBias/ComissioningHI-v1/RAW",
"/Cosmics/ComissioningHI-PromptReco-v1/RECO",
"/GammaGammaToEE_Elastic_Pt15_8TeV-lpair/Summer12-START53_V7C-v1/GEN-SIM"]
BLOCKS = []

### The output file which will contain all the Rucio mock data
Expand All @@ -45,6 +42,7 @@ def main():
for container in CONTAINERS:
print("Building call list for container: {}".format(container))
calls.append(['getBlocksInContainer', {'container': container}])
calls.append(['getBlocksInContainer', {'container': container, 'scope': SCOPE}])
calls.append(['isContainer', {'didName': container}])
calls.append(['getDID', {'didName': container, 'dynamic': False}])
calls.append(['didExist', {'didName': container}])
Expand All @@ -66,7 +64,7 @@ def main():
for call in calls:
func = getattr(rucio, call[0])
if len(call) > 1:
signature = '%s:%s' % (call[0], sorted(viewitems(call[1])))
signature = '%s:%s' % (call[0], sorted(call[1].items()))
result = func(**call[1])
else:
result = func()
Expand Down
9 changes: 0 additions & 9 deletions src/python/WMQuality/Emulators/RucioClient/MockRucioApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,6 @@ def genericLookup(*args, **kwargs):

return genericLookup

def getBlocksInContainer(self, container, scope='cms'):
"""
Returns list of block names for given container
"""
cname = self.__class__.__name__
blockNames = [container + '#123', container + '#456']
logging.info("%s getBlocksInContainer %s", cname, blockNames)
return blockNames

def listDataRules(self, name, **kwargs):
"""
Emulate listDataRules Rucio API
Expand Down
Loading

0 comments on commit 53491a6

Please sign in to comment.