Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(actix): Update Healthcheck Actor [INGEST-1481] #1374

Merged
merged 23 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c248880
Pair programming session, getting started
tobias-wilfert Jul 14, 2022
4da1f67
Partially updated the Healthcheck actor
tobias-wilfert Jul 21, 2022
4571eb0
Healthcheck wip
tobias-wilfert Jul 22, 2022
f49d362
Cleaned up code
tobias-wilfert Jul 25, 2022
6e70355
Added the copie flag again
tobias-wilfert Jul 25, 2022
91efb1a
Possible fix for registry
tobias-wilfert Jul 27, 2022
7a2a5d3
removed unused imports
tobias-wilfert Jul 27, 2022
e0b2578
Cleanedup service.rs
tobias-wilfert Jul 27, 2022
c2ca319
Cleaned up healthchecck.rs
tobias-wilfert Jul 27, 2022
26756fa
Cleanedup controller.rs
tobias-wilfert Jul 27, 2022
6ee55da
Addressed comments on the PR
tobias-wilfert Jul 28, 2022
1b5a34a
Adressed comments left on PR
tobias-wilfert Jul 28, 2022
6f10265
Addressed some more comments
tobias-wilfert Jul 28, 2022
89d975f
Merge branch 'master' into tobias-wilfert/future-proofing-relay
tobias-wilfert Jul 28, 2022
8b51aaa
Added entry to changelog
tobias-wilfert Jul 29, 2022
38d7cde
Merge branch 'master' into tobias-wilfert/future-proofing-relay
tobias-wilfert Jul 29, 2022
dfd8631
Moved entry to corrrect position
tobias-wilfert Jul 29, 2022
d76f35c
Addressed comments
tobias-wilfert Jul 29, 2022
5d5b050
Merge branch 'master' into tobias-wilfert/future-proofing-relay
tobias-wilfert Aug 1, 2022
6542a7b
Merge branch 'master' into tobias-wilfert/future-proofing-relay
tobias-wilfert Aug 2, 2022
52a7459
Added fix for legacy Actor message sending issue
tobias-wilfert Aug 2, 2022
41838a8
Updated changelog
tobias-wilfert Aug 2, 2022
eac0ee1
Removed TODO
tobias-wilfert Aug 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Refactor profile processing into its own crate. ([#1340](https://github.com/getsentry/relay/pull/1340))
- Treat "unknown" transaction source as low cardinality for safe SDKs. ([#1352](https://github.com/getsentry/relay/pull/1352), [#1356](https://github.com/getsentry/relay/pull/1356))
- Conditionally write a default transaction source to the transaction payload. ([#1354](https://github.com/getsentry/relay/pull/1354))
- Change to the internals of the healthcheck endpoint. ([#1374](https://github.com/getsentry/relay/pull/1374))

**Bug Fixes**:

Expand Down
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

205 changes: 139 additions & 66 deletions relay-server/src/actors/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,162 @@
use std::sync::Arc;

use actix::prelude::*;

use futures::future;
use futures::prelude::*;
use actix::SystemService;
use parking_lot::RwLock;
use tokio::sync::{mpsc, oneshot};

use relay_config::{Config, RelayMode};
use relay_metrics::{AcceptsMetrics, Aggregator};
use relay_statsd::metric;
use relay_system::{Controller, Shutdown};
use relay_system::{compat, Controller, Shutdown};

use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay};
use crate::statsd::RelayGauges;

lazy_static::lazy_static! {
/// Singleton of the `Healthcheck` service.
static ref ADDRESS: RwLock<Option<Addr<HealthcheckMessage>>> = RwLock::new(None);
}

/// Internal wrapper of a message sent through an `Addr` with return channel.
#[derive(Debug)]
struct Message<T> {
data: T,
// TODO(tobias): This is hard-coded to return `bool`.
responder: oneshot::Sender<bool>,
}

/// An error when [sending](Addr::send) a message to a service fails.
pub struct SendError;

/// Channel for sending public messages into a service.
///
/// To send a message, use [`Addr::send`].
#[derive(Clone, Debug)]
pub struct Addr<T> {
tx: mpsc::UnboundedSender<Message<T>>,
}

impl<T> Addr<T> {
/// Sends an asynchronous message to the service and waits for the response.
///
/// The result of the message does not have to be awaited. The message will be delivered and
/// handled regardless. The communication channel with the service is unbounded, so backlogs
/// could occur when sending too many messages.
///
/// Sending the message can fail with `Err(SendError)` if the service has shut down.
pub async fn send(&self, data: T) -> Result<bool, SendError> {
let (responder, rx) = oneshot::channel();
let message = Message { data, responder };
self.tx.send(message).map_err(|_| SendError)?;
rx.await.map_err(|_| SendError)
}
}

pub struct Healthcheck {
is_shutting_down: bool,
config: Arc<Config>,
}

impl Healthcheck {
/// Returns the [`Addr`] of the [`Healthcheck`] actor.
///
/// Prior to using this, the service must be started using [`Healthcheck::start`].
///
/// # Panics
///
/// Panics if the service was not started using [`Healthcheck::start`] prior to this being used.
pub fn from_registry() -> Addr<HealthcheckMessage> {
ADDRESS.read().as_ref().unwrap().clone()
}

/// Creates a new instance of the Healthcheck service.
///
/// The service does not run. To run the service, use [`start`](Self::start).
pub fn new(config: Arc<Config>) -> Self {
Healthcheck {
is_shutting_down: false,
config,
}
}
}

impl Actor for Healthcheck {
type Context = Context<Self>;
async fn handle_is_healthy(&mut self, message: IsHealthy) -> bool {
let upstream = UpstreamRelay::from_registry();

fn started(&mut self, context: &mut Self::Context) {
Controller::subscribe(context.address());
}
}
if self.config.relay_mode() == RelayMode::Managed {
let fut = compat::send(upstream.clone(), IsNetworkOutage);
tokio::spawn(async move {
if let Ok(is_outage) = fut.await {
metric!(gauge(RelayGauges::NetworkOutage) = if is_outage { 1 } else { 0 });
}
});
}

impl Supervised for Healthcheck {}
match message {
IsHealthy::Liveness => true,
IsHealthy::Readiness => {
if self.is_shutting_down {
return false;
}

impl SystemService for Healthcheck {}
if self.config.requires_auth()
&& !compat::send(upstream, IsAuthenticated)
.await
.unwrap_or(false)
{
return false;
}

impl Default for Healthcheck {
fn default() -> Self {
unimplemented!("register with the SystemRegistry instead")
compat::send(Aggregator::from_registry(), AcceptsMetrics)
.await
.unwrap_or(false)
}
}
}
}

impl Handler<Shutdown> for Healthcheck {
type Result = Result<(), ()>;

fn handle(&mut self, _message: Shutdown, _context: &mut Self::Context) -> Self::Result {
fn handle_shutdown(&mut self) -> bool {
self.is_shutting_down = true;
Ok(())
true // TODO(tobias): This should go away once messages are more generic
}

async fn handle(&mut self, message: HealthcheckMessage) -> bool {
match message {
HealthcheckMessage::Health(message) => self.handle_is_healthy(message).await,
HealthcheckMessage::Shutdown => self.handle_shutdown(),
}
}

/// Start this service, returning an [`Addr`] for communication.
pub fn start(mut self) -> Addr<HealthcheckMessage> {
let (tx, mut rx) = mpsc::unbounded_channel::<Message<_>>();

let addr = Addr { tx };
*ADDRESS.write() = Some(addr.clone());

tokio::spawn(async move {
while let Some(message) = rx.recv().await {
// TODO(tobias): This does not allow for concurrent execution.
let response = self.handle(message.data).await;
message.responder.send(response).ok();
}
});

// Forward shutdown signals to the main message channel
let shutdown_addr = addr.clone();
tokio::spawn(async move {
let mut shutdown_rx = Controller::subscribe_v2().await;

while shutdown_rx.changed().await.is_ok() {
if shutdown_rx.borrow_and_update().is_some() {
let _ = shutdown_addr.send(HealthcheckMessage::Shutdown);
}
}
});

addr
}
}

#[derive(Clone, Debug)]
pub enum IsHealthy {
/// Check if the Relay is alive at all.
Liveness,
Expand All @@ -62,51 +165,21 @@ pub enum IsHealthy {
Readiness,
}

impl Message for IsHealthy {
type Result = Result<bool, ()>;
/// All the message types which can be sent to the [`Healthcheck`] actor.
#[derive(Clone, Debug)]
pub enum HealthcheckMessage {
Health(IsHealthy),
Shutdown,
}

impl Handler<IsHealthy> for Healthcheck {
type Result = ResponseFuture<bool, ()>;

fn handle(&mut self, message: IsHealthy, context: &mut Self::Context) -> Self::Result {
let upstream = UpstreamRelay::from_registry();

if self.config.relay_mode() == RelayMode::Managed {
upstream
.send(IsNetworkOutage)
.map_err(|_| ())
.map(|is_network_outage| {
metric!(
gauge(RelayGauges::NetworkOutage) = if is_network_outage { 1 } else { 0 }
);
})
.into_actor(self)
.spawn(context);
}

match message {
IsHealthy::Liveness => Box::new(future::ok(true)),
IsHealthy::Readiness => {
if self.is_shutting_down {
return Box::new(future::ok(false));
}
impl From<Shutdown> for HealthcheckMessage {
fn from(_: Shutdown) -> Self {
HealthcheckMessage::Shutdown
}
}

let is_aggregator_full = Aggregator::from_registry()
.send(AcceptsMetrics)
.map_err(|_| ());
let is_authenticated: Self::Result = if self.config.requires_auth() {
Box::new(upstream.send(IsAuthenticated).map_err(|_| ()))
} else {
Box::new(future::ok(true))
};

Box::new(
is_aggregator_full
.join(is_authenticated)
.map(|(a, b)| a && b),
)
}
}
impl From<IsHealthy> for HealthcheckMessage {
fn from(is_healthy: IsHealthy) -> Self {
Self::Health(is_healthy)
}
}
11 changes: 8 additions & 3 deletions relay-server/src/endpoints/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use ::actix::prelude::*;
use actix_web::{Error, HttpResponse};
use futures::prelude::*;
use futures03::{FutureExt, TryFutureExt};
use serde::Serialize;

use crate::service::ServiceApp;
Expand Down Expand Up @@ -32,11 +33,15 @@ impl HealthcheckResponse {
}

fn healthcheck_impl(message: IsHealthy) -> ResponseFuture<HttpResponse, Error> {
let fut = async {
let addr = Healthcheck::from_registry();
addr.send(message.into()).await
};

Box::new(
Healthcheck::from_registry()
.send(message)
fut.boxed_local()
.compat()
.map_err(|_| ())
.flatten()
.and_then(move |is_healthy| {
if !is_healthy {
Err(())
Expand Down
21 changes: 19 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use failure::{Backtrace, Context, Fail};
use listenfd::ListenFd;

use relay_aws_extension::AwsExtension;
use relay_common::clone;
use relay_config::Config;
use relay_metrics::Aggregator;
use relay_redis::RedisPool;
Expand Down Expand Up @@ -108,6 +109,7 @@ impl From<Context<ServerErrorKind>> for ServerError {
#[derive(Clone)]
pub struct ServiceState {
config: Arc<Config>,
_runtime: Arc<tokio::runtime::Runtime>,
}

impl ServiceState {
Expand All @@ -116,6 +118,16 @@ impl ServiceState {
let system = System::current();
let registry = system.registry();

let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.on_thread_start(clone!(system, || System::set_current(system.clone())))
.build()
.unwrap();

// Enter the tokio runtime so we can start spawning tasks from the outside.
let _guard = runtime.enter();

let upstream_relay = UpstreamRelay::new(config.clone());
registry.set(Arbiter::start(|_| upstream_relay));

Expand All @@ -136,7 +148,9 @@ impl ServiceState {

let project_cache = ProjectCache::new(config.clone(), redis_pool).start();
registry.set(project_cache.clone());
registry.set(Healthcheck::new(config.clone()).start());

Healthcheck::new(config.clone()).start(); // TODO(tobias): Registry is implicit

registry.set(RelayCache::new(config.clone()).start());
registry
.set(Aggregator::new(config.aggregator_config(), project_cache.recipient()).start());
Expand All @@ -150,7 +164,10 @@ impl ServiceState {
}
}

Ok(ServiceState { config })
Ok(ServiceState {
config,
_runtime: Arc::new(runtime),
})
}

/// Returns an atomically counted reference to the config.
Expand Down
4 changes: 4 additions & 0 deletions relay-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ publish = false
actix = "0.7.9"
failure = "0.1.8"
futures = "0.1.28"
futures03 = { version = "0.3", package = "futures", features = ["compat"] }
lazy_static = "1.4.0"
relay-log = { path = "../relay-log" }
tokio = { version = "1.0", features = ["rt-multi-thread"] }
tokio01 = { version = "0.1", package = "tokio" }
Loading