Skip to content

Commit

Permalink
Combines block records in single dict that includes all associated wfs
Browse files Browse the repository at this point in the history
  • Loading branch information
germanfgv committed Aug 5, 2022
1 parent 254a0c4 commit a3b0d59
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,29 @@ def format(self, result):
# NOTE: We need to rename all the keys to follow the cammelCase standard. And also to comply
# with the key names as expected from the rest of the already existing python code
keyMap = {'blockname': 'blockName',
'name': 'workflowName',
'name': 'workflowNames',
'pnn': 'location',
'site': 'sites',
'path': 'dataset',
'create_time': 'blockCreateTime'}

dictResults = DBFormatter.formatDict(self, result)
for record in dictResults:
listResults = DBFormatter.formatDict(self, result)
dictResults = {}
for record in listResults:
for dbKey, pyKey in keyMap.items():
if dbKey == 'site':
sites = record.pop(dbKey)
if dbKey == 'site' or dbKey == 'name':
data = record.pop(dbKey)
record[pyKey] = set()
record[pyKey].add(sites)
record[pyKey].add(data)
else:
record[pyKey] = record.pop(dbKey)
# Populates results dict and adds all workflows of the same block to a single record
blockname = record['blockName']
if blockname in dictResults:
dictResults[blockname]['workflowName'].add(record['name'])
else:
dictResults[blockname] = record


return dictResults

Expand Down
14 changes: 7 additions & 7 deletions src/python/WMComponent/RucioInjector/RucioInjectorPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,16 @@ def deleteBlocks(self):

# Get list of blocks that can be deleted
# blockDict = self.findDeletableBlocks.execute(transaction=False)
completedBlocksList = self.getCompletedBlocks.execute(transaction=False)
completedBlocksDict = self.getCompletedBlocks.execute(transaction=False)

if not completedBlocksList:
if not completedBlocksDict:
logging.info("No candidate blocks found for rule deletion.")
return

logging.info("Found %d completed blocks", len(completedBlocksList))
logging.debug("Full completedBlocksList: %s", pformat(completedBlocksList))
logging.info("Found %d completed blocks", len(completedBlocksDict))
logging.debug("Full completedBlocksDict: %s", pformat(completedBlocksDict))

deletableWfsDict = self.getDeletableWorkflows.execute()
deletableWfsDict = set(self.getDeletableWorkflows.execute())

if not deletableWfsDict:
logging.info("No workflow chains (Parent + child workflows) in fully terminal state found. Skipping block level rule deletion in the current run.")
Expand All @@ -382,8 +382,8 @@ def deleteBlocks(self):

now = int(time.time())
blockDict = {}
for block in completedBlocksList:
if block['workflowName'] in deletableWfsDict and \
for block in completedBlocksDict:
if block['workflowNames'].issubset(deletableWfsDict) and \
now - block['blockCreateTime'] > self.blockDeletionDelaySeconds:
blockDict[block['blockName']] = block

Expand Down

0 comments on commit a3b0d59

Please sign in to comment.