Skip to content

Commit

Permalink
Add SchedulerClient, use circus to start scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
superstar54 committed Aug 27, 2024
1 parent 8669675 commit 56ef506
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 92 deletions.
227 changes: 135 additions & 92 deletions aiida_workgraph/cli/cmd_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from aiida_workgraph.cli.cmd_workgraph import workgraph
from aiida import orm
import click
import os
from pathlib import Path
from aiida.cmdline.utils import echo
from .cmd_graph import REPAIR_INSTRUCTIONS

from aiida.cmdline.utils import decorators, echo
from aiida.cmdline.params import options
from aiida_workgraph.engine.scheduler.client import get_scheduler_client
import sys

REACT_PORT = "3000"

Expand All @@ -29,107 +28,151 @@ def scheduler():
"""Commands to manage the scheduler process."""


@scheduler.command()
def start_worker():
"""Start the scheduler application."""
from aiida_workgraph.engine.launch import start_scheduler_worker
# @scheduler.command()
# def worker():
# """Start the scheduler application."""
# from aiida_workgraph.engine.launch import start_scheduler_worker

click.echo("Starting the scheduler worker...")
# click.echo("Starting the scheduler worker...")

start_scheduler_worker()
# start_scheduler_worker()


@scheduler.command()
def start():
@click.option("--foreground", is_flag=True, help="Run in foreground.")
@options.TIMEOUT(default=None, required=False, type=int)
@decorators.with_dbenv()
@decorators.requires_broker
@decorators.check_circus_zmq_version
def start(foreground, timeout):
"""Start the scheduler application."""
from aiida_workgraph.engine.scheduler import WorkGraphScheduler
from aiida.engine import submit

click.echo("Starting the scheduler process...")

pid_file_path = get_pid_file_path()
# if the PID file already exists, check if the process is running
if pid_file_path.exists():
with open(pid_file_path, "r") as pid_file:
for line in pid_file:
_, pid = line.strip().split(":")
if pid:
try:
node = orm.load_node(pid)
if node.is_sealed:
click.echo(
"PID file exists but no running scheduler process found."
)
else:
click.echo(
f"Scheduler process with PID {node.pk} already running."
)
return
except Exception:
click.echo(
"PID file exists but no running scheduler process found."
)

with open(pid_file_path, "w") as pid_file:
node = submit(WorkGraphScheduler)
pid_file.write(f"Scheduler:{node.pk}\n")
click.echo(f"Scheduler process started with PID {node.pk}.")
try:
client = get_scheduler_client()
client.start_daemon(foreground=foreground)
except Exception as exception:
echo.echo(f"Failed to start the scheduler process: {exception}")


@scheduler.command()
def stop():
"""Stop the scheduler application."""
from aiida.engine.processes import control

pid_file_path = get_pid_file_path()

if not pid_file_path.exists():
click.echo("No running scheduler application found.")
return

with open(pid_file_path, "r") as pid_file:
for line in pid_file:
_, pid = line.strip().split(":")
if pid:
click.confirm(
"Are you sure you want to kill the scheduler process?", abort=True
)
process = orm.load_node(pid)
try:
message = "Killed through `verdi process kill`"
control.kill_processes(
[process],
timeout=5,
wait=True,
message=message,
)
except control.ProcessTimeoutException as exception:
echo.echo_critical(f"{exception}\n{REPAIR_INSTRUCTIONS}")
os.remove(pid_file_path)
@click.option("--no-wait", is_flag=True, help="Do not wait for confirmation.")
@click.option("--all", "all_profiles", is_flag=True, help="Stop all daemons.")
@options.TIMEOUT(default=None, required=False, type=int)
@decorators.requires_broker
@click.pass_context
def stop(ctx, no_wait, all_profiles, timeout):
"""Stop the scheduler daemon.
Returns exit code 0 if the daemon was shut down successfully (or was not running), non-zero if there was an error.
"""
if all_profiles is True:
profiles = [
profile
for profile in ctx.obj.config.profiles
if not profile.is_test_profile
]
else:
profiles = [ctx.obj.profile]

for profile in profiles:
echo.echo("Profile: ", fg=echo.COLORS["report"], bold=True, nl=False)
echo.echo(f"{profile.name}", bold=True)
echo.echo("Stopping the daemon... ", nl=False)
try:
client = get_scheduler_client()
client.stop_daemon(wait=not no_wait, timeout=timeout)
except Exception as exception:
echo.echo_error(f"Failed to stop the daemon: {exception}")


@scheduler.command(hidden=True)
@click.option("--foreground", is_flag=True, help="Run in foreground.")
@decorators.with_dbenv()
@decorators.requires_broker
@decorators.check_circus_zmq_version
def start_circus(foreground):
"""This will actually launch the circus daemon, either daemonized in the background or in the foreground.
If run in the foreground all logs are redirected to stdout.
.. note:: this should not be called directly from the commandline!
"""

get_scheduler_client()._start_daemon(foreground=foreground)


@scheduler.command()
def status():
"""Check the status of the scheduler application."""
from aiida.orm import QueryBuilder
from aiida_workgraph.engine.scheduler import WorkGraphScheduler

qb = QueryBuilder()
projections = ["id"]
filters = {
"or": [
{"attributes.sealed": False},
{"attributes": {"!has_key": "sealed"}},
@click.option("--all", "all_profiles", is_flag=True, help="Show status of all daemons.")
@options.TIMEOUT(default=None, required=False, type=int)
@click.pass_context
@decorators.requires_loaded_profile()
@decorators.requires_broker
def status(ctx, all_profiles, timeout):
"""Print the status of the scheduler daemon.
Returns exit code 0 if all requested daemons are running, else exit code 3.
"""
from tabulate import tabulate

from aiida.cmdline.utils.common import format_local_time
from aiida.engine.daemon.client import DaemonException

if all_profiles is True:
profiles = [
profile
for profile in ctx.obj.config.profiles
if not profile.is_test_profile
]
}
qb.append(
WorkGraphScheduler,
filters=filters,
project=projections,
tag="process",
)
results = qb.all()
if len(results) == 0:
click.echo("No scheduler found. Please start the scheduler first.")
else:
click.echo(f"Scheduler process is running with PID: {results[0][0]}")
profiles = [ctx.obj.profile]

daemons_running = []

for profile in profiles:
client = get_scheduler_client(profile.name)
echo.echo("Profile: ", fg=echo.COLORS["report"], bold=True, nl=False)
echo.echo(f"{profile.name}", bold=True)

try:
client.get_status(timeout=timeout)
except DaemonException as exception:
echo.echo_error(str(exception))
daemons_running.append(False)
continue

worker_response = client.get_worker_info()
daemon_response = client.get_daemon_info()

workers = []
for pid, info in worker_response["info"].items():
if isinstance(info, dict):
row = [
pid,
info["mem"],
info["cpu"],
format_local_time(info["create_time"]),
]
else:
row = [pid, "-", "-", "-"]
workers.append(row)

if workers:
workers_info = tabulate(
workers, headers=["PID", "MEM %", "CPU %", "started"], tablefmt="simple"
)
else:
workers_info = (
"--> No workers are running. Use `verdi daemon incr` to start some!\n"
)

start_time = format_local_time(daemon_response["info"]["create_time"])
echo.echo(
f'Daemon is running as PID {daemon_response["info"]["pid"]} since {start_time}\n'
f"Active workers [{len(workers)}]:\n{workers_info}\n"
"Use `verdi daemon [incr | decr] [num]` to increase / decrease the number of workers"
)

if not all(daemons_running):
sys.exit(3)
3 changes: 3 additions & 0 deletions aiida_workgraph/engine/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .scheduler import WorkGraphScheduler

__all__ = ("WorkGraphScheduler",)
Loading

0 comments on commit 56ef506

Please sign in to comment.