Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Use a single tokio runtime, adjust processor task count #3516

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Internal**:

- Add metrics extraction config to global config. ([#3490](https://github.com/getsentry/relay/pull/3490), [#3504](https://github.com/getsentry/relay/pull/3504))
- Adjust worker thread distribution of internal services. ([#3516](https://github.com/getsentry/relay/pull/3516))
- Extract `cache.item_size` from measurements instead of data. ([#3510](https://github.com/getsentry/relay/pull/3510))
- Collect `enviornment` tag as part of exclusive_time_light for cache spans. ([#3510](https://github.com/getsentry/relay/pull/3510))

Expand Down
2 changes: 1 addition & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ struct Limits {
max_replay_message_size: ByteSize,
/// The maximum number of threads to spawn for CPU and web work, each.
///
/// The total number of threads spawned will roughly be `2 * max_thread_count + 1`. Defaults to
/// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add to this comment that Relay will use at most max_thread_count CPU?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current setup this isn't gurantueed right? If the runtime needs more processing power than expected it can go over.

/// the number of logical CPU cores on the host.
max_thread_count: usize,
/// The maximum number of seconds a query is allowed to take across retries. Individual requests
Expand Down
10 changes: 2 additions & 8 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ use std::sync::Arc;
use relay_config::Config;
use relay_system::{Controller, Service};

use crate::service::{Runtimes, ServiceState};
use crate::service::ServiceState;
use crate::services::server::HttpServer;

/// Runs a relay web server and spawns all internal worker threads.
Expand All @@ -291,23 +291,17 @@ pub fn run(config: Config) -> anyhow::Result<()> {
// Creates the main runtime.
let main_runtime = crate::service::create_runtime("main-rt", config.cpu_concurrency());

// Create secondary service runtimes.
//
// Runtimes must not be dropped within other runtimes, so keep them alive here.
let runtimes = Runtimes::new(&config);

// Run the system and block until a shutdown signal is sent to this process. Inside, start a
// web server and run all relevant services. See the `actors` module documentation for more
// information on all services.
main_runtime.block_on(async {
Controller::start(config.shutdown_timeout());
let service = ServiceState::start(config.clone(), &runtimes)?;
let service = ServiceState::start(config.clone())?;
HttpServer::new(config, service.clone())?.start();
Controller::shutdown_handle().finished().await;
anyhow::Ok(())
})?;

drop(runtimes);
drop(main_runtime);

relay_log::info!("relay shutdown complete");
Expand Down
55 changes: 12 additions & 43 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ pub struct ServiceState {

impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub fn start(config: Arc<Config>, runtimes: &Runtimes) -> Result<Self> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start_in(&runtimes.upstream);
pub fn start(config: Arc<Config>) -> Result<Self> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start();
let test_store = TestStoreService::new(config.clone()).start();

let redis_pool = match config.redis() {
Expand All @@ -117,9 +117,8 @@ impl ServiceState {
upstream_relay.clone(),
processor.clone(),
)?
.start_in(&runtimes.outcome);
let outcome_aggregator =
OutcomeAggregator::new(&config, outcome_producer.clone()).start_in(&runtimes.outcome);
.start();
let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start();

let global_config = GlobalConfigService::new(config.clone(), upstream_relay.clone());
let global_config_handle = global_config.handle();
Expand All @@ -135,7 +134,7 @@ impl ServiceState {
config.secondary_aggregator_configs().clone(),
Some(project_cache.clone().recipient()),
)
.start_in(&runtimes.aggregator);
.start();

let metric_stats = MetricStats::new(
config.clone(),
Expand All @@ -144,18 +143,18 @@ impl ServiceState {
);

#[cfg(feature = "processing")]
let store = match runtimes.store {
Some(ref rt) => Some(
let store = config
.processing_enabled()
.then(|| {
StoreService::create(
config.clone(),
global_config_handle.clone(),
outcome_aggregator.clone(),
metric_stats.clone(),
)?
.start_in(rt),
),
None => None,
};
)
.map(|s| s.start())
})
.transpose()?;

let cogs = CogsService::new(
&config,
Expand Down Expand Up @@ -197,7 +196,6 @@ impl ServiceState {
upstream_relay.clone(),
global_config.clone(),
);
let guard = runtimes.project.enter();
ProjectCacheService::new(
config.clone(),
buffer_guard.clone(),
Expand All @@ -206,7 +204,6 @@ impl ServiceState {
redis_pool,
)
.spawn_handler(project_cache_rx);
drop(guard);

let health_check = HealthCheckService::new(
config.clone(),
Expand Down Expand Up @@ -306,34 +303,6 @@ impl ServiceState {
}
}

/// Contains secondary service runtimes.
#[derive(Debug)]
pub struct Runtimes {
upstream: Runtime,
project: Runtime,
aggregator: Runtime,
outcome: Runtime,
#[cfg(feature = "processing")]
store: Option<Runtime>,
}

impl Runtimes {
/// Creates the secondary runtimes required by services.
#[allow(unused_variables)]
pub fn new(config: &Config) -> Self {
Self {
upstream: create_runtime("upstream-rt", 2),
project: create_runtime("project-rt", 2),
aggregator: create_runtime("aggregator-rt", 2),
outcome: create_runtime("outcome-rt", 2),
#[cfg(feature = "processing")]
store: config
.processing_enabled()
.then(|| create_runtime("store-rt", 1)),
}
}
}

#[axum::async_trait]
impl FromRequestParts<Self> for ServiceState {
type Rejection = Infallible;
Expand Down
8 changes: 7 additions & 1 deletion relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2627,7 +2627,13 @@ impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let thread_count = self.inner.config.cpu_concurrency();
// Adjust thread count for small cpu counts to not have too many idle cores
// and distribute workload better.
let thread_count = match self.inner.config.cpu_concurrency() {
conc @ 0..=2 => conc.max(1),
conc @ 3..=4 => conc - 1,
conc => conc - 2,
};
relay_log::info!("starting {thread_count} envelope processing workers");

tokio::spawn(async move {
Expand Down
Loading