Skip to content

Commit

Permalink
Add extra protection for T0 to prevent archival of workflows having b…
Browse files Browse the repository at this point in the history
…locks not yet deleted.

Add GetDeletedBlocksByWorkflow DAO to WMBS.

Aggregate all results in the DAO per workflowName

Add extra protection for T0 at cleanCouchPoller

Typo

Update docstrings and log messages

Remove redundant statements:

Remove redundant range() start argument

Remove redundant pass statement

Remove redundant DISTINCT statement
  • Loading branch information
todor-ivanov committed Jun 1, 2022
1 parent f2aef9c commit f4ef897
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/python/WMComponent/TaskArchiver/CleanCouchPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,19 @@ def deleteWorkflowFromWMBSAndDisk(self):
deletableWorkflowsDAO = self.daoFactory(classname="Workflow.GetDeletableWorkflows")
deletablewfs = deletableWorkflowsDAO.execute()

# For T0 subtract the workflows which are not having all their blocks deleted yet:
if not self.useReqMgrForCompletionCheck:
deletedBlocksByWorkflowDAO = self.daoFactory(classname="Workflow.GetDeletedBlocksByWorkflow")
deletedBlocksByWorkflow = deletedBlocksByWorkflowDAO.execute()
for workflow in list(deletablewfs):
for ind in range(len(deletedBlocksByWorkflow)):
if deletedBlocksByWorkflow[ind]['workflowName'] == workflow and \
len(deletedBlocksByWorkflow[ind]['blocksNotDeleted'] != 0):
msg = "Removing workflow: %s from the list of deletable workflows. It still has blocks NOT deleted."
self.logger.debug(msg, workflow)
deletablewfs.pop(workflow)
break

# Only delete those where the upload and notification succeeded
logging.info("Found %d candidate workflows for deletion.", len(deletablewfs))
# update the completed flag in dbsbuffer_workflow table so blocks can be closed
Expand Down
159 changes: 159 additions & 0 deletions src/python/WMCore/WMBS/MySQL/Workflow/GetDeletedBlocksByWorkflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""
_GetDeletedBlocksByWorkflow_
MySQL implementation of Workflows.GetDeletedBlocksByWorkflow
Retrieves a list of workflows and blocks per workflow with counters for deleted blocks,
"""


from WMCore.Database.DBFormatter import DBFormatter


class GetDeletedBlocksByWorkflow(DBFormatter):
"""
Retrieves a list of all workflows and the relative deleted blocks lists
"""
sql = """SELECT
dbsbuffer_block.blockname,
dbsbuffer_block.deleted,
wmbs_workflow.name
FROM dbsbuffer_block
INNER JOIN dbsbuffer_file ON
dbsbuffer_file.block_id = dbsbuffer_block.id
INNER JOIN dbsbuffer_workflow ON
dbsbuffer_workflow.id = dbsbuffer_file.workflow
INNER JOIN wmbs_workflow ON
wmbs_workflow.name = dbsbuffer_workflow.name
GROUP BY
dbsbuffer_block.blockname,
dbsbuffer_block.deleted,
wmbs_workflow.name
"""

def format(self, result):
"""
_format_
Format the query results into the proper dictionary expected at the upper layer Python code.
The input should be a list of database objects, each one representing a single line returned
from the database with key names matching the column names from the sql query.
The intermediate (not aggregated) result representing the primary database output in python should be
a list of dictionaries one record per line returned from the database with key names mapped to the
python code variable naming conventions.
e.g.
[{'blockName': '/StreamExpressCosmics/Tier0_REPLAY_2022-SiStripCalZeroBias-Express-v425/ALCARECO#4cb4a954-a96c-43b8-9f2e-2ec5f92c06dd',
'deleted': 0,
'workflowName': 'Express_Run351572_StreamExpressCosmics_Tier0_REPLAY_2022_ID220531142559_v425_220531_1426'},
{'blockName': '/StreamExpressCosmics/Tier0_REPLAY_2022-SiStripCalZeroBias-Express-v425/ALCARECO#4cb4a954-a96c-43b8-9f2e-2ec5f92c06dd',
'deleted': 1,
'workflowName': 'Express_Run351572_StreamExpressCosmics_Tier0_REPLAY_2022_ID220531142559_v425_220531_1426'},
{'blockName': '/StreamExpressCosmics/Tier0_REPLAY_2022-Express-v425/DQMIO#d2819fcf-bac2-4fbb-80cf-11b5bc9eeb00',
'deleted': 0,
'workflowName': 'Express_Run351572_StreamExpressCosmics_Tier0_REPLAY_2022_ID220531142559_v425_220531_1426'}
...
]
NOTE:
* The number of records per workflow and block returned (i.e. number of records per group in the GROUP BY statement)
from the query is not related to either the number of blocks nor to the number of workflows, but rather to the
combination of number of files in the block and some other factor which increases the granularity (it seems to be
the number of records in dbsbuffer_workflow table per file aggregated by workflow), and NO `DISTINCT` requirement
in the SELECT statement is needed because we already have them properly grouped.
* Once deleted we should NOT expect duplicate records with two different values of the deleted
flag to be returned for a single block but we should still create the list of deleted and
NotDeleted blocks as sets and eventually create their proper intersection for double check.
This list needs to be further aggregated by workflowName to produce an aggregated structure per workflow like:
[{'workflowName': 'Express_Run351572_StreamExpressCosmics_Tier0_REPLAY_2022_ID220531142559_v425_220531_1426'
'blocksNotDeleted': ['/StreamExpressCosmics/Tier0_REPLAY_2022-SiStripCalZeroBias-Express-v425/ALCARECO#4cb4a954-a96c-43b8-9f2e-2ec5f92c06dd',
/StreamExpressCosmics/Tier0_REPLAY_2022-Express-v425/DQMIO#d2819fcf-bac2-4fbb-80cf-11b5bc9eeb00']
'blocksDeleted': [/StreamExpressCosmics/Tier0_REPLAY_2022-SiStripCalZeroBias-Express-v425/ALCARECO#4cb4a954-a96c-43b8-9f2e-2ec5f92c06dd']
},
...
]
:param result: The result as returned by the mysql query execution.
:return: List of dictionaries
"""

# First reformat the output in a list of dictionaries per DB record and remap all keys to be more descriptive:
keyMap = {'blockname': 'blockName',
'name': 'workflowName',
'deleted': 'deleted'}

dictResults = DBFormatter.formatDict(self, result)
for record in dictResults:
for dbKey, pyKey in keyMap.items():
record[pyKey] = record.pop(dbKey)

# Now aggregate all blocks per workflow:
aggResults=[]
aggResults.append(self._createAggRecord(dictResults.pop()))
while dictResults:
aggRecord = self._createAggRecord(dictResults.pop())
if aggRecord['workflowName'] not in [wf['workflowName'] for wf in aggResults]:
aggResults.append(aggRecord)
else:
for ind in range(len(aggResults)):
if aggResults[ind]['workflowName'] == aggRecord['workflowName']:
aggResults[ind] = self._mergeAggRecords(aggResults[ind], aggRecord)
break
return aggResults

def _createAggRecord(self, record):
"""
Auxiliary method for creating an aggregated record out of a regular DB record
:param record: A dictionary with a single DB record of the form:
{'workflowName': '...',
'blockName': '...',
'deleted':'...'}
:return addRecord: A dictionary of the form:
{'workflowName': '...',
'blocksDeleted: [...],
'blocksNotDeleted: [...]}
"""
aggRecord={'workflowName': record['workflowName'],
'blocksDeleted': [],
'blocksNotDeleted':[]}
if record['deleted']:
aggRecord['blocksDeleted'].append(record['blockName'])
else:
aggRecord['blocksNotDeleted'].append(record['blockName'])
return aggRecord

def _mergeAggRecords(self, aggRecordOne, aggRecordTwo):
"""
Auxiliary method for merging two aggregated records into a single one
:param addRecord*: identical dictionaries with a single aggregated record of the form:
{'workflowName': '...',
'blocksDeleted: [...],
'blocksNotDeleted: [...]}
:return addRecord: A dictionary with the aggregated record:
"""
# NOTE: The workflowNames for the two aggRecords need to be identical otherwise an empty record is returned
aggRecord={'workflowName': "",
'blocksDeleted': [],
'blocksNotDeleted':[]}

# First we need to check if the workflowNames for the two aggRecords are identical
if aggRecordOne['workflowName'] == aggRecordTwo['workflowName']:
aggRecord['workflowName'] = aggRecordOne['workflowName']
aggRecord['blocksDeleted'] = list(set(aggRecordOne['blocksDeleted'] + aggRecordTwo['blocksDeleted']))
aggRecord['blocksNotDeleted'] = list(set(aggRecordOne['blocksNotDeleted'] + aggRecordTwo['blocksNotDeleted']))
return aggRecord

def execute(self, conn=None, transaction=False, returnCursor=False):
"""
Executing the current sql query.
:param conn: A current database connection to be used if existing
:param transaction: A current database transaction to be used if existing
:return: A list of dictionaries one record for each database line returned
"""
results = self.dbi.processData(self.sql, conn=conn,
transaction=transaction)

return self.format(results)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python
"""
_GetDeletedBlocksByWorkflow_
Oracle implementation of Workflow.GetDeletedBlocksByWorkflow
"""

from WMCore.WMBS.MySQL.Workflow.GetDeletedBlocksByWorkflow import GetDeletedBlocksByWorkflow as MySQLGetDeletedBlocksByWorkflow

class GetDeletedBlocksByWorkflow(MySQLGetDeletedBlocksByWorkflow):
"""
Retrieves a list of all workflows and the relative deleted blocks lists
"""

0 comments on commit f4ef897

Please sign in to comment.