Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt MSPileup data into PileupFetcher #12197

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/python/Utils/Patterns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Patterns module provides set of CS patterns
"""

import re

class Singleton(type):
"""Implementation of Singleton class"""
Expand All @@ -11,3 +11,14 @@ def __call__(cls, *args, **kwargs):
cls._instances[cls] = \
super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


def getDomainName(urlStr):
"""
Given a URL string, return the domain name.
amaltaro marked this conversation as resolved.
Show resolved Hide resolved
:param urlStr: URL string
:return: a string with the domain name (e.g. "cmsweb-prod")
"""
domainPattern = re.compile(r'https?://([^/]+)\.cern\.ch')
match = domainPattern.search(urlStr)
return match.group(1) if match else ""
72 changes: 55 additions & 17 deletions src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
of pileup files in the job sandbox for the dataset.

"""
from __future__ import print_function

from future.utils import viewitems

import datetime
import os
import hashlib
Expand All @@ -15,8 +11,10 @@
import logging
from json import JSONEncoder
import WMCore.WMSpec.WMStep as WMStep
from Utils.Patterns import getDomainName
from Utils.Utilities import encodeUnicodeToBytes
from WMCore.Services.DBS.DBSReader import DBSReader
from WMCore.Services.MSPileup.MSPileupUtils import getPileupDocs
from WMCore.Services.Rucio.Rucio import Rucio
from WMCore.WMSpec.Steps.Fetchers.FetcherInterface import FetcherInterface

Expand All @@ -34,7 +32,6 @@ def __init__(self):
Prepare module setup
"""
super(PileupFetcher, self).__init__()
# FIXME: find a way to pass the Rucio account name to this fetcher module
self.rucioAcct = "wmcore_pileup"
self.rucio = None

Expand All @@ -52,41 +49,81 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
"BlockB": {"FileList": [], "PhEDExNodeName": []}, ....}
"""
resultDict = {}
# first, figure out which instance of MSPileup and Rucio to use
pileupInstance = getDomainName(dbsReader.dbsURL)
msPileupUrl = f"https://{pileupInstance}.cern.ch/ms-pileup/data/pileup"
# FIXME: this juggling with Rucio is tough! We can get away without it,
# but for that we would have to use testbed MSPileup against Prod Rucio
if pileupInstance == "cmsweb-prod" or pileupInstance == "cmsweb":
rucioAuthUrl, rucioUrl = "cms-rucio-auth", "cms-rucio"
else:
rucioAuthUrl, rucioUrl = "cms-rucio-auth-int", "cms-rucio-int"
# initialize Rucio here to avoid this authentication on T0-WMAgent
self.rucio = Rucio(self.rucioAcct,
authUrl=f"https://{rucioAuthUrl}.cern.ch",
hostUrl=f"http://{rucioUrl}.cern.ch")

# iterate over input pileup types (e.g. "cosmics", "minbias")
for pileupType in stepHelper.data.pileup.listSections_():
# the format here is: step.data.pileup.cosmics.dataset = [/some/data/set]
datasets = getattr(getattr(stepHelper.data.pileup, pileupType), "dataset")
# each dataset input can generally be a list, iterate over dataset names
blockDict = {}
for dataset in datasets:

# using the original dataset, resolve blocks, files and number of events with DBS
fCounter = 0
for fileInfo in dbsReader.getFileListByDataset(dataset=dataset, detail=True):
blockDict.setdefault(fileInfo['block_name'], {'FileList': [],
'NumberOfEvents': 0,
'PhEDExNodeNames': []})
blockDict[fileInfo['block_name']]['FileList'].append(fileInfo['logical_file_name'])
blockDict[fileInfo['block_name']]['NumberOfEvents'] += fileInfo['event_count']
fCounter += 1

self._getDatasetLocation(dataset, blockDict)
logging.info(f"Found {len(blockDict)} blocks in DBS for dataset {dataset} with {fCounter} files")
self._getDatasetLocation(dataset, blockDict, msPileupUrl)

resultDict[pileupType] = blockDict
return resultDict

def _getDatasetLocation(self, dset, blockDict):
def _getDatasetLocation(self, dset, blockDict, msPileupUrl):
"""
Given a dataset name, query PhEDEx or Rucio and resolve the block location
:param dset: string with the dataset name
:param blockDict: dictionary with DBS summary info
:param msPileupUrl: string with the MSPileup url
:return: update blockDict in place
"""
# initialize Rucio here to avoid this authentication on T0-WMAgent
self.rucio = Rucio(self.rucioAcct)
blockReplicas = self.rucio.getPileupLockedAndAvailable(dset, account=self.rucioAcct)
for blockName, blockLocation in viewitems(blockReplicas):
try:
blockDict[blockName]['PhEDExNodeNames'] = list(blockLocation)
except KeyError:
logging.warning("Block '%s' present in Rucio but not in DBS", blockName)
# fetch the pileup configuration from MSPileup
try:
queryDict = {'query': {'pileupName': dset},
'filters': ['pileupName', 'customName', 'containerFraction', 'currentRSEs']}
doc = getPileupDocs(msPileupUrl, queryDict, method='POST')[0]
msg = f'Pileup dataset {doc["pileupName"]} with:\n\tcustom name: {doc["customName"]},'
msg += f'\n\tcurrent RSEs: {doc["currentRSEs"]}\n\tand container fraction: {doc["containerFraction"]}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaltaro do we need any translation like this one for currentRSEs in this case? Probably not, since your tests are successful, but better to document it here with this comment, for future reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a relevant question!
We do not need to perform any mapping here because MSPileup (and Rucio) are already returning a PhEDEx Node Name (aka RSE).

And what is expected during runtime is the PNN, as it is loaded from the Site Local Config:
https://github.com/dmwm/WMCore/blob/master/src/python/WMCore/WMRuntime/Scripts/SetupCMSSWPset.py#L378

So we are good.

logging.info(msg)
except Exception as ex:
logging.error(f'Error querying MSPileup for dataset {dset}. Details: {str(ex)}')
raise ex

# custom dataset name means there was a container fraction change, use different scope
puScope = 'cms'
if doc["customName"]:
dset = doc["customName"]
puScope = 'group.wmcore'

blockReplicas = self.rucio.getBlocksInContainer(container=dset, scope=puScope)
logging.info(f"Found {len(blockReplicas)} blocks in container {dset} for scope {puScope}")

# Finally, update blocks present in Rucio with the MSPileup currentRSEs.
# Blocks not present in Rucio - hence only in DBS - are meant to be removed.
for blockName in list(blockDict):
if blockName not in blockReplicas:
logging.warning(f"Block {blockName} present in DBS but not in Rucio. Removing it.")
blockDict.pop(blockName)
else:
blockDict[blockName]['PhEDExNodeNames'] = doc["currentRSEs"]
logging.info(f"Final pileup dataset {dset} has a total of {len(blockDict)} blocks.")

def _getCacheFilePath(self, stepHelper):

Expand Down Expand Up @@ -171,7 +208,8 @@ def createPileupConfigFile(self, helper):
"""
Stores pileup JSON configuration file in the working
directory / sandbox.

:param helper: WMStepHelper instance
:return: None
"""
if self._isCacheValid(helper):
# if file already exist don't make a new dbs call and overwrite the file.
Expand Down
1 change: 1 addition & 0 deletions src/python/WMQuality/Emulators/EmulatedUnitTestCase.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def setUp(self):
patchMSPileupAt = ['WMCore.MicroService.MSTransferor.MSTransferor.getPileupDocs',
'WMCore.WorkQueue.Policy.Start.StartPolicyInterface.getPileupDocs',
'WMCore.WorkQueue.DataLocationMapper.getPileupDocs',
'WMCore.WMSpec.Steps.Fetchers.PileupFetcher.getPileupDocs',
'WMComponent.WorkflowUpdater.WorkflowUpdaterPoller.getPileupDocs',
'WMCore_t.Services_t.MSPileup_t.MSPileupUtils_t.getPileupDocs']
for module in patchMSPileupAt:
Expand Down
12 changes: 7 additions & 5 deletions src/python/WMQuality/Emulators/MSPileup/MockMSPileupAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ def getPileupDocs(mspileupUrl, queryDict=None, method='GET'):
"""
Returns list of Pileup Documents.
"""
print(f"Mocking MSPileup getPileupDocs: \
url: {mspileupUrl}, query: {queryDict}, method: {method}")
print(f"Mocking MSPileup getPileupDocs: url: {mspileupUrl}, query: {queryDict}, method: {method}")

queryDict = queryDict or {}

if 'pileupName' in queryDict.get("query", {}):
pileupName = queryDict['query']['pileupName']
else:
pileupName = "/GammaGammaToEE_Elastic_Pt15_8TeV-lpair/Summer12-START53_V7C-v1/GEN-SIM"
print(f"pileupName: {pileupName}")
pdict = {
# "pileupName": "/MinBias_TuneCP5_14TeV-pythia8/PhaseIITDRSpring19GS-106X_upgrade2023_realistic_v2_ext1-v1/GEN-SIM",
"pileupName": "/GammaGammaToEE_Elastic_Pt15_8TeV-lpair/Summer12-START53_V7C-v1/GEN-SIM",
"pileupName": pileupName,
"pileupType": "classic",
"insertTime": 1680873642,
"lastUpdateTime": 1706216047,
Expand Down
12 changes: 5 additions & 7 deletions src/python/WMQuality/Emulators/RucioClient/MakeRucioMockFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@
"""
Script to produce a json file to mock Rucio data
"""

from __future__ import (division, print_function)

import json
import os
import sys

from future.utils import viewitems

from WMCore.Services.Rucio.Rucio import Rucio
from WMCore.WMBase import getTestBase


### Here goes a list of data that we want to fetch from Rucio and
### persist in our json file to mock those calls/data
CONTAINERS = [u"/MinimumBias/ComissioningHI-v1/RAW",
u"/Cosmics/ComissioningHI-PromptReco-v1/RECO"]
CONTAINERS = ["/MinimumBias/ComissioningHI-v1/RAW",
"/Cosmics/ComissioningHI-PromptReco-v1/RECO",
"/GammaGammaToEE_Elastic_Pt15_8TeV-lpair/Summer12-START53_V7C-v1/GEN-SIM"]
BLOCKS = []

### The output file which will contain all the Rucio mock data
Expand All @@ -45,6 +42,7 @@ def main():
for container in CONTAINERS:
print("Building call list for container: {}".format(container))
calls.append(['getBlocksInContainer', {'container': container}])
calls.append(['getBlocksInContainer', {'container': container, 'scope': SCOPE}])
calls.append(['isContainer', {'didName': container}])
calls.append(['getDID', {'didName': container, 'dynamic': False}])
calls.append(['didExist', {'didName': container}])
Expand All @@ -66,7 +64,7 @@ def main():
for call in calls:
func = getattr(rucio, call[0])
if len(call) > 1:
signature = '%s:%s' % (call[0], sorted(viewitems(call[1])))
signature = '%s:%s' % (call[0], sorted(call[1].items()))
result = func(**call[1])
else:
result = func()
Expand Down
9 changes: 0 additions & 9 deletions src/python/WMQuality/Emulators/RucioClient/MockRucioApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,6 @@ def genericLookup(*args, **kwargs):

return genericLookup

def getBlocksInContainer(self, container, scope='cms'):
"""
Returns list of block names for given container
"""
cname = self.__class__.__name__
blockNames = [container + '#123', container + '#456']
logging.info("%s getBlocksInContainer %s", cname, blockNames)
return blockNames

def listDataRules(self, name, **kwargs):
"""
Emulate listDataRules Rucio API
Expand Down
Loading