Skip to content

Commit

Permalink
health check
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Nov 14, 2024
1 parent 0a28e44 commit d6bbdcc
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 15 deletions.
13 changes: 9 additions & 4 deletions relay-server/src/endpoints/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ struct Status {
is_healthy: bool,
}

const UNHEALTHY: (hyper::StatusCode, axum::Json<Status>) = (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(Status { is_healthy: false }),
);

pub async fn handle(state: ServiceState, Path(kind): Path<IsHealthy>) -> impl IntoResponse {
if state.has_crashed() {
return UNHEALTHY;
}
match state.health_check().send(kind).await {
Ok(HealthStatus::Healthy) => (StatusCode::OK, axum::Json(Status { is_healthy: true })),
_ => (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(Status { is_healthy: false }),
),
_ => UNHEALTHY,
}
}

Expand Down
14 changes: 9 additions & 5 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,15 @@ pub fn run(config: Config) -> anyhow::Result<()> {
runner.start(HttpServer::new(config, state.clone())?);

tokio::select! {
_ = runner.join() => {},
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
// that every service finished its main task.
// See also https://github.com/getsentry/relay/issues/4050.
_ = runner.join(|e| {
if e.is_panic() {
state.report_crash();
}
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
// that every service finished its main task.
// See also https://github.com/getsentry/relay/issues/4050.
}) => {},
_ = Controller::shutdown_handle().finished() => {}
}

Expand Down
19 changes: 19 additions & 0 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::convert::Infallible;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -135,6 +136,7 @@ struct StateInner {
config: Arc<Config>,
memory_checker: MemoryChecker,
registry: Registry,
service_crashed: AtomicBool,
}

/// Server state.
Expand Down Expand Up @@ -333,6 +335,7 @@ impl ServiceState {
config: config.clone(),
memory_checker: MemoryChecker::new(memory_stat, config.clone()),
registry,
service_crashed: false.into(),
};

Ok((
Expand Down Expand Up @@ -409,6 +412,22 @@ impl ServiceState {
pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
&self.inner.registry.outcome_aggregator
}

/// Checks whether one of the services has crashed.
pub fn has_crashed(&self) -> bool {
self.inner
.service_crashed
.load(std::sync::atomic::Ordering::Relaxed)
}

/// Reports that one of the services has crashed.
///
/// Marks the entire service state as unhealthy.
pub fn report_crash(&self) {
self.inner
.service_crashed
.store(true, std::sync::atomic::Ordering::Relaxed);
}
}

fn create_redis_pool(redis_config: RedisConfigRef) -> Result<RedisPool, RedisError> {
Expand Down
9 changes: 3 additions & 6 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::future::Shared;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::task::{JoinError, JoinHandle};
use tokio::time::MissedTickBehavior;

use crate::statsd::{SystemCounters, SystemGauges};
Expand Down Expand Up @@ -1060,13 +1060,10 @@ impl ServiceRunner {
/// Awaits until all services have finished.
///
/// Panics if one of the spawned services has panicked.
pub async fn join(&mut self) {
pub async fn join<F: Fn(JoinError)>(&mut self, error_handler: F) {
while let Some(res) = self.0.next().await {
if let Err(e) = res {
if e.is_panic() {
// Re-trigger panic to terminate the process:
std::panic::resume_unwind(e.into_panic());
}
error_handler(e);
}
}
}
Expand Down

0 comments on commit d6bbdcc

Please sign in to comment.