Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: scheduler #275

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aiida_workgraph/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from aiida_workgraph.cli import cmd_graph
from aiida_workgraph.cli import cmd_web
from aiida_workgraph.cli import cmd_task
from aiida_workgraph.cli import cmd_scheduler


__all__ = ["cmd_graph", "cmd_web", "cmd_task"]
__all__ = ["cmd_graph", "cmd_web", "cmd_task", "cmd_scheduler"]
175 changes: 175 additions & 0 deletions aiida_workgraph/cli/cmd_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from aiida_workgraph.cli.cmd_workgraph import workgraph
import click
from pathlib import Path
from aiida.cmdline.utils import decorators, echo
from aiida.cmdline.params import options
from aiida_workgraph.engine.scheduler.client import get_scheduler_client
import sys

REACT_PORT = "3000"


def get_package_root():
"""Returns the root directory of the package."""
current_file = Path(__file__)
# Root directory of your package
return current_file.parent


def get_pid_file_path():
"""Get the path to the PID file in the desired directory."""
from aiida.manage.configuration.settings import AIIDA_CONFIG_FOLDER

return AIIDA_CONFIG_FOLDER / "scheduler_processes.pid"


@workgraph.group("scheduler")
def scheduler():
"""Commands to manage the scheduler process."""


@scheduler.command()
def worker():
"""Start the scheduler application."""
from aiida_workgraph.engine.launch import start_scheduler_worker

click.echo("Starting the scheduler worker...")

start_scheduler_worker()


@scheduler.command()
@click.option("--foreground", is_flag=True, help="Run in foreground.")
@options.TIMEOUT(default=None, required=False, type=int)
@decorators.with_dbenv()
@decorators.requires_broker
@decorators.check_circus_zmq_version
def start(foreground, timeout):
"""Start the scheduler application."""

click.echo("Starting the scheduler process...")

client = get_scheduler_client()
client.start_daemon(foreground=foreground)


@scheduler.command()
@click.option("--no-wait", is_flag=True, help="Do not wait for confirmation.")
@click.option("--all", "all_profiles", is_flag=True, help="Stop all daemons.")
@options.TIMEOUT(default=None, required=False, type=int)
@decorators.requires_broker
@click.pass_context
def stop(ctx, no_wait, all_profiles, timeout):
"""Stop the scheduler daemon.

Returns exit code 0 if the daemon was shut down successfully (or was not running), non-zero if there was an error.
"""
if all_profiles is True:
profiles = [
profile
for profile in ctx.obj.config.profiles
if not profile.is_test_profile
]
else:
profiles = [ctx.obj.profile]

for profile in profiles:
echo.echo("Profile: ", fg=echo.COLORS["report"], bold=True, nl=False)
echo.echo(f"{profile.name}", bold=True)
echo.echo("Stopping the daemon... ", nl=False)
try:
client = get_scheduler_client()
client.stop_daemon(wait=not no_wait, timeout=timeout)
except Exception as exception:
echo.echo_error(f"Failed to stop the daemon: {exception}")


@scheduler.command(hidden=True)
@click.option("--foreground", is_flag=True, help="Run in foreground.")
@decorators.with_dbenv()
@decorators.requires_broker
@decorators.check_circus_zmq_version
def start_circus(foreground):
"""This will actually launch the circus daemon, either daemonized in the background or in the foreground.

If run in the foreground all logs are redirected to stdout.

.. note:: this should not be called directly from the commandline!
"""

get_scheduler_client()._start_daemon(foreground=foreground)


@scheduler.command()
@click.option("--all", "all_profiles", is_flag=True, help="Show status of all daemons.")
@options.TIMEOUT(default=None, required=False, type=int)
@click.pass_context
@decorators.requires_loaded_profile()
@decorators.requires_broker
def status(ctx, all_profiles, timeout):
"""Print the status of the scheduler daemon.

Returns exit code 0 if all requested daemons are running, else exit code 3.
"""
from tabulate import tabulate

from aiida.cmdline.utils.common import format_local_time
from aiida.engine.daemon.client import DaemonException

if all_profiles is True:
profiles = [
profile
for profile in ctx.obj.config.profiles
if not profile.is_test_profile
]
else:
profiles = [ctx.obj.profile]

daemons_running = []

for profile in profiles:
client = get_scheduler_client(profile.name)
echo.echo("Profile: ", fg=echo.COLORS["report"], bold=True, nl=False)
echo.echo(f"{profile.name}", bold=True)

try:
client.get_status(timeout=timeout)
except DaemonException as exception:
echo.echo_error(str(exception))
daemons_running.append(False)
continue

worker_response = client.get_worker_info()
daemon_response = client.get_daemon_info()

workers = []
for pid, info in worker_response["info"].items():
if isinstance(info, dict):
row = [
pid,
info["mem"],
info["cpu"],
format_local_time(info["create_time"]),
]
else:
row = [pid, "-", "-", "-"]
workers.append(row)

if workers:
workers_info = tabulate(
workers, headers=["PID", "MEM %", "CPU %", "started"], tablefmt="simple"
)
else:
workers_info = (
"--> No workers are running. Use `verdi daemon incr` to start some!\n"
)

start_time = format_local_time(daemon_response["info"]["create_time"])
echo.echo(
f'Daemon is running as PID {daemon_response["info"]["pid"]} since {start_time}\n'
f"Active workers [{len(workers)}]:\n{workers_info}\n"
"Use `verdi daemon [incr | decr] [num]` to increase / decrease the number of workers"
)

if not all(daemons_running):
sys.exit(3)
Loading
Loading