Skip to content

Commit

Permalink
feat(server): Use a single tokio runtime, adjust processor task count
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed May 2, 2024
1 parent 6c51a00 commit 8c3d622
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Internal**:

- Add metrics extraction config to global config. ([#3490](https://github.com/getsentry/relay/pull/3490), [#3504](https://github.com/getsentry/relay/pull/3504))
- Adjust worker thread distribution of internal services. ([#3516](https://github.com/getsentry/relay/pull/3516))

## 24.4.2

Expand Down
10 changes: 2 additions & 8 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ use std::sync::Arc;
use relay_config::Config;
use relay_system::{Controller, Service};

use crate::service::{Runtimes, ServiceState};
use crate::service::ServiceState;
use crate::services::server::HttpServer;

/// Runs a relay web server and spawns all internal worker threads.
Expand All @@ -291,23 +291,17 @@ pub fn run(config: Config) -> anyhow::Result<()> {
// Creates the main runtime.
let main_runtime = crate::service::create_runtime("main-rt", config.cpu_concurrency());

// Create secondary service runtimes.
//
// Runtimes must not be dropped within other runtimes, so keep them alive here.
let runtimes = Runtimes::new(&config);

// Run the system and block until a shutdown signal is sent to this process. Inside, start a
// web server and run all relevant services. See the `actors` module documentation for more
// information on all services.
main_runtime.block_on(async {
Controller::start(config.shutdown_timeout());
let service = ServiceState::start(config.clone(), &runtimes)?;
let service = ServiceState::start(config.clone())?;
HttpServer::new(config, service.clone())?.start();
Controller::shutdown_handle().finished().await;
anyhow::Ok(())
})?;

drop(runtimes);
drop(main_runtime);

relay_log::info!("relay shutdown complete");
Expand Down
55 changes: 12 additions & 43 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ pub struct ServiceState {

impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub fn start(config: Arc<Config>, runtimes: &Runtimes) -> Result<Self> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start_in(&runtimes.upstream);
pub fn start(config: Arc<Config>) -> Result<Self> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start();
let test_store = TestStoreService::new(config.clone()).start();

let redis_pool = match config.redis() {
Expand All @@ -117,9 +117,8 @@ impl ServiceState {
upstream_relay.clone(),
processor.clone(),
)?
.start_in(&runtimes.outcome);
let outcome_aggregator =
OutcomeAggregator::new(&config, outcome_producer.clone()).start_in(&runtimes.outcome);
.start();
let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start();

let global_config = GlobalConfigService::new(config.clone(), upstream_relay.clone());
let global_config_handle = global_config.handle();
Expand All @@ -135,7 +134,7 @@ impl ServiceState {
config.secondary_aggregator_configs().clone(),
Some(project_cache.clone().recipient()),
)
.start_in(&runtimes.aggregator);
.start();

let metric_stats = MetricStats::new(
config.clone(),
Expand All @@ -144,18 +143,18 @@ impl ServiceState {
);

#[cfg(feature = "processing")]
let store = match runtimes.store {
Some(ref rt) => Some(
let store = config
.processing_enabled()
.then(|| {
StoreService::create(
config.clone(),
global_config_handle.clone(),
outcome_aggregator.clone(),
metric_stats.clone(),
)?
.start_in(rt),
),
None => None,
};
)
.map(|s| s.start())
})
.transpose()?;

let cogs = CogsService::new(
&config,
Expand Down Expand Up @@ -197,7 +196,6 @@ impl ServiceState {
upstream_relay.clone(),
global_config.clone(),
);
let guard = runtimes.project.enter();
ProjectCacheService::new(
config.clone(),
buffer_guard.clone(),
Expand All @@ -206,7 +204,6 @@ impl ServiceState {
redis_pool,
)
.spawn_handler(project_cache_rx);
drop(guard);

let health_check = HealthCheckService::new(
config.clone(),
Expand Down Expand Up @@ -306,34 +303,6 @@ impl ServiceState {
}
}

/// Contains secondary service runtimes.
#[derive(Debug)]
pub struct Runtimes {
upstream: Runtime,
project: Runtime,
aggregator: Runtime,
outcome: Runtime,
#[cfg(feature = "processing")]
store: Option<Runtime>,
}

impl Runtimes {
/// Creates the secondary runtimes required by services.
#[allow(unused_variables)]
pub fn new(config: &Config) -> Self {
Self {
upstream: create_runtime("upstream-rt", 2),
project: create_runtime("project-rt", 2),
aggregator: create_runtime("aggregator-rt", 2),
outcome: create_runtime("outcome-rt", 2),
#[cfg(feature = "processing")]
store: config
.processing_enabled()
.then(|| create_runtime("store-rt", 1)),
}
}
}

#[axum::async_trait]
impl FromRequestParts<Self> for ServiceState {
type Rejection = Infallible;
Expand Down
9 changes: 8 additions & 1 deletion relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2627,7 +2627,14 @@ impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let thread_count = self.inner.config.cpu_concurrency();
// Adjust thread count for small cpu counts to not have too many idle cores
// and distribute workload better.
let thread_count = match self.inner.config.cpu_concurrency() {
0 | 1 => 1,
2 | 3 => 2,
4 => 3,
conc => conc - 2,
};
relay_log::info!("starting {thread_count} envelope processing workers");

tokio::spawn(async move {
Expand Down

0 comments on commit 8c3d622

Please sign in to comment.