Skip to content

Commit

Permalink
Web: add setting for scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
superstar54 committed Sep 2, 2024
1 parent 8299fe5 commit 5a29c79
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 52 deletions.
6 changes: 4 additions & 2 deletions aiida_workgraph/engine/scheduler/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,10 @@ def start_scheduler_process(number: int = 1) -> None:

# Restart existing schedulers if they exceed the number to start
for pk in schedulers[:number]:
create_scheduler_action(pk)
print(f"Scheduler with pk {pk} running.")
# When the runner stop, the runner does not ack back to rmq,
# so the msg is still in the queue, and the msg is not acked,
# we don't need send the msg to continue again
print(f"Scheduler with pk {pk} restart and running.")
count += 1
# not running
for pk in schedulers[number:]:
Expand Down
2 changes: 2 additions & 0 deletions aiida_workgraph/engine/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,8 @@ def read_wgdata_from_base(self, pk: int) -> t.Dict[str, t.Any]:
if isinstance(prop["value"], PickledLocalFunction):
prop["value"] = prop["value"].value
wgdata["error_handlers"] = deserialize_unsafe(wgdata["error_handlers"])
wgdata["context"] = deserialize_unsafe(wgdata["context"])

return wgdata, node

def update_workgraph_from_base(self, pk: int) -> None:
Expand Down
2 changes: 2 additions & 0 deletions aiida_workgraph/web/backend/app/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from fastapi.middleware.cors import CORSMiddleware
from aiida.manage import manager
from aiida_workgraph.web.backend.app.daemon import router as daemon_router
from aiida_workgraph.web.backend.app.scheduler import router as scheduler_router
from aiida_workgraph.web.backend.app.workgraph import router as workgraph_router
from aiida_workgraph.web.backend.app.datanode import router as datanode_router
from fastapi.staticfiles import StaticFiles
Expand Down Expand Up @@ -47,6 +48,7 @@ async def read_root() -> dict:
app.include_router(workgraph_router)
app.include_router(datanode_router)
app.include_router(daemon_router)
app.include_router(scheduler_router)


@app.get("/debug")
Expand Down
12 changes: 6 additions & 6 deletions aiida_workgraph/web/backend/app/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class DaemonStatusModel(BaseModel):
)


@router.get("/api/daemon/status", response_model=DaemonStatusModel)
@router.get("/api/daemon/task/status", response_model=DaemonStatusModel)
@with_dbenv()
async def get_daemon_status() -> DaemonStatusModel:
"""Return the daemon status."""
Expand All @@ -36,7 +36,7 @@ async def get_daemon_status() -> DaemonStatusModel:
return DaemonStatusModel(running=True, num_workers=response["numprocesses"])


@router.get("/api/daemon/worker")
@router.get("/api/daemon/task/worker")
@with_dbenv()
async def get_daemon_worker():
"""Return the daemon status."""
Expand All @@ -50,7 +50,7 @@ async def get_daemon_worker():
return response["info"]


@router.post("/api/daemon/start", response_model=DaemonStatusModel)
@router.post("/api/daemon/task/start", response_model=DaemonStatusModel)
@with_dbenv()
async def get_daemon_start() -> DaemonStatusModel:
"""Start the daemon."""
Expand All @@ -69,7 +69,7 @@ async def get_daemon_start() -> DaemonStatusModel:
return DaemonStatusModel(running=True, num_workers=response["numprocesses"])


@router.post("/api/daemon/stop", response_model=DaemonStatusModel)
@router.post("/api/daemon/task/stop", response_model=DaemonStatusModel)
@with_dbenv()
async def get_daemon_stop() -> DaemonStatusModel:
"""Stop the daemon."""
Expand All @@ -86,7 +86,7 @@ async def get_daemon_stop() -> DaemonStatusModel:
return DaemonStatusModel(running=False, num_workers=None)


@router.post("/api/daemon/increase", response_model=DaemonStatusModel)
@router.post("/api/daemon/task/increase", response_model=DaemonStatusModel)
@with_dbenv()
async def increase_daemon_worker() -> DaemonStatusModel:
"""increase the daemon worker."""
Expand All @@ -103,7 +103,7 @@ async def increase_daemon_worker() -> DaemonStatusModel:
return DaemonStatusModel(running=False, num_workers=None)


@router.post("/api/daemon/decrease", response_model=DaemonStatusModel)
@router.post("/api/daemon/task/decrease", response_model=DaemonStatusModel)
@with_dbenv()
async def decrease_daemon_worker() -> DaemonStatusModel:
"""decrease the daemon worker."""
Expand Down
127 changes: 127 additions & 0 deletions aiida_workgraph/web/backend/app/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
"""Declaration of FastAPI router for daemon endpoints."""
from __future__ import annotations

import typing as t

from aiida.cmdline.utils.decorators import with_dbenv
from aiida.engine.daemon.client import DaemonException
from aiida_workgraph.engine.scheduler.client import get_scheduler_client
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from aiida_workgraph.engine.scheduler.client import start_scheduler_process


router = APIRouter()


class DaemonStatusModel(BaseModel):
"""Response model for daemon status."""

running: bool = Field(description="Whether the daemon is running or not.")
num_workers: t.Optional[int] = Field(
description="The number of workers if the daemon is running."
)


@router.get("/api/daemon/scheduler/status", response_model=DaemonStatusModel)
@with_dbenv()
async def get_daemon_status() -> DaemonStatusModel:
"""Return the daemon status."""
client = get_scheduler_client()

if not client.is_daemon_running:
return DaemonStatusModel(running=False, num_workers=None)

response = client.get_numprocesses()

return DaemonStatusModel(running=True, num_workers=response["numprocesses"])


@router.get("/api/daemon/scheduler/worker")
@with_dbenv()
async def get_daemon_worker():
"""Return the daemon status."""
client = get_scheduler_client()

if not client.is_daemon_running:
return {}

response = client.get_worker_info()

return response["info"]


@router.post("/api/daemon/scheduler/start", response_model=DaemonStatusModel)
@with_dbenv()
async def get_daemon_start() -> DaemonStatusModel:
"""Start the daemon."""
client = get_scheduler_client()

if client.is_daemon_running:
raise HTTPException(status_code=400, detail="The daemon is already running.")

try:
client.start_daemon()
except DaemonException as exception:
raise HTTPException(status_code=500, detail=str(exception)) from exception

response = client.get_numprocesses()
start_scheduler_process()

return DaemonStatusModel(running=True, num_workers=response["numprocesses"])


@router.post("/api/daemon/scheduler/stop", response_model=DaemonStatusModel)
@with_dbenv()
async def get_daemon_stop() -> DaemonStatusModel:
"""Stop the daemon."""
client = get_scheduler_client()

if not client.is_daemon_running:
raise HTTPException(status_code=400, detail="The daemon is not running.")

try:
client.stop_daemon()
except DaemonException as exception:
raise HTTPException(status_code=500, detail=str(exception)) from exception

return DaemonStatusModel(running=False, num_workers=None)


@router.post("/api/daemon/scheduler/increase", response_model=DaemonStatusModel)
@with_dbenv()
async def increase_daemon_worker() -> DaemonStatusModel:
"""increase the daemon worker."""
client = get_scheduler_client()

if not client.is_daemon_running:
raise HTTPException(status_code=400, detail="The daemon is not running.")

try:
client.increase_workers(1)
except DaemonException as exception:
raise HTTPException(status_code=500, detail=str(exception)) from exception

response = client.get_numprocesses()
print(response)
start_scheduler_process(response["numprocesses"])

return DaemonStatusModel(running=False, num_workers=None)


@router.post("/api/daemon/scheduler/decrease", response_model=DaemonStatusModel)
@with_dbenv()
async def decrease_daemon_worker() -> DaemonStatusModel:
"""decrease the daemon worker."""
client = get_scheduler_client()

if not client.is_daemon_running:
raise HTTPException(status_code=400, detail="The daemon is not running.")

try:
client.decrease_workers(1)
except DaemonException as exception:
raise HTTPException(status_code=500, detail=str(exception)) from exception

return DaemonStatusModel(running=False, num_workers=None)
140 changes: 96 additions & 44 deletions aiida_workgraph/web/frontend/src/components/Settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,79 +3,131 @@ import { ToastContainer, toast } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';

function Settings() {
const [workers, setWorkers] = useState([]);
const [taskWorkers, setTaskWorkers] = useState([]);
const [schedulerWorkers, setSchedulerWorkers] = useState([]);

const fetchWorkers = () => {
fetch('http://localhost:8000/api/daemon/worker')
// Fetching task workers
const fetchTaskWorkers = () => {
fetch('http://localhost:8000/api/daemon/task/worker')
.then(response => response.json())
.then(data => setWorkers(Object.values(data)))
.catch(error => console.error('Failed to fetch workers:', error));
.then(data => setTaskWorkers(Object.values(data)))
.catch(error => console.error('Failed to fetch task workers:', error));
};

// Fetching scheduler workers
const fetchSchedulerWorkers = () => {
fetch('http://localhost:8000/api/daemon/scheduler/worker')
.then(response => response.json())
.then(data => setSchedulerWorkers(Object.values(data)))
.catch(error => console.error('Failed to fetch scheduler workers:', error));
};

useEffect(() => {
fetchWorkers();
const interval = setInterval(fetchWorkers, 1000); // Poll every 5 seconds
return () => clearInterval(interval); // Clear interval on component unmount
fetchTaskWorkers();
fetchSchedulerWorkers();
const taskInterval = setInterval(fetchTaskWorkers, 1000);
const schedulerInterval = setInterval(fetchSchedulerWorkers, 1000);
return () => {
clearInterval(taskInterval);
clearInterval(schedulerInterval);
}; // Clear intervals on component unmount
}, []);

const handleDaemonControl = (action) => {
fetch(`http://localhost:8000/api/daemon/${action}`, { method: 'POST' })
const handleDaemonControl = (daemonType, action) => {
fetch(`http://localhost:8000/api/daemon/${daemonType}/${action}`, { method: 'POST' })
.then(response => {
if (!response.ok) {
throw new Error(`Daemon operation failed: ${response.statusText}`);
throw new Error(`${daemonType} daemon operation failed: ${response.statusText}`);
}
return response.json();
})
.then(data => {
toast.success(`Daemon ${action}ed successfully`);
fetchWorkers();
.then(() => {
toast.success(`${daemonType} daemon ${action}ed successfully`);
if (daemonType === 'task') {
fetchTaskWorkers();
} else {
fetchSchedulerWorkers();
}
})
.catch(error => toast.error(error.message));
};

const adjustWorkers = (action) => {
fetch(`http://localhost:8000/api/daemon/${action}`, { method: 'POST' })
const adjustWorkers = (daemonType, action) => {
fetch(`http://localhost:8000/api/daemon/${daemonType}/${action}`, { method: 'POST' })
.then(response => {
if (!response.ok) {
throw new Error(`Failed to ${action} workers: ${response.statusText}`);
throw new Error(`Failed to ${action} workers for ${daemonType}: ${response.statusText}`);
}
return response.json();
})
.then(data => {
toast.success(`Workers ${action}ed successfully`);
fetchWorkers(); // Refetch workers after adjusting
.then(() => {
toast.success(`${daemonType} Workers ${action}ed successfully`);
if (daemonType === 'task') {
fetchTaskWorkers();
} else {
fetchSchedulerWorkers();
}
})
.catch(error => toast.error(error.message));
};

return (
<div>
<h2>Daemon Control</h2>
<ToastContainer />
<table className="table">
<thead>
<tr>
<th>PID</th>
<th>Memory %</th>
<th>CPU %</th>
<th>Started</th>
</tr>
</thead>
<tbody>
{workers.map(worker => (
<tr key={worker.pid}>
<td>{worker.pid}</td>
<td>{worker.mem}</td>
<td>{worker.cpu}</td>
<td>{new Date(worker.started * 1000).toLocaleString()}</td>
<div>
<h3>Task Daemon Control</h3>
<button className="button button-start" onClick={() => handleDaemonControl('task', 'start')}>Start Task Daemon</button>
<button className="button button-stop" onClick={() => handleDaemonControl('task', 'stop')}>Stop Task Daemon</button>
<button className="button button-adjust" onClick={() => adjustWorkers('task', 'increase')}>Increase Task Workers</button>
<button className="button button-adjust" onClick={() => adjustWorkers('task', 'decrease')}>Decrease Task Workers</button>
<table className="table">
<thead>
<tr>
<th>PID</th>
<th>Memory %</th>
<th>CPU %</th>
<th>Started</th>
</tr>
</thead>
<tbody>
{taskWorkers.map(worker => (
<tr key={worker.pid}>
<td>{worker.pid}</td>
<td>{worker.mem}</td>
<td>{worker.cpu}</td>
<td>{new Date(worker.started * 1000).toLocaleString()}</td>
</tr>
))}
</tbody>
</table>
</div>
<div>
<h3>Scheduler Daemon Control</h3>
<button className="button button-start" onClick={() => handleDaemonControl('scheduler', 'start')}>Start Scheduler Daemon</button>
<button className="button button-stop" onClick={() => handleDaemonControl('scheduler', 'stop')}>Stop Scheduler Daemon</button>
<button className="button button-adjust" onClick={() => adjustWorkers('scheduler', 'increase')}>Increase Scheduler Workers</button>
<button className="button button-adjust" onClick={() => adjustWorkers('scheduler', 'decrease')}>Decrease Scheduler Workers</button>
<table className="table">
<thead>
<tr>
<th>PID</th>
<th>Memory %</th>
<th>CPU %</th>
<th>Started</th>
</tr>
))}
</tbody>
</table>
<button className="button button-start" onClick={() => handleDaemonControl('start')}>Start Daemon</button>
<button className="button button-stop" onClick={() => handleDaemonControl('stop')}>Stop Daemon</button>
<button className="button button-adjust" onClick={() => adjustWorkers('increase')}>Increase Workers</button>
<button className="button button-adjust" onClick={() => adjustWorkers('decrease')}>Decrease Workers</button>
</thead>
<tbody>
{schedulerWorkers.map(worker => (
<tr key={worker.pid}>
<td>{worker.pid}</td>
<td>{worker.mem}</td>
<td>{worker.cpu}</td>
<td>{new Date(worker.started * 1000).toLocaleString()}</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
);
}
Expand Down

0 comments on commit 5a29c79

Please sign in to comment.