diff --git a/etc/WMAgentConfig.py b/etc/WMAgentConfig.py index 671d9fd9176..dc09aacc868 100644 --- a/etc/WMAgentConfig.py +++ b/etc/WMAgentConfig.py @@ -372,7 +372,9 @@ config.RucioInjector.RSEPostfix = False # enable it to append _Test to the RSE names config.RucioInjector.metaDIDProject = "Production" config.RucioInjector.listTiersToInject = [] # ["NANOAOD", "NANOAODSIM"] -config.RucioInjector.skipRulesForTiers = ["NANOAOD", "NANOAODSIM"] +config.RucioInjector.skipRulesForTiers = [] # ["NANOAOD", "NANOAODSIM"] +config.RucioInjector.containerDiskRuleParams = {"weight": "ddm_quota", "copies": 2, "grouping": "DATASET"} +config.RucioInjector.containerDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=real&rse_type=DISK" config.RucioInjector.rucioAccount = "OVER_WRITE_BY_SECRETS" config.RucioInjector.rucioUrl = "OVER_WRITE_BY_SECRETS" config.RucioInjector.rucioAuthUrl = "OVER_WRITE_BY_SECRETS" diff --git a/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py b/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py index 7464abda26c..0f917356af5 100644 --- a/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py +++ b/src/python/WMComponent/RucioInjector/RucioInjectorPoller.py @@ -84,6 +84,8 @@ def __init__(self, config): self.pollRules = config.RucioInjector.pollIntervalRules self.lastRulesExecTime = 0 self.createBlockRules = config.RucioInjector.createBlockRules + self.containerDiskRuleParams = config.RucioInjector.containerDiskRuleParams + self.containerDiskRuleRSEExpr = config.RucioInjector.containerDiskRuleRSEExpr self.skipRulesForTiers = config.RucioInjector.skipRulesForTiers self.listTiersToInject = config.RucioInjector.listTiersToInject if config.RucioInjector.metaDIDProject not in RUCIO_VALID_PROJECT: @@ -110,6 +112,13 @@ def __init__(self, config): self.testRSEs = config.RucioInjector.RSEPostfix self.filesToRecover = [] + # output data placement has a different behaviour between T0 and Production agents + if hasattr(config, "Tier0Feeder"): + logging.info("RucioInjector running on a T0 WMAgent") + self.isT0agent = True + else: + self.isT0agent = False + if not self.listTiersToInject: logging.info("Component configured to inject all the data tiers") else: @@ -283,7 +292,7 @@ def insertBlockRules(self): rseName = "%s_Test" % item['pnn'] if self.testRSEs else item['pnn'] # DATASET = replicates all files in the same block to the same RSE resp = self.rucio.createReplicationRule(item['blockname'], - rseExpression="rse=%s" % rseName, **kwargs) + rseExpression=rseName, **kwargs) if resp: msg = "Block rule created for block: %s, at: %s, with rule id: %s" logging.info(msg, item['blockname'], item['pnn'], resp[0]) @@ -405,6 +414,9 @@ def insertContainerRules(self): Poll the database for datasets meant to be subscribed and create a container level rule to replicate all files to a given RSE """ + if self.isT0agent: + return self._insertContainerRulesT0() + logging.info("Starting insertContainerRules method") # FIXME also adapt the format returned by this DAO @@ -415,6 +427,67 @@ def insertContainerRules(self): # Keep a list of subscriptions to tick as subscribed in the database subscriptionsMade = [] + # Create the subscription objects and add them to the list + # The list takes care of the sorting internally + for subInfo in unsubscribedDatasets: + container = subInfo['path'] + rseExpr = subInfo['site'] + # we skip Custodial/MSS/Tape data placement + if subInfo['custodial'] in [1, 'y']: + logging.info("Bypassing custodial container rule for container: %s and RSE: %s", + container, rseExpr) + subscriptionsMade.append(subInfo['id']) + continue + + if not self._isContainerTierAllowed(container): + logging.info("Component configured to skip container rule for: %s", container) + subscriptionsMade.append(subInfo['id']) + continue + + kwargs = dict(ask_approval=False, activity="Production Output", + account=self.rucioAcct, grouping="ALL", + comment="WMAgent automatic container rule", meta=self.metaData) + # top it up with component-level parameters + kwargs.update(self.containerDiskRuleParams) + + rseExpr = self.containerDiskRuleRSEExpr + if self.testRSEs: + rseExpr = rseExpr.replace("cms_type=real", "cms_type=test") + logging.info("Creating container rule for %s against RSE %s", container, rseExpr) + try: + resp = self.rucio.createReplicationRule(container, + rseExpression=rseExpr, **kwargs) + except Exception as exc: + msg = "Failed to create container rule for: %s" % container + msg += "\nWill retry again in the next cycle. Error: %s" % str(exc) + continue + if resp: + logging.info("Container rule created for %s under rule id: %s", container, resp) + subscriptionsMade.append(subInfo['id']) + else: + logging.error("Failed to create rule for container: %s", container) + + # Register the result in DBSBuffer + if subscriptionsMade: + self.markSubscribed.execute(subscriptionsMade) + + return + + def _insertContainerRulesT0(self): + """ + T0 specific method to deal with output container-level data placement, as + defined in the workflow spec file. + """ + logging.info("Starting insertContainerRulesT0 method") + + # FIXME also adapt the format returned by this DAO + # Check for completely unsubscribed datasets + # in short, files in phedex, file status in "GLOBAL" or "InDBS", and subscribed=0 + unsubscribedDatasets = self.getUnsubscribedDsets.execute() + + # Keep a list of subscriptions to tick as subscribed in the database + subscriptionsMade = [] + # Create the subscription objects and add them to the list # The list takes care of the sorting internally for subInfo in unsubscribedDatasets: @@ -432,14 +505,14 @@ def insertContainerRules(self): try: # ALL = replicates all files to the same RSE resp = self.rucio.createReplicationRule(container, - rseExpression="rse=%s" % rseName, **kwargs) + rseExpression=rseName, **kwargs) except WMRucioException as exc: msg = "Failed to create container rule for (retrying with approval): %s" % container logging.warning(msg) kwargs["ask_approval"] = True try: resp = self.rucio.createReplicationRule(container, - rseExpression="rse=%s" % rseName, **kwargs) + rseExpression=rseName, **kwargs) except Exception as exc: msg = "Failed once again to create container rule for: %s" % container msg += "\nWill retry again in the next cycle. Error: %s" % str(exc)