Skip to content

Commit

Permalink
add number_workers back for scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
superstar54 committed Aug 29, 2024
1 parent 60b9335 commit 139a004
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
11 changes: 7 additions & 4 deletions aiida_workgraph/cli/cmd_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from aiida.cmdline.utils import decorators, echo
from aiida.cmdline.params import options
from aiida.cmdline.commands.cmd_daemon import validate_daemon_workers
from aiida_workgraph.engine.scheduler.client import get_scheduler_client
import sys

Expand Down Expand Up @@ -40,17 +41,18 @@ def worker():

@scheduler.command()
@click.option("--foreground", is_flag=True, help="Run in foreground.")
@click.argument("number", required=False, type=int, callback=validate_daemon_workers)
@options.TIMEOUT(default=None, required=False, type=int)
@decorators.with_dbenv()
@decorators.requires_broker
@decorators.check_circus_zmq_version
def start(foreground, timeout):
def start(foreground, number, timeout):
"""Start the scheduler application."""

click.echo("Starting the scheduler process...")

client = get_scheduler_client()
client.start_daemon(foreground=foreground)
client.start_daemon(number_workers=number, foreground=foreground)


@scheduler.command()
Expand Down Expand Up @@ -86,18 +88,19 @@ def stop(ctx, no_wait, all_profiles, timeout):

@scheduler.command(hidden=True)
@click.option("--foreground", is_flag=True, help="Run in foreground.")
@click.argument("number", required=False, type=int, callback=validate_daemon_workers)
@decorators.with_dbenv()
@decorators.requires_broker
@decorators.check_circus_zmq_version
def start_circus(foreground):
def start_circus(foreground, number):
"""This will actually launch the circus daemon, either daemonized in the background or in the foreground.
If run in the foreground all logs are redirected to stdout.
.. note:: this should not be called directly from the commandline!
"""

get_scheduler_client()._start_daemon(foreground=foreground)
get_scheduler_client()._start_daemon(number_workers=number, foreground=foreground)


@scheduler.command()
Expand Down
11 changes: 8 additions & 3 deletions aiida_workgraph/engine/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aiida.common import InvalidOperation
from aiida.common.log import AIIDA_LOGGER
from aiida.manage import manager
from aiida.orm import ProcessNode
from aiida.orm import ProcessNode, load_node

from aiida.engine.processes.builder import ProcessBuilder
from aiida.engine.processes.functions import get_stack_size
Expand All @@ -15,6 +15,7 @@

import signal
import sys
import os
import inspect
from typing import (
Type,
Expand Down Expand Up @@ -271,7 +272,8 @@ def start_scheduler_worker(foreground: bool = False) -> None:
for s in signals:
# https://github.com/python/mypy/issues/12557
runner.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown_worker(runner))) # type: ignore[misc]

# get current process of this thread
current_process = os.getpid()
try:
running_scheduler = get_scheduler()
runner_loop = runner.loop
Expand All @@ -286,10 +288,13 @@ def start_scheduler_worker(foreground: bool = False) -> None:
communicator=None, pid=running_scheduler, nowait=True
)
)
process = load_node(running_scheduler)
process.base.extras.set("daemon_pid", current_process)

except ValueError:
process_inited = instantiate_process(runner, WorkGraphScheduler)
process_inited.base.extras.set("daemon_pid", current_process)
runner.loop.create_task(process_inited.step_until_terminated())

try:
LOGGER.info("Starting a daemon worker")
runner.start()
Expand Down
14 changes: 10 additions & 4 deletions aiida_workgraph/engine/scheduler/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def filepaths(self):
},
},
"daemon": {
"log": str(DAEMON_LOG_DIR / f"aiida-{self.profile.name}.log"),
"pid": str(DAEMON_DIR / f"aiida-{self.profile.name}.pid"),
"log": str(DAEMON_LOG_DIR / f"aiida-scheduler-{self.profile.name}.log"),
"pid": str(DAEMON_DIR / f"aiida-scheduler-{self.profile.name}.pid"),
},
}

Expand Down Expand Up @@ -102,6 +102,7 @@ def cmd_start_daemon(
self.profile.name,
"scheduler",
"start-circus",
str(number_workers),
]

if foreground:
Expand All @@ -114,7 +115,7 @@ def cmd_start_daemon_worker(self) -> list[str]:
"""Return the command to start a daemon worker process."""
return [self._workgraph_bin, "-p", self.profile.name, "scheduler", "worker"]

def _start_daemon(self, foreground: bool = False) -> None:
def _start_daemon(self, number_workers: int = 1, foreground: bool = False) -> None:
"""Start the daemon.
.. warning:: This will daemonize the current process and put it in the background. It is most likely not what
Expand All @@ -130,6 +131,11 @@ def _start_daemon(self, foreground: bool = False) -> None:
from circus.pidfile import Pidfile
from circus.util import check_future_exception_and_log, configure_logger

if foreground and number_workers > 1:
raise ValueError(
"can only run a single worker when running in the foreground"
)

loglevel = self.loglevel
logoutput = "-"

Expand All @@ -149,7 +155,7 @@ def _start_daemon(self, foreground: bool = False) -> None:
{
"cmd": " ".join(self.cmd_start_daemon_worker),
"name": self.daemon_name,
"numprocesses": 1,
"numprocesses": number_workers,
"virtualenv": self.virtualenv,
"copy_env": True,
"stdout_stream": {
Expand Down

0 comments on commit 139a004

Please sign in to comment.