Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

walk: Send WorkerResults in batches #1422

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/dir_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use lscolors::{Colorable, LsColors, Style};
use crate::config::Config;
use crate::filesystem::strip_current_dir;

#[derive(Debug)]
enum DirEntryInner {
Normal(ignore::DirEntry),
BrokenSymlink(PathBuf),
}

#[derive(Debug)]
pub struct DirEntry {
inner: DirEntryInner,
metadata: OnceCell<Option<Metadata>>,
Expand Down
31 changes: 16 additions & 15 deletions src/exec/job.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::sync::Mutex;

use crossbeam_channel::Receiver;

use crate::config::Config;
use crate::dir_entry::DirEntry;
use crate::error::print_error;
use crate::exit_codes::{merge_exitcodes, ExitCode};
use crate::walk::WorkerResult;
Expand All @@ -14,43 +11,47 @@ use super::CommandSet;
/// generate a command with the supplied command template. The generated command will then
/// be executed, and this process will continue until the receiver's sender has closed.
pub fn job(
rx: Receiver<WorkerResult>,
results: impl IntoIterator<Item = WorkerResult>,
cmd: &CommandSet,
out_perm: &Mutex<()>,
config: &Config,
) -> ExitCode {
// Output should be buffered when only running a single thread
let buffer_output: bool = config.threads > 1;

let mut results: Vec<ExitCode> = Vec::new();
loop {
let mut ret = ExitCode::Success;
for result in results {
// Obtain the next result from the receiver, else if the channel
// has closed, exit from the loop
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// has closed, exit from the loop
// has closed, exit from the loop.

let dir_entry: DirEntry = match rx.recv() {
Ok(WorkerResult::Entry(dir_entry)) => dir_entry,
Ok(WorkerResult::Error(err)) => {
let dir_entry = match result {
WorkerResult::Entry(dir_entry) => dir_entry,
WorkerResult::Error(err) => {
if config.show_filesystem_errors {
print_error(err.to_string());
}
continue;
}
Err(_) => break,
};

// Generate a command, execute it and store its exit code.
results.push(cmd.execute(
let code = cmd.execute(
dir_entry.stripped_path(config),
config.path_separator.as_deref(),
out_perm,
buffer_output,
))
);
ret = merge_exitcodes([ret, code]);
}
// Returns error in case of any error.
merge_exitcodes(results)
ret
}

pub fn batch(rx: Receiver<WorkerResult>, cmd: &CommandSet, config: &Config) -> ExitCode {
let paths = rx
pub fn batch(
results: impl IntoIterator<Item = WorkerResult>,
cmd: &CommandSet,
config: &Config,
) -> ExitCode {
let paths = results
.into_iter()
.filter_map(|worker_result| match worker_result {
WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)),
Expand Down
166 changes: 129 additions & 37 deletions src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::io::{self, Write};
use std::mem;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::time::{Duration, Instant};

use anyhow::{anyhow, Result};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender};
use etcetera::BaseStrategy;
use ignore::overrides::{Override, OverrideBuilder};
use ignore::{self, WalkBuilder, WalkParallel, WalkState};
Expand All @@ -36,13 +36,91 @@ enum ReceiverMode {

/// The Worker threads can result in a valid entry having PathBuf or an error.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The Worker threads can result in a valid entry having PathBuf or an error.
/// The Worker threads can result in a valid entry having `PathBuf` or an error.

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum WorkerResult {
// Errors should be rare, so it's probably better to allow large_enum_variant than
// to box the Entry variant
Entry(DirEntry),
Error(ignore::Error),
}

/// A batch of WorkerResults to send over a channel.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A batch of WorkerResults to send over a channel.
/// A batch of `WorkerResult`s to send over a channel.

#[derive(Clone)]
struct Batch {
items: Arc<Mutex<Option<Vec<WorkerResult>>>>,
}

impl Batch {
fn new() -> Self {
Self {
items: Arc::new(Mutex::new(Some(vec![]))),
tavianator marked this conversation as resolved.
Show resolved Hide resolved
}
}

fn lock(&self) -> MutexGuard<'_, Option<Vec<WorkerResult>>> {
self.items.lock().unwrap()
}
}

impl IntoIterator for Batch {
type Item = WorkerResult;
type IntoIter = std::vec::IntoIter<WorkerResult>;

fn into_iter(self) -> Self::IntoIter {
self.lock().take().unwrap().into_iter()
}
}

/// Wrapper that sends batches of items at once over a channel.
struct BatchSender {
batch: Batch,
tx: Sender<Batch>,
limit: usize,
}

impl BatchSender {
fn new(tx: Sender<Batch>, limit: usize) -> Self {
Self {
batch: Batch::new(),
tx,
limit,
}
}

/// Check if we need to flush a batch.
fn needs_flush(&self, batch: Option<&Vec<WorkerResult>>) -> bool {
match batch {
// Limit the batch size to provide some backpressure
Some(vec) => vec.len() >= self.limit,
// Batch was already taken by the receiver, so make a new one
None => true,
}
}

/// Add an item to a batch.
fn send(&mut self, item: WorkerResult) -> Result<(), SendError<()>> {
let mut batch = self.batch.lock();

if self.needs_flush(batch.as_ref()) {
drop(batch);
self.batch = Batch::new();
batch = self.batch.lock();
}

let items = batch.as_mut().unwrap();
items.push(item);

if items.len() == 1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to send over batches after reaching a certain size. That would mean we could know how large to set the initial capacity of the Vec, and possibly avoid contention on the mutex. However, it means receiver threads could end up waiting longer to get results, especially if it takes a while to find more results in the sender threads, so what you have might be better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just measured the average batch size: 209. I don't think we have to try too hard to send larger batches :)

The risk of a minimum batch size is it could stall a very long time if most results are being filtered out. We could do something like always send the batch after N entries are encountered, regardless of whether they're added to the batch. But I doubt it's worth it.

There isn't very much mutex contention with this design anyway. It's only between the receiver and at most one sender, and the receiver critical section is extremely short (just lock().take().unwrap().into_iter()). I actually just checked with perf trace and the receiver only blocked 136 times over the whole Chromium benchmark (2.1M files). Each sender blocked between 3-12 times.

// New batch, send it over the channel
self.tx
.send(self.batch.clone())
.map_err(|_| SendError(()))?;
}

Ok(())
}
}

/// Maximum size of the output buffer before flushing results to the console
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Maximum size of the output buffer before flushing results to the console
/// Maximum size of the output buffer before flushing results to the console.

const MAX_BUFFER_LENGTH: usize = 1000;
/// Default duration until output buffering switches to streaming.
Expand All @@ -57,7 +135,7 @@ struct ReceiverBuffer<'a, W> {
/// The ^C notifier.
interrupt_flag: &'a AtomicBool,
/// Receiver for worker results.
rx: Receiver<WorkerResult>,
rx: Receiver<Batch>,
/// Standard output.
stdout: W,
/// The current buffer mode.
Expand All @@ -72,7 +150,7 @@ struct ReceiverBuffer<'a, W> {

impl<'a, W: Write> ReceiverBuffer<'a, W> {
/// Create a new receiver buffer.
fn new(state: &'a WorkerState, rx: Receiver<WorkerResult>, stdout: W) -> Self {
fn new(state: &'a WorkerState, rx: Receiver<Batch>, stdout: W) -> Self {
let config = &state.config;
let quit_flag = state.quit_flag.as_ref();
let interrupt_flag = state.interrupt_flag.as_ref();
Expand Down Expand Up @@ -103,7 +181,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
}

/// Receive the next worker result.
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
fn recv(&self) -> Result<Batch, RecvTimeoutError> {
match self.mode {
ReceiverMode::Buffering => {
// Wait at most until we should switch to streaming
Expand All @@ -119,36 +197,42 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
/// Wait for a result or state change.
fn poll(&mut self) -> Result<(), ExitCode> {
match self.recv() {
Ok(WorkerResult::Entry(dir_entry)) => {
if self.config.quiet {
return Err(ExitCode::HasResults(true));
}
Ok(batch) => {
for result in batch {
match result {
WorkerResult::Entry(dir_entry) => {
if self.config.quiet {
return Err(ExitCode::HasResults(true));
}

match self.mode {
ReceiverMode::Buffering => {
self.buffer.push(dir_entry);
if self.buffer.len() > MAX_BUFFER_LENGTH {
self.stream()?;
}
}
ReceiverMode::Streaming => {
self.print(&dir_entry)?;
self.flush()?;
}
}
match self.mode {
ReceiverMode::Buffering => {
self.buffer.push(dir_entry);
if self.buffer.len() > MAX_BUFFER_LENGTH {
self.stream()?;
}
}
ReceiverMode::Streaming => {
self.print(&dir_entry)?;
self.flush()?;
}
}

self.num_results += 1;
if let Some(max_results) = self.config.max_results {
if self.num_results >= max_results {
return self.stop();
self.num_results += 1;
if let Some(max_results) = self.config.max_results {
if self.num_results >= max_results {
return self.stop();
}
}
}
WorkerResult::Error(err) => {
if self.config.show_filesystem_errors {
print_error(err.to_string());
}
}
}
}
}
Ok(WorkerResult::Error(err)) => {
if self.config.show_filesystem_errors {
print_error(err.to_string());
}
}
Err(RecvTimeoutError::Timeout) => {
self.stream()?;
}
Expand Down Expand Up @@ -319,13 +403,13 @@ impl WorkerState {

/// Run the receiver work, either on this thread or a pool of background
/// threads (for --exec).
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// threads (for --exec).
/// threads (for `--exec`).

fn receive(&self, rx: Receiver<WorkerResult>) -> ExitCode {
fn receive(&self, rx: Receiver<Batch>) -> ExitCode {
let config = &self.config;

// This will be set to `Some` if the `--exec` argument was supplied.
if let Some(ref cmd) = config.command {
if cmd.in_batch_mode() {
exec::batch(rx, cmd, &config)
exec::batch(rx.into_iter().flatten(), cmd, &config)
} else {
let out_perm = Mutex::new(());

Expand All @@ -337,7 +421,8 @@ impl WorkerState {
let rx = rx.clone();

// Spawn a job thread that will listen for and execute inputs.
let handle = scope.spawn(|| exec::job(rx, cmd, &out_perm, &config));
let handle = scope
.spawn(|| exec::job(rx.into_iter().flatten(), cmd, &out_perm, &config));

// Push the handle of the spawned thread into the vector for later joining.
handles.push(handle);
Expand All @@ -355,12 +440,20 @@ impl WorkerState {
}

/// Spawn the sender threads.
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerResult>) {
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<Batch>) {
walker.run(|| {
let patterns = &self.patterns;
let config = &self.config;
let quit_flag = self.quit_flag.as_ref();
let tx = tx.clone();

let mut limit = 0x100;
if let Some(cmd) = &config.command {
if !cmd.in_batch_mode() && config.threads > 1 {
// Evenly distribute work between multiple receivers
limit = 1;
}
}
let mut tx = BatchSender::new(tx.clone(), limit);

Box::new(move |entry| {
if quit_flag.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -545,8 +638,7 @@ impl WorkerState {
.unwrap();
}

// Channel capacity was chosen empircally to perform similarly to an unbounded channel
let (tx, rx) = bounded(0x4000 * config.threads);
let (tx, rx) = bounded(2 * config.threads);

let exit_code = thread::scope(|scope| {
// Spawn the receiver thread(s)
Expand Down