Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make parallel_pipelines default, with a limit #711

Merged
merged 6 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions compiler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from util import *


## Global
__version__ = "0.12.2" # FIXME add libdash version
GIT_TOP_CMD = [
Expand Down Expand Up @@ -209,10 +210,22 @@ def add_common_arguments(parser):
)
parser.add_argument(
"--parallel_pipelines",
help="Run multiple pipelines in parallel if they are safe to run",
help="(obsolete) Run multiple pipelines in parallel if they are safe to run. Now true by default. See --no_parallel_pipelines.",
action="store_true",
default=True,
)
parser.add_argument(
"--no_parallel_pipelines",
help="Disable parallel running of independent pipelines",
action="store_true",
default=False,
)
parser.add_argument(
"--parallel_pipelines_limit",
help="Maximum number of parallel independent pipelines",
type=int,
default=2,
)
parser.add_argument(
"--r_split_batch_size",
type=int,
Expand Down Expand Up @@ -301,10 +314,12 @@ def pass_common_arguments(pash_arguments):
arguments.append("--distributed_exec")
if pash_arguments.speculative:
arguments.append("--speculative")
if pash_arguments.parallel_pipelines:
arguments.append("--parallel_pipelines")
if pash_arguments.no_parallel_pipelines:
arguments.append("--no_parallel_pipelines")
if pash_arguments.daemon_communicates_through_unix_pipes:
arguments.append("--daemon_communicates_through_unix_pipes")
arguments.append("--parallel_pipelines_limit")
arguments.append(str(pash_arguments.parallel_pipelines_limit))
arguments.append("--r_split_batch_size")
arguments.append(str(pash_arguments.r_split_batch_size))
arguments.append("--debug")
Expand Down
6 changes: 3 additions & 3 deletions compiler/orchestrator_runtime/pash_init_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export pash_checking_log_file=0
export pash_checking_debug_level=0
export pash_avoid_pash_runtime_completion_flag=0
export pash_profile_driven_flag=1
export pash_parallel_pipelines=0
export pash_no_parallel_pipelines=0
export pash_daemon_communicates_through_unix_pipes_flag=0
export pash_speculative_flag=0
export show_version=0
Expand Down Expand Up @@ -67,8 +67,8 @@ do
pash_checking_debug_level=1
fi

if [ "--parallel_pipelines" == "$item" ]; then
export pash_parallel_pipelines=1
if [ "--no_parallel_pipelines" == "$item" ]; then
export pash_no_parallel_pipelines=1
fi

if [ "--daemon_communicates_through_unix_pipes" == "$item" ]; then
Expand Down
21 changes: 16 additions & 5 deletions compiler/pash_compilation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ def get_averages_per_width(self, input_ir_file):

## This adds the time measurement, or just removes the entry if there is no exec_time (for space reclamation)
def handle_time_measurement(self, process_id, exec_time):
## TODO: Could put those behind the profile_driven check too to not fill memory
assert self.process_id_input_ir_map[process_id].exec_time is None
## 2023-12-08 KK: When in parallel pipelines we receive two exits (when I tried to make it one something got stuck...)
## so this assert is not true
# assert self.process_id_input_ir_map[process_id].exec_time is None

## If we don't have the exec time we do Nothing
##
Expand Down Expand Up @@ -315,7 +316,11 @@ def compile_and_add(self, compiled_script_file, var_file, input_ir_file):
)

if not run_parallel:
## If we are not running in parallel everything has to finish first before scheduling for execution
self.wait_for_all()
else:
## Wait if we have more pipelines running than our current limit
self.wait_until_limit(config.pash_args.parallel_pipelines_limit)

if compile_success:
response = server_util.success_response(
Expand Down Expand Up @@ -367,18 +372,24 @@ def get_next_id(self):

def wait_for_all(self):
log(
"Waiting for all processes to finish. There are",
"Waiting for all processes to finish."
)
self.wait_until_limit(1)
self.unsafe_running = False

def wait_until_limit(self, limit: int):
log(
f"Waiting for less than {limit} processes to be running. There are",
self.running_procs,
"processes remaining.",
)
while self.running_procs > 0:
while self.running_procs >= limit:
input_cmd = self.get_input()
# must be exit command or something is wrong
if input_cmd.startswith("Exit:"):
self.handle_exit(input_cmd)
else:
raise Exception(f"Command should be exit but it was {input_cmd}")
self.unsafe_running = False

def handle_exit(self, input_cmd):
assert input_cmd.startswith("Exit:")
Expand Down
17 changes: 8 additions & 9 deletions compiler/pash_runtime.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,14 @@ else
## Invoke the compiler and make any necessary preparations
source "$RUNTIME_DIR/pash_prepare_call_compiler.sh"

function run_parallel() {
trap inform_daemon_exit SIGTERM SIGINT EXIT
export SCRIPT_TO_EXECUTE="$pash_script_to_execute"
source "$RUNTIME_DIR/pash_restore_state_and_execute.sh"
inform_daemon_exit
}

## Check if there are traps set, and if so do not execute in parallel
## TODO: This might be an overkill but is conservative
traps_set=$(trap)
pash_redir_output echo "$$: (2) Traps set: $traps_set"
# Don't fork if compilation failed. The script might have effects on the shell state.
if [ "$pash_runtime_return_code" -ne 0 ] ||
## If parallel pipelines is not enabled we shouldn't fork
[ "$pash_parallel_pipelines" -eq 0 ] ||
## If parallel pipelines is disabled using a flag we shouldn't fork
[ "$pash_no_parallel_pipelines" -eq 1 ] ||
## If parallel pipelines is explicitly disabled (e.g., due to context), no forking
[ "$pash_disable_parallel_pipelines" -eq 1 ] ||
## If traps are set, no forking
Expand Down Expand Up @@ -147,6 +140,12 @@ else

pash_redir_output echo "$$: (5) BaSh script exited with ec: $pash_runtime_final_status"
else
function run_parallel() {
trap inform_daemon_exit SIGTERM SIGINT EXIT
export SCRIPT_TO_EXECUTE="$pash_script_to_execute"
source "$RUNTIME_DIR/pash_restore_state_and_execute.sh"
inform_daemon_exit
}
# Should we redirect errors aswell?
# TODO: capturing the return state here isn't completely correct.
run_parallel "$@" <&0 &
Expand Down
2 changes: 1 addition & 1 deletion evaluation/tests/interface_tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export PASH_TOP=${PASH_TOP:-$(git rev-parse --show-toplevel --show-superproject-
# time: print real in seconds, to simplify parsing

bash="bash"
pash="$PASH_TOP/pa.sh --parallel_pipelines --profile_driven"
pash="$PASH_TOP/pa.sh --profile_driven"

output_dir="$PASH_TOP/evaluation/tests/interface_tests/output"
rm -rf "$output_dir"
Expand Down
5 changes: 2 additions & 3 deletions evaluation/tests/test_evaluation_scripts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@ n_inputs=(

if [ "$EXPERIMENTAL" -eq 1 ]; then
configurations=(
# "" # Commenting this out since the tests take a lot of time to finish
"--parallel_pipelines"
""
)
else
configurations=(
"--parallel_pipelines --profile_driven"
"--profile_driven"
)
fi

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
graphviz
libdash
pash-annotations==0.2.0
pash-annotations==0.2.2
shasta==0.1.0
sh-expand>=0.1.3
Loading