Skip to content

Commit

Permalink
chore(core): Drop use of num_cpus crate
Browse files Browse the repository at this point in the history
As of the release of Rust 1.61, [`std::thread::available_parallelism` now
takes cgroup quotas into account](rust-lang/rust#92697).
This puts `available_parallelism` into feature parity with `num_cpus`
for our purposes, allowing us to drop our use of this crate.
  • Loading branch information
bruceg committed Jun 16, 2022
1 parent 2fddd2b commit 27bec73
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 7 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ nats = { version = "0.20.1", default-features = false, optional = true }
nkeys = { version = "0.2.0", default-features = false, optional = true }
nom = { version = "7.1.1", default-features = false, optional = true }
notify = { version = "4.0.17", default-features = false }
num_cpus = { version = "1.13.1", default-features = false }
once_cell = { version = "1.12", default-features = false }
openssl = { version = "0.10.40", default-features = false, features = ["vendored"] }
openssl-probe = { version = "0.1.5", default-features = false }
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,9 @@ where
#[cfg(not(tokio_unstable))]
tokio::spawn(task)
}

pub fn num_threads() -> usize {
std::thread::available_parallelism()
.expect("Could not determine available parallelism")
.into()
}
2 changes: 1 addition & 1 deletion src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ const fn default_true() -> bool {
}

fn default_client_concurrency() -> u32 {
cmp::max(1, num_cpus::get() as u32)
cmp::max(1, crate::num_threads() as u32)
}

#[derive(Debug, Snafu)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ const fn default_poll_secs() -> u32 {
}

fn default_client_concurrency() -> u32 {
cmp::max(1, num_cpus::get() as u32)
cmp::max(1, crate::num_threads() as u32)
}

const fn default_visibility_timeout_secs() -> u32 {
Expand Down
3 changes: 2 additions & 1 deletion src/sources/util/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ where
let connection_gauge = OpenGauge::new();
let shutdown_clone = cx.shutdown.clone();

let request_limiter = RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, num_cpus::get());
let request_limiter =
RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads());

listener
.accept_stream_limited(max_connections)
Expand Down
2 changes: 1 addition & 1 deletion src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy<usize> = Lazy::new(|| {
crate::app::WORKER_THREADS
.get()
.map(std::num::NonZeroUsize::get)
.unwrap_or_else(num_cpus::get)
.unwrap_or_else(crate::num_threads)
});

pub(self) async fn load_enrichment_tables<'a>(
Expand Down
2 changes: 1 addition & 1 deletion tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn fork_test<T: std::future::Future<Output = ()>>(test_name: &'static str, fut:
// Since we are spawning the runtime from within a forked process, use one worker less
// to account for the additional process.
// This adjustment mainly serves to not overload CI workers with low resources.
let rt = runtime_constrained(std::cmp::max(1, num_cpus::get() - 1));
let rt = runtime_constrained(std::cmp::max(1, crate::num_threads() - 1));
rt.block_on(fut);
},
)
Expand Down

0 comments on commit 27bec73

Please sign in to comment.