Skip to content

Commit

Permalink
MS: Update MSOutput record schema
Browse files Browse the repository at this point in the history
few bugfixes for MSOutput
  • Loading branch information
amaltaro committed Sep 9, 2020
1 parent f4d0358 commit 58b8f50
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 256 deletions.
179 changes: 90 additions & 89 deletions src/python/WMCore/MicroService/DataStructs/MSOutputTemplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from __future__ import division, print_function

from time import time

from copy import deepcopy


Expand All @@ -19,31 +18,6 @@ class MSOutputTemplate(dict):
buffer for both inbound (reading from the database) and outbound (writing to
database) documents. And it must have all the methods needed to set or update
various fields in the object. So that we assure the uniformity of all documents.
Template format:
{
"RequestName": "ReqName",
"RequestStatus": "(clompletd|cloed-out|announced),
'Campaign': "Campaign",
"creationTime": integer timestamp,
"lastUpdate": integer timestamp,
"isRelVal": (True|False),
"isTaken": (True|False),
"isTakenBy": "ThreadIdentifier",
"destination": ["list of locations"],
"OutputDatasets": ["list of datasets"],
"destinationOutputMap": [{"destination": ["list of locations"],
"datasets": ["list of datasets"]},
{"destination": ["list of locations"],
"datasets": ["list of datasets"]}],
"campaignOutputMap": [{"campaignName": "blah",
"datasets": ["list of datasets"]},
{"campaignName": "blah",
"datasets": ["list of datasets"]}],
"transferStatus": "blah", # either "pending" or "done",
"transferIDs": ["list of transfer IDs"],
"numberOfCopies": integer
}
"""
# TODO:
# To add an identifier parameter to __init__
Expand All @@ -68,74 +42,95 @@ class MSOutputTemplate(dict):

def __init__(self, doc=None, **kwargs):
super(MSOutputTemplate, self).__init__(**kwargs)
# super(MSOutputTemplate, self).__init__()

docTemplate = [
('_id', None, unicode),
('RequestName', None, unicode),
('Campaign', None, unicode),
('creationTime', None, int),
('lastUpdate', None, int),
('isRelVal', None, bool),
('isTaken', False, bool),
('isTakenBy', None, (str, unicode)),
('OutputDatasets', None, list),
('destination', None, list),
('destinationOutputMap', None, list),
('campaignOutputMap', None, list),
('transferStatus', None, (str, unicode)),
('transferIDs', None, list),
('numberOfCopies', None, int)]

self.docTemplate = docTemplate
self.required = ['_id', 'RequestName', 'creationTime']
doc = doc or {}
self.required = ['_id', 'RequestName', 'CreationTime']
self.allowEmpty = ['OutputDatasets']
self.allowedStatus = ["pending", "done"]

myDoc = dict()
for tup in docTemplate:
myDoc.update({tup[0]: tup[1]})

# set creation time - if already present in the document it will be
# overwritten with the value from the document itself during the _checkAttr
# call and this value here will be ignored
myDoc['creationTime'] = int(time())

# if no document was passed consider only **kwargs
if doc is not None:
# Search the keys from the document template for their equivalent into
# the document passed and fill them into internal document so that they
# can pass the needed checks later, throw the unneeded/unrecognised
# key/values from the passed document
for key in [tup[0] for tup in docTemplate]:
if key in doc.keys():
myDoc[key] = deepcopy(doc[key])
myDoc['CreationTime'] = int(time())
for tup in self.docSchema():
if tup[0] in doc:
myDoc[tup[0]] = deepcopy(doc[tup[0]])
else:
myDoc.update({tup[0]: tup[1]})

# if any object attribute is passed as a **kwargs parameter then
# overwrite the equivalent parameter which was coming with the doc
self._checkAttr(docTemplate, myDoc, update=True, throw=True, **kwargs)
self._checkAttr(myDoc, update=True, throw=True, **kwargs)

if doc is not None:
if doc:
self._setRelVal(doc)
self._setCampMap(doc, myDoc)

# enforce a full check on the final document
self._checkAttr(docTemplate, myDoc, update=False, throw=True, **myDoc)
self._checkAttr(myDoc, update=False, throw=True, **myDoc)

# final validation:
if self._checkValid(myDoc, throw=True):
self.update(myDoc)

def checkAttr(self):
def docSchema(self):
"""
A function to perform a self consistency check
Return the data schema for a record in MongoDB.
It's a tuple where:
* 1st element: is the key name / attribute in the request
* 2nd element: is the default value
* 3rd element: is the expected data type
Template format:
{
"_id": "ReqName",
"RequestName": "ReqName",
"Campaign": "Top level campaign name",
"CreationTime": integer timestamp,
"LastUpdate": integer timestamp,
"IsRelVal": (True|False),
"Destination": ["list of locations"],
"OutputDatasets": ["list of output datasets"],
"DestinationOutputMap": [{"Destination": ["list of locations"],
"Datasets": ["list of datasets"]},
{"Destination": ["list of locations"],
"Datasets": ["list of datasets"]}],
"CampaignOutputMap": [{"CampaignName": "blah",
"Datasets": ["list of datasets"]},
{"CampaignName": "blah",
"Datasets": ["list of datasets"]}],
"TransferOutputMap": [{"TransferID": "rucio rule id",
"TransferType": "disk",
"DatasetName": "dataset name"},
{"TransferID": "rucio rule id",
"TransferType": "tape",
"DatasetName": "dataset name"},],
"TransferStatus": "pending"|"done,
"TransferIDs": ["list of transfer IDs"],
"NumberOfCopies": integer
}
:return: a list of tuples
"""
return self._checkAttr(self.docTemplate, self, update=False, throw=False, **self)

def _checkAttr(self, docTemplate, myDoc, update=False, throw=False, **kwargs):
docTemplate = [
('_id', None, (str, unicode)),
('RequestName', None, (str, unicode)),
('Campaign', [], (str, unicode)),
('CreationTime', int(time()), int),
('LastUpdate', None, int),
('IsRelVal', False, bool),
('OutputDatasets', [], list),
('Destination', [], list),
('DestinationOutputMap', [], list),
('CampaignOutputMap', [], list),
('TransferOutputMap', [], list),
('TransferStatus', "pending", (str, unicode)),
('TransferIDs', [], list),
('NumberOfCopies', 1, int)]
return docTemplate

def _checkAttr(self, myDoc, update=False, throw=False, **kwargs):
"""
Basically checks everything given in **kwargs against the docTemplate and
if it passes the check then it is absorbed in myDoc or just the
Basically checks everything given in **kwargs against the document schema
and if it passes the check then it is absorbed in myDoc or just the
result from the check is returned depending on the flags given
:docTemplate: The document Templaint against which the check is performed
:myDoc: The document where the valid keys to be copied
:**kwarg: The source set of kwargs whose values to be checked
:update: Update flag:
Expand All @@ -150,7 +145,7 @@ def _checkAttr(self, docTemplate, myDoc, update=False, throw=False, **kwargs):
for kw in kwargs.keys():
found = False
typeok = False
for tup in docTemplate:
for tup in self.docSchema():
if kw == tup[0]:
found = True
# NOTE: Here we can allow more than one type per field if we
Expand Down Expand Up @@ -219,33 +214,39 @@ def setKey(self, key, value):
A method to be used for setting a key in the document
"""
myDoc = {key: value}
if self._checkAttr(self.docTemplate, myDoc, throw=True, update=False, **myDoc):
if self._checkAttr(myDoc, throw=True, update=False, **myDoc):
self.update(myDoc)
return True
return False

def setRelVal(self, isRelVal):
def _setRelVal(self, myDoc):
"""
A method to be used for setting the isRelval key in the document
Evaluates whether it's a release validation request, if so, set the flag to True
:param myDoc: the request dictionary
"""
myDoc = {'isRelVal': isRelVal}
if self._checkAttr(self.docTemplate, myDoc, throw=False, update=False, **myDoc):
self.update(myDoc)
return True
return False
if myDoc.get('SubRequestType') == 'RelVal':
return self.setKey('IsRelVal', True)

def setTransferStatus(self, newStatus):
"""
Updates the TransferStatus attribute value
"""
if newStatus not in self.allowedStatus:
raise RuntimeError("TransferStatus: '{}' is not supported. Use: {}".format(newStatus,
self.allowedStatus))
return self.setKey('TransferStatus', newStatus)

def updateDoc(self, myDoc, throw=False):
"""
A method to be used for updating the document fields from a dictionary
"""
if self._checkAttr(self.docTemplate, myDoc, throw=throw, update=False, **myDoc):
if self._checkAttr(myDoc, throw=throw, update=False, **myDoc):
self.update(myDoc)
return True
return False

def _setCampMap(self, reqDoc, thisDoc):
"""
__setCampMap__
Provided the request content retrieved from ReqMgr2, build a map
of output dataset per campaign.
:param reqDoc: meant to be the request dictionary retrieved from ReqMgr2
Expand All @@ -270,13 +271,13 @@ def _setCampMap(self, reqDoc, thisDoc):
campOutputMap[campName].extend(reqDoc["ChainParentageMap"][key]["ChildDsets"])
# now just write the final data structure out
for campName, dsetList in campOutputMap.items():
finalMap.append(dict(campaignName=campName, datasets=dsetList))
finalMap.append(dict(CampaignName=campName, Datasets=dsetList))
else:
# ReReco and StoreResults (or very old - <=2018 - non-archived workflows)
finalMap.append(dict(campaignName=reqDoc["Campaign"], datasets=reqDoc["OutputDatasets"]))
finalMap.append(dict(CampaignName=reqDoc["Campaign"], Datasets=reqDoc["OutputDatasets"]))

# finally, update this object
thisDoc["campaignOutputMap"] = finalMap
thisDoc["CampaignOutputMap"] = finalMap
return True

def setDestMap(self):
Expand All @@ -289,4 +290,4 @@ def updateTime(self):
"""
__updateTime__
"""
self.setKey('lastUpdate', int(time()))
self.setKey('LastUpdate', int(time()))
Loading

0 comments on commit 58b8f50

Please sign in to comment.