Skip to content

Commit

Permalink
[AnalysisQC] Change configuration logic
Browse files Browse the repository at this point in the history
* remove unused analyses

* split config JSONs into executable-specific single JSONs
  Most of those are seen as configurations for "service wagons".
  Each analyses is forced to use those comon configurations and it is
  not possible to have analysis-specific configurations for service
  wagons.

* possible to use special configuration strings in configs; this is done
  to avoid duplications of configs
  * "!ANALYSIS_QC_is_mc!" ("!ANALYSIS_QC_is_data!") will be replaced
    with "true"("false") or "false"("true") depending on whether the
    analyses are run on MC(data)

* new default is to pipe all analyses together;
  to split into single analyses, run
  o2dpg_analysis_testing_workflow.py [...] --split-analyses
  • Loading branch information
Benedikt Volkel committed Apr 10, 2024
1 parent 54a5527 commit 3d9abaf
Show file tree
Hide file tree
Showing 33 changed files with 585 additions and 6,070 deletions.
60 changes: 27 additions & 33 deletions MC/analysis_testing/o2dpg_analysis_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
# Analsysis task utilities
#
import sys
from os import environ
from os.path import join, exists, abspath, expanduser
from os import environ, listdir
from os.path import join, abspath

import json

# make sure O2DPG + O2 is loaded
O2DPG_ROOT=environ.get('O2DPG_ROOT')
Expand All @@ -22,40 +24,32 @@
ANALYSIS_VALID_DATA = "data"
ANALYSIS_COLLISION_SYSTEM_PP = "pp"
ANALYSIS_COLLISION_SYSTEM_PBPB = "pbpb"
ANALYSIS_CONFIGURATION_PREFIX = "analysis-testing"
ANALYSIS_DEFAULT_CONFIGURATION = {ANALYSIS_COLLISION_SYSTEM_PP: {ANALYSIS_VALID_MC: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PP, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_MC}.json"),
ANALYSIS_VALID_DATA: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PP, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_DATA}.json")},
ANALYSIS_COLLISION_SYSTEM_PBPB: {ANALYSIS_VALID_MC: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PBPB, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_MC}.json"),
ANALYSIS_VALID_DATA: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PBPB, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_DATA}.json")}}


def sanitize_configuration_path(path):
# sanitize path
path = path.replace("json://", "")
if path[0] != "$":
# only do this if there is no potential environment variable given as the first part of the path
path = abspath(expanduser(path))
return f"json://{path}"


def get_default_configuration(data_or_mc, collision_system):
path = ANALYSIS_DEFAULT_CONFIGURATION.get(collision_system, None)
if not path:
print(f"ERROR: Unknown collision system {collision_system}")
return None
return path[data_or_mc]


def get_configuration(analysis_name, data_or_mc, collision_system):
path = join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", analysis_name, collision_system, f"{ANALYSIS_CONFIGURATION_PREFIX}-{data_or_mc}.json")
if not exists(path):
path = get_default_configuration(data_or_mc, collision_system)
if not path:
return None
print(f"INFO: Use default configuration for {analysis_name}")
return sanitize_configuration_path(path)
def adjust_configuration_line(line, data_or_mc, collision_system):
line = line.replace('!ANALYSIS_QC_is_mc!', str(data_or_mc == ANALYSIS_VALID_MC).lower())
line = line.replace('!ANALYSIS_QC_is_data!', str(data_or_mc == ANALYSIS_VALID_DATA).lower())
return line


return sanitize_configuration_path(path)
def adjust_and_get_configuration_path(data_or_mc, collision_system, output_dir):

final_config = {}
path = join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "dpl")
for config_path in listdir(path):
if not config_path.endswith('.json'):
continue
json_string = ""
with open(join(path, config_path), 'r') as f:
for line in f:
json_string += adjust_configuration_line(line, data_or_mc, collision_system)
final_config |= json.loads(json_string)
# now we can do some adjustments
output_path = abspath(join(output_dir, 'dpl-config.json'))
with open(output_path, 'w') as f:
json.dump(final_config, f, indent=2)

return output_path


def get_collision_system(collision_system=None):
Expand Down
116 changes: 58 additions & 58 deletions MC/analysis_testing/o2dpg_analysis_test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import sys
import importlib.util
import argparse
from os import environ
from os import environ, makedirs
from os.path import join, exists, abspath, expanduser
import json

Expand All @@ -90,7 +90,7 @@
from o2dpg_analysis_test_utils import *


def create_ana_task(name, cmd, output_dir, *, needs=None, extraarguments="-b", is_mc=False):
def create_ana_task(name, cmd, output_dir, *, cpu=1, mem='2000', needs=None, extraarguments="-b", is_mc=False):
"""Quick helper to create analysis task
This creates an analysis task from various arguments
Expand All @@ -114,7 +114,7 @@ def create_ana_task(name, cmd, output_dir, *, needs=None, extraarguments="-b", i
if needs is None:
# set to empty list
needs = []
task = createTask(name=full_ana_name(name), cwd=join(output_dir, name), lab=[ANALYSIS_LABEL, name], cpu=1, mem='2000', needs=needs)
task = createTask(name=full_ana_name(name), cwd=join(output_dir, name), lab=[ANALYSIS_LABEL, name], cpu=cpu, mem=mem, needs=needs)
if is_mc:
task["labels"].append(ANALYSIS_LABEL_ON_MC)
task['cmd'] = f"{cmd} {extraarguments}"
Expand All @@ -138,38 +138,6 @@ def load_analyses(analyses_only=None, include_disabled_analyses=False):
return collect_analyses


def add_analysis_post_processing_tasks(workflow):
"""add post-processing step to analysis tasks if possible
Args:
workflow: list
current list of tasks
"""
analyses_to_add_for = {}
# collect analyses in current workflow
for task in workflow:
if ANALYSIS_LABEL in task["labels"]:
analyses_to_add_for[task["name"]] = task

for ana in load_analyses(include_disabled_analyses=True):
if not ana["expected_output"]:
continue
ana_name_raw = ana["name"]
post_processing_macro = join(O2DPG_ROOT, "MC", "analysis_testing", "post_processing", f"{ana_name_raw}.C")
if not exists(post_processing_macro):
continue
ana_name = full_ana_name(ana_name_raw)
if ana_name not in analyses_to_add_for:
continue
pot_ana = analyses_to_add_for[ana_name]
cwd = pot_ana["cwd"]
needs = [ana_name]
task = createTask(name=f"{ANALYSIS_LABEL}_post_processing_{ana_name_raw}", cwd=join(cwd, "post_processing"), lab=[ANALYSIS_LABEL, f"{ANALYSIS_LABEL}PostProcessing", ana_name_raw], cpu=1, mem='2000', needs=needs)
input_files = ",".join([f"../{eo}" for eo in ana["expected_output"]])
cmd = f"\\(\\\"{input_files}\\\",\\\"./\\\"\\)"
task["cmd"] = f"root -l -b -q {post_processing_macro}{cmd}"
workflow.append(task)

def get_additional_workflows(input_aod):
additional_workflows = []

Expand Down Expand Up @@ -207,7 +175,7 @@ def get_additional_workflows(input_aod):
return additional_workflows


def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, collision_system=None, needs=None, autoset_converters=False, include_disabled_analyses=False, timeout=None, add_common_args=None):
def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, collision_system=None, needs=None, autoset_converters=False, include_disabled_analyses=False, timeout=None, split_analyses=False):
"""Add default analyses to user workflow
Args:
Expand Down Expand Up @@ -238,38 +206,71 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis
data_or_mc = ANALYSIS_VALID_MC if is_mc else ANALYSIS_VALID_DATA
collision_system = get_collision_system(collision_system)

# list of lists, each sub-list corresponds to one analysis pipe to be executed
analysis_pipes = []
# collect the names corresponding to analysis pipes
analysis_names = []
# cpu and mem of each task
analysis_cpu_mem = []
# a list of all tasks to be put together
merged_analysis_pipe = additional_workflows.copy()
# cpu and mem of merged analyses
merged_analysis_cpu_mem = [0, 0]

for ana in load_analyses(analyses_only, include_disabled_analyses=include_disabled_analyses):
if is_mc and not ana.get("valid_mc", False):
print(f"INFO: Analysis {ana['name']} not added since not valid in MC")
continue
if not is_mc and not ana.get("valid_data", False):
print(f"INFO: Analysis {ana['name']} not added since not valid in data")
continue

configuration = get_configuration(ana["name"], data_or_mc, collision_system)
if not configuration:
print(f"INFO: Analysis {ana['name']} excluded due to no valid configuration")
if analyses_only and ana['name'] not in analyses_only:
# filter on analyses if requested
continue
print(f"INFO: Analysis {ana['name']} uses configuration {configuration}")

add_common_args_ana = get_common_args_as_string(ana, add_common_args)
if not add_common_args_ana:
print(f"ERROR: Cannot parse common args for analysis {ana['name']}")
if split_analyses:
# only the individual analyses, no merged
analysis_pipes.append(ana['tasks'])
analysis_names.append(ana['name'])
analysis_cpu_mem.append((1, 2000))
continue

for i in additional_workflows:
if i not in ana["tasks"]:
# print("Appending extra task", i, "to analysis", ana["name"], "as it is not there yet and needed for conversion")
ana["tasks"].append(i)
piped_analysis = f" --configuration {configuration} | ".join(ana["tasks"])
piped_analysis += f" --configuration {configuration} --aod-file {input_aod}"
piped_analysis += add_common_args_ana
merged_analysis_pipe.extend(ana['tasks'])
# underestimate what a single analysis would take in the merged case.
# Putting everything into one big pipe does not mean that the resources scale the same!
merged_analysis_cpu_mem[0] += 0.5
merged_analysis_cpu_mem[1] += 700

if not split_analyses:
# add the merged analysis
analysis_pipes.append(merged_analysis_pipe)
analysis_names.append('MergedAnalyses')
# take at least the resources estimated for a single analysis
analysis_cpu_mem.append((max(1, merged_analysis_cpu_mem[0]), max(2000, merged_analysis_cpu_mem[1])))

# now we need to create the output directory where we want the final configurations to go
output_dir_config = join(output_dir, 'config')
if not exists(output_dir_config):
makedirs(output_dir_config)

configuration = adjust_and_get_configuration_path(data_or_mc, collision_system, output_dir_config)

for analysis_name, analysis_pipe, analysis_res in zip(analysis_names, analysis_pipes, analysis_cpu_mem):
# remove duplicates if they are there for nay reason (especially in the merged case)
analysis_pipe = list(set(analysis_pipe))
analysis_pipe_assembled = []
for executable_string in analysis_pipe:
# the input executable might come already with some configurations, the very first token is the actual executable
executable_string += f' --configuration json://{configuration}'
analysis_pipe_assembled.append(executable_string)

# put together, add AOD and timeout if requested
analysis_pipe_assembled = ' | '.join(analysis_pipe_assembled)
analysis_pipe_assembled += f' --aod-file {input_aod} --shm-segment-size 3000000000 --readers 1 --aod-memory-rate-limit 500000000'
if timeout is not None:
piped_analysis += f" --time-limit {timeout}"
workflow.append(create_ana_task(ana["name"], piped_analysis, output_dir, needs=needs, is_mc=is_mc))
analysis_pipe_assembled += f' --time-limit {timeout}'

# append potential post-processing
add_analysis_post_processing_tasks(workflow)
workflow.append(create_ana_task(analysis_name, analysis_pipe_assembled, output_dir, cpu=analysis_res[0], mem=analysis_res[1], needs=needs, is_mc=is_mc))


def add_analysis_qc_upload_tasks(workflow, period_name, run_number, pass_name):
Expand Down Expand Up @@ -300,7 +301,6 @@ def add_analysis_qc_upload_tasks(workflow, period_name, run_number, pass_name):
# search through workflow stages if we can find the requested analysis
pot_ana = analyses_to_add_for[ana_name]
cwd = pot_ana["cwd"]
qc_tag = f"Analysis{ana_name_raw}"
needs = [ana_name]
provenance = "qc_mc" if ANALYSIS_LABEL_ON_MC in pot_ana["labels"] else "qc"
for eo in ana["expected_output"]:
Expand All @@ -325,7 +325,7 @@ def run(args):
### setup global environment variables which are valid for all tasks, set as first task
global_env = {"ALICEO2_CCDB_CONDITION_NOT_AFTER": args.condition_not_after} if args.condition_not_after else None
workflow = [createGlobalInitTask(global_env)]
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, add_common_args=args.add_common_args)
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, split_analyses=args.split_analyses)
if args.with_qc_upload:
add_analysis_qc_upload_tasks(workflow, args.period_name, args.run_number, args.pass_name)
if not workflow:
Expand All @@ -351,8 +351,8 @@ def main():
parser.add_argument("--autoset-converters", dest="autoset_converters", action="store_true", help="Compatibility mode to automatically set the converters for the analysis")
parser.add_argument("--timeout", type=int, default=None, help="Timeout for analysis tasks in seconds.")
parser.add_argument("--collision-system", dest="collision_system", help="Set the collision system. If not set, tried to be derived from ALIEN_JDL_LPMInterationType. Fallback to pp")
parser.add_argument("--add-common-args", dest="add_common_args", nargs="*", help="Pass additional common arguments per analysis, for instance --add-common-args EMCAL-shm-segment-size 2500000000 will add --shm-segment-size 2500000000 to the EMCAL analysis")
parser.add_argument('--condition-not-after', dest="condition_not_after", type=int, help="only consider CCDB objects not created after this timestamp (for TimeMachine)", default=3385078236000)
parser.add_argument('--split-analyses', dest='split_analyses', action='store_true', help='Split into single analyses pipes to be executed.')

parser.set_defaults(func=run)
args = parser.parse_args()
Expand Down
39 changes: 0 additions & 39 deletions MC/analysis_testing/post_processing/PWGMMMDnDeta.C

This file was deleted.

Loading

0 comments on commit 3d9abaf

Please sign in to comment.