From 68341a91eecb830c57779cd2423733a7395258ab Mon Sep 17 00:00:00 2001 From: Joshua Lockerman Date: Fri, 29 Sep 2017 15:58:11 -0400 Subject: [PATCH 1/3] Improve performance of spsc_queue and stream. This commit makes two main changes. 1. It switches the spsc_queue node caching strategy from keeping a shared counter of the number of nodes in the cache to keeping a consumer only counter of the number of node eligible to be cached. 2. It separate the consumer and producers fields of spsc_queue and stream into a producer cache line and consumer cache line. --- src/libstd/lib.rs | 2 + src/libstd/sync/mpsc/cache_aligned.rs | 37 ++++++ src/libstd/sync/mpsc/mod.rs | 2 + src/libstd/sync/mpsc/spsc_queue.rs | 161 +++++++++++++++++--------- src/libstd/sync/mpsc/stream.rs | 105 ++++++++++------- 5 files changed, 208 insertions(+), 99 deletions(-) create mode 100644 src/libstd/sync/mpsc/cache_aligned.rs diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index 9fc7e2c01aa19..83cc9ce582e34 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -244,6 +244,7 @@ #![feature(allow_internal_unstable)] #![feature(align_offset)] #![feature(asm)] +#![feature(attr_literals)] #![feature(box_syntax)] #![feature(cfg_target_has_atomic)] #![feature(cfg_target_thread_local)] @@ -290,6 +291,7 @@ #![feature(prelude_import)] #![feature(rand)] #![feature(raw)] +#![feature(repr_align)] #![feature(repr_simd)] #![feature(rustc_attrs)] #![cfg_attr(not(stage0), feature(rustc_const_unstable))] diff --git a/src/libstd/sync/mpsc/cache_aligned.rs b/src/libstd/sync/mpsc/cache_aligned.rs new file mode 100644 index 0000000000000..5af01262573f3 --- /dev/null +++ b/src/libstd/sync/mpsc/cache_aligned.rs @@ -0,0 +1,37 @@ +// Copyright 2017 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use ops::{Deref, DerefMut}; + +#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(align(64))] +pub(super) struct Aligner; + +#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(super) struct CacheAligned(pub T, pub Aligner); + +impl Deref for CacheAligned { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for CacheAligned { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl CacheAligned { + pub(super) fn new(t: T) -> Self { + CacheAligned(t, Aligner) + } +} diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index dcd4c8dfdf549..0bfbcd2d2cd59 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -297,6 +297,8 @@ mod sync; mod mpsc_queue; mod spsc_queue; +mod cache_aligned; + /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type. /// This half can only be owned by one thread. /// diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs index 1148bc66fbabb..3ce5927033581 100644 --- a/src/libstd/sync/mpsc/spsc_queue.rs +++ b/src/libstd/sync/mpsc/spsc_queue.rs @@ -22,12 +22,15 @@ use core::cell::UnsafeCell; use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use super::cache_aligned::CacheAligned; + // Node within the linked list queue of messages to send struct Node { // FIXME: this could be an uninitialized T if we're careful enough, and // that would reduce memory usage (and be a bit faster). // is it worth it? value: Option, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache next: AtomicPtr>, // next node in the queue } @@ -35,37 +38,45 @@ struct Node { /// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. -pub struct Queue { +pub struct Queue { // consumer fields + consumer: CacheAligned>, + + // producer fields + producer: CacheAligned>, +} + +struct Consumer { tail: UnsafeCell<*mut Node>, // where to pop from tail_prev: AtomicPtr>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cachable + addition: Addition, +} - // producer fields +struct Producer { head: UnsafeCell<*mut Node>, // where to push to first: UnsafeCell<*mut Node>, // where to get new nodes from tail_copy: UnsafeCell<*mut Node>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: usize, - cache_additions: AtomicUsize, - cache_subtractions: AtomicUsize, + addition: Addition, } -unsafe impl Send for Queue { } +unsafe impl Send for Queue { } -unsafe impl Sync for Queue { } +unsafe impl Sync for Queue { } impl Node { fn new() -> *mut Node { Box::into_raw(box Node { value: None, + cached: false, next: AtomicPtr::new(ptr::null_mut::>()), }) } } impl Queue { + #[cfg(test)] /// Creates a new queue. /// /// This is unsafe as the type system doesn't enforce a single @@ -84,18 +95,60 @@ impl Queue { /// no bound. Otherwise, the cache will never grow larger than /// `bound` (although the queue itself could be much larger. pub unsafe fn new(bound: usize) -> Queue { + Self::with_additions(bound, (), ()) + } +} + +impl Queue { + + /// Creates a new queue. With given additional elements in the producer and + /// consumer portions of the queue. + /// + /// Due to the performance implications of cache-contention, + /// we wish to keep fields used mainly by the producer on a separate cache + /// line than those used by the consumer. + /// Since cache lines are usually 64 bytes, it is unreasonably expensive to + /// allocate one for small fields, so we allow users to insert additional + /// fields into the cache lines already allocated by this for the producer + /// and consumer. + /// + /// This is unsafe as the type system doesn't enforce a single + /// consumer-producer relationship. It also allows the consumer to `pop` + /// items while there is a `peek` active due to all methods having a + /// non-mutable receiver. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub unsafe fn with_additions( + bound: usize, + producer_addition: ProducerAddition, + consumer_addition: ConsumerAddition, + ) -> Self { let n1 = Node::new(); let n2 = Node::new(); (*n1).next.store(n2, Ordering::Relaxed); Queue { - tail: UnsafeCell::new(n2), - tail_prev: AtomicPtr::new(n1), - head: UnsafeCell::new(n2), - first: UnsafeCell::new(n1), - tail_copy: UnsafeCell::new(n1), - cache_bound: bound, - cache_additions: AtomicUsize::new(0), - cache_subtractions: AtomicUsize::new(0), + consumer: CacheAligned::new(Consumer { + tail: UnsafeCell::new(n2), + tail_prev: AtomicPtr::new(n1), + cache_bound: bound, + cached_nodes: AtomicUsize::new(0), + addition: consumer_addition + }), + producer: CacheAligned::new(Producer { + head: UnsafeCell::new(n2), + first: UnsafeCell::new(n1), + tail_copy: UnsafeCell::new(n1), + addition: producer_addition + }), } } @@ -109,35 +162,25 @@ impl Queue { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(ptr::null_mut(), Ordering::Relaxed); - (**self.head.get()).next.store(n, Ordering::Release); - *self.head.get() = n; + (**self.producer.head.get()).next.store(n, Ordering::Release); + *(&self.producer.head).get() = n; } } unsafe fn alloc(&self) -> *mut Node { // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Ordering::Relaxed); - self.cache_subtractions.store(b + 1, Ordering::Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Ordering::Relaxed); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire); - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Ordering::Relaxed); - self.cache_subtractions.store(b + 1, Ordering::Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Ordering::Relaxed); + *self.producer.0.tail_copy.get() = + self.consumer.tail_prev.load(Ordering::Acquire); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -153,27 +196,27 @@ impl Queue { // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = *self.tail.get(); + let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - *self.tail.get() = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Ordering::Release); + *self.consumer.0.tail.get() = next; + if self.consumer.cache_bound == 0 { + self.consumer.tail_prev.store(tail, Ordering::Release); } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Ordering::Relaxed); - let subtractions = self.cache_subtractions.load(Ordering::Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Ordering::Release); - self.cache_additions.store(additions + 1, Ordering::Relaxed); + let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); + if cached_nodes < self.consumer.cache_bound && !(*tail).cached { + self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); + (*tail).cached = true; + } + + if (*tail).cached { + self.consumer.tail_prev.store(tail, Ordering::Release); } else { - (*self.tail_prev.load(Ordering::Relaxed)) - .next.store(next, Ordering::Relaxed); + (*self.consumer.tail_prev.load(Ordering::Relaxed)) + .next.store(next, Ordering::Relaxed); // We have successfully erased all references to 'tail', so // now we can safely drop it. let _: Box> = Box::from_raw(tail); @@ -194,17 +237,25 @@ impl Queue { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = *self.tail.get(); + let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { None } else { (*next).value.as_mut() } } } + + pub fn producer_addition(&self) -> &ProducerAddition { + &self.producer.addition + } + + pub fn consumer_addition(&self) -> &ConsumerAddition { + &self.consumer.addition + } } -impl Drop for Queue { +impl Drop for Queue { fn drop(&mut self) { unsafe { - let mut cur = *self.first.get(); + let mut cur = *self.producer.first.get(); while !cur.is_null() { let next = (*cur).next.load(Ordering::Relaxed); let _n: Box> = Box::from_raw(cur); diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs index 47cd8977fda23..d1515eba68c3e 100644 --- a/src/libstd/sync/mpsc/stream.rs +++ b/src/libstd/sync/mpsc/stream.rs @@ -41,15 +41,22 @@ const MAX_STEALS: isize = 5; const MAX_STEALS: isize = 1 << 20; pub struct Packet { - queue: spsc::Queue>, // internal queue for all message + // internal queue for all messages + queue: spsc::Queue, ProducerAddition, ConsumerAddition>, +} +struct ProducerAddition { cnt: AtomicIsize, // How many items are on this channel - steals: UnsafeCell, // How many times has a port received without blocking? to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up port_dropped: AtomicBool, // flag if the channel has been destroyed. } +struct ConsumerAddition { + steals: UnsafeCell, // How many times has a port received without blocking? +} + + pub enum Failure { Empty, Disconnected, @@ -78,13 +85,18 @@ enum Message { impl Packet { pub fn new() -> Packet { Packet { - queue: unsafe { spsc::Queue::new(128) }, - - cnt: AtomicIsize::new(0), - steals: UnsafeCell::new(0), - to_wake: AtomicUsize::new(0), - - port_dropped: AtomicBool::new(false), + queue: unsafe { spsc::Queue::with_additions( + 128, + ProducerAddition { + cnt: AtomicIsize::new(0), + to_wake: AtomicUsize::new(0), + + port_dropped: AtomicBool::new(false), + }, + ConsumerAddition { + steals: UnsafeCell::new(0), + } + )}, } } @@ -92,7 +104,7 @@ impl Packet { // If the other port has deterministically gone away, then definitely // must return the data back up the stack. Otherwise, the data is // considered as being sent. - if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) } match self.do_send(Data(t)) { UpSuccess | UpDisconnected => {}, @@ -104,14 +116,16 @@ impl Packet { pub fn upgrade(&self, up: Receiver) -> UpgradeResult { // If the port has gone away, then there's no need to proceed any // further. - if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected } + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + return UpDisconnected + } self.do_send(GoUp(up)) } fn do_send(&self, t: Message) -> UpgradeResult { self.queue.push(t); - match self.cnt.fetch_add(1, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) { // As described in the mod's doc comment, -1 == wakeup -1 => UpWoke(self.take_to_wake()), // As as described before, SPSC queues must be >= -2 @@ -125,7 +139,7 @@ impl Packet { // will never remove this data. We can only have at most one item to // drain (the port drains the rest). DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); let first = self.queue.pop(); let second = self.queue.pop(); assert!(second.is_none()); @@ -144,8 +158,8 @@ impl Packet { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&self) -> SignalToken { - let ptr = self.to_wake.load(Ordering::SeqCst); - self.to_wake.store(0, Ordering::SeqCst); + let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); unsafe { SignalToken::cast_from_usize(ptr) } } @@ -154,14 +168,16 @@ impl Packet { // back if it shouldn't sleep. Note that this is the location where we take // steals into account. fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); let ptr = unsafe { token.cast_to_usize() }; - self.to_wake.store(ptr, Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst); - let steals = unsafe { ptr::replace(self.steals.get(), 0) }; + let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }; - match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } + match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + } // If we factor in our steals and notice that the channel has no // data, we successfully sleep n => { @@ -170,7 +186,7 @@ impl Packet { } } - self.to_wake.store(0, Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); Err(unsafe { SignalToken::cast_from_usize(ptr) }) } @@ -201,7 +217,7 @@ impl Packet { // "steal" factored into the channel count above). data @ Ok(..) | data @ Err(Upgraded(..)) => unsafe { - *self.steals.get() -= 1; + *self.queue.consumer_addition().steals.get() -= 1; data }, @@ -223,20 +239,21 @@ impl Packet { // down as much as possible (without going negative), and then // adding back in whatever we couldn't factor into steals. Some(data) => unsafe { - if *self.steals.get() > MAX_STEALS { - match self.cnt.swap(0, Ordering::SeqCst) { + if *self.queue.consumer_addition().steals.get() > MAX_STEALS { + match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store( + DISCONNECTED, Ordering::SeqCst); } n => { - let m = cmp::min(n, *self.steals.get()); - *self.steals.get() -= m; + let m = cmp::min(n, *self.queue.consumer_addition().steals.get()); + *self.queue.consumer_addition().steals.get() -= m; self.bump(n - m); } } - assert!(*self.steals.get() >= 0); + assert!(*self.queue.consumer_addition().steals.get() >= 0); } - *self.steals.get() += 1; + *self.queue.consumer_addition().steals.get() += 1; match data { Data(t) => Ok(t), GoUp(up) => Err(Upgraded(up)), @@ -244,7 +261,7 @@ impl Packet { }, None => { - match self.cnt.load(Ordering::SeqCst) { + match self.queue.producer_addition().cnt.load(Ordering::SeqCst) { n if n != DISCONNECTED => Err(Empty), // This is a little bit of a tricky case. We failed to pop @@ -273,7 +290,7 @@ impl Packet { pub fn drop_chan(&self) { // Dropping a channel is pretty simple, we just flag it as disconnected // and then wakeup a blocker if there is one. - match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) { -1 => { self.take_to_wake().signal(); } DISCONNECTED => {} n => { assert!(n >= 0); } @@ -300,7 +317,7 @@ impl Packet { // sends are gated on this flag, so we're immediately guaranteed that // there are a bounded number of active sends that we'll have to deal // with. - self.port_dropped.store(true, Ordering::SeqCst); + self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst); // Now that we're guaranteed to deal with a bounded number of senders, // we need to drain the queue. This draining process happens atomically @@ -310,9 +327,9 @@ impl Packet { // continue to fail while active senders send data while we're dropping // data, but eventually we're guaranteed to break out of this loop // (because there is a bounded number of senders). - let mut steals = unsafe { *self.steals.get() }; + let mut steals = unsafe { *self.queue.consumer_addition().steals.get() }; while { - let cnt = self.cnt.compare_and_swap( + let cnt = self.queue.producer_addition().cnt.compare_and_swap( steals, DISCONNECTED, Ordering::SeqCst); cnt != DISCONNECTED && cnt != steals } { @@ -353,9 +370,9 @@ impl Packet { // increment the count on the channel (used for selection) fn bump(&self, amt: isize) -> isize { - match self.cnt.fetch_add(amt, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); DISCONNECTED } n => n @@ -404,8 +421,8 @@ impl Packet { // this end. This is fine because we know it's a small bounded windows // of time until the data is actually sent. if was_upgrade { - assert_eq!(unsafe { *self.steals.get() }, 0); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); return Ok(true) } @@ -418,7 +435,7 @@ impl Packet { // If we were previously disconnected, then we know for sure that there // is no thread in to_wake, so just keep going let has_data = if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); true // there is data, that data is that we're disconnected } else { let cur = prev + steals + 1; @@ -441,13 +458,13 @@ impl Packet { if prev < 0 { drop(self.take_to_wake()); } else { - while self.to_wake.load(Ordering::SeqCst) != 0 { + while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 { thread::yield_now(); } } unsafe { - assert_eq!(*self.steals.get(), 0); - *self.steals.get() = steals; + assert_eq!(*self.queue.consumer_addition().steals.get(), 0); + *self.queue.consumer_addition().steals.get() = steals; } // if we were previously positive, then there's surely data to @@ -481,7 +498,7 @@ impl Drop for Packet { // disconnection, but also a proper fence before the read of // `to_wake`, so this assert cannot be removed with also removing // the `to_wake` assert. - assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); } } From 41320fa52e1e4dd85a200a34195ae32d7efd13e0 Mon Sep 17 00:00:00 2001 From: JLockerman Date: Sun, 8 Oct 2017 18:40:45 -0400 Subject: [PATCH 2/3] cfg out Queue::new for emscripten Queue::new is only used is tests atm, which causes warnings on emscripten which does not run queue tests. --- src/libstd/sync/mpsc/spsc_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs index 3ce5927033581..45a00978b3521 100644 --- a/src/libstd/sync/mpsc/spsc_queue.rs +++ b/src/libstd/sync/mpsc/spsc_queue.rs @@ -76,7 +76,7 @@ impl Node { } impl Queue { - #[cfg(test)] + #[cfg(all(test, not(target_os = "emscripten")))] /// Creates a new queue. /// /// This is unsafe as the type system doesn't enforce a single From bb7945e2fe662c86cb8e9e3a93730f20b7480dca Mon Sep 17 00:00:00 2001 From: Joshua Lockerman Date: Fri, 29 Sep 2017 15:58:11 -0400 Subject: [PATCH 3/3] Remove Queue::new. --- src/libstd/sync/mpsc/spsc_queue.rs | 34 +++++------------------------- 1 file changed, 5 insertions(+), 29 deletions(-) diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs index 45a00978b3521..cc4be92276a3b 100644 --- a/src/libstd/sync/mpsc/spsc_queue.rs +++ b/src/libstd/sync/mpsc/spsc_queue.rs @@ -75,30 +75,6 @@ impl Node { } } -impl Queue { - #[cfg(all(test, not(target_os = "emscripten")))] - /// Creates a new queue. - /// - /// This is unsafe as the type system doesn't enforce a single - /// consumer-producer relationship. It also allows the consumer to `pop` - /// items while there is a `peek` active due to all methods having a - /// non-mutable receiver. - /// - /// # Arguments - /// - /// * `bound` - This queue implementation is implemented with a linked - /// list, and this means that a push is always a malloc. In - /// order to amortize this cost, an internal cache of nodes is - /// maintained to prevent a malloc from always being - /// necessary. This bound is the limit on the size of the - /// cache (if desired). If the value is 0, then the cache has - /// no bound. Otherwise, the cache will never grow larger than - /// `bound` (although the queue itself could be much larger. - pub unsafe fn new(bound: usize) -> Queue { - Self::with_additions(bound, (), ()) - } -} - impl Queue { /// Creates a new queue. With given additional elements in the producer and @@ -275,7 +251,7 @@ mod tests { #[test] fn smoke() { unsafe { - let queue = Queue::new(0); + let queue = Queue::with_additions(0, (), ()); queue.push(1); queue.push(2); assert_eq!(queue.pop(), Some(1)); @@ -292,7 +268,7 @@ mod tests { #[test] fn peek() { unsafe { - let queue = Queue::new(0); + let queue = Queue::with_additions(0, (), ()); queue.push(vec![1]); // Ensure the borrowchecker works @@ -315,7 +291,7 @@ mod tests { #[test] fn drop_full() { unsafe { - let q: Queue> = Queue::new(0); + let q: Queue> = Queue::with_additions(0, (), ()); q.push(box 1); q.push(box 2); } @@ -324,7 +300,7 @@ mod tests { #[test] fn smoke_bound() { unsafe { - let q = Queue::new(0); + let q = Queue::with_additions(0, (), ()); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -346,7 +322,7 @@ mod tests { } unsafe fn stress_bound(bound: usize) { - let q = Arc::new(Queue::new(bound)); + let q = Arc::new(Queue::with_additions(bound, (), ())); let (tx, rx) = channel(); let q2 = q.clone();