Skip to content

Commit

Permalink
[WFRunner] Adjust number of workers on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
Benedikt Volkel committed Mar 21, 2024
1 parent d03ca96 commit 2094c9d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
16 changes: 11 additions & 5 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import signal
import socket
import sys
from math import ceil, round

Check failure on line 14 in MC/bin/o2_dpg_workflow_runner.py

View workflow job for this annotation

GitHub Actions / Pylint

Pylint error no-name-in-module

No name 'round' in module 'math'
import traceback
import platform
import tarfile
Expand Down Expand Up @@ -539,18 +540,16 @@ def __init__(self, tid, name, cpu, cpu_relative, mem, resource_boundaries):
# the task ID belonging to these resources
self.tid = tid
self.name = name
# original CPUs/MEM assigned (persistent)
self.cpu_assigned_original = cpu
self.mem_assigned_original = mem
# relative CPU, to be multiplied with sampled CPU; set by the user, e.g. to allow to backfill tasks
# only takes effect when sampling resources; persistent
self.cpu_relative = cpu_relative if cpu_relative else 1
# CPUs/MEM assigned (transient)
self.cpu_assigned = cpu
self.mem_assigned = mem
self.n_workers_assigned = round(cpu / cpu_relative)
# global resource settings
self.resource_boundaries = resource_boundaries
# sampled resources of this
# sampled resources of this, note that the n_workers will be derived from cpu
self.cpu_sampled = None
self.mem_sampled = None
# Set these after a task has finished to compute new estomates for related tasks
Expand Down Expand Up @@ -616,7 +615,8 @@ def sample_resources(self):

if len(self.time_collect) < 3:
# Consider at least 3 points to sample from
self.cpu_sampled = self.cpu_assigned
# This task was short, halfen the CPU
self.cpu_sampled = self.cpu_assigned / 2
self.mem_sampled = self.mem_assigned
actionlogger.debug("Task %s has not enough points (< 3) to sample resources, setting to previosuly assigned values.", self.name)
else:
Expand Down Expand Up @@ -651,6 +651,7 @@ def sample_resources(self):
for res in self.related_tasks:
if res.is_done or res.booked:
continue
res.n_workers_assigned = ceil(cpu_sampled)
res.cpu_assigned = cpu_sampled * res.cpu_relative
res.mem_assigned = mem_sampled
# This task has been run before, stay optimistic and limit the resources in case the sampled ones exceed limits
Expand Down Expand Up @@ -733,6 +734,10 @@ def add_task_resources(self, name, related_tasks_name, cpu, cpu_relative, mem, s
self.resources_related_tasks_dict[related_tasks_name].append(resources)
resources.related_tasks = self.resources_related_tasks_dict[related_tasks_name]

def adjust_cmd(self, tid, cmd):
resources = self.resources[tid]
return cmd.replace("!WFR_NWORKERS!", resources.n_workers_assigned)

def add_monitored_resources(self, tid, time_delta_since_start, cpu, mem):
self.resources[tid].add(time_delta_since_start, cpu, mem)

Expand Down Expand Up @@ -1074,6 +1079,7 @@ def submit(self, tid, nice):
"""
actionlogger.debug("Submitting task " + str(self.idtotask[tid]) + " with nice value " + str(nice))
c = self.workflowspec['stages'][tid]['cmd']
c = self.resource_manager.adjust_cmd(tid, c)
workdir = self.workflowspec['stages'][tid]['cwd']
if workdir:
if os.path.exists(workdir) and not os.path.isdir(workdir):
Expand Down
14 changes: 7 additions & 7 deletions MC/bin/o2dpg_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
sgnmem = 6000 if COLTYPE == 'PbPb' else 4000
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=7/8, n_workers=NWORKERS, mem=str(sgnmem))
SGNtask['cmd']='${O2_ROOT}/bin/o2-sim -e ' + str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' --seed ' + str(TFSEED) \
+ ' --field ccdb -j ' + str(NWORKERS) + ' -g ' + str(GENERATOR) \
+ ' --field ccdb -j !WFR_NWORKERS! -g ' + str(GENERATOR) \
+ ' ' + str(TRIGGER) + ' ' + str(CONFKEY) + ' ' + str(INIFILE) \
+ ' -o ' + signalprefix + ' ' + embeddinto \
+ ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) \
Expand Down Expand Up @@ -849,7 +849,7 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}):
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem=str(tpcdigimem))
TPCDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTPC.root . ;')[doembedding]
TPCDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \
+ ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS) \
+ ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes !WFR_NWORKERS!' \
+ ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini --early-forward-policy always --forceSelectedDets ' \
+ putConfigValuesNew(["TPCGasParam","TPCGEMParam","TPCEleParam","TPCITCorr","TPCDetParam"],
localCF={"DigiParams.maxOrbitsToDigitize" : str(orbitsPerTF), "DigiParams.seed" : str(TFSEED)})
Expand All @@ -868,7 +868,7 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}):
TRDDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTRD.root . ;')[doembedding]
TRDDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \
+ ' --onlyDet TRD --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' \
+ putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets"
+ putConfigValuesNew(localCF={"TRDSimParams.digithreads" : "!WFR_NWORKERS!", "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets"
TRDDigitask['cmd'] += ('',' --disable-mc')[args.no_mc_labels]
workflow['stages'].append(TRDDigitask)

Expand Down Expand Up @@ -958,7 +958,7 @@ def getDigiTaskName(det):
taskname = 'tpcclusterpart' + str((int)(s/sectorpertask)) + '_' + str(tf)
tpcclustertasks.append(taskname)
tpcclussect = createTask(name=taskname, needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000')
digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS) + ' | '
digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes !WFR_NWORKERS! | '
tpcclussect['cmd'] = (digitmergerstr,'')[args.no_tpc_digitchunking] + ' ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type ' + ('digitizer','digits')[args.no_tpc_digitchunking] + ' --output-type clusters,send-clusters-per-sector --tpc-native-cluster-writer \" --outfile tpc-native-clusters-part'+ str((int)(s/sectorpertask)) + '.root\" --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValuesNew(["GPU_global"], {"GPU_proc.ompThreads" : 4}) + ('',' --disable-mc')[args.no_mc_labels]
tpcclussect['env'] = { "OMP_NUM_THREADS" : "4", "SHMSIZE" : "16000000000" }
tpcclussect['semaphore'] = "tpctriggers.root"
Expand All @@ -971,15 +971,15 @@ def getDigiTaskName(det):
tpcreconeeds.append(TPCCLUSMERGEtask['name'])
else:
tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='2000')
tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS)
tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes !WFR_NWORKERS!'
tpcclus['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options() + ' --input-type digitizer --output-type clusters,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam","TPCCorrMap"],{"GPU_proc.ompThreads" : 1}) + ('',' --disable-mc')[args.no_mc_labels]
workflow['stages'].append(tpcclus)
tpcreconeeds.append(tpcclus['name'])

tpc_corr_scaling_options = anchorConfig.get('tpc-corr-scaling','')
TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=tpcreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000')
TPCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type clusters --output-type tracks,send-clusters-per-sector ' \
+ putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads":NWORKERS}) + ('',' --disable-mc')[args.no_mc_labels] \
+ putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads": "!WFR_NWORKERS!"}) + ('',' --disable-mc')[args.no_mc_labels] \
+ tpc_corr_scaling_options
workflow['stages'].append(TPCRECOtask)

Expand Down Expand Up @@ -1300,7 +1300,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
needs=[PHSRECOtask['name']],
readerCommand='o2-phos-reco-workflow --input-type cells --output-type clusters --disable-mc --disable-root-output',
configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/phs-cells-clusters-task.json')

### MID
if isActive('MID'):
addQCPerTF(taskName='MIDTaskQC',
Expand Down

0 comments on commit 2094c9d

Please sign in to comment.