diff --git a/src/python/WMCore/MicroService/MSOutput/MSOutput.py b/src/python/WMCore/MicroService/MSOutput/MSOutput.py index caca143a59..b6f8529b37 100644 --- a/src/python/WMCore/MicroService/MSOutput/MSOutput.py +++ b/src/python/WMCore/MicroService/MSOutput/MSOutput.py @@ -21,9 +21,11 @@ from WMCore.MicroService.MSCore import MSCore from WMCore.MicroService.Tools.Common import gigaBytes from WMCore.Services.CRIC.CRIC import CRIC +from WMCore.Services.DBS.DBS3Reader import getDataTiers from Utils.Pipeline import Pipeline, Functor from WMCore.Database.MongoDB import MongoDB from WMCore.MicroService.MSOutput.MSOutputTemplate import MSOutputTemplate +from WMCore.MicroService.MSOutput.RelValPolicy import RelValPolicy from WMCore.WMException import WMException from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI @@ -97,20 +99,25 @@ def __init__(self, msConfig, mode, logger=None): # fetch documents created in the last 6 months (default value) self.msConfig.setdefault("mongoDocsCreatedSecs", 6 * 30 * 24 * 60 * 60) self.msConfig.setdefault("sendNotification", False) + self.msConfig.setdefault("relvalPolicy", []) + self.uConfig = {} # service name used to route alerts via AlertManager self.alertServiceName = "ms-output" self.alertManagerAPI = AlertManagerAPI(self.msConfig.get("alertManagerUrl", None), logger=logger) + # RelVal output data placement policy from the service configuration + self.msConfig.setdefault("dbsUrl", "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader") + allDBSDatatiers = getDataTiers(self.msConfig['dbsUrl']) + allDiskRSEs = self.rucio.evaluateRSEExpression("*", returnTape=False) + self.relvalPolicy = RelValPolicy(self.msConfig['relvalPolicy'], + allDBSDatatiers, allDiskRSEs, logger=logger) + self.cric = CRIC(logger=self.logger) self.uConfig = {} self.campaigns = {} self.psn2pnnMap = {} - self.tapeStatus = dict() - for endpoint, quota in viewitems(self.msConfig['tapePledges']): - self.tapeStatus[endpoint] = dict(quota=quota, usage=0, remaining=0) - msOutIndex = IndexModel('RequestName', unique=True) msOutDBConfig = { 'database': 'msOutDB', @@ -665,22 +672,10 @@ def docInfoUpdate(self, msOutDoc): dataItem['Copies'] = 1 if msOutDoc['IsRelVal']: - _, dsn, procString, dataTier = dataItem['Dataset'].split('/') - destination = set() - if dataTier != "RECO" and dataTier != "ALCARECO": - destination.add('T2_CH_CERN') - if dataTier == "GEN-SIM": - destination.add('T1_US_FNAL_Disk') - if dataTier == "GEN-SIM-DIGI-RAW": - destination.add('T1_US_FNAL_Disk') - if dataTier == "GEN-SIM-RECO": - destination.add('T1_US_FNAL_Disk') - if "RelValTTBar" in dsn and "TkAlMinBias" in procString and dataTier != "ALCARECO": - destination.add('T2_CH_CERN') - if "MinimumBias" in dsn and "SiStripCalMinBias" in procString and dataTier != "ALCARECO": - destination.add('T2_CH_CERN') - + destination = self.relvalPolicy.getDestinationByDataset(dataItem['Dataset']) if destination: + # ensure each RelVal destination gets a copy of the data + dataItem['Copies'] = len(destination) dataItem['DiskDestination'] = '|'.join(destination) else: self.logger.warning("RelVal dataset: %s without any destination", dataItem['Dataset']) diff --git a/src/python/WMCore/MicroService/MSOutput/RelValPolicy.py b/src/python/WMCore/MicroService/MSOutput/RelValPolicy.py new file mode 100644 index 0000000000..1b03e3361d --- /dev/null +++ b/src/python/WMCore/MicroService/MSOutput/RelValPolicy.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Module to store the RelVal output data placement policy and +make decisions based on that +""" +from __future__ import print_function, division + +from copy import deepcopy +import json +from WMCore.MicroService.Tools.Common import getMSLogger +from WMCore.WMException import WMException + + +class RelValPolicyException(WMException): + """ + General exception to be raised when a flaw is found in the RelVal + output data placement policy + """ + pass + + +class RelValPolicy(): + """ + This module will contain the RelVal output data placement policy, where + destinations will be decided according to the dataset datatier. + + It's supposed to hold a policy driven by dataset datatier, and it's + data structure looks like: + [{"datatier": "tier_1", "destinations": ["rse_name_1", "rse_name_2"]}, + {"datatier": "tier_2", "destinations": ["rse_name_2"]}, + {"datatier": "default", "destinations": ["rse_name_3"]}] + + the 'default' key matches the case where a datatier is not specified + in the policy. + """ + + def __init__(self, policyDesc, listDatatiers, listRSEs, logger=None): + """ + Given a policy data structure - as a list of dictionaries - it + will validate the policy, the datatiers and RSEs defined in it, + and it will convert the policy into a flat dictionary for easier + data lookup. + :param policyDesc: list of dictionary items with the output rules + :param listDatatiers: flat list of existent datatiers in DBS + :param listRSEs: flat list of existent Disk RSEs in Rucio + :param logger: logger object, if any + """ + self.origPolicy = deepcopy(policyDesc) + + self.logger = getMSLogger(verbose=False, logger=logger) + + self._validatePolicy(policyDesc, listDatatiers, listRSEs) + self.dictPolicy = self._convertPolicy(policyDesc) + + def __str__(self): + """ + Stringify this object, printing the original policy + """ + objectOut = dict(originalPolicy=self.origPolicy, mappedPolicy=self.dictPolicy) + return json.dumps(objectOut) + + def _validatePolicy(self, policyDesc, validDBSTiers, validDiskRSEs): + """ + This method validates the overall policy data structure, including: + * internal and external data types + * whether the datatiers exist in DBS + * whether the RSEs exist in Rucio + :param policyDesc: list of dictionaries with the policy definition + :param validDBSTiers: list with existent DBS datatiers + :param validDiskRSEs: list with existent Rucio Disk RSEs + :return: nothing, but it will raise an exception if any validation fails + """ + if not isinstance(policyDesc, list): + msg = "The RelVal output data placement policy is not in the expected data type. " + msg += "Type expected: list, while the current data type is: {}. ".format(type(policyDesc)) + msg += "This critical ERROR must be fixed." + raise RelValPolicyException(msg) from None + + # policy must have a default/fallback destination for datatiers not explicitly listed + hasDefault = False + for item in policyDesc: + # validate the datatier + if not isinstance(item['datatier'], str): + msg = "The 'datatier' parameter must be a string, not {}.".format(type(item['datatier'])) + raise RelValPolicyException(msg) from None + if item['datatier'] == "default": + hasDefault = True + elif item['datatier'] not in validDBSTiers: + raise RelValPolicyException("Datatier '{}' does not exist in DBS.".format(item['datatier'])) + + # validate the destinations + if not isinstance(item['destinations'], list): + msg = "The 'destinations' parameter must be a list, not {}".format(type(item['destinations'])) + raise RelValPolicyException(msg) from None + for rseName in item['destinations']: + if rseName not in validDiskRSEs: + msg = "Destinations '{}' does not exist in Rucio.".format(rseName) + raise RelValPolicyException(msg) from None + + if hasDefault is False: + msg = "A 'default' key must be defined with default destinations." + raise RelValPolicyException(msg) from None + + def _convertPolicy(self, policyDesc): + """ + Maps the RelVal data policy to a flat dictionary key'ed by datatiers + :param policyDesc: list of dictionaries with the policy definition + :return: a dictionary with a map of the RelVal policy + """ + outputPolicy = dict() + for item in policyDesc: + outputPolicy.update({item['datatier']: item['destinations']}) + return outputPolicy + + def getDestinationByDataset(self, dsetName): + """ + Provided a dataset name, return the destination defined for its datatier. + :param dsetName: a string with the full dataset name + :return: a list of locations + """ + _, dsn, procString, dataTier = dsetName.split('/') + return self.dictPolicy.get(dataTier, self.dictPolicy['default']) diff --git a/test/python/WMCore_t/MicroService_t/MSOutput_t/RelValPolicy_t.py b/test/python/WMCore_t/MicroService_t/MSOutput_t/RelValPolicy_t.py new file mode 100644 index 0000000000..f2e0fb2ee8 --- /dev/null +++ b/test/python/WMCore_t/MicroService_t/MSOutput_t/RelValPolicy_t.py @@ -0,0 +1,83 @@ +""" +Unit tests for the WMCore/MicroService/MSOutput/RelValPolicy.py module +""" +from __future__ import division, print_function + +import unittest +import json +from WMCore.MicroService.MSOutput.RelValPolicy import RelValPolicy, RelValPolicyException + + +class RelValPolicyTests(unittest.TestCase): + """Unit tests for the RelValPolicy module""" + + def setUp(self): + """Basic setup for each unit test""" + pass + + def tearDown(self): + """Basic tear down operation when leaving each unit test""" + pass + + def testBrokenPolicy(self): + """Tests for the RelValPolicy class with a broken policy""" + validDatatiers = ["GEN-SIM", "GEN", "SIM", "AOD"] + validRSEs = ["rse_1", "rse_2", "rse_3"] + + # test output policies with a wrong data type + for testPolicy in [None, {}, "blah", 123]: + with self.assertRaises(RelValPolicyException): + RelValPolicy(testPolicy, validDatatiers, validRSEs) + + # test internal structure with wrong data type for datatier + testPolicy = [{"datatier": ["tier1", "tier2"], "destinations": ["rse_1", "rse_2"]}, + {"datatier": "default", "destinations": ["rse_1"]}] + with self.assertRaises(RelValPolicyException): + RelValPolicy(testPolicy, validDatatiers, validRSEs) + + # test internal structure with wrong data type for destinations + testPolicy = [{"datatier": "tier1", "destinations": "rse_1"}, + {"datatier": "default", "destinations": ["rse_1"]}] + with self.assertRaises(RelValPolicyException): + RelValPolicy(testPolicy, validDatatiers, validRSEs) + + # test internal structure missing the required "default" key/value pair + testPolicy = [{"datatier": "GEN", "destinations": ["rse_1", "rse_2"]}] + with self.assertRaises(RelValPolicyException): + RelValPolicy(testPolicy, validDatatiers, validRSEs) + + def testValidPolicy(self): + """Tests for the RelValPolicy class with a valid policy""" + validDatatiers = ["GEN-SIM", "GEN", "SIM", "AOD"] + validRSEs = ["rse_1", "rse_2", "rse_3"] + + testPolicy = [{"datatier": "SIM", "destinations": ["rse_1", "rse_2"]}, + {"datatier": "GEN-SIM", "destinations": ["rse_1"]}, + {"datatier": "default", "destinations": ["rse_2"]}] + policyObj = RelValPolicy(testPolicy, validDatatiers, validRSEs) + + # now test the method to get destinations for a given dataset (datatier) + for policyItem in testPolicy: + resp = policyObj.getDestinationByDataset("/PD/ProcStr-v1/{}".format(policyItem['datatier'])) + self.assertEqual(resp, policyItem['destinations']) + + # and this should fallback to the 'default' case + resp = policyObj.getDestinationByDataset("/PD/ProcStr-v1/BLAH") + self.assertEqual(resp, ["rse_2"]) + + def testStringification(self): + """Test the stringification of the RelValPolicy object""" + validDatatiers = [] + validRSEs = ["rse_2"] + testPolicy = [{"datatier": "default", "destinations": ["rse_2"]}] + + policyObj = str(RelValPolicy(testPolicy, validDatatiers, validRSEs)) + + self.assertTrue(isinstance(policyObj, str)) + policyObj = json.loads(policyObj) + self.assertCountEqual(policyObj["originalPolicy"], testPolicy) + self.assertCountEqual(policyObj["mappedPolicy"], {"default": ["rse_2"]}) + + +if __name__ == '__main__': + unittest.main()