Skip to content

Commit

Permalink
auto merge of #8356 : toddaaro/rust/ws, r=brson
Browse files Browse the repository at this point in the history
This pull request converts the scheduler from a naive shared queue scheduler to a naive workstealing scheduler. The deque is still a queue inside a lock, but there is still a substantial performance gain. Fiddling with the messaging benchmark I got a ~10x speedup and observed massively reduced memory usage.

There are still *many* locations for optimization, but based on my experience so far it is a clear performance win as it is now.
  • Loading branch information
bors committed Aug 8, 2013
2 parents 8f65dbf + af2e039 commit 936f70b
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 63 deletions.
7 changes: 4 additions & 3 deletions src/libstd/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ impl<T> Select for PortOne<T> {
fn optimistic_check(&mut self) -> bool {
// The optimistic check is never necessary for correctness. For testing
// purposes, making it randomly return false simulates a racing sender.
use rand::{Rand, rng};
let mut rng = rng();
let actually_check = Rand::rand(&mut rng);
use rand::{Rand};
let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
Rand::rand(&mut sched.rng)
};
if actually_check {
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
} else {
Expand Down
32 changes: 22 additions & 10 deletions src/libstd/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ Several modules in `core` are clients of `rt`:
use cell::Cell;
use clone::Clone;
use container::Container;
use iter::Times;
use iterator::{Iterator, IteratorUtil};
use iterator::{Iterator, IteratorUtil, range};
use option::{Some, None};
use ptr::RawPtr;
use rt::local::Local;
Expand Down Expand Up @@ -247,24 +246,32 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {

let main = Cell::new(main);

// The shared list of sleeping schedulers. Schedulers wake each other
// occassionally to do new work.
// The shared list of sleeping schedulers.
let sleepers = SleeperList::new();
// The shared work queue. Temporary until work stealing is implemented.
let work_queue = WorkQueue::new();

// Create a work queue for each scheduler, ntimes. Create an extra
// for the main thread if that flag is set. We won't steal from it.
let mut work_queues = ~[];
for _ in range(0u, nscheds) {
let work_queue: WorkQueue<~Task> = WorkQueue::new();
work_queues.push(work_queue);
}

// The schedulers.
let mut scheds = ~[];
// Handles to the schedulers. When the main task ends these will be
// sent the Shutdown message to terminate the schedulers.
let mut handles = ~[];

do nscheds.times {
for i in range(0u, nscheds) {
rtdebug!("inserting a regular scheduler");

// Every scheduler is driven by an I/O event loop.
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
let mut sched = ~Scheduler::new(loop_,
work_queues[i].clone(),
work_queues.clone(),
sleepers.clone());
let handle = sched.make_handle();

scheds.push(sched);
Expand All @@ -280,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let friend_handle = friend_sched.make_handle();
scheds.push(friend_sched);

// This scheduler needs a queue that isn't part of the stealee
// set.
let work_queue = WorkQueue::new();

let main_loop = ~UvEventLoop::new();
let mut main_sched = ~Scheduler::new_special(main_loop,
work_queue.clone(),
work_queue,
work_queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
Expand Down Expand Up @@ -371,7 +383,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None,
home, main.take());
main_task.death.on_exit = Some(on_exit.take());
rtdebug!("boostrapping main_task");
rtdebug!("bootstrapping main_task");

main_sched.bootstrap(main_task);
}
Expand Down
159 changes: 114 additions & 45 deletions src/libstd/rt/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use option::{Option, Some, None};
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
use unstable::raw;

use super::sleeper_list::SleeperList;
use super::work_queue::WorkQueue;
use super::stack::{StackPool};
Expand All @@ -28,6 +27,9 @@ use rt::rtio::RemoteCallback;
use rt::metrics::SchedMetrics;
use borrow::{to_uint};
use cell::Cell;
use rand::{XorShiftRng, RngUtil};
use iterator::{range};
use vec::{OwnedVector};

/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
Expand All @@ -37,9 +39,11 @@ use cell::Cell;
/// XXX: This creates too many callbacks to run_sched_once, resulting
/// in too much allocation and too many events.
pub struct Scheduler {
/// A queue of available work. Under a work-stealing policy there
/// is one per Scheduler.
work_queue: WorkQueue<~Task>,
/// There are N work queues, one per scheduler.
priv work_queue: WorkQueue<~Task>,
/// Work queues for the other schedulers. These are created by
/// cloning the core work queues.
work_queues: ~[WorkQueue<~Task>],
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
Expand Down Expand Up @@ -70,7 +74,10 @@ pub struct Scheduler {
run_anything: bool,
/// If the scheduler shouldn't run some tasks, a friend to send
/// them to.
friend_handle: Option<SchedHandle>
friend_handle: Option<SchedHandle>,
/// A fast XorShift rng for scheduler use
rng: XorShiftRng

}

pub struct SchedHandle {
Expand All @@ -97,17 +104,21 @@ impl Scheduler {

pub fn new(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList)
-> Scheduler {

Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None)
Scheduler::new_special(event_loop, work_queue,
work_queues,
sleeper_list, true, None)

}

// When you create a scheduler it isn't yet "in" a task, so the
// task field is None.
pub fn new_special(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
Expand All @@ -120,12 +131,14 @@ impl Scheduler {
no_sleep: false,
event_loop: event_loop,
work_queue: work_queue,
work_queues: work_queues,
stack_pool: StackPool::new(),
sched_task: None,
cleanup_job: None,
metrics: SchedMetrics::new(),
run_anything: run_anything,
friend_handle: friend
friend_handle: friend,
rng: XorShiftRng::new()
}
}

Expand Down Expand Up @@ -248,7 +261,7 @@ impl Scheduler {

// Second activity is to try resuming a task from the queue.

let result = sched.resume_task_from_queue();
let result = sched.do_work();
let mut sched = match result {
Some(sched) => {
// Failed to dequeue a task, so we return.
Expand Down Expand Up @@ -415,47 +428,98 @@ impl Scheduler {
}
}

// Resume a task from the queue - but also take into account that
// it might not belong here.
// Workstealing: In this iteration of the runtime each scheduler
// thread has a distinct work queue. When no work is available
// locally, make a few attempts to steal work from the queues of
// other scheduler threads. If a few steals fail we end up in the
// old "no work" path which is fine.

// First step in the process is to find a task. This function does
// that by first checking the local queue, and if there is no work
// there, trying to steal from the remote work queues.
fn find_work(&mut self) -> Option<~Task> {
rtdebug!("scheduler looking for work");
match self.work_queue.pop() {
Some(task) => {
rtdebug!("found a task locally");
return Some(task)
}
None => {
// Our naive stealing, try kinda hard.
rtdebug!("scheduler trying to steal");
let _len = self.work_queues.len();
return self.try_steals(2);
}
}
}

// With no backoff try stealing n times from the queues the
// scheduler knows about. This naive implementation can steal from
// our own queue or from other special schedulers.
fn try_steals(&mut self, n: uint) -> Option<~Task> {
for _ in range(0, n) {
let index = self.rng.gen_uint_range(0, self.work_queues.len());
let work_queues = &mut self.work_queues;
match work_queues[index].steal() {
Some(task) => {
rtdebug!("found task by stealing"); return Some(task)
}
None => ()
}
};
rtdebug!("giving up on stealing");
return None;
}

// If we perform a scheduler action we give away the scheduler ~
// pointer, if it is still available we return it.
// Given a task, execute it correctly.
fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
let mut this = self;
let mut task = task;

fn resume_task_from_queue(~self) -> Option<~Scheduler> {
rtdebug!("processing a task");

let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
if home_handle.sched_id != this.sched_id() {
rtdebug!("sending task home");
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
return Some(this);
} else {
rtdebug!("running task here");
task.give_home(Sched(home_handle));
this.resume_task_immediately(task);
return None;
}
}
AnySched if this.run_anything => {
rtdebug!("running anysched task here");
task.give_home(AnySched);
this.resume_task_immediately(task);
return None;
}
AnySched => {
rtdebug!("sending task to friend");
task.give_home(AnySched);
this.send_to_friend(task);
return Some(this);
}
}
}

// Bundle the helpers together.
fn do_work(~self) -> Option<~Scheduler> {
let mut this = self;

match this.work_queue.pop() {
rtdebug!("scheduler calling do work");
match this.find_work() {
Some(task) => {
let mut task = task;
let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
if home_handle.sched_id != this.sched_id() {
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
return Some(this);
} else {
this.event_loop.callback(Scheduler::run_sched_once);
task.give_home(Sched(home_handle));
this.resume_task_immediately(task);
return None;
}
}
AnySched if this.run_anything => {
this.event_loop.callback(Scheduler::run_sched_once);
task.give_home(AnySched);
this.resume_task_immediately(task);
return None;
}
AnySched => {
task.give_home(AnySched);
this.send_to_friend(task);
return Some(this);
}
}
rtdebug!("found some work! processing the task");
return this.process_task(task);
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
return Some(this);
}
}
Expand Down Expand Up @@ -711,7 +775,6 @@ impl Scheduler {
GiveTask(task, f) => f.to_fn()(self, task)
}
}

}

// The cases for the below function.
Expand Down Expand Up @@ -745,6 +808,8 @@ impl ClosureConverter for UnsafeTaskReceiver {

#[cfg(test)]
mod test {
extern mod extra;

use prelude::*;
use rt::test::*;
use unstable::run_in_bare_thread;
Expand Down Expand Up @@ -862,12 +927,15 @@ mod test {
do run_in_bare_thread {

let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
let normal_queue = WorkQueue::new();
let special_queue = WorkQueue::new();
let queues = ~[normal_queue.clone(), special_queue.clone()];

// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
~UvEventLoop::new(),
work_queue.clone(),
normal_queue,
queues.clone(),
sleepers.clone());

let normal_handle = Cell::new(normal_sched.make_handle());
Expand All @@ -877,7 +945,8 @@ mod test {
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
~UvEventLoop::new(),
work_queue.clone(),
special_queue.clone(),
queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
Expand Down
2 changes: 2 additions & 0 deletions src/libstd/rt/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ mod test {
fn select_stream() {
use util;
use comm::GenericChan;
use iter::Times;

// Sends 10 buffered packets, and uses select to retrieve them all.
// Puts the port in a different spot in the vector each time.
Expand Down Expand Up @@ -263,6 +264,7 @@ mod test {

fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
use rt::test::spawntask_random;
use iter::Times;

do run_in_newsched_task {
// A bit of stress, since ordinarily this is just smoke and mirrors.
Expand Down
Loading

0 comments on commit 936f70b

Please sign in to comment.