Skip to content

Commit

Permalink
Merge pull request dmwm#5547 from ticoann/reqmgr2
Browse files Browse the repository at this point in the history
Reqmgr2
  • Loading branch information
ticoann committed Dec 30, 2014
2 parents d3ee889 + 4dbbe74 commit 3c669c2
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 53 deletions.
72 changes: 43 additions & 29 deletions src/python/WMComponent/JobSubmitter/JobSubmitterPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,40 +290,54 @@ def refreshCache(self):

loadedJob['retry_count'] = newJob['retry_count']

# Grab the possible locations
# This should be in terms of siteNames
# Because there can be multiple entry points to a site with one SE
# And each of them can be a separate location
# Note that all the files in a job have the same set of locations
possibleLocations = set()
rawLocations = loadedJob["input_files"][0]["locations"]
siteWhitelist = loadedJob.get("siteWhitelist", [])
siteBlacklist = loadedJob.get("siteBlacklist", [])
trustSitelists = loadedJob.get("trustSitelists", False)

# convert site lists into correct format
if len(siteWhitelist) > 0:
whitelist = []
for cmsName in siteWhitelist:
whitelist.extend(self.cmsNames.get(cmsName, []))
siteWhitelist = whitelist
if len(siteBlacklist) > 0:
blacklist = []
for cmsName in siteBlacklist:
blacklist.extend(self.cmsNames.get(cmsName, []))
siteBlacklist = blacklist

# figure out possible locations for job
if trustSitelists:
possibleLocations = set(siteWhitelist) - set(siteBlacklist)
else:
possibleLocations = set()

# all files in job have same location (in se names)
rawLocations = loadedJob["input_files"][0]["locations"]

# transform se names into site names
for loc in rawLocations:
if not loc in self.siteKeys.keys():
# Then we have a problem
logging.error('Encountered unknown location %s for job %i' % (loc, jobID))
logging.error('Ignoring for now, but watch out for this')
else:
for siteName in self.siteKeys[loc]:
possibleLocations.add(siteName)

# filter with site lists
if len(siteWhitelist) > 0:
possibleLocations = possibleLocations & set(siteWhitelist)
if len(siteBlacklist) > 0:
possibleLocations = possibleLocations - set(siteBlacklist)

# Create another set of locations that may change when a site goes white/black listed
# Does not care about the non_draining or aborted sites, they may change and that is the point
potentialLocations = set()

# Transform se into siteNames
for loc in rawLocations:
if not loc in self.siteKeys.keys():
# Then we have a problem
logging.error('Encountered unknown location %s for job %i' % (loc, jobID))
logging.error('Ignoring for now, but watch out for this')
else:
for siteName in self.siteKeys[loc]:
possibleLocations.add(siteName)

if len(loadedJob["siteWhitelist"]) > 0:
whiteList = []
for cmsName in loadedJob["siteWhitelist"]:
whiteList.extend(self.cmsNames.get(cmsName, []))
possibleLocations = possibleLocations & set(whiteList)
if len(loadedJob["siteBlacklist"]) > 0:
blackList = []
for cmsName in loadedJob["siteBlacklist"]:
blackList.extend(self.cmsNames.get(cmsName, []))
possibleLocations = possibleLocations - set(blackList)

potentialLocations.update(possibleLocations)

# now check for sites in drain and adjust the possible locations
# also check if there is at least one site left to run the job
if len(possibleLocations) == 0:
newJob['name'] = loadedJob['name']
badJobs[61101].append(newJob)
Expand Down
14 changes: 11 additions & 3 deletions src/python/WMCore/JobSplitting/JobFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ def __call__(self, jobtype = "Job", grouptype = "JobGroup", *args, **kwargs):
self.currentGroup = None
self.currentJob = None

self.siteBlacklist = kwargs.get("siteBlacklist", [])
self.siteWhitelist = kwargs.get("siteWhitelist", [])
self.siteBlacklist = kwargs.get("siteBlacklist", [])
self.trustSitelists = kwargs.get("trustSitelists", False)

# Every time we restart, re-zero the jobs
self.nJobs = 0
Expand Down Expand Up @@ -123,8 +124,15 @@ def newJob(self, name=None, files=None, failedJob=False, failedReason=None):
self.currentJob["jobType"] = self.subscription["type"]
self.currentJob["taskType"] = self.subscription.workflowType()
self.currentJob["owner"] = self.subscription.owner()
self.currentJob["siteBlacklist"] = self.siteBlacklist
self.currentJob["siteWhitelist"] = self.siteWhitelist

# only put into job object if relevant
# JobSubmitter can deal with absence
if len(self.siteBlacklist) > 0:
self.currentJob["siteBlacklist"] = self.siteBlacklist
if len(self.siteWhitelist) > 0:
self.currentJob["siteWhitelist"] = self.siteWhitelist
if self.trustSitelists:
self.currentJob["trustSitelists"] = self.trustSitelists

# All production jobs must be run 1
if self.subscription["type"] == "Production":
Expand Down
8 changes: 4 additions & 4 deletions src/python/WMCore/WMBS/CreateWMBSBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ def __init__(self, logger = None, dbi = None, params = None):
self.create["01wmbs_fileset"] = \
"""CREATE TABLE wmbs_fileset (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(500) NOT NULL,
name VARCHAR(700) NOT NULL,
open INT(1) NOT NULL DEFAULT 0,
last_update INTEGER NOT NULL,
UNIQUE (name))"""

self.create["02wmbs_file_details"] = \
"""CREATE TABLE wmbs_file_details (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
lfn VARCHAR(500) NOT NULL,
lfn VARCHAR(700) NOT NULL,
filesize BIGINT,
events INTEGER,
first_event BIGINT NOT NULL DEFAULT 0,
Expand Down Expand Up @@ -285,8 +285,8 @@ def __init__(self, logger = None, dbi = None, params = None):
couch_record VARCHAR(255),
location INTEGER,
outcome INTEGER DEFAULT 0,
cache_dir VARCHAR(700) DEFAULT 'None',
fwjr_path VARCHAR(700),
cache_dir VARCHAR(800) DEFAULT 'None',
fwjr_path VARCHAR(800),
FOREIGN KEY (jobgroup)
REFERENCES wmbs_jobgroup(id) ON DELETE CASCADE,
FOREIGN KEY (state) REFERENCES wmbs_job_state(id),
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/WMBS/MySQL/Create.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, logger = None, dbi = None, params = None):
self.create["01wmbs_fileset"] = \
"""CREATE TABLE wmbs_fileset (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(512) NOT NULL,
name VARCHAR(700) NOT NULL,
open INT(1) NOT NULL DEFAULT 0,
last_update INT(11) NOT NULL,
UNIQUE (name))"""
Expand Down
8 changes: 4 additions & 4 deletions src/python/WMCore/WMBS/Oracle/Create.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, logger = None, dbi = None, params = None):
self.create["01wmbs_fileset"] = \
"""CREATE TABLE wmbs_fileset (
id INTEGER NOT NULL,
name VARCHAR(500) NOT NULL,
name VARCHAR(700) NOT NULL,
open CHAR(1) CHECK (open IN ('0', '1' )) NOT NULL,
last_update INTEGER NOT NULL
) %s""" % tablespaceTable
Expand All @@ -67,7 +67,7 @@ def __init__(self, logger = None, dbi = None, params = None):
self.create["02wmbs_file_details"] = \
"""CREATE TABLE wmbs_file_details (
id INTEGER NOT NULL,
lfn VARCHAR(500) NOT NULL,
lfn VARCHAR(700) NOT NULL,
filesize INTEGER,
events INTEGER,
first_event INTEGER DEFAULT 0,
Expand Down Expand Up @@ -517,8 +517,8 @@ def __init__(self, logger = None, dbi = None, params = None):
couch_record VARCHAR(255),
location INTEGER,
outcome INTEGER DEFAULT 0,
cache_dir VARCHAR(700) DEFAULT 'None',
fwjr_path VARCHAR(700)
cache_dir VARCHAR(800) DEFAULT 'None',
fwjr_path VARCHAR(800)
) %s""" % tablespaceTable

self.indexes["01_pk_wmbs_job"] = \
Expand Down
12 changes: 8 additions & 4 deletions src/python/WMCore/WMSpec/StdSpecs/StdBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def setupProcessingTask(self, procTask, taskType, inputDataset = None, inputStep
userSandbox = None, userFiles = [], primarySubType = None,
forceMerged = False, forceUnmerged = False,
configCacheUrl = None, timePerEvent = None, memoryReq = None,
sizePerEvent = None, useMulticore = True):
sizePerEvent = None, useMulticore = True, applySiteLists = True):
"""
_setupProcessingTask_
Expand Down Expand Up @@ -275,8 +275,11 @@ def setupProcessingTask(self, procTask, taskType, inputDataset = None, inputStep
procTask.applyTemplates()

procTask.setTaskLogBaseLFN(self.unmergedLFNBase)
procTask.setSiteWhitelist(self.siteWhitelist)
procTask.setSiteBlacklist(self.siteBlacklist)

if applySiteLists:
procTask.setSiteWhitelist(self.siteWhitelist)
procTask.setSiteBlacklist(self.siteBlacklist)
procTask.setTrustSitelists(self.trustSitelists)

newSplitArgs = {}
for argName in splitArgs.keys():
Expand Down Expand Up @@ -507,8 +510,8 @@ def addMergeTask(self, parentTask, parentTaskSplitting, parentOutputModuleName,
mergeTaskLogArch.setStepType("LogArchive")
mergeTaskLogArch.setNewStageoutOverride(self.enableNewStageout)


mergeTask.setTaskLogBaseLFN(self.unmergedLFNBase)

if doLogCollect:
self.addLogCollectTask(mergeTask, taskName = "%s%sMergeLogCollect" % (parentTask.name(), parentOutputModuleName))

Expand Down Expand Up @@ -896,6 +899,7 @@ def getWorkloadArguments():
"validate" : lambda x: all([cmsname(y) for y in x])},
"SiteWhitelist" : {"default" : [], "type" : makeList,
"validate" : lambda x: all([cmsname(y) for y in x])},
"TrustSitelists" : {"default" : False, "type" : strToBool},
"UnmergedLFNBase" : {"default" : "/store/unmerged"},
"MergedLFNBase" : {"default" : "/store/data"},
"MinMergeSize" : {"default" : 2 * 1024 * 1024 * 1024, "type" : int,
Expand Down
12 changes: 6 additions & 6 deletions src/python/WMCore/WMSpec/StdSpecs/StoreResults.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ def __call__(self, workloadName, arguments):

mergeTaskLogArch = mergeTaskCmssw.addStep("logArch1")
mergeTaskLogArch.setStepType("LogArchive")

self.addLogCollectTask(mergeTask,
taskName = "StoreResultsLogCollect")

mergeTask.setSiteWhitelist(self.siteWhitelist)
mergeTask.setSiteBlacklist(self.siteBlacklist)

self.addLogCollectTask(mergeTask, taskName = "StoreResultsLogCollect")

mergeTask.setTaskType("Merge")
mergeTask.applyTemplates()
Expand All @@ -70,9 +72,7 @@ def __call__(self, workloadName, arguments):
mergeTask.setSplittingAlgorithm(splitAlgo,
max_merge_size = self.maxMergeSize,
min_merge_size = self.minMergeSize,
max_merge_events = self.maxMergeEvents,
siteWhitelist = self.siteWhitelist,
siteBlacklist = self.siteBlacklist)
max_merge_events = self.maxMergeEvents)

mergeTaskCmsswHelper = mergeTaskCmssw.getTypeHelper()
mergeTaskCmsswHelper.cmsswSetup(self.frameworkVersion, softwareEnvironment = "",
Expand Down
23 changes: 21 additions & 2 deletions src/python/WMCore/WMSpec/WMTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ def jobSplittingParameters(self, performance = True):
del splittingParams['performance']
splittingParams["siteWhitelist"] = self.siteWhitelist()
splittingParams["siteBlacklist"] = self.siteBlacklist()
splittingParams["trustSitelists"] = self.trustSitelists()

if "runWhitelist" not in splittingParams.keys() and self.inputRunWhitelist() != None:
splittingParams["runWhitelist"] = self.inputRunWhitelist()
Expand Down Expand Up @@ -805,7 +806,7 @@ def setSiteWhitelist(self, siteWhitelist):
"""
_setSiteWhitelist_
Set the set white list for this task.
Set the set white list for the task.
"""
self.data.constraints.sites.whitelist = siteWhitelist
return
Expand All @@ -822,11 +823,28 @@ def setSiteBlacklist(self, siteBlacklist):
"""
_setSiteBlacklist_
Set the site black list for this task.
Set the site black list for the task.
"""
self.data.constraints.sites.blacklist = siteBlacklist
return

def trustSitelists(self):
"""
_trustSitelists_
Accessor for the 'trust site lists' flag for the task.
"""
return self.data.constraints.sites.trustlists

def setTrustSitelists(self, trustSitelists):
"""
_setTrustSitelists_
Set the 'trus site lists' flag for the task.
"""
self.data.constraints.sites.trustlists = trustSitelists
return

def listOutputDatasetsAndModules(self):
"""
_listOutputDatasetsAndModules_
Expand Down Expand Up @@ -1338,6 +1356,7 @@ def __init__(self, name):
self.constraints.section_("sites")
self.constraints.sites.whitelist = []
self.constraints.sites.blacklist = []
self.constraints.sites.trustlists = False
self.subscriptions.outputModules = []
self.input.section_("WMBS")

Expand Down

0 comments on commit 3c669c2

Please sign in to comment.