Skip to content

Commit

Permalink
Combines block records in single dict that includes all associated wfs (
Browse files Browse the repository at this point in the history
#11245)

* Combines block records in single dict that includes all associated wfs

* Remove complete workflows check

* Add all sites to single block

* Documentation and styling fixes
  • Loading branch information
germanfgv authored Aug 17, 2022
1 parent 0710cb9 commit d813d61
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class GetCompletedBlocks(DBFormatter):
dbsbuffer_dataset_subscription.site,
dbsbuffer_workflow.name,
dbsbuffer_block.create_time
HAVING COUNT(*) = SUM(dbsbuffer_workflow.completed)
"""

def format(self, result):
Expand All @@ -59,48 +58,63 @@ def format(self, result):
Format the query results into the proper dictionary expected at the upper layer Python code.
The input should be a list of database objects each representing a line returned from the database
with key names matching the column names from the sql query
The result should be a list of dictionaries one record per line returned from the database
with key names mapped to the python code expected structures.
The result should be a list of dictionaries one record per block returned from the database
with key names mapped to the python code expected structures. All workflows and sites are aggregated
into the same block record.
e.g.
[{'blockName': '/Cosmics/Tier0_REPLAY_2022-v425/RAW#aa3dbb67-dab0-4671-a550-711bdf9f4b08',
'workflowName': 'Repack_Run351572_StreamPhysics_Tier0_REPLAY_2022_ID220513161632_v425_220514_2038',
'location': 'T0_CH_CERN_Disk',
'site': {'T0_CH_CERN_Disk'},
'dataset': '/Cosmics/Tier0_REPLAY_2022-v425/RAW'},
{'blockName': '/NoBPTX/Tier0_REPLAY_2022-v425/RAW#6b8e95c3-656b-4302-8265-275df1195f4c',
'workflowName': 'Repack_Run351572_StreamPhysics_Tier0_REPLAY_2022_ID220513161632_v425_220514_2038',
'location': 'T0_CH_CERN_Disk',
'site': {'T0_CH_CERN_Disk'},
'dataset': '/NoBPTX/Tier0_REPLAY_2022-v425/RAW'}]
{ '/Tau/Run2022C-PromptReco-v1/MINIAOD#2dd5a82b-873a-4403-8da1-6b943dac7081': {'blockCreateTime': 1659675842,
'blockName': '/Tau/Run2022C-PromptReco-v1/MINIAOD#2dd5a82b-873a-4403-8da1-6b943dac7081',
'dataset': '/Tau/Run2022C-PromptReco-v1/MINIAOD',
'location': 'T0_CH_CERN_Disk',
'sites': {'T1_ES_PIC_Disk',
'T1_ES_PIC_MSS'},
'workflowNames': {'PromptReco_Run356614_Tau'}},
'/Tau/Run2022C-PromptReco-v1/MINIAOD#f6bf5cc7-cab2-4572-8f30-574296bb109d': {'blockCreateTime': 1659723755,
'blockName': '/Tau/Run2022C-PromptReco-v1/MINIAOD#f6bf5cc7-cab2-4572-8f30-574296bb109d',
'dataset': '/Tau/Run2022C-PromptReco-v1/MINIAOD',
'location': 'T0_CH_CERN_Disk',
'sites': {'T1_ES_PIC_Disk',
'T1_ES_PIC_MSS'},
'workflowNames': {'PromptReco_Run356615_Tau',
'PromptReco_Run356619_Tau'}}
}
NOTE:
* location: Means where the output block has been created
* site(s): Means where the dataset gets a container-level rule
:param result: The result as returned by the mysql query execution.
:return: List of dictionaries
:return: Dictionary of dictionaries, each one describing a block.
"""

# NOTE: We need to rename all the keys to follow the cammelCase standard. And also to comply
# with the key names as expected from the rest of the already existing python code
keyMap = {'blockname': 'blockName',
'name': 'workflowName',
'name': 'workflowNames',
'pnn': 'location',
'site': 'sites',
'path': 'dataset',
'create_time': 'blockCreateTime'}

dictResults = DBFormatter.formatDict(self, result)
for record in dictResults:
for dbKey, pyKey in keyMap.items():
if dbKey == 'site':
sites = record.pop(dbKey)
record[pyKey] = set()
record[pyKey].add(sites)
else:
record[pyKey] = record.pop(dbKey)
listResults = DBFormatter.formatDict(self, result)
dictResults = {}
for record in listResults:
# Populates results dict and adds all workflows and sites of the same block to a single record
blockName = record['blockname']
if blockName in dictResults:
dictResults[blockName]['workflowNames'].add(record['name'])
dictResults[blockName]['sites'].add(record['site'])
else:
for dbKey, pyKey in keyMap.items():
if dbKey == 'site' or dbKey == 'name':
data = record.pop(dbKey)
record[pyKey] = set()
record[pyKey].add(data)
else:
record[pyKey] = record.pop(dbKey)
dictResults[blockName] = record

return dictResults

Expand Down
21 changes: 11 additions & 10 deletions src/python/WMComponent/RucioInjector/RucioInjectorPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,16 @@ def deleteBlocks(self):

# Get list of blocks that can be deleted
# blockDict = self.findDeletableBlocks.execute(transaction=False)
completedBlocksList = self.getCompletedBlocks.execute(transaction=False)
completedBlocksDict = self.getCompletedBlocks.execute(transaction=False)

if not completedBlocksList:
if not completedBlocksDict:
logging.info("No candidate blocks found for rule deletion.")
return

logging.info("Found %d completed blocks", len(completedBlocksList))
logging.debug("Full completedBlocksList: %s", pformat(completedBlocksList))
logging.info("Found %d completed blocks", len(completedBlocksDict))
logging.debug("Full completedBlocksDict: %s", pformat(completedBlocksDict))

deletableWfsDict = self.getDeletableWorkflows.execute()
deletableWfsDict = set(self.getDeletableWorkflows.execute())

if not deletableWfsDict:
logging.info("No workflow chains (Parent + child workflows) in fully terminal state found. Skipping block level rule deletion in the current run.")
Expand All @@ -382,23 +382,24 @@ def deleteBlocks(self):

now = int(time.time())
blockDict = {}
for block in completedBlocksList:
if block['workflowName'] in deletableWfsDict and \
for block in completedBlocksDict.values():
if block['workflowNames'].issubset(deletableWfsDict) and \
now - block['blockCreateTime'] > self.blockDeletionDelaySeconds:
blockDict[block['blockName']] = block

logging.info("Found %d final candidate blocks for rule deletion", len(blockDict))
logging.debug("Final deletable blocks dict: %s", pformat(blockDict))

for blocksSlice in grouper(blockDict, self.delBlockSlicesize):
logging.info("Handeling %d candidate blocks", len(blocksSlice))
logging.info("Handling %d candidate blocks", len(blocksSlice))
containerDict = {}
# Populate containerDict, assigning each block to its correspondant container
# Populate containerDict, assigning each block to its correspondent container
for blockName in blocksSlice:
container = blockDict[blockName]['dataset']
# If the container is not in the dictionary, create a new entry for it
if container not in containerDict:
# Set of sites to which the container needs to be transferred
# All blocks belonging to a container need to be sent to the same sites, so we simply take the sites list
# from the current block to determine the containers required final RSEs.
sites = set(x.replace("_MSS", "_Tape") for x in blockDict[blockName]['sites'])
containerDict[container] = {'blocks': [], 'rse': sites}
containerDict[container]['blocks'].append(blockName)
Expand Down

0 comments on commit d813d61

Please sign in to comment.