Skip to content

Commit

Permalink
Merge pull request #10812 from khurtado/jobprio
Browse files Browse the repository at this point in the history
Update job priority only for processing and production task types
  • Loading branch information
amaltaro authored Jan 6, 2022
2 parents 001cc51 + 9b9e613 commit 0a97b5d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
9 changes: 6 additions & 3 deletions src/python/WMComponent/JobUpdater/JobUpdaterPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def synchronizeJobPriority(self):
priorityCache = {}
workflowsToUpdateWMBS = {}
workflowsToCheck = self.listWorkflowsDAO.execute()
updatedRequest = []
for workflowEntry in workflowsToCheck:
workflow = workflowEntry['name']
if workflow not in priorityCache:
Expand All @@ -157,9 +158,11 @@ def synchronizeJobPriority(self):
self.workqueue.updatePriority(workflow, requestPriority)
# Check if there are executing jobs for this particular task
if self.executingJobsDAO.execute(workflow, workflowEntry['task']) > 0:
self.bossAir.updateJobInformation(workflow, workflowEntry['task'],
requestPriority=priorityCache[workflow],
taskPriority=workflowEntry['task_priority'])
# Only update once per request workflow
if workflow not in updatedRequest:
updatedRequest.append(workflow)
self.bossAir.updateJobInformation(workflow,
requestPriority=priorityCache[workflow])
workflowsToUpdateWMBS[workflow] = priorityCache[workflow]
if workflowsToUpdateWMBS:
logging.info("Updating %d workflows in WMBS.", len(workflowsToUpdateWMBS))
Expand Down
6 changes: 3 additions & 3 deletions src/python/WMCore/BossAir/BossAirAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,18 +689,18 @@ def monitor(self, commonState=True):

return results

def updateJobInformation(self, workflow, task, **kwargs):
def updateJobInformation(self, workflow, **kwargs):
"""
_updateJobInformation_
Update the information of jobs in a particular workflow and task,
Update the information of jobs in a particular workflow,
the data will be updated according the keyword arguments which
will be interpreted by the individual plugins accordingly.
"""
for plugin in self.plugins:
try:
pluginInst = self.plugins[plugin]
pluginInst.updateJobInformation(workflow, task, **kwargs)
pluginInst.updateJobInformation(workflow, **kwargs)
except WMException:
raise
except Exception as ex:
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/BossAir/Plugins/BasePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def kill(self, jobs, raiseEx):

pass

def updateJobInformation(self, workflow, task, **kwargs):
def updateJobInformation(self, workflow, **kwargs):
"""
_updateJobInformation_
Expand Down
15 changes: 9 additions & 6 deletions src/python/WMCore/BossAir/Plugins/SimpleCondorPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def killWorkflowJobs(self, workflow):

return

def updateJobInformation(self, workflow, task, **kwargs):
def updateJobInformation(self, workflow, **kwargs):
"""
_updateJobInformation_
Expand All @@ -475,18 +475,21 @@ def updateJobInformation(self, workflow, task, **kwargs):
The currently supported changes are only priority for which both the task (taskPriority)
and workflow priority (requestPriority) must be provided.
Since the default priority is very high, we only need to adjust new priorities
for processing/production task types (which have a task priority of 0)
"""
schedd = htcondor.Schedd()

if 'taskPriority' in kwargs and 'requestPriority' in kwargs:
newPriority = int(kwargs['requestPriority']) + int(kwargs['taskPriority'] * self.maxTaskPriority)
if 'requestPriority' in kwargs:
newPriority = int(kwargs['requestPriority'])
try:
constraint = "WMAgent_SubTaskName =?= %s" % classad.quote(str(task))
constraint += " && WMAgent_RequestName =?= %s" % classad.quote(str(workflow))
constraint = "WMAgent_RequestName =?= %s" % classad.quote(str(workflow))
constraint += " && JobPrio =!= %d" % newPriority
constraint += " && stringListMember(CMS_JobType, %s) " % classad.quote(str("Production, Processing"))
schedd.edit(constraint, 'JobPrio', classad.Literal(newPriority))
except Exception as ex:
logging.error("Failed to update JobPrio for WMAgent_SubTaskName=%s", task)
logging.error("Failed to update JobPrio for WMAgent_RequestName=%s", str(workflow))
logging.exception(ex)

return
Expand Down

0 comments on commit 0a97b5d

Please sign in to comment.