Skip to content

Commit

Permalink
[AnalysisQC] Change configuration logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Benedikt Volkel committed Apr 10, 2024
1 parent 268702d commit 29396db
Show file tree
Hide file tree
Showing 32 changed files with 457 additions and 5,001 deletions.
51 changes: 18 additions & 33 deletions MC/analysis_testing/o2dpg_analysis_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
# Analsysis task utilities
#
import sys
from os import environ
from os.path import join, exists, abspath, expanduser
from os import environ, listdir
from os.path import join, abspath

import json

# make sure O2DPG + O2 is loaded
O2DPG_ROOT=environ.get('O2DPG_ROOT')
Expand All @@ -22,40 +24,23 @@
ANALYSIS_VALID_DATA = "data"
ANALYSIS_COLLISION_SYSTEM_PP = "pp"
ANALYSIS_COLLISION_SYSTEM_PBPB = "pbpb"
ANALYSIS_CONFIGURATION_PREFIX = "analysis-testing"
ANALYSIS_DEFAULT_CONFIGURATION = {ANALYSIS_COLLISION_SYSTEM_PP: {ANALYSIS_VALID_MC: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PP, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_MC}.json"),
ANALYSIS_VALID_DATA: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PP, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_DATA}.json")},
ANALYSIS_COLLISION_SYSTEM_PBPB: {ANALYSIS_VALID_MC: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PBPB, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_MC}.json"),
ANALYSIS_VALID_DATA: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PBPB, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_DATA}.json")}}


def sanitize_configuration_path(path):
# sanitize path
path = path.replace("json://", "")
if path[0] != "$":
# only do this if there is no potential environment variable given as the first part of the path
path = abspath(expanduser(path))
return f"json://{path}"


def get_default_configuration(data_or_mc, collision_system):
path = ANALYSIS_DEFAULT_CONFIGURATION.get(collision_system, None)
if not path:
print(f"ERROR: Unknown collision system {collision_system}")
return None
return path[data_or_mc]


def get_configuration(analysis_name, data_or_mc, collision_system):
path = join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", analysis_name, collision_system, f"{ANALYSIS_CONFIGURATION_PREFIX}-{data_or_mc}.json")
if not exists(path):
path = get_default_configuration(data_or_mc, collision_system)
if not path:
return None
print(f"INFO: Use default configuration for {analysis_name}")
return sanitize_configuration_path(path)
def adjust_and_get_configuration_path(data_or_mc, collision_system, output_dir):

return sanitize_configuration_path(path)
final_config = {}
path = join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "dpl")
for config_path in listdir(path):
if not config_path.endswith('.json'):
continue
with open(join(path, config_path), 'r') as f:
final_config |= json.load(f)
# now we can do some adjustments
output_path = abspath(join(output_dir, 'dpl-config.json'))
with open(output_path, 'w') as f:
json.dump(final_config, f, indent=2)

return output_path


def get_collision_system(collision_system=None):
Expand Down
96 changes: 42 additions & 54 deletions MC/analysis_testing/o2dpg_analysis_test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import sys
import importlib.util
import argparse
from os import environ
from os import environ, makedirs
from os.path import join, exists, abspath, expanduser
import json

Expand Down Expand Up @@ -138,38 +138,6 @@ def load_analyses(analyses_only=None, include_disabled_analyses=False):
return collect_analyses


def add_analysis_post_processing_tasks(workflow):
"""add post-processing step to analysis tasks if possible
Args:
workflow: list
current list of tasks
"""
analyses_to_add_for = {}
# collect analyses in current workflow
for task in workflow:
if ANALYSIS_LABEL in task["labels"]:
analyses_to_add_for[task["name"]] = task

for ana in load_analyses(include_disabled_analyses=True):
if not ana["expected_output"]:
continue
ana_name_raw = ana["name"]
post_processing_macro = join(O2DPG_ROOT, "MC", "analysis_testing", "post_processing", f"{ana_name_raw}.C")
if not exists(post_processing_macro):
continue
ana_name = full_ana_name(ana_name_raw)
if ana_name not in analyses_to_add_for:
continue
pot_ana = analyses_to_add_for[ana_name]
cwd = pot_ana["cwd"]
needs = [ana_name]
task = createTask(name=f"{ANALYSIS_LABEL}_post_processing_{ana_name_raw}", cwd=join(cwd, "post_processing"), lab=[ANALYSIS_LABEL, f"{ANALYSIS_LABEL}PostProcessing", ana_name_raw], cpu=1, mem='2000', needs=needs)
input_files = ",".join([f"../{eo}" for eo in ana["expected_output"]])
cmd = f"\\(\\\"{input_files}\\\",\\\"./\\\"\\)"
task["cmd"] = f"root -l -b -q {post_processing_macro}{cmd}"
workflow.append(task)

def get_additional_workflows(input_aod):
additional_workflows = []

Expand Down Expand Up @@ -207,7 +175,7 @@ def get_additional_workflows(input_aod):
return additional_workflows


def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, collision_system=None, needs=None, autoset_converters=False, include_disabled_analyses=False, timeout=None, add_common_args=None):
def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, collision_system=None, needs=None, autoset_converters=False, include_disabled_analyses=False, timeout=None, add_common_args=None, split_analyses=False):
"""Add default analyses to user workflow
Args:
Expand Down Expand Up @@ -238,6 +206,13 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis
data_or_mc = ANALYSIS_VALID_MC if is_mc else ANALYSIS_VALID_DATA
collision_system = get_collision_system(collision_system)

# list of lists, each sub-list corresponds to one analysis pipe to be executed
analysis_pipes = []
# collect the names corresponding to analysis pipes
analysis_names = []
# a list of all tasks to be put together
merged_analysis_pipe = additional_workflows.copy()

for ana in load_analyses(analyses_only, include_disabled_analyses=include_disabled_analyses):
if is_mc and not ana.get("valid_mc", False):
print(f"INFO: Analysis {ana['name']} not added since not valid in MC")
Expand All @@ -246,30 +221,42 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis
print(f"INFO: Analysis {ana['name']} not added since not valid in data")
continue

configuration = get_configuration(ana["name"], data_or_mc, collision_system)
if not configuration:
print(f"INFO: Analysis {ana['name']} excluded due to no valid configuration")
if split_analyses:
# only the individual analyses, no merged
analysis_pipes.append(ana['tasks'])
analysis_names.append(ana['name'])
continue
print(f"INFO: Analysis {ana['name']} uses configuration {configuration}")

add_common_args_ana = get_common_args_as_string(ana, add_common_args)
if not add_common_args_ana:
print(f"ERROR: Cannot parse common args for analysis {ana['name']}")
continue
merged_analysis_pipe.extend(ana['tasks'])

if not split_analyses:
# add the merged analysis
analysis_pipes.append(merged_analysis_pipe)
analysis_names.append('AllAnalyses')

# now we need to create the output directory where we want the final configurations to go
output_dir_config = join(output_dir, 'config')
if not exists(output_dir_config):
makedirs(output_dir_config)

configuration = adjust_and_get_configuration_path(data_or_mc, collision_system, output_dir_config)

for analysis_name, analysis_pipe in zip(analysis_names, analysis_pipes):
# remove duplicates if they are there for nay reason (especially in the merged case)
analysis_pipe = list(set(analysis_pipe))
analysis_pipe_assembled = []
for executable_string in analysis_pipe:
# the input executable might come already with some configurations, the very first token is the actual executable
executable_string += f' --configuration json://{configuration}'
analysis_pipe_assembled.append(executable_string)

for i in additional_workflows:
if i not in ana["tasks"]:
# print("Appending extra task", i, "to analysis", ana["name"], "as it is not there yet and needed for conversion")
ana["tasks"].append(i)
piped_analysis = f" --configuration {configuration} | ".join(ana["tasks"])
piped_analysis += f" --configuration {configuration} --aod-file {input_aod}"
piped_analysis += add_common_args_ana
# put together, add AOD and timeout if requested
analysis_pipe_assembled = ' | '.join(analysis_pipe_assembled)
analysis_pipe_assembled += f' --aod-file {input_aod} --shm-segment-size 2000000000 --readers 1 --aod-memory-rate-limit 500000000'
if timeout is not None:
piped_analysis += f" --time-limit {timeout}"
workflow.append(create_ana_task(ana["name"], piped_analysis, output_dir, needs=needs, is_mc=is_mc))
analysis_pipe_assembled += f' --time-limit {timeout}'

# append potential post-processing
add_analysis_post_processing_tasks(workflow)
workflow.append(create_ana_task(analysis_name, analysis_pipe_assembled, output_dir, needs=needs, is_mc=is_mc))


def add_analysis_qc_upload_tasks(workflow, period_name, run_number, pass_name):
Expand Down Expand Up @@ -325,7 +312,7 @@ def run(args):
### setup global environment variables which are valid for all tasks, set as first task
global_env = {"ALICEO2_CCDB_CONDITION_NOT_AFTER": args.condition_not_after} if args.condition_not_after else None
workflow = [createGlobalInitTask(global_env)]
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, add_common_args=args.add_common_args)
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, add_common_args=args.add_common_args, split_analyses=args.split_analyses)
if args.with_qc_upload:
add_analysis_qc_upload_tasks(workflow, args.period_name, args.run_number, args.pass_name)
if not workflow:
Expand Down Expand Up @@ -353,6 +340,7 @@ def main():
parser.add_argument("--collision-system", dest="collision_system", help="Set the collision system. If not set, tried to be derived from ALIEN_JDL_LPMInterationType. Fallback to pp")
parser.add_argument("--add-common-args", dest="add_common_args", nargs="*", help="Pass additional common arguments per analysis, for instance --add-common-args EMCAL-shm-segment-size 2500000000 will add --shm-segment-size 2500000000 to the EMCAL analysis")
parser.add_argument('--condition-not-after', dest="condition_not_after", type=int, help="only consider CCDB objects not created after this timestamp (for TimeMachine)", default=3385078236000)
parser.add_argument('--split-analyses', dest='split_analyses', action='store_true', help='Split into single analyses pipes to be executed.')

parser.set_defaults(func=run)
args = parser.parse_args()
Expand Down
39 changes: 0 additions & 39 deletions MC/analysis_testing/post_processing/PWGMMMDnDeta.C

This file was deleted.

Loading

0 comments on commit 29396db

Please sign in to comment.