Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[router] Health check on worker before added to the router #2392

Merged
merged 1 commit into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ http = "1.1.0"
env_logger = "0.11.5"
log = "0.4.22"
chrono = "0.4.38"
tokio = "1.42.0"

[profile.release]
lto = "thin"
Expand Down
28 changes: 11 additions & 17 deletions rust/py_test/test_launch_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def popen_launch_router(
base_url: str,
dp_size: int,
timeout: float,
policy: str = "cache_aware",
):
"""
Launch the router server process.
Expand All @@ -29,6 +30,7 @@ def popen_launch_router(
base_url: Server base URL
dp_size: Data parallel size
timeout: Server launch timeout
policy: Router policy, one of "cache_aware", "round_robin", "random"
"""
_, host, port = base_url.split(":")
host = host[2:]
Expand All @@ -47,11 +49,10 @@ def popen_launch_router(
str(dp_size), # Convert dp_size to string
"--router-eviction-interval",
"5", # frequent eviction for testing
"--router-policy",
policy,
]

# Use current environment
env = None

process = subprocess.Popen(command, stdout=None, stderr=None)

start_time = time.time()
Expand Down Expand Up @@ -99,19 +100,8 @@ def popen_launch_server(

process = subprocess.Popen(command, stdout=None, stderr=None)

start_time = time.time()
with requests.Session() as session:
while time.time() - start_time < timeout:
try:
response = session.get(f"{base_url}/health")
if response.status_code == 200:
print(f"Server {base_url} is healthy")
return process
except requests.RequestException:
pass
time.sleep(10)

raise TimeoutError("Server failed to start within the timeout period.")
# intentionally don't wait and defer the job to the router health check
return process


class TestLaunchServer(unittest.TestCase):
Expand All @@ -135,6 +125,7 @@ def test_mmlu(self):
self.base_url,
dp_size=2,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
policy="cache_aware",
)

args = SimpleNamespace(
Expand All @@ -160,6 +151,7 @@ def test_add_and_remove_worker(self):
self.base_url,
dp_size=1,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
policy="round_robin", # use round robin to make sure every worker processes requests
)
# 1. start a worker, and wait until it is healthy
port = find_available_port()
Expand All @@ -168,11 +160,13 @@ def test_add_and_remove_worker(self):
self.model, worker_url, DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH
)
TestLaunchServer.other_process.append(worker_process)
# 2. use /add_worker api to add it the the router

# 2. use /add_worker api to add it the the router. It will be used by router after it is healthy
with requests.Session() as session:
response = session.post(f"{self.base_url}/add_worker?url={worker_url}")
print(f"status code: {response.status_code}, response: {response.text}")
self.assertEqual(response.status_code, 200)

# 3. run mmlu
args = SimpleNamespace(
base_url=self.base_url,
Expand Down
71 changes: 62 additions & 9 deletions rust/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use actix_web::http::header::{HeaderValue, CONTENT_TYPE};
use actix_web::{HttpRequest, HttpResponse};
use bytes::Bytes;
use futures_util::{StreamExt, TryStreamExt};
use log::{debug, info};
use log::{debug, info, warn};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::Duration;
use tokio;

#[derive(Debug)]
pub enum Router {
Expand Down Expand Up @@ -385,14 +386,66 @@ impl Router {
}
}

pub fn add_worker(&self, worker_url: String) {
match self {
Router::RoundRobin { worker_urls, .. }
| Router::Random { worker_urls }
| Router::CacheAware { worker_urls, .. } => {
let mut urls = worker_urls.write().unwrap();
info!("Added worker: {}", worker_url);
urls.push(worker_url);
pub async fn add_worker(&self, worker_url: String) -> HttpResponse {
let interval_secs = 10; // check every 10 seconds
let timeout_secs = 300; // 5 minutes

let start_time = std::time::Instant::now();
let client = reqwest::Client::new();

loop {
if start_time.elapsed() > Duration::from_secs(timeout_secs) {
return HttpResponse::InternalServerError().body(format!(
"Timeout {}s waiting for worker {} to become healthy",
timeout_secs, worker_url
));
}

match client.get(&format!("{}/health", worker_url)).send().await {
Ok(res) => {
if res.status().is_success() {
match self {
Router::RoundRobin { worker_urls, .. }
| Router::Random { worker_urls }
| Router::CacheAware { worker_urls, .. } => {
info!("Worker {} health check passed", worker_url);
let mut urls = worker_urls.write().unwrap();
if urls.contains(&worker_url) {
return HttpResponse::BadRequest()
.body(format!("Worker {} already exists", worker_url));
}
info!("Added worker: {}", worker_url);
urls.push(worker_url.clone());
}
}
return HttpResponse::Ok()
.body(format!("Successfully added worker: {}", worker_url));
} else {
info!(
"Worker {} health check failed with status: {}. The worker might still be starting up.",
worker_url, res.status()
);
// if the url does not have http or https prefix, warn users
if !worker_url.starts_with("http://") && !worker_url.starts_with("https://")
{
warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url);
}

tokio::time::sleep(Duration::from_secs(interval_secs)).await;
continue;
}
}
Err(e) => {
info!("Worker {} health check failed: {}", worker_url, e);

// if the url does not have http or https prefix, warn users
if !worker_url.starts_with("http://") && !worker_url.starts_with("https://") {
warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url);
}

tokio::time::sleep(Duration::from_secs(interval_secs)).await;
continue;
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions rust/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ async fn add_worker(
.body("Worker URL required. Provide 'url' query parameter")
}
};
data.router.add_worker(worker_url);
HttpResponse::Ok().finish()
data.router.add_worker(worker_url).await
}

#[post("/remove_worker")]
Expand Down
Loading