Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Make health_check fully async #3567

Merged
merged 18 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Bug fixes:**

- Properly handle AI metrics from the Python SDK's `@ai_track` decorator ([#3539](https://github.com/getsentry/relay/pull/3539))
- Mitigates occasional slowness and timeouts of the healthcheck endpoint. The endpoint will now respond promptly an unhealthy state. ([#3567](https://github.com/getsentry/relay/pull/3567))

**Internal**:

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ where
.route("/api/:project_id/minidump/", minidump::route(config))
.route("/api/:project_id/events/:event_id/attachments/", attachments::route(config))
.route("/api/:project_id/unreal/:sentry_key/", unreal::route(config))
.route("/api/:project_id/spans/", spans::route(config))
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
.route("/api/:project_id/spans/", spans::route(config))
.route_layer(middlewares::cors());

let router = Router::new();
Expand Down
205 changes: 83 additions & 122 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, PoisonError};
use std::time::{Duration, Instant};
use std::sync::Arc;

use relay_config::{Config, RelayMode};
use relay_metrics::{AcceptsMetrics, Aggregator};
use relay_statsd::metric;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use relay_system::{
Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service, ShutdownHandle,
};
use std::future::Future;
use sysinfo::{MemoryRefreshKind, RefreshKind, System};
use tokio::time::timeout;
use sysinfo::{MemoryRefreshKind, System};
use tokio::sync::watch;
use tokio::time::{timeout, Instant};

use crate::services::project_cache::{ProjectCache, SpoolHealth};
use crate::services::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay};
Expand All @@ -26,16 +27,6 @@ pub enum IsHealthy {
Readiness,
}

impl IsHealthy {
/// Returns the name of the variant, either `liveness` or `healthy`.
fn variant(&self) -> &'static str {
match self {
Self::Liveness => "liveness",
Self::Readiness => "readiness",
}
}
}

/// Health check status.
#[derive(Debug, Copy, Clone)]
pub enum Status {
Expand Down Expand Up @@ -76,15 +67,31 @@ impl FromMessage<IsHealthy> for HealthCheck {
}
}

#[derive(Debug)]
struct Statuses {
live: Status,
ready: Status,
instant: Instant,
}

impl Statuses {
pub fn healthy() -> Self {
Self {
live: Status::Healthy,
ready: Status::Healthy,
instant: Instant::now(),
}
}
}

/// Service implementing the [`HealthCheck`] interface.
#[derive(Debug)]
pub struct HealthCheckService {
is_shutting_down: AtomicBool,
config: Arc<Config>,
aggregator: Addr<Aggregator>,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
system: Mutex<SystemInfo>,
system: System,
}

impl HealthCheckService {
Expand All @@ -97,22 +104,29 @@ impl HealthCheckService {
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
) -> Self {
HealthCheckService {
is_shutting_down: AtomicBool::new(false),
system: Mutex::new(SystemInfo::new(config.health_sys_info_refresh_interval())),
config,
Self {
system: System::new(),
aggregator,
upstream_relay,
project_cache,
config,
}
}

async fn system_memory_probe(&self) -> Status {
let memory = {
self.system
.lock()
.unwrap_or_else(PoisonError::into_inner)
.memory()
fn system_memory_probe(&mut self) -> Status {
self.system
.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());

// Use the cgroup if available in case Relay is running in a container.
let memory = match self.system.cgroup_limits() {
Some(cgroup) => Memory {
used: cgroup.total_memory.saturating_sub(cgroup.free_memory),
total: cgroup.total_memory,
},
None => Memory {
used: self.system.used_memory(),
total: self.system.total_memory(),
},
};

metric!(gauge(RelayGauges::SystemMemoryUsed) = memory.used);
Expand Down Expand Up @@ -182,127 +196,74 @@ impl HealthCheckService {
}
}

async fn handle_is_healthy(&self, message: IsHealthy) -> Status {
let upstream = self.upstream_relay.clone();

async fn check_readiness(&mut self, shutdown: &ShutdownHandle) -> Status {
// Internal metric that we need to be logged in recurring intervals. This is a form of
// health check, but it does not contribute to the status of this service.
if self.config.relay_mode() == RelayMode::Managed {
let fut = upstream.send(IsNetworkOutage);
tokio::spawn(async move {
if let Ok(is_outage) = fut.await {
metric!(gauge(RelayGauges::NetworkOutage) = u64::from(is_outage));
}
});
}

if matches!(message, IsHealthy::Liveness) {
return Status::Healthy;
if let Ok(is_outage) = self.upstream_relay.send(IsNetworkOutage).await {
metric!(gauge(RelayGauges::NetworkOutage) = u64::from(is_outage));
}
}
jan-auer marked this conversation as resolved.
Show resolved Hide resolved

if self.is_shutting_down.load(Ordering::Relaxed) {
if shutdown.get().is_some() {
return Status::Unhealthy;
}

// System memory is sync and requires mutable access, but we still want to log errors.
let sys_mem = self.system_memory_probe();

let (sys_mem, auth, agg, proj) = tokio::join!(
self.probe("system memory", self.system_memory_probe()),
self.probe("system memory", async { sys_mem }),
self.probe("auth", self.auth_probe()),
self.probe("aggregator", self.aggregator_probe()),
self.probe("spool health", self.spool_health_probe()),
);

Status::from_iter([sys_mem, auth, agg, proj])
}

async fn handle_message(&self, message: HealthCheck) {
let HealthCheck(message, sender) = message;

let ty = message.variant();
let response = relay_statsd::metric!(
timer(RelayTimers::HealthCheckDuration),
type = ty,
{ self.handle_is_healthy(message).await }
);

sender.send(response);
}
}

impl Service for HealthCheckService {
type Interface = HealthCheck;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let service = Arc::new(self);
fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
let (status_tx, status_rx) = watch::channel(Statuses::healthy());
let check_interval = self.config.health_sys_info_refresh_interval();
// Add 10% buffer to the internal timeouts to avoid race conditions.
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);

tokio::spawn(async move {
let mut shutdown = Controller::shutdown_handle();
let shutdown = Controller::shutdown_handle();

jan-auer marked this conversation as resolved.
Show resolved Hide resolved
loop {
tokio::select! {
biased;

Some(message) = rx.recv() => {
let service = service.clone();
tokio::spawn(async move { service.handle_message(message).await });
}
_ = shutdown.notified() => {
service.is_shutting_down.store(true, Ordering::Relaxed);
}
}
let ready = relay_statsd::metric!(
timer(RelayTimers::HealthCheckDuration),
type = "readiness",
{ self.check_readiness(&shutdown).await }
);

let _ = status_tx.send(Statuses {
live: Status::Healthy,
ready,
instant: Instant::now(),
});

tokio::time::sleep(check_interval).await;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in many other places we instead create a tokio::time::interval and then use tokio::select! with the shutdown handle and the interval. I opted for a different solution here as we don't need to stick to strict intervals and this version ended up less code. The alternative would look like this:

let mut ticker = tokio::time::interval(check_interval);
let mut shutdown = Controller::shutdown_handle();

loop {
    let _ = update_tx.send(StatusUpdate::new(relay_statsd::metric!(
        timer(RelayTimers::HealthCheckDuration),
        type = "readiness",
        { self.check_readiness().await }
    )));

    tokio::select! {
        biased;
        _ = ticker.tick() => (),
        _ = shutdown.notified() => break,
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the join!, if one health check times out, other unrelated health checks will be delayed as well IIUC. Unless the timeout is always lower than the check interval.

We could make this a sleep_until(next_check_time) to be a little more robust against timeouts spilling over the check interval.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the pattern from other places if you prefer, but please check #3567 (comment)

The validity timeout is the interval + poll timeout, so we'd never timeout prematurely.

});
}
}

#[derive(Debug)]
struct SystemInfo {
system: System,
last_refresh: Instant,
refresh_interval: Duration,
}

impl SystemInfo {
/// Creates a new [`SystemInfo`] to query system information.
///
/// System information updates are debounced with the passed `refresh_interval`.
pub fn new(refresh_interval: Duration) -> Self {
let system = System::new_with_specifics(
RefreshKind::new().with_memory(MemoryRefreshKind::everything()),
);

Self {
system,
last_refresh: Instant::now(),
refresh_interval,
}
}

/// Current snapshot of system memory.
///
/// On Linux systems it uses the cgroup limits to determine used and total memory,
/// if available.
pub fn memory(&mut self) -> Memory {
self.refresh();

// Use the cgroup if available in case Relay is running in a container.
if let Some(cgroup) = self.system.cgroup_limits() {
Memory {
used: cgroup.total_memory.saturating_sub(cgroup.free_memory),
total: cgroup.total_memory,
}
} else {
Memory {
used: self.system.used_memory(),
total: self.system.total_memory(),
tokio::spawn(async move {
while let Some(HealthCheck(message, sender)) = rx.recv().await {
let statuses = status_rx.borrow();
let is_valid = statuses.instant.elapsed() < status_timeout;

sender.send(match (is_valid, message) {
(false, _) => Status::Unhealthy,
(_, IsHealthy::Liveness) => statuses.live,
(_, IsHealthy::Readiness) => statuses.ready,
});
}
}
}

fn refresh(&mut self) {
if self.last_refresh.elapsed() >= self.refresh_interval {
self.system
.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());

self.last_refresh = Instant::now();
}
});
}
}

Expand Down
5 changes: 5 additions & 0 deletions relay-system/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ static MANUAL_SHUTDOWN: Lazy<Channel<ShutdownMode>> = Lazy::new(|| watch::channe
pub struct ShutdownHandle(watch::Receiver<Option<Shutdown>>);

impl ShutdownHandle {
/// Returns the current shutdown state.
pub fn get(&self) -> Option<Shutdown> {
self.0.borrow().clone()
}

/// Wait for a shutdown.
///
/// This receives all shutdown signals since the [`Controller`] has been started, even before
Expand Down
Loading