Skip to content

Commit

Permalink
ref(processor): Switch processor to a rayon thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jul 18, 2024
1 parent f9c8227 commit 23da3aa
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 15 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

## Unreleased

**Features**
**Features**:

- "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825))

**Internal**:

- Use a dedicated thread pool for CPU intensive workloads. ([#3833](https://github.com/getsentry/relay/pull/3833))

## 24.7.0

**Bug Fixes**:
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ quote = "1.0.2"
r2d2 = "0.8.10"
rand = "0.8.5"
rand_pcg = "0.3.1"
rayon = "1.10"
rdkafka = "0.29.0"
rdkafka-sys = "4.3.0"
# Git revision until https://github.com/redis-rs/redis-rs/pull/1097 (merged) and https://github.com/redis-rs/redis-rs/pull/1253 are released.
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ multer = { workspace = true }
once_cell = { workspace = true }
pin-project-lite = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
regex = { workspace = true }
relay-auth = { workspace = true }
relay-base-schema = { workspace = true }
Expand Down
20 changes: 20 additions & 0 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::{Config, RedisConnection};
use relay_redis::RedisPool;
Expand Down Expand Up @@ -81,6 +82,24 @@ pub fn create_runtime(name: &str, threads: usize) -> Runtime {
.unwrap()
}

fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
// Adjust thread count for small cpu counts to not have too many idle cores
// and distribute workload better.
let thread_count = match config.cpu_concurrency() {
conc @ 0..=2 => conc.max(1),
conc @ 3..=4 => conc - 1,
conc => conc - 2,
};
relay_log::info!("starting {thread_count} envelope processing workers");

let pool = crate::utils::ThreadPoolBuilder::new("processor")
.num_threads(thread_count)
.runtime(tokio::runtime::Handle::current())
.build()?;

Ok(pool)
}

#[derive(Debug)]
struct StateInner {
config: Arc<Config>,
Expand Down Expand Up @@ -171,6 +190,7 @@ impl ServiceState {
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));

EnvelopeProcessorService::new(
create_processor_pool(&config)?,
config.clone(),
global_config_handle,
cogs,
Expand Down
18 changes: 6 additions & 12 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use crate::services::upstream::{
};
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{
self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, TypedEnvelope,
self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope,
};
use crate::{http, metrics};

Expand Down Expand Up @@ -1113,6 +1113,7 @@ impl Default for Addrs {
}

struct InnerProcessor {
pool: ThreadPool,
config: Arc<Config>,
global_config: GlobalConfigHandle,
cogs: Cogs,
Expand All @@ -1132,6 +1133,7 @@ struct InnerProcessor {
impl EnvelopeProcessorService {
/// Creates a multi-threaded envelope processor.
pub fn new(
pool: ThreadPool,
config: Arc<Config>,
global_config: GlobalConfigHandle,
cogs: Cogs,
Expand All @@ -1150,6 +1152,7 @@ impl EnvelopeProcessorService {
});

let inner = InnerProcessor {
pool,
global_config,
cogs,
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -2811,18 +2814,9 @@ impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
// Adjust thread count for small cpu counts to not have too many idle cores
// and distribute workload better.
let thread_count = match self.inner.config.cpu_concurrency() {
conc @ 0..=2 => conc.max(1),
conc @ 3..=4 => conc - 1,
conc => conc - 2,
};
relay_log::info!("starting {thread_count} envelope processing workers");
let semaphore = Arc::new(Semaphore::new(self.inner.pool.current_num_threads()));

tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(thread_count));

loop {
let next_msg = async {
let permit_result = semaphore.clone().acquire_owned().await;
Expand All @@ -2837,7 +2831,7 @@ impl Service for EnvelopeProcessorService {

(Some(message), Ok(permit)) = next_msg => {
let service = self.clone();
tokio::task::spawn_blocking(move || {
self.inner.pool.spawn(move || {
service.handle_message(message);
drop(permit);
});
Expand Down
11 changes: 11 additions & 0 deletions relay-server/src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::services::outcome::TrackOutcome;
use crate::services::processor::{self, EnvelopeProcessorService};
use crate::services::project::ProjectState;
use crate::services::test_store::TestStore;
use crate::utils::{ThreadPool, ThreadPoolBuilder};

pub fn state_with_rule_and_condition(
sample_rate: Option<f64>,
Expand Down Expand Up @@ -138,6 +139,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService {

let config = Arc::new(config);
EnvelopeProcessorService::new(
create_processor_pool(),
Arc::clone(&config),
GlobalConfigHandle::fixed(Default::default()),
Cogs::noop(),
Expand Down Expand Up @@ -177,6 +179,7 @@ pub fn create_test_processor_with_addrs(

let config = Arc::new(config);
EnvelopeProcessorService::new(
create_processor_pool(),
Arc::clone(&config),
GlobalConfigHandle::fixed(Default::default()),
Cogs::noop(),
Expand All @@ -192,3 +195,11 @@ pub fn processor_services() -> (Addr<TrackOutcome>, Addr<TestStore>) {
let (test_store, _) = mock_service("test_store", (), |&mut (), _| {});
(outcome_aggregator, test_store)
}

fn create_processor_pool() -> ThreadPool {
ThreadPoolBuilder::new("processor")
.num_threads(1)
.runtime(tokio::runtime::Handle::current())
.build()
.unwrap()
}
2 changes: 2 additions & 0 deletions relay-server/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod sizes;
mod sleep_handle;
mod split_off;
mod statsd;
mod thread_pool;

#[cfg(feature = "processing")]
mod native;
Expand All @@ -38,5 +39,6 @@ pub use self::sizes::*;
pub use self::sleep_handle::*;
pub use self::split_off::*;
pub use self::statsd::*;
pub use self::thread_pool::*;
#[cfg(feature = "processing")]
pub use self::unreal::*;

0 comments on commit 23da3aa

Please sign in to comment.