From c24888054c07a4deafa8d3f0d042240879fbf3b7 Mon Sep 17 00:00:00 2001 From: Tobias Wilfert Date: Thu, 14 Jul 2022 11:26:20 +0200 Subject: [PATCH 01/19] Pair programming session, getting started --- relay-server/src/actors/healthcheck.rs | 78 ++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 5 deletions(-) diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index fd56ccb297..e89ebb38bf 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -1,9 +1,9 @@ +use core::num::flt2dec::strategy; use std::sync::Arc; -use actix::prelude::*; - -use futures::future; -use futures::prelude::*; +use actix::SystemService; +use futures03::compat::Future01CompatExt; +use tokio::sync::{mpsc, oneshot}; use relay_config::{Config, RelayMode}; use relay_metrics::{AcceptsMetrics, Aggregator}; @@ -13,6 +13,27 @@ use relay_system::{Controller, Shutdown}; use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay}; use crate::statsd::RelayGauges; +#[derive(Debug)] +struct Message { + data: T, + responder: oneshot::Sender, +} + +pub struct SendError; + +pub struct Addr { + tx: mpsc::UnboundedSender>, +} + +impl Addr { + pub async fn send(&self, data: T) -> Result { + let (responder, rx) = oneshot::channel(); + let message = Message { data, responder }; + self.tx.send(message).map_err(|_| SendError)?; + rx.await.map_err(|_| SendError) + } +} + pub struct Healthcheck { is_shutting_down: bool, config: Arc, @@ -25,8 +46,52 @@ impl Healthcheck { config, } } + + async fn handle(&mut self, message: IsHealthy) -> bool { + let upstream = UpstreamRelay::from_registry(); + + match message { + IsHealthy::Liveness => return true, + IsHealthy::Readiness => { + if self.is_shutting_down { + return false; + } + + if self.config.requires_auth() { + if !upstream + .send(IsAuthenticated) + .compat() + .await + .unwrap_or(false) + { + return false; + } + } + + Aggregator::from_registry() + .send(AcceptsMetrics) + .compat() + .await + .unwrap_or(false) + } + } + } + + pub fn start(self) -> Addr { + let (tx, mut rx) = mpsc::unbounded_channel::>(); + + tokio::spawn(async move { + while let Some(message) = rx.recv().await { + let response = self.handle(message.data).await; + message.responder.send(response).ok(); + } + }); + + Addr { tx } + } } +/* impl Actor for Healthcheck { type Context = Context; @@ -53,7 +118,9 @@ impl Handler for Healthcheck { Ok(()) } } +*/ +#[derive(Debug)] pub enum IsHealthy { /// Check if the Relay is alive at all. Liveness, @@ -61,7 +128,7 @@ pub enum IsHealthy { /// it's both live/alive and not too busy). Readiness, } - +/* impl Message for IsHealthy { type Result = Result; } @@ -110,3 +177,4 @@ impl Handler for Healthcheck { } } } +*/ From 4da1f67f487d0068fa49c84917a2b90797133a79 Mon Sep 17 00:00:00 2001 From: Tobias Wilfert Date: Thu, 21 Jul 2022 09:19:04 +0200 Subject: [PATCH 02/19] Partially updated the Healthcheck actor --- Cargo.lock | 1 + relay-server/src/actors/healthcheck.rs | 119 +++++++++++++++++++++---- relay-server/src/service.rs | 16 +++- relay-system/Cargo.toml | 1 + relay-system/src/controller.rs | 42 +++++++++ 5 files changed, 161 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 09ea7df59d..70fc4cb1bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3564,6 +3564,7 @@ dependencies = [ "actix", "failure", "futures 0.1.31", + "futures 0.3.21", "relay-log", ] diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index e89ebb38bf..1d9894ae80 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -1,45 +1,73 @@ -use core::num::flt2dec::strategy; +use std::borrow::Borrow; +use std::pin::Pin; use std::sync::Arc; use actix::SystemService; use futures03::compat::Future01CompatExt; +use futures03::FutureExt; use tokio::sync::{mpsc, oneshot}; use relay_config::{Config, RelayMode}; use relay_metrics::{AcceptsMetrics, Aggregator}; use relay_statsd::metric; -use relay_system::{Controller, Shutdown}; +use relay_system::{Shutdown, ShutdownReceiver}; use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay}; use crate::statsd::RelayGauges; +// Wrapper for our data and oneshot chanel(needed to retransmit back the reply) #[derive(Debug)] struct Message { data: T, responder: oneshot::Sender, } +// Custom error that acts as an abstraction pub struct SendError; +// tx = transceiver, mpcs = multi-producer, single consumer +#[derive(Clone, Debug)] pub struct Addr { tx: mpsc::UnboundedSender>, } +// Impl the send function which can then be used to send a shut down message +/* +impl ShutdownReceiver for Addr +where + T: From + std::marker::Send, // FIXME: Check if that is an actual solution +{ + fn send<'a>( + &'a self, + message: Shutdown, + ) -> Pin + Send + 'a>> { + Box::pin(self.send(message.into()).map(|_| ())) + } + + fn foo(&self) -> Box { + Bow::new(self.clone()) + } +} +*/ + +// Name send chosen to keep it compatible with the existing code that is invoking it impl Addr { pub async fn send(&self, data: T) -> Result { - let (responder, rx) = oneshot::channel(); - let message = Message { data, responder }; - self.tx.send(message).map_err(|_| SendError)?; - rx.await.map_err(|_| SendError) + let (responder, rx) = oneshot::channel(); // Generate the oneshot channel + let message = Message { data, responder }; // Construct the message + self.tx.send(message).map_err(|_| SendError)?; // Send the message and return an error if it fails + rx.await.map_err(|_| SendError) // await the result and return it, if anything goes wrong, throw a SendError } } +// Unchanged pub struct Healthcheck { is_shutting_down: bool, config: Arc, } impl Healthcheck { + // Unchanged pub fn new(config: Arc) -> Self { Healthcheck { is_shutting_down: false, @@ -47,20 +75,31 @@ impl Healthcheck { } } - async fn handle(&mut self, message: IsHealthy) -> bool { - let upstream = UpstreamRelay::from_registry(); + async fn handle_is_healthy(&mut self, message: IsHealthy) -> bool { + let upstream = UpstreamRelay::from_registry(); // Legacy, still needed for now + + if self.config.relay_mode() == RelayMode::Managed { + let fut = upstream.send(IsNetworkOutage).compat(); + tokio::spawn(async move { + if let Ok(is_outage) = fut.await { + metric!(gauge(RelayGauges::NetworkOutage) = if is_outage { 1 } else { 0 }); + } + }); + } match message { - IsHealthy::Liveness => return true, + IsHealthy::Liveness => return true, // Liveness always returns true IsHealthy::Readiness => { + // If we are shutting down we are no longer Ready if self.is_shutting_down { return false; } + // If we need authentication check that we have authentication if self.config.requires_auth() { if !upstream .send(IsAuthenticated) - .compat() + .compat() // Convert from the old futures to the new futures .await .unwrap_or(false) { @@ -68,19 +107,34 @@ impl Healthcheck { } } - Aggregator::from_registry() + Aggregator::from_registry() // Still legacy? .send(AcceptsMetrics) - .compat() + .compat() // Convert from the old futures to the new futures .await .unwrap_or(false) } } } - pub fn start(self) -> Addr { + fn handle_shutdown(&mut self) -> bool { + self.is_shutting_down = true; + true + } + + async fn handle(&mut self, message: HealthCheckMessage) -> bool { + match message { + HealthCheckMessage::Health(message) => self.handle_is_healthy(message).await, + HealthCheckMessage::Shutdown => self.handle_shutdown(), + } + } + + // Start the healthCheck actor, returns an Addr so that it can be reached + pub fn start(self) -> Addr { + // Make the channel to use for communication let (tx, mut rx) = mpsc::unbounded_channel::>(); tokio::spawn(async move { + // While incoming messages are still being received while let Some(message) = rx.recv().await { let response = self.handle(message.data).await; message.responder.send(response).ok(); @@ -109,13 +163,31 @@ impl Default for Healthcheck { unimplemented!("register with the SystemRegistry instead") } } +*/ + +// - More sophisticated solution - +/* +trait Handler { + type Result; + + fn handle<'a>( + &'a mut self, + message: T, + ) -> Pin + Send + 'a>>; +} impl Handler for Healthcheck { type Result = Result<(), ()>; - fn handle(&mut self, _message: Shutdown, _context: &mut Self::Context) -> Self::Result { - self.is_shutting_down = true; - Ok(()) + fn handle<'a>( + &'a mut self, + _message: Shutdown, + ) -> Pin + Send + 'a>> { + let result_future = async move { + self.is_shutting_down = true; + Ok(()) + }; + Box::pin(result_future) } } */ @@ -128,6 +200,21 @@ pub enum IsHealthy { /// it's both live/alive and not too busy). Readiness, } + +#[derive(Debug)] +pub enum HealthCheckMessage { + Health(IsHealthy), + Shutdown, +} + +// Allows for shutdown to be converted to HealthCheckMessage +// Needed so that that the shutdown message can be send to multiple actors in the same way +impl From for HealthCheckMessage { + fn from(_: Shutdown) -> Self { + HealthCheckMessage::Shutdown + } +} + /* impl Message for IsHealthy { type Result = Result; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index fb9ceb80e6..f0c96f8c9c 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -7,6 +7,8 @@ use failure::ResultExt; use failure::{Backtrace, Context, Fail}; use listenfd::ListenFd; +use tokio::runtime::{self, Runtime}; + use relay_aws_extension::AwsExtension; use relay_config::Config; use relay_metrics::Aggregator; @@ -108,6 +110,7 @@ impl From> for ServerError { #[derive(Clone)] pub struct ServiceState { config: Arc, + runtime: Arc, } impl ServiceState { @@ -136,7 +139,16 @@ impl ServiceState { let project_cache = ProjectCache::new(config.clone(), redis_pool).start(); registry.set(project_cache.clone()); - registry.set(Healthcheck::new(config.clone()).start()); + + let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); // FIXME: Might need to change that later + // Spawn the Health check in the runtime? + runtime.spawn(async { + // FIXME: Need to make a new registry + let v = Healthcheck::new(config.clone()).start(); + }); + + // registry.set(Healthcheck::new(config.clone()).start()); + registry.set(RelayCache::new(config.clone()).start()); registry .set(Aggregator::new(config.aggregator_config(), project_cache.recipient()).start()); @@ -150,7 +162,7 @@ impl ServiceState { } } - Ok(ServiceState { config }) + Ok(ServiceState { config, runtime }) } /// Returns an atomically counted reference to the config. diff --git a/relay-system/Cargo.toml b/relay-system/Cargo.toml index 53743735bc..ded41d4e48 100644 --- a/relay-system/Cargo.toml +++ b/relay-system/Cargo.toml @@ -13,4 +13,5 @@ publish = false actix = "0.7.9" failure = "0.1.8" futures = "0.1.28" +futures03 = { version = "0.3", package = "futures", features = ["compat"] } relay-log = { path = "../relay-log" } diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 0b629b55f3..86211107ed 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::pin::Pin; use std::time::Duration; use actix::actors::signal; @@ -6,6 +7,7 @@ use actix::fut; use actix::prelude::*; use futures::future; use futures::prelude::*; +use futures03::{FutureExt, TryFutureExt}; #[doc(inline)] pub use actix::actors::signal::{Signal, SignalType}; @@ -55,11 +57,36 @@ pub use actix::actors::signal::{Signal, SignalType}; /// Ok(()) /// }).unwrap(); /// ``` + pub struct Controller { /// Configured timeout for graceful shutdowns. timeout: Duration, /// Subscribed actors for the shutdown message. subscribers: Vec>, + /// Subscribed actors for the shutdown message. + new_subscribers: Vec>, +} + +/// TODO +pub trait ShutdownReceiver { + // Not sure if this can help us: https://stackoverflow.com/a/53989780/8076979 + // Removing `static also doesn't help + // `?Trait` is not permitted in supertraits + /// TODO + fn send<'a>( + &'a self, + message: Shutdown, + ) -> Pin + Send + 'a>>; + + fn foo(&self) -> Box + where + Self: Sized; + /* + fn clone(&self) -> Self + where + Self: Sized; + */ + // fn clone(&self) -> Box; // Complains if foo exists } impl Controller { @@ -135,6 +162,19 @@ impl Controller { }); fut::ok(()) }) + // TODO: Best approach till now but still not working :/ + .and_then(move |_, slf, _| { + let new_futures: Vec<_> = slf // Would moving this out help us? + .new_subscribers + .iter() // <- Issue + .map(|recipient| recipient.send(Shutdown { timeout })) + .collect(); + let fut = futures03::future::join_all(new_futures) + .map(|_| ()) + .unit_error() + .compat(); + fut::wrap_future::<_, Controller>(fut) + }) .spawn(context); } } @@ -144,6 +184,7 @@ impl Default for Controller { Controller { timeout: Duration::from_secs(0), subscribers: Vec::new(), + new_subscribers: Vec::new(), } } } @@ -153,6 +194,7 @@ impl fmt::Debug for Controller { f.debug_struct("Controller") .field("timeout", &self.timeout) .field("subscribers", &self.subscribers.len()) + .field("new_subscribers", &self.new_subscribers.len()) .finish() } } From 4571eb09aa8b7b746ec4b4b7eb1d9de37c433198 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 22 Jul 2022 16:20:45 +0200 Subject: [PATCH 03/19] Healthcheck wip --- Cargo.lock | 6 ++- relay-server/Cargo.toml | 1 + relay-server/src/actors/healthcheck.rs | 57 +++++++++++++++++++--- relay-server/src/endpoints/healthcheck.rs | 13 +++-- relay-server/src/service.rs | 6 ++- relay-system/Cargo.toml | 1 + relay-system/src/controller.rs | 59 ++++++++++++++++++++--- 7 files changed, 124 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 70fc4cb1bb..d664d61e92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2345,9 +2345,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" +checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" [[package]] name = "oorandom" @@ -3513,6 +3513,7 @@ dependencies = [ "listenfd", "minidump", "native-tls", + "once_cell", "parking_lot 0.10.2", "rdkafka", "rdkafka-sys", @@ -3566,6 +3567,7 @@ dependencies = [ "futures 0.1.31", "futures 0.3.21", "relay-log", + "tokio 1.19.2", ] [[package]] diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 3b9a4f18b2..55aa43a6c5 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -45,6 +45,7 @@ lazy_static = "1.4.0" listenfd = "0.3.3" minidump = { version = "0.10.0", optional = true } native-tls = { version = "0.2.4", optional = true } +once_cell = { version = "1.13.0" } parking_lot = "0.10.0" rdkafka = { version = "0.24", optional = true } rdkafka-sys = { version = "2.1.0", optional = true } diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index 1d9894ae80..991f69ccda 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -5,16 +5,20 @@ use std::sync::Arc; use actix::SystemService; use futures03::compat::Future01CompatExt; use futures03::FutureExt; -use tokio::sync::{mpsc, oneshot}; +use once_cell::sync::Lazy; +use tokio::sync::{mpsc, oneshot, watch}; use relay_config::{Config, RelayMode}; use relay_metrics::{AcceptsMetrics, Aggregator}; use relay_statsd::metric; -use relay_system::{Shutdown, ShutdownReceiver}; +use relay_system::{Controller, Shutdown}; use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay}; use crate::statsd::RelayGauges; +static ADDRESS: Lazy>>> = + Lazy::new(|| std::sync::Mutex::new(None)); + // Wrapper for our data and oneshot chanel(needed to retransmit back the reply) #[derive(Debug)] struct Message { @@ -61,6 +65,7 @@ impl Addr { } // Unchanged + pub struct Healthcheck { is_shutting_down: bool, config: Arc, @@ -129,9 +134,11 @@ impl Healthcheck { } // Start the healthCheck actor, returns an Addr so that it can be reached - pub fn start(self) -> Addr { + pub fn start(mut self) -> Addr { // Make the channel to use for communication let (tx, mut rx) = mpsc::unbounded_channel::>(); + let addr = Addr { tx: tx.clone() }; + *ADDRESS.lock().unwrap() = Some(addr.clone()); tokio::spawn(async move { // While incoming messages are still being received @@ -141,7 +148,39 @@ impl Healthcheck { } }); - Addr { tx } + // When receiving a shutdown message forward it to our mpsc channel + tokio::spawn(async move { + // Get the receiving end of the watch channel from the Controller + let mut wrx = Controller::subscribe_v2().await; + + loop { + if wrx.changed().await.is_ok() { + if wrx.borrow().is_none() { + continue; + } + } + let (responder, _) = oneshot::channel(); + tx.send(Message { + data: HealthCheckMessage::Shutdown, + responder, + }); + break; + } + }); + addr + } + + // Maybe give back our own Addr and say that it can be converted into the actix Addr? + // Because Healthcheck is not an actor so doesn't work? + // So escentially need to make a converter? or leave it an Actor + pub fn from_registry() -> Addr { + // TODO: Here need to return the address from the HealthCheck + // HTF do we do this, surely we now need to make a registry + let guard = ADDRESS.lock().unwrap(); + match *guard { + Some(ref addr) => addr.clone(), + None => panic!(), + } } } @@ -192,7 +231,7 @@ impl Handler for Healthcheck { } */ -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum IsHealthy { /// Check if the Relay is alive at all. Liveness, @@ -201,7 +240,7 @@ pub enum IsHealthy { Readiness, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum HealthCheckMessage { Health(IsHealthy), Shutdown, @@ -215,6 +254,12 @@ impl From for HealthCheckMessage { } } +impl From for HealthCheckMessage { + fn from(is_healthy: IsHealthy) -> Self { + Self::Health(is_healthy) + } +} + /* impl Message for IsHealthy { type Result = Result; diff --git a/relay-server/src/endpoints/healthcheck.rs b/relay-server/src/endpoints/healthcheck.rs index 788628cea2..fae61485d4 100644 --- a/relay-server/src/endpoints/healthcheck.rs +++ b/relay-server/src/endpoints/healthcheck.rs @@ -2,6 +2,8 @@ use ::actix::prelude::*; use actix_web::{Error, HttpResponse}; use futures::prelude::*; +use futures03::compat::Future01CompatExt; +use futures03::TryFutureExt; use serde::Serialize; use crate::service::ServiceApp; @@ -32,11 +34,16 @@ impl HealthcheckResponse { } fn healthcheck_impl(message: IsHealthy) -> ResponseFuture { + // let addr = Healthcheck::from_registry(); + let fut = async move { + let addr = Healthcheck::from_registry(); + addr.send(message.into()).await + }; + let bp = Box::pin(fut); + Box::new( - Healthcheck::from_registry() - .send(message) + bp.compat() .map_err(|_| ()) - .flatten() .and_then(move |is_healthy| { if !is_healthy { Err(()) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index f0c96f8c9c..7dd162cbd1 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -140,11 +140,13 @@ impl ServiceState { let project_cache = ProjectCache::new(config.clone(), redis_pool).start(); registry.set(project_cache.clone()); + let v = Controller::from_registry(); let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); // FIXME: Might need to change that later // Spawn the Health check in the runtime? - runtime.spawn(async { + let config_copy = config.clone(); + runtime.spawn(async move { // FIXME: Need to make a new registry - let v = Healthcheck::new(config.clone()).start(); + let addr = Healthcheck::new(config_copy).start(); // <- }); // registry.set(Healthcheck::new(config.clone()).start()); diff --git a/relay-system/Cargo.toml b/relay-system/Cargo.toml index ded41d4e48..e4a32a3b1e 100644 --- a/relay-system/Cargo.toml +++ b/relay-system/Cargo.toml @@ -15,3 +15,4 @@ failure = "0.1.8" futures = "0.1.28" futures03 = { version = "0.3", package = "futures", features = ["compat"] } relay-log = { path = "../relay-log" } +tokio = { version = "1.0", features = ["rt-multi-thread"] } # in sync with reqwest diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 86211107ed..0f574c534c 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -7,7 +7,9 @@ use actix::fut; use actix::prelude::*; use futures::future; use futures::prelude::*; -use futures03::{FutureExt, TryFutureExt}; +use futures03::compat::Future01CompatExt; +use tokio::sync::{mpsc, watch}; +// use futures03::{FutureExt, TryFutureExt}; #[doc(inline)] pub use actix::actors::signal::{Signal, SignalType}; @@ -58,16 +60,27 @@ pub use actix::actors::signal::{Signal, SignalType}; /// }).unwrap(); /// ``` +type ShutdownReceiver = watch::Receiver>; // Find a good name for this +type ShutdownSender = watch::Sender>; // Check of that is really needed + // Only used once but gives a nice symetry + pub struct Controller { /// Configured timeout for graceful shutdowns. timeout: Duration, /// Subscribed actors for the shutdown message. subscribers: Vec>, + /// Subscribed actors for the shutdown message. - new_subscribers: Vec>, + // new_subscribers: Vec>, + + // Hand this out to actors that subscribe to us + shutdown_receiver: ShutdownReceiver, + // Use this to send the shutdown message to all the actors that are subscribed to us + shutdown_sender: ShutdownSender, } /// TODO +/* pub trait ShutdownReceiver { // Not sure if this can help us: https://stackoverflow.com/a/53989780/8076979 // Removing `static also doesn't help @@ -78,6 +91,7 @@ pub trait ShutdownReceiver { message: Shutdown, ) -> Pin + Send + 'a>>; + /// TODO fn foo(&self) -> Box where Self: Sized; @@ -88,6 +102,7 @@ pub trait ShutdownReceiver { */ // fn clone(&self) -> Box; // Complains if foo exists } +*/ impl Controller { /// Starts an actix system and runs the `factory` to start actors. @@ -129,6 +144,15 @@ impl Controller { Controller::from_registry().do_send(Subscribe(addr.recipient())) } + /// TODO + pub async fn subscribe_v2() -> ShutdownReceiver { + Controller::from_registry() + .send(SubscribeV2()) + .compat() + .await + .unwrap() // FIXME: Remove this later + } + /// Performs a graceful shutdown with the given timeout. /// /// This sends a `Shutdown` message to all subscribed actors and waits for them to finish. As @@ -137,6 +161,10 @@ impl Controller { // Send a shutdown signal to all registered subscribers (including self). They will report // when the shutdown has completed. Note that we ignore all errors to make sure that we // don't cancel the shutdown of other actors if one actor fails. + + // Send the message + self.shutdown_sender.send(Some(Shutdown { timeout })); + let futures: Vec<_> = self .subscribers .iter() @@ -163,6 +191,7 @@ impl Controller { fut::ok(()) }) // TODO: Best approach till now but still not working :/ + /* .and_then(move |_, slf, _| { let new_futures: Vec<_> = slf // Would moving this out help us? .new_subscribers @@ -174,17 +203,21 @@ impl Controller { .unit_error() .compat(); fut::wrap_future::<_, Controller>(fut) - }) + })*/ .spawn(context); } } impl Default for Controller { fn default() -> Self { + // TODO: Still iffy not sure why we need to pass in a init (also would be nice if we could avoid it) + let (shutdown_sender, mut shutdown_receiver) = watch::channel(None); + Controller { timeout: Duration::from_secs(0), subscribers: Vec::new(), - new_subscribers: Vec::new(), + shutdown_receiver, + shutdown_sender, } } } @@ -193,8 +226,7 @@ impl fmt::Debug for Controller { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Controller") .field("timeout", &self.timeout) - .field("subscribers", &self.subscribers.len()) - .field("new_subscribers", &self.new_subscribers.len()) + .field("subscribers", &self.subscribers.len()) // Ask if we need to update these here .finish() } } @@ -275,6 +307,21 @@ impl Handler for Controller { } } +#[derive(Debug)] +pub struct SubscribeV2(); + +impl Message for SubscribeV2 { + type Result = ShutdownReceiver; +} + +impl Handler for Controller { + type Result = MessageResult; + + fn handle(&mut self, msg: SubscribeV2, ctx: &mut Self::Context) -> Self::Result { + MessageResult(self.shutdown_receiver.clone()) + } +} + /// Shutdown request message sent by the [`Controller`] to subscribed actors. /// /// A handler has to ensure that it doesn't take longer than `timeout` to resolve the future. From f49d36236bf829413a6caf695b0bbd83f9c59c13 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 25 Jul 2022 10:05:04 +0200 Subject: [PATCH 04/19] Cleaned up code --- Makefile | 2 +- relay-server/src/actors/healthcheck.rs | 24 +++++++++-------------- relay-server/src/endpoints/healthcheck.rs | 1 - relay-server/src/service.rs | 17 ++++++++-------- relay-system/src/controller.rs | 14 +++++++------ 5 files changed, 26 insertions(+), 32 deletions(-) diff --git a/Makefile b/Makefile index 08a18152ca..c78c4fc038 100644 --- a/Makefile +++ b/Makefile @@ -148,7 +148,7 @@ clean-target-dir: @# --copies is necessary because OS X make checks the mtime of the symlink @# target (/usr/local/bin/python), which is always much older than the @# Makefile, and then proceeds to unconditionally rebuild the venv. - $$RELAY_PYTHON_VERSION -m venv --copies .venv + $$RELAY_PYTHON_VERSION -m venv .venv .venv/bin/pip install -U pip wheel @# Work around https://github.com/confluentinc/confluent-kafka-python/issues/1190 diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index 991f69ccda..db44d4ca37 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -1,12 +1,9 @@ -use std::borrow::Borrow; -use std::pin::Pin; use std::sync::Arc; use actix::SystemService; use futures03::compat::Future01CompatExt; -use futures03::FutureExt; use once_cell::sync::Lazy; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{mpsc, oneshot}; use relay_config::{Config, RelayMode}; use relay_metrics::{AcceptsMetrics, Aggregator}; @@ -93,7 +90,7 @@ impl Healthcheck { } match message { - IsHealthy::Liveness => return true, // Liveness always returns true + IsHealthy::Liveness => true, // Liveness always returns true IsHealthy::Readiness => { // If we are shutting down we are no longer Ready if self.is_shutting_down { @@ -101,15 +98,14 @@ impl Healthcheck { } // If we need authentication check that we have authentication - if self.config.requires_auth() { - if !upstream + if self.config.requires_auth() + && !upstream .send(IsAuthenticated) .compat() // Convert from the old futures to the new futures .await .unwrap_or(false) - { - return false; - } + { + return false; } Aggregator::from_registry() // Still legacy? @@ -154,13 +150,11 @@ impl Healthcheck { let mut wrx = Controller::subscribe_v2().await; loop { - if wrx.changed().await.is_ok() { - if wrx.borrow().is_none() { - continue; - } + if wrx.changed().await.is_ok() && wrx.borrow().is_none() { + continue; } let (responder, _) = oneshot::channel(); - tx.send(Message { + let _ = tx.send(Message { data: HealthCheckMessage::Shutdown, responder, }); diff --git a/relay-server/src/endpoints/healthcheck.rs b/relay-server/src/endpoints/healthcheck.rs index fae61485d4..1bda8fcd41 100644 --- a/relay-server/src/endpoints/healthcheck.rs +++ b/relay-server/src/endpoints/healthcheck.rs @@ -2,7 +2,6 @@ use ::actix::prelude::*; use actix_web::{Error, HttpResponse}; use futures::prelude::*; -use futures03::compat::Future01CompatExt; use futures03::TryFutureExt; use serde::Serialize; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 7dd162cbd1..5fc24af628 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -7,8 +7,6 @@ use failure::ResultExt; use failure::{Backtrace, Context, Fail}; use listenfd::ListenFd; -use tokio::runtime::{self, Runtime}; - use relay_aws_extension::AwsExtension; use relay_config::Config; use relay_metrics::Aggregator; @@ -110,7 +108,7 @@ impl From> for ServerError { #[derive(Clone)] pub struct ServiceState { config: Arc, - runtime: Arc, + _runtime: Arc, } impl ServiceState { @@ -140,13 +138,11 @@ impl ServiceState { let project_cache = ProjectCache::new(config.clone(), redis_pool).start(); registry.set(project_cache.clone()); - let v = Controller::from_registry(); - let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); // FIXME: Might need to change that later - // Spawn the Health check in the runtime? + let _ = Controller::from_registry(); + let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); let config_copy = config.clone(); runtime.spawn(async move { - // FIXME: Need to make a new registry - let addr = Healthcheck::new(config_copy).start(); // <- + let _addr = Healthcheck::new(config_copy).start(); //TODO: Make registry eventually }); // registry.set(Healthcheck::new(config.clone()).start()); @@ -164,7 +160,10 @@ impl ServiceState { } } - Ok(ServiceState { config, runtime }) + Ok(ServiceState { + config, + _runtime: runtime, + }) } /// Returns an atomically counted reference to the config. diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 0f574c534c..ae08b5dcb9 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -1,5 +1,4 @@ use std::fmt; -use std::pin::Pin; use std::time::Duration; use actix::actors::signal; @@ -8,7 +7,7 @@ use actix::prelude::*; use futures::future; use futures::prelude::*; use futures03::compat::Future01CompatExt; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; // use futures03::{FutureExt, TryFutureExt}; #[doc(inline)] @@ -64,6 +63,7 @@ type ShutdownReceiver = watch::Receiver>; // Find a good name f type ShutdownSender = watch::Sender>; // Check of that is really needed // Only used once but gives a nice symetry +/// TODO pub struct Controller { /// Configured timeout for graceful shutdowns. timeout: Duration, @@ -163,7 +163,7 @@ impl Controller { // don't cancel the shutdown of other actors if one actor fails. // Send the message - self.shutdown_sender.send(Some(Shutdown { timeout })); + let _ = self.shutdown_sender.send(Some(Shutdown { timeout })); // TODO Ask if it is better to have a warning or an elegant line let futures: Vec<_> = self .subscribers @@ -210,8 +210,7 @@ impl Controller { impl Default for Controller { fn default() -> Self { - // TODO: Still iffy not sure why we need to pass in a init (also would be nice if we could avoid it) - let (shutdown_sender, mut shutdown_receiver) = watch::channel(None); + let (shutdown_sender, shutdown_receiver) = watch::channel(None); Controller { timeout: Duration::from_secs(0), @@ -308,6 +307,8 @@ impl Handler for Controller { } #[derive(Debug)] + +/// TODO pub struct SubscribeV2(); impl Message for SubscribeV2 { @@ -317,7 +318,8 @@ impl Message for SubscribeV2 { impl Handler for Controller { type Result = MessageResult; - fn handle(&mut self, msg: SubscribeV2, ctx: &mut Self::Context) -> Self::Result { + // TODO look into why and if msg and ctx are needed + fn handle(&mut self, _msg: SubscribeV2, _ctx: &mut Self::Context) -> Self::Result { MessageResult(self.shutdown_receiver.clone()) } } From 6e703559bba1099b9d6a9422f7ab9eee1afa475b Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Mon, 25 Jul 2022 10:40:36 +0200 Subject: [PATCH 05/19] Added the copie flag again --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index c78c4fc038..08a18152ca 100644 --- a/Makefile +++ b/Makefile @@ -148,7 +148,7 @@ clean-target-dir: @# --copies is necessary because OS X make checks the mtime of the symlink @# target (/usr/local/bin/python), which is always much older than the @# Makefile, and then proceeds to unconditionally rebuild the venv. - $$RELAY_PYTHON_VERSION -m venv .venv + $$RELAY_PYTHON_VERSION -m venv --copies .venv .venv/bin/pip install -U pip wheel @# Work around https://github.com/confluentinc/confluent-kafka-python/issues/1190 From 91efb1a938102d0ad4ff6216d19b92634cce8c57 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Wed, 27 Jul 2022 11:32:03 +0200 Subject: [PATCH 06/19] Possible fix for registry --- Cargo.toml | 2 +- relay-server/src/actors/healthcheck.rs | 8 +++- relay-server/src/service.rs | 54 ++++++++++++++++++++++++-- relay-system/src/controller.rs | 7 +++- 4 files changed, 64 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4fd4c52a72..5cdfcaff05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ default-members = ["relay"] [profile.dev] # Debug information slows down the build and increases caches in the # target folder, but we don't require stack traces in most cases. -debug = false +# debug = false [profile.release] # In release, however, we do want full debug information to report diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index db44d4ca37..625c32e4c7 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use actix::SystemService; use futures03::compat::Future01CompatExt; use once_cell::sync::Lazy; +use tokio::runtime::{Handle, Runtime}; use tokio::sync::{mpsc, oneshot}; use relay_config::{Config, RelayMode}; @@ -26,7 +27,7 @@ struct Message { // Custom error that acts as an abstraction pub struct SendError; -// tx = transceiver, mpcs = multi-producer, single consumer +// tx = transmitter, mpcs = multi-producer, single consumer #[derive(Clone, Debug)] pub struct Addr { tx: mpsc::UnboundedSender>, @@ -144,10 +145,13 @@ impl Healthcheck { } }); + // let mut wrx = Controller::subscribe_v2(); <- this works but could it be because we don't await it? + // When receiving a shutdown message forward it to our mpsc channel tokio::spawn(async move { // Get the receiving end of the watch channel from the Controller - let mut wrx = Controller::subscribe_v2().await; + // let _guard = handle.enter(); + let mut wrx = Controller::subscribe_v2().await; // <- Is this were the error comes from loop { if wrx.changed().await.is_ok() && wrx.borrow().is_none() { diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 5fc24af628..da44119f8e 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -6,12 +6,14 @@ use actix_web::{server, App}; use failure::ResultExt; use failure::{Backtrace, Context, Fail}; use listenfd::ListenFd; +use once_cell::sync::Lazy; use relay_aws_extension::AwsExtension; use relay_config::Config; use relay_metrics::Aggregator; use relay_redis::RedisPool; use relay_system::{Configure, Controller}; +use tokio::runtime::Handle; use crate::actors::envelopes::{EnvelopeManager, EnvelopeProcessor}; use crate::actors::healthcheck::Healthcheck; @@ -138,11 +140,57 @@ impl ServiceState { let project_cache = ProjectCache::new(config.clone(), redis_pool).start(); registry.set(project_cache.clone()); - let _ = Controller::from_registry(); - let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); let config_copy = config.clone(); + // RefCell> = RefCell::new(None); + static STATICSYSTEM: Lazy>> = + Lazy::new(|| std::sync::RwLock::new(None)); + + *STATICSYSTEM.write().unwrap() = Some(system.clone()); + + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .on_thread_start(|| { + let guard = STATICSYSTEM.read().unwrap(); + let system = guard.as_ref().cloned().unwrap(); + actix::System::set_current(system); + }) + .build() + .unwrap(), + ); + // let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); + + let _ = Controller::from_registry(); // <- THIS WORKS + + // let handle = Handle::current(); // Error: thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/runtime/context.rs:21:19 + + // let _guard = runtime.enter(); // <- Does not solve it sadly: https://docs.rs/tokio/1.20.0/tokio/runtime/struct.Runtime.html#method.enter + // > Only seems to allow for tokio::spawn instead of runtime.spawn, but doesn't fix the System issue + + // > Adding this doesn't solve the problem + // let handle = runtime.handle(); + // let _guard = handle.enter(); + + // > handle.spawn doesn't solve the issue runtime.spawn(async move { - let _addr = Healthcheck::new(config_copy).start(); //TODO: Make registry eventually + // let _guard = runtime.enter(); <- this feels illegal and also complains + + // > Adding this doesn't solve the problem + // let handle = Handle::current(); + // let _guard = handle.enter(); + + // System::current(); // <- Even this doesn't work + // Error: thread 'tokio-runtime-worker' panicked at 'System is not running', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.7.9/src/system.rs:93:21 + + // So why is the 'System is not running' inside this block? + + let _ = Controller::from_registry(); // <- THIS DOES NOT WORK, Even after removing line 145 so it is not like duplicating the line is what causes the error + // Error: thread 'tokio-runtime-worker' panicked at 'System is not running', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.7.9/src/system.rs:118:21 + // panic!("Made it here"); + + // TODO: Make registry eventually + let _addr = Healthcheck::new(config_copy).start(); // <- This causes the problem + // thread 'tokio-runtime-worker' panicked at 'System is not running', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.7.9/src/system.rs:118:21 }); // registry.set(Healthcheck::new(config.clone()).start()); diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index ae08b5dcb9..c666a096c9 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -145,12 +145,17 @@ impl Controller { } /// TODO + // So need to fix the registry + // Funny enough Controller can be accessed (as seen by being able to call this function) but + // System can not because system is an Actix thing 🤪 pub async fn subscribe_v2() -> ShutdownReceiver { - Controller::from_registry() + Controller::from_registry() // <- Pretty sure this is the call that breaks it all .send(SubscribeV2()) .compat() .await .unwrap() // FIXME: Remove this later + + // panic!() } /// Performs a graceful shutdown with the given timeout. From 7a2a5d32129556db34e4a8e81843f3e2f0305e46 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Wed, 27 Jul 2022 11:58:12 +0200 Subject: [PATCH 07/19] removed unused imports --- relay-server/src/actors/healthcheck.rs | 1 - relay-server/src/service.rs | 11 +++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index 625c32e4c7..7af8957bdb 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use actix::SystemService; use futures03::compat::Future01CompatExt; use once_cell::sync::Lazy; -use tokio::runtime::{Handle, Runtime}; use tokio::sync::{mpsc, oneshot}; use relay_config::{Config, RelayMode}; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index da44119f8e..1d82d35ff9 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -13,7 +13,6 @@ use relay_config::Config; use relay_metrics::Aggregator; use relay_redis::RedisPool; use relay_system::{Configure, Controller}; -use tokio::runtime::Handle; use crate::actors::envelopes::{EnvelopeManager, EnvelopeProcessor}; use crate::actors::healthcheck::Healthcheck; @@ -140,6 +139,8 @@ impl ServiceState { let project_cache = ProjectCache::new(config.clone(), redis_pool).start(); registry.set(project_cache.clone()); + // let _ = Controller::from_registry(); // <- THIS WORKS + let config_copy = config.clone(); // RefCell> = RefCell::new(None); static STATICSYSTEM: Lazy>> = @@ -160,8 +161,6 @@ impl ServiceState { ); // let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); - let _ = Controller::from_registry(); // <- THIS WORKS - // let handle = Handle::current(); // Error: thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/runtime/context.rs:21:19 // let _guard = runtime.enter(); // <- Does not solve it sadly: https://docs.rs/tokio/1.20.0/tokio/runtime/struct.Runtime.html#method.enter @@ -184,9 +183,9 @@ impl ServiceState { // So why is the 'System is not running' inside this block? - let _ = Controller::from_registry(); // <- THIS DOES NOT WORK, Even after removing line 145 so it is not like duplicating the line is what causes the error - // Error: thread 'tokio-runtime-worker' panicked at 'System is not running', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.7.9/src/system.rs:118:21 - // panic!("Made it here"); + // let _ = Controller::from_registry(); // <- THIS DOES NOT WORK, Even after removing line 145 so it is not like duplicating the line is what causes the error + // Error: thread 'tokio-runtime-worker' panicked at 'System is not running', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.7.9/src/system.rs:118:21 + // panic!("Made it here"); // TODO: Make registry eventually let _addr = Healthcheck::new(config_copy).start(); // <- This causes the problem From e0b25785480e5265ee203abd69188b50436c30f0 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Wed, 27 Jul 2022 12:22:21 +0200 Subject: [PATCH 08/19] Cleanedup service.rs --- relay-server/src/service.rs | 68 +++++++------------------------------ 1 file changed, 12 insertions(+), 56 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 1d82d35ff9..f67ae0ad6a 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -6,9 +6,9 @@ use actix_web::{server, App}; use failure::ResultExt; use failure::{Backtrace, Context, Fail}; use listenfd::ListenFd; -use once_cell::sync::Lazy; use relay_aws_extension::AwsExtension; +use relay_common::clone; use relay_config::Config; use relay_metrics::Aggregator; use relay_redis::RedisPool; @@ -118,6 +118,15 @@ impl ServiceState { let system = System::current(); let registry = system.registry(); + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .on_thread_start(clone!(system, || System::set_current(system.clone()))) + .build() + .unwrap(); + + // Enter the tokio runtime so we can start spawning tasks from the outside. + let _guard = runtime.enter(); + let upstream_relay = UpstreamRelay::new(config.clone()); registry.set(Arbiter::start(|_| upstream_relay)); @@ -139,60 +148,7 @@ impl ServiceState { let project_cache = ProjectCache::new(config.clone(), redis_pool).start(); registry.set(project_cache.clone()); - // let _ = Controller::from_registry(); // <- THIS WORKS - - let config_copy = config.clone(); - // RefCell> = RefCell::new(None); - static STATICSYSTEM: Lazy>> = - Lazy::new(|| std::sync::RwLock::new(None)); - - *STATICSYSTEM.write().unwrap() = Some(system.clone()); - - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .on_thread_start(|| { - let guard = STATICSYSTEM.read().unwrap(); - let system = guard.as_ref().cloned().unwrap(); - actix::System::set_current(system); - }) - .build() - .unwrap(), - ); - // let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); - - // let handle = Handle::current(); // Error: thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/runtime/context.rs:21:19 - - // let _guard = runtime.enter(); // <- Does not solve it sadly: https://docs.rs/tokio/1.20.0/tokio/runtime/struct.Runtime.html#method.enter - // > Only seems to allow for tokio::spawn instead of runtime.spawn, but doesn't fix the System issue - - // > Adding this doesn't solve the problem - // let handle = runtime.handle(); - // let _guard = handle.enter(); - - // > handle.spawn doesn't solve the issue - runtime.spawn(async move { - // let _guard = runtime.enter(); <- this feels illegal and also complains - - // > Adding this doesn't solve the problem - // let handle = Handle::current(); - // let _guard = handle.enter(); - - // System::current(); // <- Even this doesn't work - // Error: thread 'tokio-runtime-worker' panicked at 'System is not running', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.7.9/src/system.rs:93:21 - - // So why is the 'System is not running' inside this block? - - // let _ = Controller::from_registry(); // <- THIS DOES NOT WORK, Even after removing line 145 so it is not like duplicating the line is what causes the error - // Error: thread 'tokio-runtime-worker' panicked at 'System is not running', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.7.9/src/system.rs:118:21 - // panic!("Made it here"); - - // TODO: Make registry eventually - let _addr = Healthcheck::new(config_copy).start(); // <- This causes the problem - // thread 'tokio-runtime-worker' panicked at 'System is not running', /Users/tobiaswilfert/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.7.9/src/system.rs:118:21 - }); - - // registry.set(Healthcheck::new(config.clone()).start()); + Healthcheck::new(config.clone()).start(); // TODO(tobias): Registry is implicit registry.set(RelayCache::new(config.clone()).start()); registry @@ -209,7 +165,7 @@ impl ServiceState { Ok(ServiceState { config, - _runtime: runtime, + _runtime: Arc::new(runtime), }) } From c2ca31979c5471cdbbc36ee61af559f0e1699a36 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Wed, 27 Jul 2022 12:53:11 +0200 Subject: [PATCH 09/19] Cleaned up healthchecck.rs --- Cargo.lock | 1 - Cargo.toml | 2 +- relay-server/Cargo.toml | 1 - relay-server/src/actors/healthcheck.rs | 238 ++++++------------------- 4 files changed, 58 insertions(+), 184 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d664d61e92..741a272f3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3513,7 +3513,6 @@ dependencies = [ "listenfd", "minidump", "native-tls", - "once_cell", "parking_lot 0.10.2", "rdkafka", "rdkafka-sys", diff --git a/Cargo.toml b/Cargo.toml index 5cdfcaff05..4fd4c52a72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ default-members = ["relay"] [profile.dev] # Debug information slows down the build and increases caches in the # target folder, but we don't require stack traces in most cases. -# debug = false +debug = false [profile.release] # In release, however, we do want full debug information to report diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 55aa43a6c5..3b9a4f18b2 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -45,7 +45,6 @@ lazy_static = "1.4.0" listenfd = "0.3.3" minidump = { version = "0.10.0", optional = true } native-tls = { version = "0.2.4", optional = true } -once_cell = { version = "1.13.0" } parking_lot = "0.10.0" rdkafka = { version = "0.24", optional = true } rdkafka-sys = { version = "2.1.0", optional = true } diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index 7af8957bdb..cb45051a97 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use actix::SystemService; use futures03::compat::Future01CompatExt; -use once_cell::sync::Lazy; +use parking_lot::RwLock; use tokio::sync::{mpsc, oneshot}; use relay_config::{Config, RelayMode}; @@ -13,63 +13,62 @@ use relay_system::{Controller, Shutdown}; use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay}; use crate::statsd::RelayGauges; -static ADDRESS: Lazy>>> = - Lazy::new(|| std::sync::Mutex::new(None)); +lazy_static::lazy_static! { + /// Singleton of the `Healthcheck` service. + static ref ADDRESS: RwLock>> = RwLock::new(None); +} -// Wrapper for our data and oneshot chanel(needed to retransmit back the reply) +/// Internal wrapper of a message sent through an `Addr` with return channel. #[derive(Debug)] struct Message { data: T, + // TODO(tobias): This is hard-coded to return `bool`. responder: oneshot::Sender, } -// Custom error that acts as an abstraction +/// An error when [sending](Addr::send) a message to a service fails. pub struct SendError; -// tx = transmitter, mpcs = multi-producer, single consumer +/// Channel for sending public messages into a service. +/// +/// To send a message, use [`Addr::send`]. #[derive(Clone, Debug)] pub struct Addr { tx: mpsc::UnboundedSender>, } -// Impl the send function which can then be used to send a shut down message -/* -impl ShutdownReceiver for Addr -where - T: From + std::marker::Send, // FIXME: Check if that is an actual solution -{ - fn send<'a>( - &'a self, - message: Shutdown, - ) -> Pin + Send + 'a>> { - Box::pin(self.send(message.into()).map(|_| ())) - } - - fn foo(&self) -> Box { - Bow::new(self.clone()) - } -} -*/ - -// Name send chosen to keep it compatible with the existing code that is invoking it impl Addr { + /// Sends an asynchronous message to the service and waits for the response. + /// + /// The result of the message does not have to be awaited. The message will be delivered and + /// handled regardless. The communication channel with the service is unbounded, so backlogs + /// could occur when sending too many messages. + /// + /// Sending the message can fail with `Err(SendError)` if the service has shut down. pub async fn send(&self, data: T) -> Result { - let (responder, rx) = oneshot::channel(); // Generate the oneshot channel - let message = Message { data, responder }; // Construct the message - self.tx.send(message).map_err(|_| SendError)?; // Send the message and return an error if it fails - rx.await.map_err(|_| SendError) // await the result and return it, if anything goes wrong, throw a SendError + let (responder, rx) = oneshot::channel(); + let message = Message { data, responder }; + self.tx.send(message).map_err(|_| SendError)?; + rx.await.map_err(|_| SendError) } } -// Unchanged - pub struct Healthcheck { is_shutting_down: bool, config: Arc, } impl Healthcheck { - // Unchanged + /// Returns the singleton instance of the Healthcheck service. + /// + /// Prior to using this, the service must be started using [`Healthcheck::start`]. + pub fn from_registry() -> Addr { + ADDRESS.read().as_ref().unwrap().clone() + } + + /// Creates a new instance of the Healthcheck service. + /// + /// The service does not run. To run the service, use [`start`](Self::start). pub fn new(config: Arc) -> Self { Healthcheck { is_shutting_down: false, @@ -78,7 +77,7 @@ impl Healthcheck { } async fn handle_is_healthy(&mut self, message: IsHealthy) -> bool { - let upstream = UpstreamRelay::from_registry(); // Legacy, still needed for now + let upstream = UpstreamRelay::from_registry(); if self.config.relay_mode() == RelayMode::Managed { let fut = upstream.send(IsNetworkOutage).compat(); @@ -92,25 +91,23 @@ impl Healthcheck { match message { IsHealthy::Liveness => true, // Liveness always returns true IsHealthy::Readiness => { - // If we are shutting down we are no longer Ready if self.is_shutting_down { return false; } - // If we need authentication check that we have authentication if self.config.requires_auth() && !upstream .send(IsAuthenticated) - .compat() // Convert from the old futures to the new futures + .compat() .await .unwrap_or(false) { return false; } - Aggregator::from_registry() // Still legacy? + Aggregator::from_registry() .send(AcceptsMetrics) - .compat() // Convert from the old futures to the new futures + .compat() .await .unwrap_or(false) } @@ -119,114 +116,46 @@ impl Healthcheck { fn handle_shutdown(&mut self) -> bool { self.is_shutting_down = true; - true + true // TODO: This should go away once messages are more generic } - async fn handle(&mut self, message: HealthCheckMessage) -> bool { + async fn handle(&mut self, message: HealthcheckMessage) -> bool { match message { - HealthCheckMessage::Health(message) => self.handle_is_healthy(message).await, - HealthCheckMessage::Shutdown => self.handle_shutdown(), + HealthcheckMessage::Health(message) => self.handle_is_healthy(message).await, + HealthcheckMessage::Shutdown => self.handle_shutdown(), } } - // Start the healthCheck actor, returns an Addr so that it can be reached - pub fn start(mut self) -> Addr { - // Make the channel to use for communication + /// Start this service, returning an [`Addr`] for communication. + pub fn start(mut self) -> Addr { let (tx, mut rx) = mpsc::unbounded_channel::>(); - let addr = Addr { tx: tx.clone() }; - *ADDRESS.lock().unwrap() = Some(addr.clone()); + + let addr = Addr { tx }; + *ADDRESS.write() = Some(addr.clone()); tokio::spawn(async move { - // While incoming messages are still being received while let Some(message) = rx.recv().await { + // TODO(tobias): This does not allow for concurrent execution. let response = self.handle(message.data).await; message.responder.send(response).ok(); } }); - // let mut wrx = Controller::subscribe_v2(); <- this works but could it be because we don't await it? - - // When receiving a shutdown message forward it to our mpsc channel + // Forward shutdown signals to the main message channel + let shutdown_addr = addr.clone(); tokio::spawn(async move { - // Get the receiving end of the watch channel from the Controller - // let _guard = handle.enter(); - let mut wrx = Controller::subscribe_v2().await; // <- Is this were the error comes from + let mut shutdown_rx = Controller::subscribe_v2().await; - loop { - if wrx.changed().await.is_ok() && wrx.borrow().is_none() { - continue; + while shutdown_rx.changed().await.is_ok() { + if shutdown_rx.borrow_and_update().is_some() { + let _ = shutdown_addr.send(HealthcheckMessage::Shutdown); } - let (responder, _) = oneshot::channel(); - let _ = tx.send(Message { - data: HealthCheckMessage::Shutdown, - responder, - }); - break; } }); - addr - } - - // Maybe give back our own Addr and say that it can be converted into the actix Addr? - // Because Healthcheck is not an actor so doesn't work? - // So escentially need to make a converter? or leave it an Actor - pub fn from_registry() -> Addr { - // TODO: Here need to return the address from the HealthCheck - // HTF do we do this, surely we now need to make a registry - let guard = ADDRESS.lock().unwrap(); - match *guard { - Some(ref addr) => addr.clone(), - None => panic!(), - } - } -} - -/* -impl Actor for Healthcheck { - type Context = Context; - - fn started(&mut self, context: &mut Self::Context) { - Controller::subscribe(context.address()); - } -} - -impl Supervised for Healthcheck {} - -impl SystemService for Healthcheck {} - -impl Default for Healthcheck { - fn default() -> Self { - unimplemented!("register with the SystemRegistry instead") - } -} -*/ - -// - More sophisticated solution - -/* -trait Handler { - type Result; - - fn handle<'a>( - &'a mut self, - message: T, - ) -> Pin + Send + 'a>>; -} -impl Handler for Healthcheck { - type Result = Result<(), ()>; - - fn handle<'a>( - &'a mut self, - _message: Shutdown, - ) -> Pin + Send + 'a>> { - let result_future = async move { - self.is_shutting_down = true; - Ok(()) - }; - Box::pin(result_future) + addr } } -*/ #[derive(Clone, Debug)] pub enum IsHealthy { @@ -238,72 +167,19 @@ pub enum IsHealthy { } #[derive(Clone, Debug)] -pub enum HealthCheckMessage { +pub enum HealthcheckMessage { Health(IsHealthy), Shutdown, } -// Allows for shutdown to be converted to HealthCheckMessage -// Needed so that that the shutdown message can be send to multiple actors in the same way -impl From for HealthCheckMessage { +impl From for HealthcheckMessage { fn from(_: Shutdown) -> Self { - HealthCheckMessage::Shutdown + HealthcheckMessage::Shutdown } } -impl From for HealthCheckMessage { +impl From for HealthcheckMessage { fn from(is_healthy: IsHealthy) -> Self { Self::Health(is_healthy) } } - -/* -impl Message for IsHealthy { - type Result = Result; -} - -impl Handler for Healthcheck { - type Result = ResponseFuture; - - fn handle(&mut self, message: IsHealthy, context: &mut Self::Context) -> Self::Result { - let upstream = UpstreamRelay::from_registry(); - - if self.config.relay_mode() == RelayMode::Managed { - upstream - .send(IsNetworkOutage) - .map_err(|_| ()) - .map(|is_network_outage| { - metric!( - gauge(RelayGauges::NetworkOutage) = if is_network_outage { 1 } else { 0 } - ); - }) - .into_actor(self) - .spawn(context); - } - - match message { - IsHealthy::Liveness => Box::new(future::ok(true)), - IsHealthy::Readiness => { - if self.is_shutting_down { - return Box::new(future::ok(false)); - } - - let is_aggregator_full = Aggregator::from_registry() - .send(AcceptsMetrics) - .map_err(|_| ()); - let is_authenticated: Self::Result = if self.config.requires_auth() { - Box::new(upstream.send(IsAuthenticated).map_err(|_| ())) - } else { - Box::new(future::ok(true)) - }; - - Box::new( - is_aggregator_full - .join(is_authenticated) - .map(|(a, b)| a && b), - ) - } - } - } -} -*/ From 26756fae44db8a9e34c77fbb1a4414fc486ce390 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Wed, 27 Jul 2022 14:44:44 +0200 Subject: [PATCH 10/19] Cleanedup controller.rs --- relay-system/src/controller.rs | 56 +++------------------------------- 1 file changed, 4 insertions(+), 52 deletions(-) diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index c666a096c9..819b8c9f04 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -70,40 +70,12 @@ pub struct Controller { /// Subscribed actors for the shutdown message. subscribers: Vec>, - /// Subscribed actors for the shutdown message. - // new_subscribers: Vec>, - // Hand this out to actors that subscribe to us shutdown_receiver: ShutdownReceiver, // Use this to send the shutdown message to all the actors that are subscribed to us shutdown_sender: ShutdownSender, } -/// TODO -/* -pub trait ShutdownReceiver { - // Not sure if this can help us: https://stackoverflow.com/a/53989780/8076979 - // Removing `static also doesn't help - // `?Trait` is not permitted in supertraits - /// TODO - fn send<'a>( - &'a self, - message: Shutdown, - ) -> Pin + Send + 'a>>; - - /// TODO - fn foo(&self) -> Box - where - Self: Sized; - /* - fn clone(&self) -> Self - where - Self: Sized; - */ - // fn clone(&self) -> Box; // Complains if foo exists -} -*/ - impl Controller { /// Starts an actix system and runs the `factory` to start actors. /// @@ -145,17 +117,12 @@ impl Controller { } /// TODO - // So need to fix the registry - // Funny enough Controller can be accessed (as seen by being able to call this function) but - // System can not because system is an Actix thing 🤪 pub async fn subscribe_v2() -> ShutdownReceiver { - Controller::from_registry() // <- Pretty sure this is the call that breaks it all + Controller::from_registry() .send(SubscribeV2()) .compat() .await .unwrap() // FIXME: Remove this later - - // panic!() } /// Performs a graceful shutdown with the given timeout. @@ -168,7 +135,7 @@ impl Controller { // don't cancel the shutdown of other actors if one actor fails. // Send the message - let _ = self.shutdown_sender.send(Some(Shutdown { timeout })); // TODO Ask if it is better to have a warning or an elegant line + let _ = self.shutdown_sender.send(Some(Shutdown { timeout })); let futures: Vec<_> = self .subscribers @@ -195,20 +162,6 @@ impl Controller { }); fut::ok(()) }) - // TODO: Best approach till now but still not working :/ - /* - .and_then(move |_, slf, _| { - let new_futures: Vec<_> = slf // Would moving this out help us? - .new_subscribers - .iter() // <- Issue - .map(|recipient| recipient.send(Shutdown { timeout })) - .collect(); - let fut = futures03::future::join_all(new_futures) - .map(|_| ()) - .unit_error() - .compat(); - fut::wrap_future::<_, Controller>(fut) - })*/ .spawn(context); } } @@ -231,7 +184,7 @@ impl fmt::Debug for Controller { f.debug_struct("Controller") .field("timeout", &self.timeout) .field("subscribers", &self.subscribers.len()) // Ask if we need to update these here - .finish() + .finish() // TODO(tobias): Add the new fields } } @@ -311,9 +264,8 @@ impl Handler for Controller { } } -#[derive(Debug)] - /// TODO +#[derive(Debug)] pub struct SubscribeV2(); impl Message for SubscribeV2 { From 6ee55da9239f83db75f8e7b4f7b6f837b52ad5f2 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Thu, 28 Jul 2022 09:06:21 +0200 Subject: [PATCH 11/19] Addressed comments on the PR --- relay-server/src/actors/healthcheck.rs | 9 +++++++- relay-server/src/endpoints/healthcheck.rs | 1 - relay-system/src/controller.rs | 27 ++++++++++++----------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index cb45051a97..c10685c4d1 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -59,9 +59,13 @@ pub struct Healthcheck { } impl Healthcheck { - /// Returns the singleton instance of the Healthcheck service. + /// Returns the [`Addr`] of the [`Healthcheck`] actor. /// /// Prior to using this, the service must be started using [`Healthcheck::start`]. + /// + /// # Panics + /// + /// Panics if the service was not started using [`Healthcheck::start`] prior to this being used. pub fn from_registry() -> Addr { ADDRESS.read().as_ref().unwrap().clone() } @@ -127,6 +131,8 @@ impl Healthcheck { } /// Start this service, returning an [`Addr`] for communication. + /// + /// Blocks until the [`Controller`] is running pub fn start(mut self) -> Addr { let (tx, mut rx) = mpsc::unbounded_channel::>(); @@ -157,6 +163,7 @@ impl Healthcheck { } } +/// All the message types which can be sent to the [`Healthcheck`] actor. #[derive(Clone, Debug)] pub enum IsHealthy { /// Check if the Relay is alive at all. diff --git a/relay-server/src/endpoints/healthcheck.rs b/relay-server/src/endpoints/healthcheck.rs index 1bda8fcd41..c46a649552 100644 --- a/relay-server/src/endpoints/healthcheck.rs +++ b/relay-server/src/endpoints/healthcheck.rs @@ -33,7 +33,6 @@ impl HealthcheckResponse { } fn healthcheck_impl(message: IsHealthy) -> ResponseFuture { - // let addr = Healthcheck::from_registry(); let fut = async move { let addr = Healthcheck::from_registry(); addr.send(message.into()).await diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 819b8c9f04..8ded407f15 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -8,7 +8,6 @@ use futures::future; use futures::prelude::*; use futures03::compat::Future01CompatExt; use tokio::sync::watch; -// use futures03::{FutureExt, TryFutureExt}; #[doc(inline)] pub use actix::actors::signal::{Signal, SignalType}; @@ -59,21 +58,16 @@ pub use actix::actors::signal::{Signal, SignalType}; /// }).unwrap(); /// ``` -type ShutdownReceiver = watch::Receiver>; // Find a good name for this -type ShutdownSender = watch::Sender>; // Check of that is really needed - // Only used once but gives a nice symetry - /// TODO pub struct Controller { /// Configured timeout for graceful shutdowns. timeout: Duration, /// Subscribed actors for the shutdown message. subscribers: Vec>, - - // Hand this out to actors that subscribe to us - shutdown_receiver: ShutdownReceiver, - // Use this to send the shutdown message to all the actors that are subscribed to us - shutdown_sender: ShutdownSender, + /// Handed out to actors who wish to subscribe to the [`Shutdown`] message. + shutdown_receiver: watch::Receiver>, + /// The sender for the [`Shutdown`] message. + shutdown_sender: watch::Sender>, } impl Controller { @@ -116,8 +110,15 @@ impl Controller { Controller::from_registry().do_send(Subscribe(addr.recipient())) } - /// TODO - pub async fn subscribe_v2() -> ShutdownReceiver { + /// Subscribes to the [`Shutdown`] message to handle graceful shutdown. + /// + /// Returns a receiver for the [`Shutdown`] message, to be used to gracefully + /// shutdown. This sends a message to the [`Controller`] actor so will + /// block until this actor is running. + /// + /// TODO: The receiver of this message can not yet signal they have completed + /// shutdown. + pub async fn subscribe_v2() -> watch::Receiver> { Controller::from_registry() .send(SubscribeV2()) .compat() @@ -269,7 +270,7 @@ impl Handler for Controller { pub struct SubscribeV2(); impl Message for SubscribeV2 { - type Result = ShutdownReceiver; + type Result = watch::Receiver>; } impl Handler for Controller { From 1b5a34abc9ad9e20887f1e96e3d59c7fa7c4377c Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Thu, 28 Jul 2022 09:13:36 +0200 Subject: [PATCH 12/19] Adressed comments left on PR --- relay-system/src/controller.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 8ded407f15..efa96ba5ea 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -184,7 +184,7 @@ impl fmt::Debug for Controller { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Controller") .field("timeout", &self.timeout) - .field("subscribers", &self.subscribers.len()) // Ask if we need to update these here + .field("subscribers", &self.subscribers.len()) .finish() // TODO(tobias): Add the new fields } } @@ -265,7 +265,7 @@ impl Handler for Controller { } } -/// TODO +/// Internal message to handle subscribing to the [`Shutdown`] message. #[derive(Debug)] pub struct SubscribeV2(); @@ -276,8 +276,7 @@ impl Message for SubscribeV2 { impl Handler for Controller { type Result = MessageResult; - // TODO look into why and if msg and ctx are needed - fn handle(&mut self, _msg: SubscribeV2, _ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, _: SubscribeV2, _: &mut Self::Context) -> Self::Result { MessageResult(self.shutdown_receiver.clone()) } } From 6f1026599d5cdf2430194a770df7cd725b6276aa Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Thu, 28 Jul 2022 12:00:54 +0200 Subject: [PATCH 13/19] Addressed some more comments --- relay-server/src/actors/healthcheck.rs | 6 ++---- relay-system/src/controller.rs | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index c10685c4d1..a816727325 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -120,7 +120,7 @@ impl Healthcheck { fn handle_shutdown(&mut self) -> bool { self.is_shutting_down = true; - true // TODO: This should go away once messages are more generic + true // TODO(tobias): This should go away once messages are more generic } async fn handle(&mut self, message: HealthcheckMessage) -> bool { @@ -131,8 +131,6 @@ impl Healthcheck { } /// Start this service, returning an [`Addr`] for communication. - /// - /// Blocks until the [`Controller`] is running pub fn start(mut self) -> Addr { let (tx, mut rx) = mpsc::unbounded_channel::>(); @@ -163,7 +161,6 @@ impl Healthcheck { } } -/// All the message types which can be sent to the [`Healthcheck`] actor. #[derive(Clone, Debug)] pub enum IsHealthy { /// Check if the Relay is alive at all. @@ -173,6 +170,7 @@ pub enum IsHealthy { Readiness, } +/// All the message types which can be sent to the [`Healthcheck`] actor. #[derive(Clone, Debug)] pub enum HealthcheckMessage { Health(IsHealthy), diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index efa96ba5ea..a60ea1d311 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -57,8 +57,6 @@ pub use actix::actors::signal::{Signal, SignalType}; /// Ok(()) /// }).unwrap(); /// ``` - -/// TODO pub struct Controller { /// Configured timeout for graceful shutdowns. timeout: Duration, @@ -116,7 +114,7 @@ impl Controller { /// shutdown. This sends a message to the [`Controller`] actor so will /// block until this actor is running. /// - /// TODO: The receiver of this message can not yet signal they have completed + /// TODO(tobias): The receiver of this message can not yet signal they have completed /// shutdown. pub async fn subscribe_v2() -> watch::Receiver> { Controller::from_registry() @@ -136,7 +134,7 @@ impl Controller { // don't cancel the shutdown of other actors if one actor fails. // Send the message - let _ = self.shutdown_sender.send(Some(Shutdown { timeout })); + self.shutdown_sender.send(Some(Shutdown { timeout })).ok(); let futures: Vec<_> = self .subscribers From 8b51aaa5a80c3c93820e192ce1026fd104b8cf1e Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 29 Jul 2022 09:28:38 +0200 Subject: [PATCH 14/19] Added entry to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b67bf9c22..8b616bd187 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - Refactor profile processing into its own crate. ([#1340](https://github.com/getsentry/relay/pull/1340)) - Treat "unknown" transaction source as low cardinality, except for JS. ([#1352](https://github.com/getsentry/relay/pull/1352)) ## 22.7.0 +- Change to the internals of the healthcheck endpoint. ([#1349](https://github.com/getsentry/relay/pull/1349)) **Features**: From dfd863191993c9f2834688e5a7070f794a2220b0 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 29 Jul 2022 09:34:46 +0200 Subject: [PATCH 15/19] Moved entry to corrrect position --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc6fa347d5..431763bc00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,13 +12,13 @@ - Refactor profile processing into its own crate. ([#1340](https://github.com/getsentry/relay/pull/1340)) - Treat "unknown" transaction source as low cardinality for safe SDKs. ([#1352](https://github.com/getsentry/relay/pull/1352), [#1356](https://github.com/getsentry/relay/pull/1356)) - Conditionally write a default transaction source to the transaction payload. ([#1354](https://github.com/getsentry/relay/pull/1354)) +- Change to the internals of the healthcheck endpoint. ([#1349](https://github.com/getsentry/relay/pull/1349)) **Bug Fixes**: - Fix a bug where unreal crash reports were dropped when metrics extraction is enabled. ([#1355](https://github.com/getsentry/relay/pull/1355)) ## 22.7.0 -- Change to the internals of the healthcheck endpoint. ([#1349](https://github.com/getsentry/relay/pull/1349)) **Features**: From d76f35c85bc64f4aee2196a22763ecb02af86290 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Fri, 29 Jul 2022 11:33:42 +0200 Subject: [PATCH 16/19] Addressed comments --- relay-server/src/endpoints/healthcheck.rs | 8 ++++---- relay-server/src/service.rs | 1 + relay-system/Cargo.toml | 2 +- relay-system/src/controller.rs | 4 +--- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/relay-server/src/endpoints/healthcheck.rs b/relay-server/src/endpoints/healthcheck.rs index c46a649552..66a57ab92e 100644 --- a/relay-server/src/endpoints/healthcheck.rs +++ b/relay-server/src/endpoints/healthcheck.rs @@ -2,7 +2,7 @@ use ::actix::prelude::*; use actix_web::{Error, HttpResponse}; use futures::prelude::*; -use futures03::TryFutureExt; +use futures03::{FutureExt, TryFutureExt}; use serde::Serialize; use crate::service::ServiceApp; @@ -33,14 +33,14 @@ impl HealthcheckResponse { } fn healthcheck_impl(message: IsHealthy) -> ResponseFuture { - let fut = async move { + let fut = async { let addr = Healthcheck::from_registry(); addr.send(message.into()).await }; - let bp = Box::pin(fut); Box::new( - bp.compat() + fut.boxed_local() + .compat() .map_err(|_| ()) .and_then(move |is_healthy| { if !is_healthy { diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index f67ae0ad6a..a53bb5955b 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -119,6 +119,7 @@ impl ServiceState { let registry = system.registry(); let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) .enable_all() .on_thread_start(clone!(system, || System::set_current(system.clone()))) .build() diff --git a/relay-system/Cargo.toml b/relay-system/Cargo.toml index 1522d3f95a..a01fe5f49c 100644 --- a/relay-system/Cargo.toml +++ b/relay-system/Cargo.toml @@ -15,4 +15,4 @@ failure = "0.1.8" futures = "0.1.28" futures03 = { version = "0.3", package = "futures", features = ["compat"] } relay-log = { path = "../relay-log" } -tokio = { version = "1.0", features = ["rt-multi-thread"] } # in sync with reqwest +tokio = { version = "1.0", features = ["rt-multi-thread"] } diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index a60ea1d311..34a1ad283f 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -132,8 +132,6 @@ impl Controller { // Send a shutdown signal to all registered subscribers (including self). They will report // when the shutdown has completed. Note that we ignore all errors to make sure that we // don't cancel the shutdown of other actors if one actor fails. - - // Send the message self.shutdown_sender.send(Some(Shutdown { timeout })).ok(); let futures: Vec<_> = self @@ -183,7 +181,7 @@ impl fmt::Debug for Controller { f.debug_struct("Controller") .field("timeout", &self.timeout) .field("subscribers", &self.subscribers.len()) - .finish() // TODO(tobias): Add the new fields + .finish() } } From 52a7459a284947b9aaa3800c781556a1f377d667 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Tue, 2 Aug 2022 11:41:08 +0200 Subject: [PATCH 17/19] Added fix for legacy Actor message sending issue --- Cargo.lock | 2 + relay-server/src/actors/healthcheck.rs | 15 +++---- relay-system/Cargo.toml | 2 + relay-system/src/compat.rs | 58 ++++++++++++++++++++++++++ relay-system/src/controller.rs | 5 +++ relay-system/src/lib.rs | 1 + 6 files changed, 73 insertions(+), 10 deletions(-) create mode 100644 relay-system/src/compat.rs diff --git a/Cargo.lock b/Cargo.lock index 2ac0b306a4..0e593cd59c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3592,7 +3592,9 @@ dependencies = [ "failure", "futures 0.1.31", "futures 0.3.21", + "lazy_static", "relay-log", + "tokio 0.1.22", "tokio 1.19.2", ] diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index a816727325..93b7e4198c 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -1,14 +1,13 @@ use std::sync::Arc; use actix::SystemService; -use futures03::compat::Future01CompatExt; use parking_lot::RwLock; use tokio::sync::{mpsc, oneshot}; use relay_config::{Config, RelayMode}; use relay_metrics::{AcceptsMetrics, Aggregator}; use relay_statsd::metric; -use relay_system::{Controller, Shutdown}; +use relay_system::{compat, Controller, Shutdown}; use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay}; use crate::statsd::RelayGauges; @@ -84,7 +83,7 @@ impl Healthcheck { let upstream = UpstreamRelay::from_registry(); if self.config.relay_mode() == RelayMode::Managed { - let fut = upstream.send(IsNetworkOutage).compat(); + let fut = compat::send(upstream.clone(), IsNetworkOutage); tokio::spawn(async move { if let Ok(is_outage) = fut.await { metric!(gauge(RelayGauges::NetworkOutage) = if is_outage { 1 } else { 0 }); @@ -93,25 +92,21 @@ impl Healthcheck { } match message { - IsHealthy::Liveness => true, // Liveness always returns true + IsHealthy::Liveness => true, IsHealthy::Readiness => { if self.is_shutting_down { return false; } if self.config.requires_auth() - && !upstream - .send(IsAuthenticated) - .compat() + && !compat::send(upstream, IsAuthenticated) .await .unwrap_or(false) { return false; } - Aggregator::from_registry() - .send(AcceptsMetrics) - .compat() + compat::send(Aggregator::from_registry(), AcceptsMetrics) .await .unwrap_or(false) } diff --git a/relay-system/Cargo.toml b/relay-system/Cargo.toml index a01fe5f49c..e37d03c0c8 100644 --- a/relay-system/Cargo.toml +++ b/relay-system/Cargo.toml @@ -14,5 +14,7 @@ actix = "0.7.9" failure = "0.1.8" futures = "0.1.28" futures03 = { version = "0.3", package = "futures", features = ["compat"] } +lazy_static = "1.4.0" relay-log = { path = "../relay-log" } tokio = { version = "1.0", features = ["rt-multi-thread"] } +tokio01 = { version = "0.1", package = "tokio" } diff --git a/relay-system/src/compat.rs b/relay-system/src/compat.rs new file mode 100644 index 0000000000..aa5ab3adef --- /dev/null +++ b/relay-system/src/compat.rs @@ -0,0 +1,58 @@ +//! Compatibility layer for bridging between `actix` and `tokio` 1.0. +//! +//! Needed to allow services to communicate with the legacy actors (using `actix` 0.7.9). +//! The problem that is addressed arises when the queues of the channels fill up and tasks are +//! getting parked. When that happens `task::current()` is executed which panics. + +use actix::prelude::*; +use futures::prelude::*; +use futures03::channel::oneshot; + +lazy_static::lazy_static! { + /// Needed so that the System is running by the time RUNTIME is first initialized. + static ref ACTIX_SYSTEM: System = System::current(); + + /// Custom `tokio` 0.1 runtime. + /// + /// Needed when sending messages from the new Actors to old Actors to avoid panics when the + /// channel queues fill up and tasks are being parked. + static ref RUNTIME: tokio01::runtime::Runtime = tokio01::runtime::Builder::new() + .core_threads(1) + .blocking_threads(1) + .after_start(|| System::set_current(ACTIX_SYSTEM.clone())) + .build() + .unwrap(); + + /// Needed so that `EXECUTOR.spawn` can be called [`send`]. + static ref EXECUTOR: tokio01::runtime::TaskExecutor = RUNTIME.executor(); +} + +/// Initializes the compatibility layer. +pub(crate) fn init() { + lazy_static::initialize(&ACTIX_SYSTEM); + lazy_static::initialize(&EXECUTOR); +} + +/// Sends a message to an actor using `actix` 0.7 from a `tokio` 1.0 context. +/// +/// The message is internally forwarded through a `tokio` 0.1 runtime in order to avoid panics when +/// the channel queues fill up and tasks are getting parked. +/// +/// # Panics +/// +/// Panics if this is invoked outside of the [`Controller`](crate::Controller). +pub async fn send(addr: Addr, msg: M) -> Result +where + A: Actor + Send, // NOTE: Addr only implements Send if the actor does + M: Message + Send + 'static, + M::Result: Send, + A: Handler, + A::Context: dev::ToEnvelope, +{ + let (tx, rx) = oneshot::channel(); + let f = futures::future::lazy(move || addr.send(msg)) + .then(|res| tx.send(res)) + .map_err(|_| ()); + EXECUTOR.spawn(f); + rx.await.map_err(|_| MailboxError::Closed)? +} diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 34a1ad283f..ad3538ec00 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -12,6 +12,8 @@ use tokio::sync::watch; #[doc(inline)] pub use actix::actors::signal::{Signal, SignalType}; +use crate::compat; + /// Actor to start and gracefully stop an actix system. /// /// This actor contains a static `run` method which will run an actix system and block the current @@ -81,6 +83,9 @@ impl Controller { { let sys = System::new("relay"); + // TODO + compat::init(); + // Run the factory and exit early if an error happens. The return value of the factory is // discarded for convenience, to allow shorthand notations. factory()?; diff --git a/relay-system/src/lib.rs b/relay-system/src/lib.rs index 112194c453..c115a7eb2b 100644 --- a/relay-system/src/lib.rs +++ b/relay-system/src/lib.rs @@ -13,6 +13,7 @@ html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" )] +pub mod compat; mod controller; pub use self::controller::*; From 41838a84e03117ae28f466de476d18c00783a8c1 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Tue, 2 Aug 2022 11:44:20 +0200 Subject: [PATCH 18/19] Updated changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 431763bc00..53fdd1d262 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ - Refactor profile processing into its own crate. ([#1340](https://github.com/getsentry/relay/pull/1340)) - Treat "unknown" transaction source as low cardinality for safe SDKs. ([#1352](https://github.com/getsentry/relay/pull/1352), [#1356](https://github.com/getsentry/relay/pull/1356)) - Conditionally write a default transaction source to the transaction payload. ([#1354](https://github.com/getsentry/relay/pull/1354)) -- Change to the internals of the healthcheck endpoint. ([#1349](https://github.com/getsentry/relay/pull/1349)) +- Change to the internals of the healthcheck endpoint. ([#1374](https://github.com/getsentry/relay/pull/1374)) **Bug Fixes**: From eac0ee1815734469b4eb7b997752841a24401a86 Mon Sep 17 00:00:00 2001 From: tobias-wilfert Date: Tue, 2 Aug 2022 11:48:00 +0200 Subject: [PATCH 19/19] Removed TODO --- relay-system/src/controller.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index ad3538ec00..331a6f56a2 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -83,7 +83,6 @@ impl Controller { { let sys = System::new("relay"); - // TODO compat::init(); // Run the factory and exit early if an error happens. The return value of the factory is