Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean input data rules #10224

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 150 additions & 9 deletions src/python/WMCore/MicroService/DataStructs/MSRuleCleanerWflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,126 @@
from __future__ import division, print_function

from copy import deepcopy
from Utils.IteratorTools import flattenList


class WfParser(object):
"""
Workflow description parser class.
"""
def __init__(self, docSchema):
"""
The init method for the Workflow parser class.
:param docSchema: Document template in the form of a list of tuples as follows:
[('KeyName', DefaultValue, type),
('KeyName', DefaultValue, type),
...]
To be used for identifying the fields to be searched for
in the workflow description

"""
self.extDoc = {}
for tup in docSchema:
self.extDoc[tup[0]] = {'keyName': tup[0],
'values': list(),
'default': tup[1],
'type': tup[2]}

def __call__(self, wfDescr):
"""
The Call method for the Workflow parser class.
"""
self._paramFinder(wfDescr)
self._wfParse()
return self.extDoc

def _paramFinder(self, wfObj):
"""
Private method used to recursively traverse a workflow description
and search for all the keyNames defined in the extDoc auxiliary data
structure. If a 'keyName' happens to be present in several nested levels,
or in several similar objects from the same level (like {'Task1': {},
'Task2': {} ...), all the values found are accumulated in the respective
(flat) list at extDoc[keyName]['values'], which is later to be converted
to the originally expected type for the given field as described in the
Document Template
:param wfObj: Dictionary containing the workflow description

"""
if isinstance(wfObj, (list, set, tuple)):
for value in wfObj:
self._paramFinder(value)
if isinstance(wfObj, dict):
for key, value in wfObj.items():
self._paramFinder(value)
for key in self.extDoc.keys():
if key in wfObj.keys():
self.extDoc[key]['values'].append(deepcopy(wfObj[key]))

def _wfParse(self):
"""
Workflow description parser. Given a document template representing all the
keyNames to be searched and a workflow description to search in recursively,
returns all the fields that it can find aggregated according to the rules bellow:
* if the number of found key instances is 0 - sets the default value from
the template.
* if the number of found key instances is 1 - sets the so found value from the
workflow description and converts it back to the form expected and described
in the template (removes the outermost list used for value aggregation)
* if the number of found key instances is > 1 - the values are aggregated
according to the expected types and data structure defined in the
template as follows:
* bool: sets it to True if any of the values found was set to True
* list: chains/flattens all the sub lists into a single list containing
all the values found
* dict: aggregates/flattens all the key-value pairs from all the
dictionaries found into one big dictionary
WARNING: (if an inner keyName happens to be found in multiple
dictionaries from the aggregated list of dictionaries
it will be overwritten with the values from the last
one to be merged into the finally constructed dictionary)!
* str: will be accumulated in a list containing all the values found
WARNING: (will change the expected structure of the field from
a single string to a list of strings)!

:param wfDescr: Dictionary with the workflow description
:param docTemplate: Document template in the form of a list of tuples as follows:
[('KeyName', DefaultValue, type),
('KeyName', DefaultValue, type),
...]
To be used for identifying the fields to be searched for
in the workflow description
"""

# Convert back the so aggregated extDoc to the original structure:
for keyName, data in self.extDoc.items():
if len(data['values']) == 0:
self.extDoc[keyName] = deepcopy(data['default'])
elif len(data['values']) == 1:
self.extDoc[keyName] = deepcopy(data['values'][0])
elif len(data['values']) > 1:
if data['type'] is bool:
self.extDoc[keyName] = any(data['values'])
elif data['type'] is list:
self.extDoc[keyName] = list(set(flattenList(data['values'])))
# WARNING: If it happens this list to be constructed out of elements
# which are instances of unhashable types (e.g. dict, list)
# the set() call will produce an ERR, but this is unlikely
# to happen, see [1] - All the fields we fetch from the
# so nested structure of Task/Step Chain dictionary are
# of hashable types.
# [1] https://github.com/dmwm/WMCore/blob/ed40d33069bdddcd98ed5b8430d5ca6662e5941f/src/python/WMCore/WMSpec/StdSpecs/StdBase.py#L1189
elif data['type'] is dict:
self.extDoc[keyName] = {}
for item in data['values']:
self.extDoc[keyName].update(item)
elif (isinstance(data['type'], tuple) and (str in data['type'] or unicode in data['type'])) or \
(data['type'] is str or data['type'] is unicode):
data['values'] = list(set(data['values']))
if len(data['values']) == 1:
self.extDoc[keyName] = deepcopy(data['values'][0])
else:
self.extDoc[keyName] = deepcopy(data['values'])


class MSRuleCleanerWflow(dict):
Expand All @@ -15,16 +135,24 @@ class MSRuleCleanerWflow(dict):
of the MSRuleCleaner Micro Service.
"""

def __init__(self, doc, **kwargs):
def __init__(self, wfDescr, **kwargs):
super(MSRuleCleanerWflow, self).__init__(**kwargs)

# Search for all the keys we need from the ReqManager workflow description
myDoc = {}
for tup in self.docSchema():
if tup[0] in doc:
myDoc[tup[0]] = deepcopy(doc[tup[0]])
else:
myDoc.update({tup[0]: tup[1]})
wfParser = WfParser(self.docSchema())
myDoc = wfParser(wfDescr)

# Convert some fields to lists explicitly:
# NOTE: Those are fields defined as strings in the original workflow
# representation, but may turn into lists during the recursive
# search and we will use them as lists for the rest of the code.
for key in ['DataPileup', 'MCPileup', 'ParentDataset']:
if not isinstance(myDoc[key], list):
if myDoc[key] is None:
myDoc[key] = []
else:
myDoc[key] = [myDoc[key]]

self.update(myDoc)

def docSchema(self):
Expand Down Expand Up @@ -54,7 +182,15 @@ def docSchema(self):
'ParentageResolved': Bool,
'PlineMarkers': None,
'IsClean': False
'ForceArchive', False]
'IsLogDBClean': False,
'IsArchivalDelayExpired': False,
'ForceArchive': False,
'RequestTransition': [],
'IncludeParents': False
'DataPileup': [],
'MCPileup': [],
'InputDataset': None,
'ParentDataset': []
}
:return: a list of tuples
"""
Expand All @@ -73,7 +209,12 @@ def docSchema(self):
('IsLogDBClean', False, bool),
('IsArchivalDelayExpired', False, bool),
('ForceArchive', False, bool),
('RequestTransition', [], list)]
('RequestTransition', [], list),
('IncludeParents', False, bool),
('DataPileup', None, (str, unicode)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I just noticed these default values don't match the default values in the unit test testTaskChainDefaults.
DataPileup, MCPileup and ParentDataset are inconsistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have not yet started paying attention to the final version of the unit tests. I was about to do that in a later stage. Once we agree on the implementation of all the rest. Because we kind of depend on that for the unit tests step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the DataPileup/MCPileup/ParentDataset default value supposed to be a list instead of None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's precisely my point. This data structure defines the schema of the workflow object used throughout MSRuleCleaner. By just looking at this, I'd expect those keys to have a value equal to None, not an empty list.

Does the whole logic - besides that explicit code you just referred to - change if you set default value to [] instead of None? If not and we get that for free, then I'd suggest to update the default value to make it consistent with the outcome of this workflow parser.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work, but what would happen is that this conversion from a string to list will happen silently in the workflow parser class here: [1] instead of explicitly in the three lines I previously mentioned. I have tested it both ways, and now I do not remember which exactly but there was a corner case in which the silent conversion of the data structure was not preferable. So I preferred in the -wfParser() method to make the best effort to revert the structure back to the original one as described in the document template (which reflects the values as we expect them from reqmgr), and only then make the explicit conversion to lists only for those I really want and make them obvious in the init() method of MSRuleCleanerWflow.

[1]
https://github.com/dmwm/WMCore/pull/10224/files/2f4a6275b7e0af14e3f42aabbca6f40470503ca9#diff-414c04814f1133dc334c7718010c11e70809389d23ec30e9d00d5e8874b84bf5R110-R111

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I just tested it again.... And not even a corner case but a rather general one. It tries to create a list of all letters present in all the MCPilup dataset names it can find and explodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let it be then. Thanks for checking this out Todor.

('MCPileup', None, (str, unicode)),
('InputDataset', None, (str, unicode)),
('ParentDataset', None, (str, unicode))]

# NOTE: ParentageResolved is set by default to True it will be False only if:
# - RequestType is StepChain
Expand Down
128 changes: 107 additions & 21 deletions src/python/WMCore/MicroService/Unified/MSRuleCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@
from Utils.Pipeline import Pipeline, Functor
from WMCore.WMException import WMException
from WMCore.Services.LogDB.LogDB import LogDB
from WMCore.Services.WMStatsServer.WMStatsServer import WMStatsServer
from WMCore.MicroService.Unified.Common import findParent


class MSRuleCleanerResolveParentError(WMException):
"""
WMCore exception class for the MSRuleCleaner module, in WMCore MicroServices,
used to signal if an error occurred during parent dataset resolution step.
"""
def __init__(self, message):
super(MSRuleCleanerResolveParentError, self).__init__(message)


class MSRuleCleanerArchivalError(WMException):
Expand Down Expand Up @@ -81,15 +92,20 @@ def __init__(self, msConfig, logger=None):
self.logDB = LogDB(self.msConfig["logDBUrl"],
self.msConfig["logDBReporter"],
logger=self.logger)
self.wmstatsSvc = WMStatsServer(self.msConfig['wmstatsUrl'], logger=self.logger)

# Building all the Pipelines:
pName = 'plineMSTrCont'
self.plineMSTrCont = Pipeline(name=pName,
funcLine=[Functor(self.setPlineMarker, pName),
Functor(self.setParentDatasets),
todor-ivanov marked this conversation as resolved.
Show resolved Hide resolved
Functor(self.getRucioRules, 'container', self.msConfig['rucioMStrAccount']),
Functor(self.cleanRucioRules)])
pName = 'plineMSTrBlock'
self.plineMSTrBlock = Pipeline(name=pName,
funcLine=[Functor(self.setPlineMarker, pName),
Functor(self.setParentDatasets),
Functor(self.getRucioRules, 'block', self.msConfig['rucioMStrAccount']),
Functor(self.cleanRucioRules)])
pName = 'plineAgentCont'
self.plineAgentCont = Pipeline(name=pName,
Expand Down Expand Up @@ -130,6 +146,32 @@ def __init__(self, msConfig, logger=None):
self.wfCounters = {'cleaned': {},
'archived': {'normalArchived': 0,
'forceArchived': 0}}
self.globalLocks = set()

def getGlobalLocks(self):
"""
Fetches the list of 'globalLocks' from wmstats server and the list of
'parentLocks' from request manager. Stores/updates the unified set in
the 'globalLocks' instance variable. Returns the resultant unified set.
:return: A union set of the 'globalLocks' and the 'parentLocks' lists
"""
self.logger.info("Fetching globalLocks list from wmstats server.")
try:
globalLocks = set(self.wmstatsSvc.getGlobalLocks())
except Exception as ex:
msg = "Failed to refresh global locks list for the current polling cycle. Error: %s "
msg += "Skipping this polling cycle."
self.logger.error(msg, str(ex))
raise ex
self.logger.info("Fetching parentLocks list from reqmgr2 server.")
try:
parentLocks = set(self.reqmgr2.getParentLocks())
except Exception as ex:
msg = "Failed to refresh parent locks list for the current poling cycle. Error: %s "
msg += "Skipping this polling cycle."
self.logger.error(msg, str(ex))
raise ex
self.globalLocks = globalLocks | parentLocks

def resetCounters(self):
"""
Expand All @@ -152,7 +194,6 @@ def execute(self, reqStatus):
self.currThreadIdent = self.currThread.name
self.updateReportDict(summary, "thread_id", self.currThreadIdent)
self.resetCounters()

self.logger.info("MSRuleCleaner is running in mode: %s.", self.mode)

# Build the list of workflows to work on:
Expand All @@ -167,6 +208,7 @@ def execute(self, reqStatus):

# Call _execute() and feed the relevant pipeline with the objects popped from requestRecords
try:
self.getGlobalLocks()
totalNumRequests, cleanNumRequests, normalArchivedNumRequests, forceArchivedNumRequests = self._execute(requestRecords)
msg = "\nNumber of processed workflows: %s."
msg += "\nNumber of properly cleaned workflows: %s."
Expand Down Expand Up @@ -280,6 +322,11 @@ def _dispatchWflow(self, wflow):
for pline in self.cleanuplines:
try:
pline.run(wflow)
except MSRuleCleanerResolveParentError as ex:
msg = "%s: Parentage Resolve Error: %s. "
msg += "Will retry again in the next cycle."
self.logger.error(msg, pline.name, ex.message())
continue
except Exception as ex:
msg = "%s: General error from pipeline. Workflow: %s. Error: \n%s. "
msg += "\nWill retry again in the next cycle."
Expand Down Expand Up @@ -528,30 +575,75 @@ def getMSOutputTransferInfo(self, wflow):
wflow['TransferDone'] = True
return wflow

def setParentDatasets(self, wflow):
"""
Used to resolve parent datasets for a workflow.
:param wflow: A MSRuleCleaner workflow representation
:return: The workflow object
"""
if wflow['InputDataset'] and wflow['IncludeParents']:
childDataset = wflow['InputDataset']
parentDataset = findParent([childDataset], self.msConfig['dbsUrl'])
todor-ivanov marked this conversation as resolved.
Show resolved Hide resolved
# NOTE: If findParent() returned None then the DBS service failed to
# resolve the request (it is considered an ERROR outside WMCore)
if parentDataset.get(childDataset, None) is None:
msg = "Failed to resolve parent dataset for: %s in workflow: %s" % (childDataset, wflow['RequestName'])
raise MSRuleCleanerResolveParentError(msg)
elif parentDataset:
wflow['ParentDataset'] = [parentDataset[childDataset]]
msg = "Found parent %s for input dataset %s in workflow: %s "
self.logger.info(msg, parentDataset, wflow['InputDataset'], wflow['RequestName'])
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This else block is no longer needed. But that's a minor and I won't ask you to update this code once again.

msg = "Could not find parent for input dataset: %s in workflows: %s"
self.logger.error(msg, wflow['InputDataset'], wflow['RequestName'])
return wflow

def getRucioRules(self, wflow, gran, rucioAcct):
"""
Queries Rucio and builds the relevant list of blocklevel rules for
the given workflow
:param wflow: A MSRuleCleaner workflow representation
:param gran: Data granularity to search for Rucio rules. Possible values:
'block' || 'container'
'block' or 'container'
:return: The workflow object
"""
currPline = wflow['PlineMarkers'][-1]
# Find all the output placement rules created by the agents
for dataCont in wflow['OutputDatasets']:
if gran == 'container':
for rule in self.rucio.listDataRules(dataCont, account=rucioAcct):
wflow['RulesToClean'][currPline].append(rule['id'])
elif gran == 'block':
try:
blocks = self.rucio.getBlocksInContainer(dataCont)
for block in blocks:
for rule in self.rucio.listDataRules(block, account=rucioAcct):
wflow['RulesToClean'][currPline].append(rule['id'])
except WMRucioDIDNotFoundException:
msg = "Container: %s not found in Rucio for workflow: %s."

# Create the container list to the rucio account map and set the checkGlobalLocks flag.
mapRuleType = {self.msConfig['rucioWmaAccount']: ["OutputDatasets"],
self.msConfig['rucioMStrAccount']: ["InputDataset", "MCPileup",
"DataPileup", "ParentDataset"]}
if rucioAcct == self.msConfig['rucioMStrAccount']:
checkGlobalLocks = True
else:
checkGlobalLocks = False

# Find all the data placement rules created by the components:
for dataType in mapRuleType[rucioAcct]:
dataList = wflow[dataType] if isinstance(wflow[dataType], list) else [wflow[dataType]]
for dataCont in dataList:
self.logger.debug("getRucioRules: dataCont: %s", pformat(dataCont))
if checkGlobalLocks and dataCont in self.globalLocks:
msg = "Found dataset: %s in GlobalLocks. NOT considering it for filling the "
msg += "RulesToClean list for both container and block level Rules for workflow: %s!"
self.logger.info(msg, dataCont, wflow['RequestName'])
continue
if gran == 'container':
for rule in self.rucio.listDataRules(dataCont, account=rucioAcct):
wflow['RulesToClean'][currPline].append(rule['id'])
msg = "Found %s container-level rule to be deleted for container %s"
self.logger.info(msg, rule['id'], dataCont)
elif gran == 'block':
try:
blocks = self.rucio.getBlocksInContainer(dataCont)
for block in blocks:
for rule in self.rucio.listDataRules(block, account=rucioAcct):
wflow['RulesToClean'][currPline].append(rule['id'])
msg = "Found %s block-level rule to be deleted for container %s"
self.logger.info(msg, rule['id'], dataCont)
except WMRucioDIDNotFoundException:
msg = "Container: %s not found in Rucio for workflow: %s."
self.logger.info(msg, dataCont, wflow['RequestName'])
return wflow

def cleanRucioRules(self, wflow):
Expand Down Expand Up @@ -581,12 +673,6 @@ def cleanRucioRules(self, wflow):

# Set the cleanup flag:
wflow['CleanupStatus'][currPline] = all(delResults)
# ----------------------------------------------------------------------
# FIXME : To be removed once the plineMSTrBlock && plineMSTrCont are
# developed
if wflow['CleanupStatus'][currPline] in ['plineMSTrBlock', 'plineMSTrCont']:
wflow['CleanupStatus'][currPline] = True
# ----------------------------------------------------------------------
return wflow

def getRequestRecords(self, reqStatus):
Expand Down
Loading