Skip to content

Commit

Permalink
[router] Health check on worker before adding to the router (#2392)
Browse files Browse the repository at this point in the history
  • Loading branch information
ByronHsu authored Dec 7, 2024
1 parent 75ae968 commit ef995da
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 31 deletions.
7 changes: 4 additions & 3 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ http = "1.1.0"
env_logger = "0.11.5"
log = "0.4.22"
chrono = "0.4.38"
tokio = "1.42.0"

[profile.release]
lto = "thin"
Expand Down
28 changes: 11 additions & 17 deletions rust/py_test/test_launch_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def popen_launch_router(
base_url: str,
dp_size: int,
timeout: float,
policy: str = "cache_aware",
):
"""
Launch the router server process.
Expand All @@ -29,6 +30,7 @@ def popen_launch_router(
base_url: Server base URL
dp_size: Data parallel size
timeout: Server launch timeout
policy: Router policy, one of "cache_aware", "round_robin", "random"
"""
_, host, port = base_url.split(":")
host = host[2:]
Expand All @@ -47,11 +49,10 @@ def popen_launch_router(
str(dp_size), # Convert dp_size to string
"--router-eviction-interval",
"5", # frequent eviction for testing
"--router-policy",
policy,
]

# Use current environment
env = None

process = subprocess.Popen(command, stdout=None, stderr=None)

start_time = time.time()
Expand Down Expand Up @@ -99,19 +100,8 @@ def popen_launch_server(

process = subprocess.Popen(command, stdout=None, stderr=None)

start_time = time.time()
with requests.Session() as session:
while time.time() - start_time < timeout:
try:
response = session.get(f"{base_url}/health")
if response.status_code == 200:
print(f"Server {base_url} is healthy")
return process
except requests.RequestException:
pass
time.sleep(10)

raise TimeoutError("Server failed to start within the timeout period.")
# intentionally don't wait and defer the job to the router health check
return process


class TestLaunchServer(unittest.TestCase):
Expand All @@ -135,6 +125,7 @@ def test_mmlu(self):
self.base_url,
dp_size=2,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
policy="cache_aware",
)

args = SimpleNamespace(
Expand All @@ -160,6 +151,7 @@ def test_add_and_remove_worker(self):
self.base_url,
dp_size=1,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
policy="round_robin", # use round robin to make sure every worker processes requests
)
# 1. start a worker, and wait until it is healthy
port = find_available_port()
Expand All @@ -168,11 +160,13 @@ def test_add_and_remove_worker(self):
self.model, worker_url, DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH
)
TestLaunchServer.other_process.append(worker_process)
# 2. use /add_worker api to add it the the router

# 2. use /add_worker api to add it the the router. It will be used by router after it is healthy
with requests.Session() as session:
response = session.post(f"{self.base_url}/add_worker?url={worker_url}")
print(f"status code: {response.status_code}, response: {response.text}")
self.assertEqual(response.status_code, 200)

# 3. run mmlu
args = SimpleNamespace(
base_url=self.base_url,
Expand Down
71 changes: 62 additions & 9 deletions rust/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use actix_web::http::header::{HeaderValue, CONTENT_TYPE};
use actix_web::{HttpRequest, HttpResponse};
use bytes::Bytes;
use futures_util::{StreamExt, TryStreamExt};
use log::{debug, info};
use log::{debug, info, warn};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::Duration;
use tokio;

#[derive(Debug)]
pub enum Router {
Expand Down Expand Up @@ -385,14 +386,66 @@ impl Router {
}
}

pub fn add_worker(&self, worker_url: String) {
match self {
Router::RoundRobin { worker_urls, .. }
| Router::Random { worker_urls }
| Router::CacheAware { worker_urls, .. } => {
let mut urls = worker_urls.write().unwrap();
info!("Added worker: {}", worker_url);
urls.push(worker_url);
pub async fn add_worker(&self, worker_url: String) -> HttpResponse {
let interval_secs = 10; // check every 10 seconds
let timeout_secs = 300; // 5 minutes

let start_time = std::time::Instant::now();
let client = reqwest::Client::new();

loop {
if start_time.elapsed() > Duration::from_secs(timeout_secs) {
return HttpResponse::InternalServerError().body(format!(
"Timeout {}s waiting for worker {} to become healthy",
timeout_secs, worker_url
));
}

match client.get(&format!("{}/health", worker_url)).send().await {
Ok(res) => {
if res.status().is_success() {
match self {
Router::RoundRobin { worker_urls, .. }
| Router::Random { worker_urls }
| Router::CacheAware { worker_urls, .. } => {
info!("Worker {} health check passed", worker_url);
let mut urls = worker_urls.write().unwrap();
if urls.contains(&worker_url) {
return HttpResponse::BadRequest()
.body(format!("Worker {} already exists", worker_url));
}
info!("Added worker: {}", worker_url);
urls.push(worker_url.clone());
}
}
return HttpResponse::Ok()
.body(format!("Successfully added worker: {}", worker_url));
} else {
info!(
"Worker {} health check failed with status: {}. The worker might still be starting up.",
worker_url, res.status()
);
// if the url does not have http or https prefix, warn users
if !worker_url.starts_with("http://") && !worker_url.starts_with("https://")
{
warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url);
}

tokio::time::sleep(Duration::from_secs(interval_secs)).await;
continue;
}
}
Err(e) => {
info!("Worker {} health check failed: {}", worker_url, e);

// if the url does not have http or https prefix, warn users
if !worker_url.starts_with("http://") && !worker_url.starts_with("https://") {
warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url);
}

tokio::time::sleep(Duration::from_secs(interval_secs)).await;
continue;
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions rust/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ async fn add_worker(
.body("Worker URL required. Provide 'url' query parameter")
}
};
data.router.add_worker(worker_url);
HttpResponse::Ok().finish()
data.router.add_worker(worker_url).await
}

#[post("/remove_worker")]
Expand Down

0 comments on commit ef995da

Please sign in to comment.