Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jobserver cleanup #11764

Merged
merged 4 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions src/cargo/core/compiler/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ pub struct Context<'a, 'cfg> {
/// metadata files in addition to the rlib itself.
rmeta_required: HashSet<Unit>,

/// When we're in jobserver-per-rustc process mode, this keeps those
/// jobserver clients for each Unit (which eventually becomes a rustc
/// process).
pub rustc_clients: HashMap<Unit, Client>,

/// Map of the LTO-status of each unit. This indicates what sort of
/// compilation is happening (only object, only bitcode, both, etc), and is
/// precalculated early on.
Expand Down Expand Up @@ -124,7 +119,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
primary_packages: HashSet::new(),
files: None,
rmeta_required: HashSet::new(),
rustc_clients: HashMap::new(),
lto: HashMap::new(),
metadata_for_doc_units: HashMap::new(),
failed_scrape_units: Arc::new(Mutex::new(HashSet::new())),
Expand Down Expand Up @@ -614,24 +608,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
self.rmeta_required.contains(unit)
}

/// Used by `-Zjobserver-per-rustc`.
pub fn new_jobserver(&mut self) -> CargoResult<Client> {
let tokens = self.bcx.jobs() as usize;
let client = Client::new(tokens).with_context(|| "failed to create jobserver")?;

// Drain the client fully
for i in 0..tokens {
client.acquire_raw().with_context(|| {
format!(
"failed to fully drain {}/{} token from jobserver at startup",
i, tokens,
)
})?;
}

Ok(client)
}

/// Finds metadata for Doc/Docscrape units.
///
/// rustdoc needs a -Cmetadata flag in order to recognize StableCrateIds that refer to
Expand Down
16 changes: 0 additions & 16 deletions src/cargo/core/compiler/job_queue/job_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,4 @@ impl<'a, 'cfg> JobState<'a, 'cfg> {
self.messages
.push(Message::FutureIncompatReport(self.id, report));
}

/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
/// on the passed client.
///
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
self.messages.push(Message::NeedsToken(self.id));
}

/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
self.messages.push(Message::ReleaseToken(self.id));
}
}
177 changes: 30 additions & 147 deletions src/cargo/core/compiler/job_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,30 @@
//!
//! ## Jobserver
//!
//! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
//! other, which is due to scaling issues with sharing a single jobserver
//! amongst what is potentially hundreds of threads of work on many-cored
//! systems on (at least) Linux, and likely other platforms as well.
//! As of Feb. 2023, Cargo and rustc have a relatively simple jobserver
//! relationship with each other. They share a single jobserver amongst what
//! is potentially hundreds of threads of work on many-cored systems.
//! The jobserver could come from either the environment (e.g., from a `make`
//! invocation), or from Cargo creating its own jobserver server if there is no
//! jobserver to inherit from.
//!
//! Cargo wants to complete the build as quickly as possible, fully saturating
//! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
//! all cores (as constrained by the `-j=N`) parameter. Cargo also must not spawn
//! more than N threads of work: the total amount of tokens we have floating
//! around must always be limited to N.
//!
//! It is not really possible to optimally choose which crate should build first
//! or last; nor is it possible to decide whether to give an additional token to
//! rustc first or rather spawn a new crate of work. For now, the algorithm we
//! implement prioritizes spawning as many crates (i.e., rustc processes) as
//! possible, and then filling each rustc with tokens on demand.
//! It is not really possible to optimally choose which crate should build
//! first or last; nor is it possible to decide whether to give an additional
//! token to rustc first or rather spawn a new crate of work. The algorithm in
//! Cargo prioritizes spawning as many crates (i.e., rustc processes) as
//! possible. In short, the jobserver relationship among Cargo and rustc
//! processes is **1 `cargo` to N `rustc`**. Cargo knows nothing beyond rustc
//! processes in terms of parallelism[^parallel-rustc].
//!
//! We integrate with the [jobserver], originating from GNU make, to make sure
//! that build scripts which use make to build C code can cooperate with us on
//! the number of used tokens and avoid overfilling the system we're on.
//!
//! The jobserver is unfortunately a very simple protocol, so we enhance it a
//! little when we know that there is a rustc on the other end. Via the stderr
//! pipe we have to rustc, we get messages such as `NeedsToken` and
//! `ReleaseToken` from rustc.
//!
//! [`NeedsToken`] indicates that a rustc is interested in acquiring a token,
//! but never that it would be impossible to make progress without one (i.e.,
//! it would be incorrect for rustc to not terminate due to an unfulfilled
//! `NeedsToken` request); we do not usually fulfill all `NeedsToken` requests for a
//! given rustc.
//!
//! [`ReleaseToken`] indicates that a rustc is done with one of its tokens and
//! is ready for us to re-acquire ownership — we will either release that token
//! back into the general pool or reuse it ourselves. Note that rustc will
//! inform us that it is releasing a token even if it itself is also requesting
//! tokens; is up to us whether to return the token to that same rustc.
//!
//! `jobserver` also manages the allocation of tokens to rustc beyond
//! the implicit token each rustc owns (i.e., the ones used for parallel LLVM
//! work and parallel rustc threads).
//! We integrate with the [jobserver] crate, originating from GNU make
//! [POSIX jobserver], to make sure that build scripts which use make to
//! build C code can cooperate with us on the number of used tokens and
//! avoid overfilling the system we're on.
//!
//! ## Scheduling
//!
Expand Down Expand Up @@ -113,17 +97,24 @@
//!
//! See [`Message`] for all available message kinds.
//!
//! [^parallel-rustc]: In fact, `jobserver` that Cargo uses also manages the
//! allocation of tokens to rustc beyond the implicit token each rustc owns
//! (i.e., the ones used for parallel LLVM work and parallel rustc threads).
//! See also ["Rust Compiler Development Guide: Parallel Compilation"]
//! and [this comment][rustc-codegen] in rust-lang/rust.
//!
//! ["Rust Compiler Development Guide: Parallel Compilation"]: https://rustc-dev-guide.rust-lang.org/parallel-rustc.html
//! [rustc-codegen]: https://github.com/rust-lang/rust/blob/5423745db8b434fcde54888b35f518f00cce00e4/compiler/rustc_codegen_ssa/src/back/write.rs#L1204-L1217
//! [jobserver]: https://docs.rs/jobserver
//! [`NeedsToken`]: Message::NeedsToken
//! [`ReleaseToken`]: Message::ReleaseToken
//! [POSIX jobserver]: https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
//! [`push`]: Queue::push
//! [`push_bounded`]: Queue::push_bounded

mod job;
mod job_state;

use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::fmt::Write as _;
use std::io;
use std::path::{Path, PathBuf};
Expand All @@ -133,7 +124,7 @@ use std::time::Duration;

use anyhow::{format_err, Context as _};
use cargo_util::ProcessBuilder;
use jobserver::{Acquired, Client, HelperThread};
use jobserver::{Acquired, HelperThread};
use log::{debug, trace};
use semver::Version;

Expand Down Expand Up @@ -199,13 +190,6 @@ struct DrainState<'cfg> {
/// single rustc process.
tokens: Vec<Acquired>,

/// rustc per-thread tokens, when in jobserver-per-rustc mode.
rustc_tokens: HashMap<JobId, Vec<Acquired>>,

/// This represents the list of rustc jobs (processes) and associated
/// clients that are interested in receiving a token.
to_send_clients: BTreeMap<JobId, Vec<Client>>,

/// The list of jobs that we have not yet started executing, but have
/// retrieved from the `queue`. We eagerly pull jobs off the main queue to
/// allow us to request jobserver tokens pretty early.
Expand Down Expand Up @@ -387,12 +371,6 @@ enum Message {
Token(io::Result<Acquired>),
Finish(JobId, Artifact, CargoResult<()>),
FutureIncompatReport(JobId, Vec<FutureBreakageItem>),

// This client should get release_raw called on it with one of our tokens
NeedsToken(JobId),

// A token previously passed to a NeedsToken client is being released.
ReleaseToken(JobId),
}

impl<'cfg> JobQueue<'cfg> {
Expand Down Expand Up @@ -507,8 +485,6 @@ impl<'cfg> JobQueue<'cfg> {
next_id: 0,
timings: self.timings,
tokens: Vec::new(),
rustc_tokens: HashMap::new(),
to_send_clients: BTreeMap::new(),
pending_queue: Vec::new(),
print: DiagnosticPrinter::new(cx.bcx.config),
finished: 0,
Expand Down Expand Up @@ -600,46 +576,9 @@ impl<'cfg> DrainState<'cfg> {
self.active.len() < self.tokens.len() + 1
}

// The oldest job (i.e., least job ID) is the one we grant tokens to first.
fn pop_waiting_client(&mut self) -> (JobId, Client) {
// FIXME: replace this with BTreeMap::first_entry when that stabilizes.
let key = *self
.to_send_clients
.keys()
.next()
.expect("at least one waiter");
let clients = self.to_send_clients.get_mut(&key).unwrap();
let client = clients.pop().unwrap();
if clients.is_empty() {
self.to_send_clients.remove(&key);
}
(key, client)
}

// If we managed to acquire some extra tokens, send them off to a waiting rustc.
fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
let (id, client) = self.pop_waiting_client();
// This unwrap is guaranteed to succeed. `active` must be at least
// length 1, as otherwise there can't be a client waiting to be sent
// on, so tokens.len() must also be at least one.
let token = self.tokens.pop().unwrap();
self.rustc_tokens
.entry(id)
.or_insert_with(Vec::new)
.push(token);
client
.release_raw()
.with_context(|| "failed to release jobserver token")?;
}

Ok(())
}

fn handle_event(
&mut self,
cx: &mut Context<'_, '_>,
jobserver_helper: &HelperThread,
plan: &mut BuildPlan,
event: Message,
) -> Result<(), ErrorToHandle> {
Expand Down Expand Up @@ -699,19 +638,6 @@ impl<'cfg> DrainState<'cfg> {
Artifact::All => {
trace!("end: {:?}", id);
self.finished += 1;
if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
// This puts back the tokens that this rustc
// acquired into our primary token list.
//
// This represents a rustc bug: it did not
// release all of its thread tokens but finished
// completely. But we want to make Cargo resilient
// to such rustc bugs, as they're generally not
// fatal in nature (i.e., Cargo can make progress
// still, and the build might not even fail).
self.tokens.extend(rustc_tokens);
}
self.to_send_clients.remove(&id);
self.report_warning_count(
cx.bcx.config,
id,
Expand Down Expand Up @@ -756,31 +682,6 @@ impl<'cfg> DrainState<'cfg> {
let token = acquired_token.with_context(|| "failed to acquire jobserver token")?;
self.tokens.push(token);
}
Message::NeedsToken(id) => {
trace!("queue token request");
jobserver_helper.request_token();
let client = cx.rustc_clients[&self.active[&id]].clone();
self.to_send_clients
.entry(id)
.or_insert_with(Vec::new)
.push(client);
}
Message::ReleaseToken(id) => {
// Note that this pops off potentially a completely
// different token, but all tokens of the same job are
// conceptually the same so that's fine.
//
// self.tokens is a "pool" -- the order doesn't matter -- and
// this transfers ownership of the token into that pool. If we
// end up using it on the next go around, then this token will
// be truncated, same as tokens obtained through Message::Token.
let rustc_tokens = self
.rustc_tokens
.get_mut(&id)
.expect("no tokens associated");
self.tokens
.push(rustc_tokens.pop().expect("rustc releases token it has"));
}
}

Ok(())
Expand All @@ -795,19 +696,6 @@ impl<'cfg> DrainState<'cfg> {
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let mut events = self.messages.try_pop_all();
trace!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
self.rustc_tokens
.iter()
.map(|(k, j)| (k, j.len()))
.collect::<Vec<_>>(),
self.to_send_clients
.iter()
.map(|(k, j)| (k, j.len()))
.collect::<Vec<_>>(),
events.len(),
);
if events.is_empty() {
loop {
self.tick_progress();
Expand Down Expand Up @@ -866,17 +754,13 @@ impl<'cfg> DrainState<'cfg> {
break;
}

if let Err(e) = self.grant_rustc_token_requests() {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
}

// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
for event in self.wait_for_events() {
if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
if let Err(event_err) = self.handle_event(cx, plan, event) {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, event_err);
}
}
Expand Down Expand Up @@ -970,7 +854,6 @@ impl<'cfg> DrainState<'cfg> {
self.active.len(),
self.pending_queue.len(),
self.queue.len(),
self.rustc_tokens.len(),
);
self.timings.record_cpu();

Expand Down
34 changes: 1 addition & 33 deletions src/cargo/core/compiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,14 +715,7 @@ fn prepare_rustc(
base.env("CARGO_TARGET_TMPDIR", tmp.display().to_string());
}

if cx.bcx.config.cli_unstable().jobserver_per_rustc {
let client = cx.new_jobserver()?;
base.inherit_jobserver(&client);
base.arg("-Z").arg("jobserver-token-requests");
assert!(cx.rustc_clients.insert(unit.clone(), client).is_none());
} else {
base.inherit_jobserver(&cx.jobserver);
}
base.inherit_jobserver(&cx.jobserver);
build_base_args(cx, &mut base, unit, crate_types)?;
build_deps_args(&mut base, cx, unit)?;
Ok(base)
Expand Down Expand Up @@ -1701,31 +1694,6 @@ fn on_stderr_line_inner(
return Ok(false);
}

#[derive(serde::Deserialize)]
struct JobserverNotification {
jobserver_event: Event,
}

#[derive(Debug, serde::Deserialize)]
enum Event {
WillAcquire,
Release,
}

if let Ok(JobserverNotification { jobserver_event }) =
serde_json::from_str::<JobserverNotification>(compiler_message.get())
{
trace!(
"found jobserver directive from rustc: `{:?}`",
jobserver_event
);
match jobserver_event {
Event::WillAcquire => state.will_acquire(),
Event::Release => state.release_token(),
}
return Ok(false);
}

// And failing all that above we should have a legitimate JSON diagnostic
// from the compiler, so wrap it in an external Cargo JSON message
// indicating which package it came from and then emit it.
Expand Down
Loading