Skip to content

Commit

Permalink
[router] Use borrow if possible to save cost (#2441)
Browse files Browse the repository at this point in the history
  • Loading branch information
ByronHsu authored Dec 11, 2024
1 parent d4de9a6 commit 0fb88aa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 19 deletions.
37 changes: 23 additions & 14 deletions rust/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl Router {
async fn send_request(
&self,
client: &reqwest::Client,
worker_url: String,
worker_url: &str,
route: &str,
) -> HttpResponse {
match client.get(format!("{}{}", worker_url, route)).send().await {
Expand All @@ -275,7 +275,7 @@ impl Router {

pub async fn route_to_first(&self, client: &reqwest::Client, route: &str) -> HttpResponse {
match self.select_first_worker() {
Ok(worker_url) => self.send_request(client, worker_url, route).await,
Ok(worker_url) => self.send_request(client, &worker_url, route).await,
Err(e) => HttpResponse::InternalServerError().body(e),
}
}
Expand Down Expand Up @@ -398,8 +398,8 @@ impl Router {
async fn send_generate_request(
&self,
client: &reqwest::Client,
req: HttpRequest,
body: Bytes,
req: &HttpRequest,
body: &Bytes,
route: &str,
worker_url: &str,
) -> HttpResponse {
Expand Down Expand Up @@ -484,16 +484,16 @@ impl Router {
pub async fn route_generate_request(
&self,
client: &reqwest::Client,
req: HttpRequest,
body: Bytes,
req: &HttpRequest,
body: &Bytes,
route: &str,
) -> HttpResponse {
let worker_url = self.select_generate_worker(&body, route);
self.send_generate_request(client, req, body, route, &worker_url)
.await
}

pub async fn add_worker(&self, worker_url: String) -> Result<String, String> {
pub async fn add_worker(&self, worker_url: &str) -> Result<String, String> {
let interval_secs = 10; // check every 10 seconds
let timeout_secs = 300; // 5 minutes

Expand All @@ -517,11 +517,11 @@ impl Router {
| Router::CacheAware { worker_urls, .. } => {
info!("Worker {} health check passed", worker_url);
let mut urls = worker_urls.write().unwrap();
if urls.contains(&worker_url) {
if urls.contains(&worker_url.to_string()) {
return Err(format!("Worker {} already exists", worker_url));
}
info!("Added worker: {}", worker_url);
urls.push(worker_url.clone());
urls.push(worker_url.to_string());
}
}

Expand All @@ -534,13 +534,16 @@ impl Router {
} = self
{
// Add worker to running queue with initial count of 0
running_queue.lock().unwrap().insert(worker_url.clone(), 0);
running_queue
.lock()
.unwrap()
.insert(worker_url.to_string(), 0);

// Add worker to processed queue with initial count of 0
processed_queue
.lock()
.unwrap()
.insert(worker_url.clone(), 0);
.insert(worker_url.to_string(), 0);

// Add worker to tree
tree.lock().unwrap().insert(&"".to_string(), &worker_url);
Expand Down Expand Up @@ -581,7 +584,7 @@ impl Router {
}
}

pub fn remove_worker(&self, worker_url: String) {
pub fn remove_worker(&self, worker_url: &str) {
match self {
Router::RoundRobin { worker_urls, .. }
| Router::Random { worker_urls }
Expand All @@ -602,8 +605,14 @@ impl Router {
} = self
{
tree.lock().unwrap().remove_tenant(&worker_url);
running_queue.lock().unwrap().remove(&worker_url);
processed_queue.lock().unwrap().remove(&worker_url);
running_queue
.lock()
.unwrap()
.remove(&worker_url.to_string());
processed_queue
.lock()
.unwrap()
.remove(&worker_url.to_string());
info!(
"Removed worker from tree and cleaned up queues: {}",
worker_url
Expand Down
10 changes: 5 additions & 5 deletions rust/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn get_model_info(data: web::Data<AppState>) -> impl Responder {
#[post("/generate")]
async fn generate(req: HttpRequest, body: Bytes, data: web::Data<AppState>) -> impl Responder {
data.router
.route_generate_request(&data.client, req, body, "/generate")
.route_generate_request(&data.client, &req, &body, "/generate")
.await
}

Expand All @@ -74,7 +74,7 @@ async fn v1_chat_completions(
data: web::Data<AppState>,
) -> impl Responder {
data.router
.route_generate_request(&data.client, req, body, "/v1/chat/completions")
.route_generate_request(&data.client, &req, &body, "/v1/chat/completions")
.await
}

Expand All @@ -85,7 +85,7 @@ async fn v1_completions(
data: web::Data<AppState>,
) -> impl Responder {
data.router
.route_generate_request(&data.client, req, body, "/v1/completions")
.route_generate_request(&data.client, &req, &body, "/v1/completions")
.await
}

Expand All @@ -102,7 +102,7 @@ async fn add_worker(
}
};

match data.router.add_worker(worker_url).await {
match data.router.add_worker(&worker_url).await {
Ok(message) => HttpResponse::Ok().body(message),
Err(error) => HttpResponse::BadRequest().body(error),
}
Expand All @@ -117,7 +117,7 @@ async fn remove_worker(
Some(url) => url.to_string(),
None => return HttpResponse::BadRequest().finish(),
};
data.router.remove_worker(worker_url);
data.router.remove_worker(&worker_url);
HttpResponse::Ok().finish()
}

Expand Down

0 comments on commit 0fb88aa

Please sign in to comment.