Skip to content

Commit

Permalink
Skip Custodial and make generic Disk NonCustodial data placement in R…
Browse files Browse the repository at this point in the history
…ucioInjector
  • Loading branch information
amaltaro committed Oct 13, 2020
1 parent 8f6d51a commit b636354
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 4 deletions.
4 changes: 3 additions & 1 deletion etc/WMAgentConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@
config.RucioInjector.RSEPostfix = False # enable it to append _Test to the RSE names
config.RucioInjector.metaDIDProject = "Production"
config.RucioInjector.listTiersToInject = [] # ["NANOAOD", "NANOAODSIM"]
config.RucioInjector.skipRulesForTiers = ["NANOAOD", "NANOAODSIM"]
config.RucioInjector.skipRulesForTiers = [] # ["NANOAOD", "NANOAODSIM"]
config.RucioInjector.containerDiskRuleParams = {"weight": "ddm_quota", "copies": 2, "grouping": "DATASET"}
config.RucioInjector.containerDiskRuleRSEExpr = "(tier=2|tier=1)&cms_type=real&rse_type=DISK"
config.RucioInjector.rucioAccount = "OVER_WRITE_BY_SECRETS"
config.RucioInjector.rucioUrl = "OVER_WRITE_BY_SECRETS"
config.RucioInjector.rucioAuthUrl = "OVER_WRITE_BY_SECRETS"
79 changes: 76 additions & 3 deletions src/python/WMComponent/RucioInjector/RucioInjectorPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def __init__(self, config):
self.pollRules = config.RucioInjector.pollIntervalRules
self.lastRulesExecTime = 0
self.createBlockRules = config.RucioInjector.createBlockRules
self.containerDiskRuleParams = config.RucioInjector.containerDiskRuleParams
self.containerDiskRuleRSEExpr = config.RucioInjector.containerDiskRuleRSEExpr
self.skipRulesForTiers = config.RucioInjector.skipRulesForTiers
self.listTiersToInject = config.RucioInjector.listTiersToInject
if config.RucioInjector.metaDIDProject not in RUCIO_VALID_PROJECT:
Expand All @@ -110,6 +112,13 @@ def __init__(self, config):
self.testRSEs = config.RucioInjector.RSEPostfix
self.filesToRecover = []

# output data placement has a different behaviour between T0 and Production agents
if hasattr(config, "Tier0Feeder"):
logging.info("RucioInjector running on a T0 WMAgent")
self.isT0agent = True
else:
self.isT0agent = False

if not self.listTiersToInject:
logging.info("Component configured to inject all the data tiers")
else:
Expand Down Expand Up @@ -283,7 +292,7 @@ def insertBlockRules(self):
rseName = "%s_Test" % item['pnn'] if self.testRSEs else item['pnn']
# DATASET = replicates all files in the same block to the same RSE
resp = self.rucio.createReplicationRule(item['blockname'],
rseExpression="rse=%s" % rseName, **kwargs)
rseExpression=rseName, **kwargs)
if resp:
msg = "Block rule created for block: %s, at: %s, with rule id: %s"
logging.info(msg, item['blockname'], item['pnn'], resp[0])
Expand Down Expand Up @@ -405,6 +414,9 @@ def insertContainerRules(self):
Poll the database for datasets meant to be subscribed and create
a container level rule to replicate all files to a given RSE
"""
if self.isT0agent:
return self._insertContainerRulesT0()

logging.info("Starting insertContainerRules method")

# FIXME also adapt the format returned by this DAO
Expand All @@ -415,6 +427,67 @@ def insertContainerRules(self):
# Keep a list of subscriptions to tick as subscribed in the database
subscriptionsMade = []

# Create the subscription objects and add them to the list
# The list takes care of the sorting internally
for subInfo in unsubscribedDatasets:
container = subInfo['path']
rseExpr = subInfo['site']
# we skip Custodial/MSS/Tape data placement
if subInfo['custodial'] in [1, 'y']:
logging.info("Bypassing custodial container rule for container: %s and RSE: %s",
container, rseExpr)
subscriptionsMade.append(subInfo['id'])
continue

if not self._isContainerTierAllowed(container):
logging.info("Component configured to skip container rule for: %s", container)
subscriptionsMade.append(subInfo['id'])
continue

kwargs = dict(ask_approval=False, activity="Production Output",
account=self.rucioAcct, grouping="ALL",
comment="WMAgent automatic container rule", meta=self.metaData)
# top it up with component-level parameters
kwargs.update(self.containerDiskRuleParams)

rseExpr = self.containerDiskRuleRSEExpr
if self.testRSEs:
rseExpr = rseExpr.replace("cms_type=real", "cms_type=test")
logging.info("Creating container rule for %s against RSE %s", container, rseExpr)
try:
resp = self.rucio.createReplicationRule(container,
rseExpression=rseExpr, **kwargs)
except Exception as exc:
msg = "Failed to create container rule for: %s" % container
msg += "\nWill retry again in the next cycle. Error: %s" % str(exc)
continue
if resp:
logging.info("Container rule created for %s under rule id: %s", container, resp)
subscriptionsMade.append(subInfo['id'])
else:
logging.error("Failed to create rule for container: %s", container)

# Register the result in DBSBuffer
if subscriptionsMade:
self.markSubscribed.execute(subscriptionsMade)

return

def _insertContainerRulesT0(self):
"""
T0 specific method to deal with output container-level data placement, as
defined in the workflow spec file.
"""
logging.info("Starting insertContainerRulesT0 method")

# FIXME also adapt the format returned by this DAO
# Check for completely unsubscribed datasets
# in short, files in phedex, file status in "GLOBAL" or "InDBS", and subscribed=0
unsubscribedDatasets = self.getUnsubscribedDsets.execute()

# Keep a list of subscriptions to tick as subscribed in the database
subscriptionsMade = []

# Create the subscription objects and add them to the list
# The list takes care of the sorting internally
for subInfo in unsubscribedDatasets:
Expand All @@ -432,14 +505,14 @@ def insertContainerRules(self):
try:
# ALL = replicates all files to the same RSE
resp = self.rucio.createReplicationRule(container,
rseExpression="rse=%s" % rseName, **kwargs)
rseExpression=rseName, **kwargs)
except WMRucioException as exc:
msg = "Failed to create container rule for (retrying with approval): %s" % container
logging.warning(msg)
kwargs["ask_approval"] = True
try:
resp = self.rucio.createReplicationRule(container,
rseExpression="rse=%s" % rseName, **kwargs)
rseExpression=rseName, **kwargs)
except Exception as exc:
msg = "Failed once again to create container rule for: %s" % container
msg += "\nWill retry again in the next cycle. Error: %s" % str(exc)
Expand Down

0 comments on commit b636354

Please sign in to comment.