Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acto early-stopping controller #386

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions acto/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@
help="Only generate test cases without executing them",
)
parser.add_argument("--checkonly", action="store_true")
parser.add_argument(
"--num-alarms",
dest="num_alarms",
type=int,
help="Number of alarms to early stop running",
)
parser.add_argument(
"--time-duration",
dest="time_duration",
type=int,
help="Approximate running time (minutes) to early stop",
)
parser.add_argument(

Check warning on line 102 in acto/__main__.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 90-102
"--hard-time-bound",
dest="hard_time_bound",
action="store_true",
help="Use hard time bound to early stop",
)

args = parser.parse_args()

Expand Down Expand Up @@ -149,6 +167,9 @@
apply_testcase_f=apply_testcase_f,
delta_from=None,
focus_fields=config.focus_fields,
num_alarms=args.num_alarms,
time_duration=args.time_duration,
hard_time_bound = args.hard_time_bound,
)
generation_time = datetime.now()
logger.info("Acto initialization finished in %s", generation_time - start_time)
Expand Down
38 changes: 38 additions & 0 deletions acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@
from acto.serialization import ActoEncoder, ContextEncoder
from acto.snapshot import Snapshot
from acto.utils import (
AlarmCounter,
delete_operator_pod,
get_early_stop_time,
get_yaml_existing_namespace,
process_crd,
terminate_threads,
update_preload_images,
)
from acto.utils.thread_logger import get_thread_logger, set_thread_logger_prefix
Expand Down Expand Up @@ -236,6 +239,8 @@
apply_testcase_f: Callable,
acto_namespace: int,
additional_exclude_paths: Optional[list[str]] = None,
alarm_counter: AlarmCounter = None,
early_stop_time: time.time = None,
) -> None:
self.context = context
self.workdir = workdir
Expand Down Expand Up @@ -275,6 +280,9 @@
self.apply_testcase_f = apply_testcase_f
self.curr_trial = 0

self.alarm_counter = alarm_counter
self.early_stop_time = early_stop_time

Check warning on line 284 in acto/engine.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 283-284

def run(
self,
errors: list[Optional[OracleResults]],
Expand All @@ -296,7 +304,15 @@
logger.info("Test finished")
break

if self.alarm_counter is not None and self.alarm_counter.judge(self.worker_id):
logger.info("Test finshed for reaching the number of alarms")
break

trial_start_time = time.time()
if self.early_stop_time is not None and self.early_stop_time < trial_start_time:
logger.info("Test finshed for reaching early stop time")
break

Check warning on line 314 in acto/engine.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 307-314

self.cluster.restart_cluster(self.cluster_name, self.kubeconfig)
apiclient = kubernetes_client(self.kubeconfig, self.context_name)
self.cluster.load_images(self.images_archive, self.cluster_name)
Expand Down Expand Up @@ -549,6 +565,13 @@
generation,
)
if run_result.oracle_result.is_error():

# is alarm, alarm count plus one
if self.alarm_counter is not None and not run_result.is_invalid_input():
self.alarm_counter.increment()
logger.info(f"Alarm count plus one, current count is {self.alarm_counter.get_count()}")

Check warning on line 572 in acto/engine.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 570-572


# before return, run the recovery test case
logger.info("Error result, running recovery")
run_result.oracle_result.differential = self.run_recovery(
Expand Down Expand Up @@ -765,6 +788,9 @@
mount: Optional[list] = None,
focus_fields: Optional[list] = None,
acto_namespace: int = 0,
num_alarms: int = None,
time_duration: int = None,
hard_time_bound: bool = False,
) -> None:
logger = get_thread_logger(with_prefix=False)

Expand Down Expand Up @@ -813,6 +839,10 @@
self.runner_type = Runner
self.checker_type = CheckerSet

self.num_alarms = num_alarms
self.time_duration = time_duration
self.hard_time_bound = hard_time_bound

self.__learn(
context_file=context_file,
helper_crd=helper_crd,
Expand Down Expand Up @@ -1060,7 +1090,9 @@
check=True,
)

alarm_counter = None if self.num_alarms is None else AlarmCounter(self.num_alarms)
start_time = time.time()
early_stop_time = get_early_stop_time(start_time, self.time_duration, self.hard_time_bound)

Check warning on line 1095 in acto/engine.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 1093-1095

errors: list[OracleResults] = []
runners: list[TrialRunner] = []
Expand All @@ -1083,6 +1115,8 @@
self.apply_testcase_f,
self.acto_namespace,
self.operator_config.diff_ignore_fields,
alarm_counter,
early_stop_time,
)
runners.append(runner)

Expand All @@ -1094,6 +1128,10 @@
)
t.start()
threads.append(t)

if self.time_duration is not None and self.hard_time_bound is True:
timer = threading.Timer((self.time_duration) * 60, terminate_threads, args=[threads])
timer.start()

Check warning on line 1134 in acto/engine.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 1132-1134

for t in threads:
t.join()
Expand Down
1 change: 1 addition & 0 deletions acto/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .early_stop import *
from .error_handler import *
from .k8s_helper import *
from .preprocess import *
Expand Down
42 changes: 42 additions & 0 deletions acto/utils/early_stop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import ctypes
import threading
import time


class AlarmCounter:
def __init__(self, bound):
self.count = 0
self.bound = bound
self.lock = threading.Lock()

Check warning on line 10 in acto/utils/early_stop.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 8-10

def increment(self, value=1):
with self.lock:
self.count += value

Check warning on line 14 in acto/utils/early_stop.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 13-14

def get_count(self):
return self.count

Check warning on line 17 in acto/utils/early_stop.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 17

def judge(self, work_id):
if self.count >= self.bound:
# print(f"Counter of thread {work_id} reached the number of alarms {self.bound}.")
return True
return False

Check warning on line 23 in acto/utils/early_stop.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 20-23


def terminate_threads(threads: list[threading.Thread]):
for thread in threads:
if thread.is_alive():
thread_id = thread.ident
# print(f"Timeout reached, terminating thread {thread_id}")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id),
ctypes.py_object(SystemExit))
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
print("Exception raise failure in kill timeout threads")


def get_early_stop_time(start_time: time.time, time_duration: int, hard_time_bound: bool):
if time_duration is None or hard_time_bound is True:
return None
early_stop_time = start_time + time_duration * 60
return early_stop_time
Loading