Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
todor-ivanov committed Jun 7, 2022
1 parent b9758f9 commit df62783
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class CountUndeletedBlocksByWorkflow(DBFormatter):
sql = """
SELECT
dbsbuffer_workflow.name,
dbsbuffer_block.deleted,
COUNT(DISTINCT dbsbuffer_block.blockname) as count
FROM dbsbuffer_block
INNER JOIN dbsbuffer_file ON
Expand All @@ -35,8 +34,7 @@ class CountUndeletedBlocksByWorkflow(DBFormatter):
dbsbuffer_workflow.id = dbsbuffer_file.workflow
WHERE dbsbuffer_block.deleted=0
GROUP BY
dbsbuffer_workflow.name,
dbsbuffer_block.deleted
dbsbuffer_workflow.name
"""

def execute(self, conn=None, transaction=False, returnCursor=False):
Expand All @@ -45,7 +43,6 @@ def execute(self, conn=None, transaction=False, returnCursor=False):
:param conn: A current database connection to be used if existing
:param transaction: A current database transaction to be used if existing
:return: A list of dictionaries one record for each database line returned
"""
dictResults = DBFormatter.formatDict(self, self.dbi.processData(self.sql, conn=conn,
transaction=transaction))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
_CountUndeletedBlocksByWorkflow_
Oracle implementation of Workflow.CountUndeletedBlocksByWorkflow
"""

from WMComponent.DBS3Buffer.MySQL.CountUndeletedBlocksByWorkflow import CountUndeletedBlocksByWorkflow as MySQLCountUndeletedBlocksByWorkflow
Expand Down
4 changes: 2 additions & 2 deletions src/python/WMComponent/TaskArchiver/CleanCouchPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ def getPerformanceFromDQM(self, dqmUrl, dataset, run):
return False
except Exception as ex:
logging.error('Couldnt fetch DQM Performance data for dataset %s , Run %s', dataset, run)
logging.exception(ex) # Let's print the stacktrace with generic Exception
logging.exception(str(ex)) # Let's print the stacktrace with generic Exception
return False

try:
Expand All @@ -1008,7 +1008,7 @@ def getPerformanceFromDQM(self, dqmUrl, dataset, run):
except Exception as ex:
logging.info("Actually got a JSON from DQM perf in for %s run %d , but content was bad, Bailing out",
dataset, run)
logging.exception(ex) # Let's print the stacktrace with generic Exception
logging.exception(str(ex)) # Let's print the stacktrace with generic Exception
return False
# If it gets here before returning False or responseJSON, it went wrong
return False
Expand Down
81 changes: 18 additions & 63 deletions src/python/WMCore/WMBS/MySQL/Workflow/GetDeletedBlocksByWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ def format(self, result):
python code variable naming conventions.
e.g.
[{'blockname': '/StreamExpressCosmics/Tier0_REPLAY_2022-SiStripCalZeroBias-Express-v425/ALCARECO#4cb4a954-a96c-43b8-9f2e-2ec5f92c06dd',
[{'blockname': '/a/b/c#123-qwe',
'deleted': 0,
'name': 'Express_Run351572_StreamExpressCosmics_Tier0_REPLAY_2022_ID220531142559_v425_220531_1426'},
{'blockname': '/StreamExpressCosmics/Tier0_REPLAY_2022-SiStripCalZeroBias-Express-v425/ALCARECO#4cb4a954-a96c-43b8-9f2e-2ec5f92c06dd',
'name': 'WorkflowName'},
{'blockname': '/a/b/c#456-rty',
'deleted': 1,
'name': 'Express_Run351572_StreamExpressCosmics_Tier0_REPLAY_2022_ID220531142559_v425_220531_1426'},
{'blockname': '/StreamExpressCosmics/Tier0_REPLAY_2022-Express-v425/DQMIO#d2819fcf-bac2-4fbb-80cf-11b5bc9eeb00',
'name': 'WorkflowName'},
{'blockname': '/a/b/d#123-asd',
'deleted': 0,
'name': 'Express_Run351572_StreamExpressCosmics_Tier0_REPLAY_2022_ID220531142559_v425_220531_1426'}
'name': 'WorkflowName'}
...
]
Expand All @@ -70,10 +70,10 @@ def format(self, result):
This list needs to be further aggregated by name to produce an aggregated structure per workflow like:
[{'name': 'Express_Run351572_StreamExpressCosmics_Tier0_REPLAY_2022_ID220531142559_v425_220531_1426'
'blocksNotDeleted': ['/StreamExpressCosmics/Tier0_REPLAY_2022-SiStripCalZeroBias-Express-v425/ALCARECO#4cb4a954-a96c-43b8-9f2e-2ec5f92c06dd',
/StreamExpressCosmics/Tier0_REPLAY_2022-Express-v425/DQMIO#d2819fcf-bac2-4fbb-80cf-11b5bc9eeb00']
'blocksDeleted': [/StreamExpressCosmics/Tier0_REPLAY_2022-SiStripCalZeroBias-Express-v425/ALCARECO#4cb4a954-a96c-43b8-9f2e-2ec5f92c06dd']
[{'name': 'WorkflowName'
'blocksNotDeleted': ['/a/b/c#123-qwe',
/a/b/c#456-rty']
'blocksDeleted': ['/a/b/d#123-asd']
},
...
]
Expand All @@ -86,60 +86,15 @@ def format(self, result):
dictResults = DBFormatter.formatDict(self, result)

# Now aggregate all blocks per workflow:
aggResults = []
aggResults.append(self._createAggRecord(dictResults.pop()))
while dictResults:
aggRecord = self._createAggRecord(dictResults.pop())
if aggRecord['name'] not in [wf['name'] for wf in aggResults]:
aggResults.append(aggRecord)
results = {}
for record in dictResults:
wfName = record['name']
results.setdefault(wfName, {'name': wfName, 'blocksDeleted': [], 'blocksNotDeleted': []})
if record['deleted']:
results[wfName]['blocksDeleted'].append(record['blockname'])
else:
for ind in range(len(aggResults)):
if aggResults[ind]['name'] == aggRecord['name']:
aggResults[ind] = self._mergeAggRecords(aggResults[ind], aggRecord)
break
return aggResults

def _createAggRecord(self, record):
"""
Auxiliary method for creating an aggregated record out of a regular DB record
:param record: A dictionary with a single DB record of the form:
{'name': '...',
'blockname': '...',
'deleted':'...'}
:return addRecord: A dictionary of the form:
{'name': '...',
'blocksDeleted: [...],
'blocksNotDeleted: [...]}
"""
aggRecord = {'name': record['name'],
'blocksDeleted': [],
'blocksNotDeleted': []}
if record['deleted']:
aggRecord['blocksDeleted'].append(record['blockname'])
else:
aggRecord['blocksNotDeleted'].append(record['blockname'])
return aggRecord

def _mergeAggRecords(self, aggRecordOne, aggRecordTwo):
"""
Auxiliary method for merging two aggregated records into a single one
:param addRecord*: identical dictionaries with a single aggregated record of the form:
{'name': '...',
'blocksDeleted: [...],
'blocksNotDeleted: [...]}
:return addRecord: A dictionary with the aggregated record:
"""
# NOTE: The workflowNames for the two aggRecords need to be identical otherwise an empty record is returned
aggRecord = {'name': "",
'blocksDeleted': [],
'blocksNotDeleted': []}

# First we need to check if the workflowNames for the two aggRecords are identical
if aggRecordOne['name'] == aggRecordTwo['name']:
aggRecord['name'] = aggRecordOne['name']
aggRecord['blocksDeleted'] = list(set(aggRecordOne['blocksDeleted'] + aggRecordTwo['blocksDeleted']))
aggRecord['blocksNotDeleted'] = list(set(aggRecordOne['blocksNotDeleted'] + aggRecordTwo['blocksNotDeleted']))
return aggRecord
results[wfName]['blocksNotDeleted'].append(record['blockname'])
return results.values()

def execute(self, conn=None, transaction=False, returnCursor=False):
"""
Expand Down

0 comments on commit df62783

Please sign in to comment.