Skip to content

Commit

Permalink
[SimWF] Recompute number of workers used in TFs
Browse files Browse the repository at this point in the history
Only do so if --pregenCollContext is set, otherwise up to the user to
decide
  • Loading branch information
Benedikt Volkel committed Mar 21, 2024
1 parent d03ca96 commit 3a932b9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
29 changes: 16 additions & 13 deletions MC/bin/o2dpg_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

sys.path.append(join(dirname(__file__), '.', 'o2dpg_workflow_utils'))

from o2dpg_workflow_utils import createTask, createGlobalInitTask, dump_workflow, adjust_RECO_environment, isActive, activate_detector, deactivate_detector
from o2dpg_workflow_utils import createTask, createGlobalInitTask, dump_workflow, adjust_RECO_environment, isActive, activate_detector, deactivate_detector, compute_n_workers
from o2dpg_qc_finalization_workflow import include_all_QC_finalization
from o2dpg_sim_config import create_sim_config, create_geant_config, constructConfigKeyArg

Expand Down Expand Up @@ -638,6 +638,9 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
QEDdigiargs=' --simPrefixQED qed_' + str(tf) + ' --qed-x-section-ratio ' + str(QEDXSecExpected/PbPbXSec)
workflow['stages'].append(QED_task)

# recompute the number of workers to increase CPU efficiency
NWORKERS_TF = compute_n_workers(INTRATE, COLTYPE) if args.pregenCollContext else NWORKERS

# produce the signal configuration
SGN_CONFIG_task=createTask(name='gensgnconf_'+str(tf), tf=tf, cwd=timeframeworkdir)
SGN_CONFIG_task['cmd'] = 'echo "placeholder / dummy task"'
Expand Down Expand Up @@ -695,9 +698,9 @@ def getDPL_global_options(bigshm=False, ccdbbackend=True):
else:
signalneeds = signalneeds + [ BKG_HEADER_task['name'] ]
sgnmem = 6000 if COLTYPE == 'PbPb' else 4000
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=7/8, n_workers=NWORKERS, mem=str(sgnmem))
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=7/8, n_workers=NWORKERS_TF, mem=str(sgnmem))
SGNtask['cmd']='${O2_ROOT}/bin/o2-sim -e ' + str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' --seed ' + str(TFSEED) \
+ ' --field ccdb -j ' + str(NWORKERS) + ' -g ' + str(GENERATOR) \
+ ' --field ccdb -j ' + str(NWORKERS_TF) + ' -g ' + str(GENERATOR) \
+ ' ' + str(TRIGGER) + ' ' + str(CONFKEY) + ' ' + str(INIFILE) \
+ ' -o ' + signalprefix + ' ' + embeddinto \
+ ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) \
Expand Down Expand Up @@ -846,10 +849,10 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}):

tpcdigimem = 12000 if havePbPb else 9000
TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=tpcdigineeds,
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem=str(tpcdigimem))
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS_TF, mem=str(tpcdigimem))
TPCDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTPC.root . ;')[doembedding]
TPCDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \
+ ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS) \
+ ' --onlyDet TPC --TPCuseCCDB --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS_TF) \
+ ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini --early-forward-policy always --forceSelectedDets ' \
+ putConfigValuesNew(["TPCGasParam","TPCGEMParam","TPCEleParam","TPCITCorr","TPCDetParam"],
localCF={"DigiParams.maxOrbitsToDigitize" : str(orbitsPerTF), "DigiParams.seed" : str(TFSEED)})
Expand All @@ -864,11 +867,11 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}):
if usebkgcache:
trddigineeds += [ BKG_HITDOWNLOADER_TASKS['TRD']['name'] ]
TRDDigitask=createTask(name='trddigi_'+str(tf), needs=trddigineeds,
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='8000')
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS_TF, mem='8000')
TRDDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTRD.root . ;')[doembedding]
TRDDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \
+ ' --onlyDet TRD --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' \
+ putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets"
+ putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS_TF, "DigiParams.seed" : str(TFSEED)}) + " --forceSelectedDets"
TRDDigitask['cmd'] += ('',' --disable-mc')[args.no_mc_labels]
workflow['stages'].append(TRDDigitask)

Expand Down Expand Up @@ -958,7 +961,7 @@ def getDigiTaskName(det):
taskname = 'tpcclusterpart' + str((int)(s/sectorpertask)) + '_' + str(tf)
tpcclustertasks.append(taskname)
tpcclussect = createTask(name=taskname, needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000')
digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS) + ' | '
digitmergerstr = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS_TF) + ' | '
tpcclussect['cmd'] = (digitmergerstr,'')[args.no_tpc_digitchunking] + ' ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type ' + ('digitizer','digits')[args.no_tpc_digitchunking] + ' --output-type clusters,send-clusters-per-sector --tpc-native-cluster-writer \" --outfile tpc-native-clusters-part'+ str((int)(s/sectorpertask)) + '.root\" --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValuesNew(["GPU_global"], {"GPU_proc.ompThreads" : 4}) + ('',' --disable-mc')[args.no_mc_labels]
tpcclussect['env'] = { "OMP_NUM_THREADS" : "4", "SHMSIZE" : "16000000000" }
tpcclussect['semaphore'] = "tpctriggers.root"
Expand All @@ -970,16 +973,16 @@ def getDigiTaskName(det):
workflow['stages'].append(TPCCLUSMERGEtask)
tpcreconeeds.append(TPCCLUSMERGEtask['name'])
else:
tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='2000')
tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS)
tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS_TF, mem='2000')
tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS_TF)
tpcclus['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options() + ' --input-type digitizer --output-type clusters,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam","TPCCorrMap"],{"GPU_proc.ompThreads" : 1}) + ('',' --disable-mc')[args.no_mc_labels]
workflow['stages'].append(tpcclus)
tpcreconeeds.append(tpcclus['name'])

tpc_corr_scaling_options = anchorConfig.get('tpc-corr-scaling','')
TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=tpcreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000')
TPCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type clusters --output-type tracks,send-clusters-per-sector ' \
+ putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads":NWORKERS}) + ('',' --disable-mc')[args.no_mc_labels] \
+ putConfigValuesNew(["GPU_global","TPCGasParam", "TPCCorrMap", "GPU_rec_tpc", "trackTuneParams"], {"GPU_proc.ompThreads":NWORKERS_TF}) + ('',' --disable-mc')[args.no_mc_labels] \
+ tpc_corr_scaling_options
workflow['stages'].append(TPCRECOtask)

Expand Down Expand Up @@ -1142,7 +1145,7 @@ def getDigiTaskName(det):
pvfinder_matching_sources = anchorConfig.get('', {}).get('vertex-track-matching-sources', 'ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF,TPC-TRD-TOF,ITS-TPC-TRD-TOF,MFT-MCH,MCH-MID,ITS,MFT,TPC,TOF,FT0,MID,EMC,PHS,CPV,ZDC,FDD,HMP,FV0,TRD,MCH,CTP')
pvfinderneeds = [TRDTRACKINGtask2['name'], FT0RECOtask['name'], FV0RECOtask['name'], EMCRECOtask['name'], PHSRECOtask['name'], CPVRECOtask['name'], FDDRECOtask['name'], ZDCRECOtask['name'], HMPMATCHtask['name'], HMPMATCHtask['name'], ITSTPCMATCHtask['name'], TOFTPCMATCHERtask['name'], MFTMCHMATCHtask['name'], MCHMIDMATCHtask['name']]

PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='4000')
PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS_TF, mem='4000')
PVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-primary-vertexing-workflow ' \
+ getDPL_global_options() + putConfigValuesNew(['ITSAlpideParam','MFTAlpideParam', 'pvertexer', 'TPCGasParam', 'TPCCorrMap', 'ft0tag'], {"NameConf.mDirMatLUT" : ".."})
PVFINDERtask['cmd'] += ' --vertexing-sources ' + pvfinder_sources + ' --vertex-track-matching-sources ' + pvfinder_matching_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices]
Expand Down Expand Up @@ -1300,7 +1303,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''):
needs=[PHSRECOtask['name']],
readerCommand='o2-phos-reco-workflow --input-type cells --output-type clusters --disable-mc --disable-root-output',
configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/phs-cells-clusters-task.json')

### MID
if isActive('MID'):
addQCPerTF(taskName='MIDTaskQC',
Expand Down
22 changes: 22 additions & 0 deletions MC/bin/o2dpg_workflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,28 @@ def deactivate_detector(det):
def isActive(det):
return det not in INACTIVE_DETECTORS and ("all" in ACTIVE_DETECTORS or det in ACTIVE_DETECTORS)

def compute_n_workers(interaction_rate, collision_system, n_workers_user=8, n_workers_min=1, interaction_rate_linear_below=300000):
"""
Compute number of workers
n_workers = m * IR + b
based on
https://indico.cern.ch/event/1395900/contributions/5868567/attachments/2823967/4932440/20240320_slides_cpu_eff.pdf, slide 3
Assume n_workers_in=8 to be ideal for pp IR > interaction_rate_linear_below
Start with 1 worker at IR=0
Go linearly until interaction_rate_linear_below
"""
if collision_system == "PbPb" or interaction_rate >= interaction_rate_linear_below:
return n_workers_user

n_workers_min = max(1, n_workers_min)
m = (n_workers_user - n_workers_min) / interaction_rate_linear_below
# at least 1 worker
return max(1, round(m * interaction_rate + n_workers_min))

def relativeCPU(n_rel, n_workers):
# compute number of CPUs from a given number of workers
# n_workers and a fraction n_rel
Expand Down

0 comments on commit 3a932b9

Please sign in to comment.