Skip to content

Commit

Permalink
[WFRunner] Better treatment of resource limits
Browse files Browse the repository at this point in the history
  • Loading branch information
Benedikt Volkel committed Mar 14, 2024
1 parent cb357d0 commit 1a29dde
Showing 1 changed file with 39 additions and 15 deletions.
54 changes: 39 additions & 15 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,31 @@ def __init__(self, tid, name, cpu, mem, resource_boundaries):
def is_done(self):
return self.time_collect and not self.booked

def is_within_limits(self):
"""
Check if assigned resources respect limits
"""
cpu_within_limits = True
mem_within_limits = True
if self.cpu_assigned > self.resource_boundaries.cpu_limit:
cpu_within_limits = False
actionlogger.warning("CPU of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
if self.cpu_assigned > self.resource_boundaries.mem_limit:
mem_within_limits = False
actionlogger.warning("MEM of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
return cpu_within_limits and mem_within_limits

def limit_resources(self, cpu_limit=None, mem_limit=None):
"""
Limit resources of this specific task
"""
if not cpu_limit:
cpu_limit = self.resource_boundaries.cpu_limit
if not mem_limit:
mem_limit = self.resource_boundaries.mem_limit
self.cpu_assigned = min(self.cpu_assigned, cpu_limit)
self.mem_assigned = min(self.mem_assigned, mem_limit)

def add(self, time_passed, cpu, mem):
"""
Brief interface to add resources that were measured after time_passed
Expand Down Expand Up @@ -610,22 +635,23 @@ def sample_resources(self):
# This task ran already with the assigned resources, so let's set it to the limit
if cpu_sampled > self.resource_boundaries.cpu_limit:
actionlogger.warning("Sampled CPU (%.2f) exceeds assigned CPU limit (%.2f)", cpu_sampled, self.resource_boundaries.cpu_limit)
cpu_sampled = self.resource_boundaries.cpu_limit
elif cpu_sampled < 0:
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
cpu_sampled = self.cpu_assigned

if mem_sampled > self.resource_boundaries.mem_limit:
actionlogger.warning("Sampled MEM (%.2f) exceeds assigned MEM limit (%.2f)", mem_sampled, self.resource_boundaries.mem_limit)
mem_sampled = self.resource_boundaries.mem_limit

if mem_sampled <= 0:
elif mem_sampled <= 0:
actionlogger.debug("Sampled memory for %s is %.2f <= 0, setting to previously assigned value %.2f", self.name, mem_sampled, self.mem_assigned)
mem_sampled = self.mem_assigned
if cpu_sampled < 0:
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
cpu_sampled = self.cpu_assigned

for res in self.related_tasks:
if res.is_done or res.booked:
continue
res.cpu_assigned = cpu_sampled
res.mem_assigned = mem_sampled
# This task has been run before, stay optimistic and limit the resources in case the sampled ones exceed limits
res.limit_resources()


class ResourceManager:
Expand Down Expand Up @@ -681,14 +707,12 @@ def add_task_resources(self, name, related_tasks_name, cpu, mem, semaphore_strin
Construct and Add a new TaskResources object
"""
resources = TaskResources(len(self.resources), name, cpu, mem, self.resource_boundaries)
if cpu > self.resource_boundaries.cpu_limit or mem > self.resource_boundaries.mem_limit:
actionlogger.warning(f"Resource estimates of id {len(self.resources)} overestimates limits, CPU limit: {self.resource_boundaries.cpu_limit}, MEM limit: {self.resource_boundaries.mem_limit}; might not run")
if not self.resource_boundaries.optimistic_resources:
# exit if we don't dare to try
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
exit(1)
# or we do dare, let's see what happens...
actionlogger.info("We will try to run this task anyway with maximum available resources")
if not resources.is_within_limits() and not self.resource_boundaries.optimistic_resources:
# exit if we don't dare to try
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
exit(1)
# if we get here, either all is good or the user decided to be optimistic and we limit the resources, by default to the given CPU and mem limits.
resources.limit_resources()

self.resources.append(resources)
# do the following to have the same Semaphore object for all corresponding TaskResources so that we do not need a lookup
Expand Down

0 comments on commit 1a29dde

Please sign in to comment.