diff --git a/MC/analysis_testing/analysis_test.sh b/MC/analysis_testing/analysis_test.sh index e934b6d47..600118101 100755 --- a/MC/analysis_testing/analysis_test.sh +++ b/MC/analysis_testing/analysis_test.sh @@ -49,12 +49,6 @@ else shift shift ;; - --add-common-args) - add_common_args=" ${2} ${3} " - shift - shift - shift - ;; *) echo "ERROR: Unknown argument ${1}" exit 1 @@ -66,7 +60,6 @@ fi # basic checks [[ "${testanalysis}" == "" ]] && { echo "ERROR: No analysis specified to be run" ; exit 1 ; } [[ "${aod}" == "" ]] && { echo "ERROR: No AOD found to be analysed" ; exit 1 ; } -[[ "${add_common_args}" != "" ]] && add_common_args="--add-common-args ${add_common_args}" # check if enabled enabled=$($O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_config.py check -t ${testanalysis} --status) @@ -77,7 +70,7 @@ mkdir Analysis 2>/dev/null include_disabled=${include_disabled:+--include-disabled} workflow_path="Analysis/workflow_analysis_test_${testanalysis}.json" rm ${workflow_path} 2>/dev/null -$O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_workflow.py --is-mc -f ${aod} -o ${workflow_path} --only-analyses ${testanalysis} ${include_disabled} ${add_common_args} +$O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_workflow.py --is-mc --split-analyses -f ${aod} -o ${workflow_path} --only-analyses ${testanalysis} ${include_disabled} [[ ! -f "${workflow_path}" ]] && { echo "Could not construct workflow for analysis ${testanalysis}" ; exit 1 ; } $O2DPG_ROOT/MC/bin/o2_dpg_workflow_runner.py -f ${workflow_path} -tt Analysis_${testanalysis}$ --rerun-from Analysis_${testanalysis}$ diff --git a/MC/analysis_testing/o2dpg_analysis_test_config.py b/MC/analysis_testing/o2dpg_analysis_test_config.py index 7e9c23d83..46ae5c31c 100755 --- a/MC/analysis_testing/o2dpg_analysis_test_config.py +++ b/MC/analysis_testing/o2dpg_analysis_test_config.py @@ -3,7 +3,7 @@ import sys import argparse from os import environ -from os.path import join, exists +from os.path import join, exists, isdir import json # make sure O2DPG + O2 is loaded @@ -14,14 +14,29 @@ sys.exit(1) +def get_config(path=None): + default_path = join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "analyses_config.json") + if not path: + return default_path + + if isdir(path): + # assume to look for analyses_config.json in this directory + path = join(path, "analyses_config.json") + + if not exists(path): + print(f"WARNING: Cannot locate config for AnalysisQC at custom path {path}. USe default at {default_path}") + return default_path + + with open(path, "r") as f: + return json.load(f)["analyses"] + + def modify(args): """ modify and create a new config """ - analyses = None - with open (args.config, "r") as f: - analyses = json.load(f)["analyses"] + analyses = get_config(args.config) for ana in analyses: if args.disable_tasks and ana["name"] in args.disable_tasks: @@ -51,9 +66,7 @@ def print_status(enabled): return print("DISABLED") - analyses = None - with open (args.config, "r") as f: - analyses = json.load(f)["analyses"] + analyses = get_config(args.config) for ana in analyses: if ana["name"] == args.task: @@ -80,9 +93,7 @@ def show_tasks(args): args.enabled = True args.disabled = True - analyses = None - with open (args.config, "r") as f: - analyses = json.load(f)["analyses"] + analyses = get_config(args.config) for ana in analyses: if (args.enabled and ana["enabled"]) or (args.disabled and not ana["enabled"]): @@ -92,9 +103,12 @@ def show_tasks(args): def validate_output(args): - analyses = None - with open (args.config, "r") as f: - analyses = json.load(f)["analyses"] + + if not args.config: + # first see if config is not explicitly given, then use the directory where the analyses to check are located + args.config = args.directory + + analyses = get_config(args.config) # global return code ret = 0 @@ -105,11 +119,10 @@ def validate_output(args): analysis_name = ana["name"] if args.tasks: - if analysis_name in args.tasks: - # tasks were specified explicitly, make sure to take them into account at all costs - include_disabled = True - else: + if analysis_name not in args.tasks: continue + # tasks were specified explicitly, make sure to take them into account at all costs + include_disabled = True if not ana["enabled"] and not include_disabled: # continue if disabled and not including those @@ -160,7 +173,7 @@ def main(): sub_parsers = parser.add_subparsers(dest="command") config_parser = argparse.ArgumentParser(add_help=False) - config_parser.add_argument("-c", "--config", help="input configuration to modify", default=join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "analyses_config.json")) + config_parser.add_argument("-c", "--config", help="input configuration to modify") # modify config modify_parser = sub_parsers.add_parser("modify", parents=[config_parser]) diff --git a/MC/analysis_testing/o2dpg_analysis_test_utils.py b/MC/analysis_testing/o2dpg_analysis_test_utils.py index ac4555429..888446003 100755 --- a/MC/analysis_testing/o2dpg_analysis_test_utils.py +++ b/MC/analysis_testing/o2dpg_analysis_test_utils.py @@ -24,6 +24,7 @@ ANALYSIS_VALID_DATA = "data" ANALYSIS_COLLISION_SYSTEM_PP = "pp" ANALYSIS_COLLISION_SYSTEM_PBPB = "pbpb" +ANALYSIS_MERGED_ANALYSIS_NAME = "MergedAnalyses" def adjust_configuration_line(line, data_or_mc, collision_system): diff --git a/MC/analysis_testing/o2dpg_analysis_test_workflow.py b/MC/analysis_testing/o2dpg_analysis_test_workflow.py index a4bd1a9a6..a515b80d5 100755 --- a/MC/analysis_testing/o2dpg_analysis_test_workflow.py +++ b/MC/analysis_testing/o2dpg_analysis_test_workflow.py @@ -216,6 +216,10 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis merged_analysis_pipe = additional_workflows.copy() # cpu and mem of merged analyses merged_analysis_cpu_mem = [0, 0] + # expected output of merged analysis + merged_analysis_expected_output = [] + # analyses config to write + analyses_config = [] for ana in load_analyses(analyses_only, include_disabled_analyses=include_disabled_analyses): if is_mc and not ana.get("valid_mc", False): @@ -233,6 +237,7 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis analysis_pipes.append(ana['tasks']) analysis_names.append(ana['name']) analysis_cpu_mem.append((1, 2000)) + analyses_config.append(ana) continue merged_analysis_pipe.extend(ana['tasks']) @@ -240,19 +245,33 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis # Putting everything into one big pipe does not mean that the resources scale the same! merged_analysis_cpu_mem[0] += 0.5 merged_analysis_cpu_mem[1] += 700 + merged_analysis_expected_output.extend(ana['expected_output']) if not split_analyses: # add the merged analysis analysis_pipes.append(merged_analysis_pipe) - analysis_names.append('MergedAnalyses') + analysis_names.append(ANALYSIS_MERGED_ANALYSIS_NAME) # take at least the resources estimated for a single analysis analysis_cpu_mem.append((max(1, merged_analysis_cpu_mem[0]), max(2000, merged_analysis_cpu_mem[1]))) + merged_analysis_expected_output = list(set(merged_analysis_expected_output)) + # config of the merged analysis. Since it doesn't exist in the previous config, but we would like to have it defined, do it here + analyses_config.append({'name': ANALYSIS_MERGED_ANALYSIS_NAME, + 'valid_mc': is_mc, + 'valid_data': not is_mc, + 'enabled': True, + 'tasks': merged_analysis_pipe, + 'expected_output': merged_analysis_expected_output}) + # now we need to create the output directory where we want the final configurations to go output_dir_config = join(output_dir, 'config') if not exists(output_dir_config): makedirs(output_dir_config) + # write the analysis config of this + with open(join(output_dir, 'analyses_config.json')) as f: + json.dump({'analyses': analyses_config}, f, indent=2) + configuration = adjust_and_get_configuration_path(data_or_mc, collision_system, output_dir_config) for analysis_name, analysis_pipe, analysis_res in zip(analysis_names, analysis_pipes, analysis_cpu_mem):