diff --git a/Cargo.lock b/Cargo.lock index e33c76f0..19f0a9b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -420,6 +420,7 @@ dependencies = [ "chrono", "chrono-humanize", "crates-index", + "crossbeam-channel", "crossbeam-utils 0.5.0", "csv", "ctrlc", @@ -906,18 +907,18 @@ checksum = "4c7e4c2612746b0df8fed4ce0c69156021b704c9aefa360311c04e6e9e002eed" [[package]] name = "futures-channel" -version = "0.3.8" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.8" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" [[package]] name = "futures-cpupool" @@ -931,17 +932,16 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" [[package]] name = "futures-macro" -version = "0.3.8" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" dependencies = [ - "proc-macro-hack", "proc-macro2 1.0.36", "quote 1.0.7", "syn 1.0.85", @@ -949,34 +949,29 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" [[package]] name = "futures-task" -version = "0.3.8" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d" -dependencies = [ - "once_cell", -] +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" [[package]] name = "futures-util" -version = "0.3.8" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ "futures-core", "futures-io", "futures-macro", "futures-task", "memchr", - "pin-project", + "pin-project-lite", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] @@ -1085,7 +1080,7 @@ dependencies = [ "http 0.2.1", "indexmap", "slab", - "tokio 1.1.0", + "tokio 1.16.1", "tokio-util", "tracing", ] @@ -1322,7 +1317,7 @@ dependencies = [ "itoa", "pin-project", "socket2 0.4.2", - "tokio 1.1.0", + "tokio 1.16.1", "tower-service", "tracing", "want 0.3.0", @@ -1350,7 +1345,7 @@ dependencies = [ "bytes 1.0.1", "hyper 0.14.8", "native-tls", - "tokio 1.1.0", + "tokio 1.16.1", "tokio-native-tls", ] @@ -2104,9 +2099,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.0" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" +checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" [[package]] name = "pin-utils" @@ -2155,18 +2150,6 @@ dependencies = [ "treeline", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" - -[[package]] -name = "proc-macro-nested" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" - [[package]] name = "proc-macro2" version = "0.4.30" @@ -2564,7 +2547,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded 0.7.0", - "tokio 1.1.0", + "tokio 1.16.1", "tokio-native-tls", "url 2.2.0", "wasm-bindgen", @@ -2705,7 +2688,7 @@ dependencies = [ "tar", "tempfile", "thiserror", - "tokio 1.1.0", + "tokio 1.16.1", "tokio-stream", "toml 0.5.7", "walkdir", @@ -3253,11 +3236,10 @@ dependencies = [ [[package]] name = "tokio" -version = "1.1.0" +version = "1.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8efab2086f17abcddb8f756117665c958feee6b2e39974c2f1600592ab3a4195" +checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a" dependencies = [ - "autocfg 1.0.1", "bytes 1.0.1", "libc", "memchr", @@ -3340,7 +3322,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" dependencies = [ "native-tls", - "tokio 1.1.0", + "tokio 1.16.1", ] [[package]] @@ -3370,7 +3352,7 @@ checksum = "76066865172052eb8796c686f0b441a93df8b08d40a950b062ffb9a426f00edd" dependencies = [ "futures-core", "pin-project-lite", - "tokio 1.1.0", + "tokio 1.16.1", ] [[package]] @@ -3470,7 +3452,7 @@ dependencies = [ "futures-sink", "log 0.4.11", "pin-project-lite", - "tokio 1.1.0", + "tokio 1.16.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 822657cc..82693627 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ chrono = { version = "0.4", features = ["serde"] } chrono-humanize = "0.1.1" crates-index = "0.16.2" crossbeam-utils = "0.5" +crossbeam-channel = "0.5" csv = "1.0.2" dotenv = "0.13" failure = "0.1.3" diff --git a/src/agent/api.rs b/src/agent/api.rs index c2a35e93..64adb530 100644 --- a/src/agent/api.rs +++ b/src/agent/api.rs @@ -36,6 +36,7 @@ impl ResponseExt for ::reqwest::blocking::Response { match self.status() { StatusCode::NOT_FOUND => return Err(AgentApiError::InvalidEndpoint.into()), StatusCode::BAD_GATEWAY + | StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE | StatusCode::GATEWAY_TIMEOUT => { return Err(AgentApiError::ServerUnavailable.into()); @@ -50,6 +51,7 @@ impl ResponseExt for ::reqwest::blocking::Response { .with_context(|_| format!("failed to parse API response (status code {})", status,))?; match result { ApiResponse::Success { result } => Ok(result), + ApiResponse::SlowDown => Err(AgentApiError::ServerUnavailable.into()), ApiResponse::InternalError { error } => { Err(AgentApiError::InternalServerError(error).into()) } diff --git a/src/server/api_types.rs b/src/server/api_types.rs index 0b93fabe..61430182 100644 --- a/src/server/api_types.rs +++ b/src/server/api_types.rs @@ -20,6 +20,7 @@ pub struct AgentConfig { #[serde(tag = "status", rename_all = "kebab-case")] pub enum ApiResponse { Success { result: T }, + SlowDown, InternalError { error: String }, Unauthorized, NotFound, @@ -41,8 +42,9 @@ impl ApiResponse<()> { impl ApiResponse { fn status_code(&self) -> StatusCode { - match *self { + match self { ApiResponse::Success { .. } => StatusCode::OK, + ApiResponse::SlowDown => StatusCode::TOO_MANY_REQUESTS, ApiResponse::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR, ApiResponse::Unauthorized => StatusCode::UNAUTHORIZED, ApiResponse::NotFound => StatusCode::NOT_FOUND, diff --git a/src/server/metrics.rs b/src/server/metrics.rs index 0fc0cb6c..24f73a27 100644 --- a/src/server/metrics.rs +++ b/src/server/metrics.rs @@ -5,8 +5,8 @@ use crate::server::agents::Agent; use chrono::{DateTime, Utc}; use prometheus::proto::{Metric, MetricFamily}; use prometheus::{ - HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, __register_counter_vec, __register_gauge, - __register_gauge_vec, + HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, __register_counter_vec, + __register_gauge, __register_gauge_vec, opts, register_counter, register_int_counter, }; const JOBS_METRIC: &str = "crater_completed_jobs_total"; @@ -18,6 +18,7 @@ const ENDPOINT_TIME: &str = "crater_endpoint_time_seconds"; #[derive(Clone)] pub struct Metrics { crater_completed_jobs_total: IntCounterVec, + pub crater_bounced_record_progress: IntCounter, crater_agent_failure: IntCounterVec, crater_work_status: IntGaugeVec, crater_last_crates_update: IntGauge, @@ -29,6 +30,10 @@ impl Metrics { let jobs_opts = prometheus::opts!(JOBS_METRIC, "total completed jobs"); let crater_completed_jobs_total = prometheus::register_int_counter_vec!(jobs_opts, &["agent", "experiment"])?; + let crater_bounced_record_progress = prometheus::register_int_counter!( + "crater_bounced_record_progress", + "hits with full record progress queue" + )?; let failure_opts = prometheus::opts!(AGENT_FAILED, "total completed jobs"); let crater_agent_failure = prometheus::register_int_counter_vec!(failure_opts, &["agent", "experiment"])?; @@ -47,6 +52,7 @@ impl Metrics { Ok(Metrics { crater_completed_jobs_total, + crater_bounced_record_progress, crater_agent_failure, crater_work_status, crater_last_crates_update, diff --git a/src/server/mod.rs b/src/server/mod.rs index 1ac0ccfb..b6e83683 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -44,6 +44,7 @@ pub struct Data { pub agents: Agents, pub db: Database, pub reports_worker: reports::ReportsWorker, + pub record_progress_worker: routes::agent::RecordProgressThread, pub acl: ACL, pub metrics: Metrics, } @@ -80,6 +81,10 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> { info!("initialized metrics..."); let data = Data { + record_progress_worker: routes::agent::RecordProgressThread::new( + db.clone(), + metrics.clone(), + ), config, tokens, agents, @@ -100,7 +105,9 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> { let data = Arc::new(data); let github_data = github_data.map(Arc::new); + let record_progress_worker = data.record_progress_worker.clone(); let routes = warp::any() + .and(warp::any().map(move || record_progress_worker.clone().start_request())) .and( warp::any() .and( @@ -118,13 +125,15 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> { .or(routes::ui::routes(data)) .unify(), ) - .map(|mut resp: Response| { - resp.headers_mut().insert( - http::header::SERVER, - HeaderValue::from_static(&SERVER_HEADER), - ); - resp - }); + .map( + |_guard: routes::agent::RequestGuard, mut resp: Response| { + resp.headers_mut().insert( + http::header::SERVER, + HeaderValue::from_static(&SERVER_HEADER), + ); + resp + }, + ); warp::serve(routes).run(bind); diff --git a/src/server/routes/agent.rs b/src/server/routes/agent.rs index 6d848910..3d11aec9 100644 --- a/src/server/routes/agent.rs +++ b/src/server/routes/agent.rs @@ -6,11 +6,12 @@ use crate::server::api_types::{AgentConfig, ApiResponse}; use crate::server::auth::{auth_filter, AuthDetails, TokenType}; use crate::server::messages::Message; use crate::server::{Data, GithubData, HttpError}; +use crossbeam_channel::Sender; use failure::Compat; use http::{Response, StatusCode}; use hyper::Body; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use warp::{self, Filter, Rejection}; #[derive(Deserialize)] @@ -144,6 +145,122 @@ fn endpoint_next_experiment( Ok(ApiResponse::Success { result }.into_response()?) } +#[derive(Clone)] +pub struct RecordProgressThread { + // String is the worker name + queue: Sender<(ExperimentData, String)>, + in_flight_requests: Arc<(Mutex, Condvar)>, +} + +impl RecordProgressThread { + pub fn new( + db: crate::db::Database, + metrics: crate::server::metrics::Metrics, + ) -> RecordProgressThread { + // 64 message queue, after which we start load shedding automatically. + let (tx, rx) = crossbeam_channel::bounded(64); + let in_flight_requests = Arc::new((Mutex::new(0), Condvar::new())); + + let this = RecordProgressThread { + queue: tx, + in_flight_requests, + }; + let ret = this.clone(); + std::thread::spawn(move || loop { + // Panics should already be logged and otherwise there's not much we + // can/should do. + let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let (result, worker_name) = rx.recv().unwrap(); + this.block_until_idle(); + + let start = std::time::Instant::now(); + + if let Some(ex) = Experiment::get(&db, &result.experiment_name).unwrap() { + metrics.record_completed_jobs( + &worker_name, + &ex.name, + result.data.results.len() as i64, + ); + + let db = DatabaseDB::new(&db); + if let Err(e) = db.store(&ex, &result.data, EncodingType::Plain) { + // Failing to record a result is basically fine -- this + // just means that we'll have to re-try this job. + log::error!("Failed to store result into database: {:?}", e); + crate::utils::report_failure(&e); + } + + metrics + .crater_endpoint_time + .with_label_values(&["record_progress"]) + .observe(start.elapsed().as_secs_f64()); + } + })); + }); + + ret + } + + pub fn block_until_idle(&self) { + // Wait until there are zero in-flight requests. + // + // Note: We do **not** keep the lock here for the subsequent + // computation. That means that if we ever observe zero, then we're + // going to kick off the below computation; obviously requests may keep + // coming in -- we don't want to block those requests. + // + // The expectation that we will see zero here also implies that + // the server is *sometimes* idle (i.e., we are not constantly + // processing requests at 100% load). It's not clear that's 100% + // a valid assumption, but if we are at 100% load in terms of + // requests coming in, that's a problem in and of itself (since + // the majority of expected requests are record-progress, which + // should be *very* fast now that the work for them is async and + // offloaded to this thread). + // + // Ignore the mutex guard (see above). + drop( + self.in_flight_requests + .1 + .wait_while( + self.in_flight_requests + .0 + .lock() + .unwrap_or_else(|l| l.into_inner()), + |g| *g != 0, + ) + .unwrap_or_else(|g| g.into_inner()), + ); + } + + pub fn start_request(&self) -> RequestGuard { + *self + .in_flight_requests + .0 + .lock() + .unwrap_or_else(|l| l.into_inner()) += 1; + RequestGuard { + thread: self.clone(), + } + } +} + +pub struct RequestGuard { + thread: RecordProgressThread, +} + +impl Drop for RequestGuard { + fn drop(&mut self) { + *self + .thread + .in_flight_requests + .0 + .lock() + .unwrap_or_else(|l| l.into_inner()) -= 1; + self.thread.in_flight_requests.1.notify_one(); + } +} + // This endpoint does not use the mutex data wrapper to exclude running in // parallel with other endpoints, which may mean that we (for example) are // recording results for an abort'd experiment. This should generally be fine -- @@ -159,22 +276,18 @@ fn endpoint_record_progress( data: Arc, auth: AuthDetails, ) -> Fallible> { - let start = std::time::Instant::now(); - let ex = Experiment::get(&data.db, &result.experiment_name)? - .ok_or_else(|| err_msg("no experiment run by this agent"))?; - - data.metrics - .record_completed_jobs(&auth.name, &ex.name, result.data.results.len() as i64); - - let db = DatabaseDB::new(&data.db); - db.store(&ex, &result.data, EncodingType::Plain)?; - - let ret = Ok(ApiResponse::Success { result: true }.into_response()?); - data.metrics - .crater_endpoint_time - .with_label_values(&["record_progress"]) - .observe(start.elapsed().as_secs_f64()); - ret + match data + .record_progress_worker + .queue + .try_send((result, auth.name)) + { + Ok(()) => Ok(ApiResponse::Success { result: true }.into_response()?), + Err(crossbeam_channel::TrySendError::Full(_)) => { + data.metrics.crater_bounced_record_progress.inc_by(1); + Ok(ApiResponse::<()>::SlowDown.into_response()?) + } + Err(crossbeam_channel::TrySendError::Disconnected(_)) => unreachable!(), + } } fn endpoint_heartbeat(data: Arc, auth: AuthDetails) -> Fallible> {