From 3f51f04925edcd345196e1f1a7b486ced81807da Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Thu, 30 Mar 2023 17:04:58 +0200 Subject: [PATCH 01/11] Fix Wait/Notify opcode, the waiters hashmap is now on the Memory itself --- lib/vm/src/instance/mod.rs | 148 +++++++++------------------------ lib/vm/src/lib.rs | 1 + lib/vm/src/memory.rs | 36 ++++++++ lib/vm/src/threadconditions.rs | 108 ++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 110 deletions(-) create mode 100644 lib/vm/src/threadconditions.rs diff --git a/lib/vm/src/instance/mod.rs b/lib/vm/src/instance/mod.rs index bfc62fca674..51cac3b3a7e 100644 --- a/lib/vm/src/instance/mod.rs +++ b/lib/vm/src/instance/mod.rs @@ -33,8 +33,7 @@ use std::fmt; use std::mem; use std::ptr::{self, NonNull}; use std::slice; -use std::sync::{Arc, Mutex}; -use std::thread::{current, park, park_timeout, Thread}; +use std::sync::Arc; use wasmer_types::entity::{packed_option::ReservedValue, BoxedSlice, EntityRef, PrimaryMap}; use wasmer_types::{ DataIndex, DataInitializer, ElemIndex, ExportIndex, FunctionIndex, GlobalIndex, GlobalInit, @@ -42,20 +41,6 @@ use wasmer_types::{ MemoryIndex, ModuleInfo, Pages, SignatureIndex, TableIndex, TableInitializer, VMOffsets, }; -#[derive(Hash, Eq, PartialEq, Clone, Copy)] -struct NotifyLocation { - memory_index: u32, - address: u32, -} - -struct NotifyWaiter { - thread: Thread, - notified: bool, -} -struct NotifyMap { - map: HashMap>, -} - /// A WebAssembly instance. /// /// The type is dynamically-sized. Indeed, the `vmctx` field can @@ -105,9 +90,6 @@ pub(crate) struct Instance { /// will point to elements here for functions imported by this instance. imported_funcrefs: BoxedSlice>, - /// The Hasmap with the Notify for the Notify/wait opcodes - conditions: Arc>, - /// Additional context used by compiled WebAssembly code. This /// field is last, and represents a dynamically-sized array that /// extends beyond the nominal end of the struct (similar to a @@ -276,6 +258,31 @@ impl Instance { } } + /// Get a locally defined or imported memory. + fn get_vmmemory_mut(&mut self, index: MemoryIndex) -> &mut VMMemory { + if let Some(local_index) = self.module.local_memory_index(index) { + unsafe { + self.memories + .get_mut(local_index) + .unwrap() + .get_mut(self.context.as_mut().unwrap()) + } + } else { + let import = self.imported_memory(index); + unsafe { import.handle.get_mut(self.context.as_mut().unwrap()) } + } + } + + /// Get a locally defined memory as mutable. + fn get_local_vmmemory_mut(&mut self, local_index: LocalMemoryIndex) -> &mut VMMemory { + unsafe { + self.memories + .get_mut(local_index) + .unwrap() + .get_mut(self.context.as_mut().unwrap()) + } + } + /// Return the indexed `VMGlobalDefinition`. fn global(&self, index: LocalGlobalIndex) -> VMGlobalDefinition { unsafe { self.global_ptr(index).as_ref().clone() } @@ -797,53 +804,6 @@ impl Instance { } } - // To implement Wait / Notify, a HasMap, behind a mutex, will be used - // to track the address of waiter. The key of the hashmap is based on the memory - // and waiter threads are "park"'d (with or without timeout) - // Notify will wake the waiters by simply "unpark" the thread - // as the Thread info is stored on the HashMap - // once unparked, the waiter thread will remove it's mark on the HashMap - // timeout / awake is tracked with a boolean in the HashMap - // because `park_timeout` doesn't gives any information on why it returns - fn do_wait(&mut self, index: u32, dst: u32, timeout: i64) -> u32 { - // fetch the notifier - let key = NotifyLocation { - memory_index: index, - address: dst, - }; - let mut conds = self.conditions.lock().unwrap(); - let v = conds.map.entry(key).or_insert_with(Vec::new); - v.push(NotifyWaiter { - thread: current(), - notified: false, - }); - drop(conds); - if timeout < 0 { - park(); - } else { - park_timeout(std::time::Duration::from_nanos(timeout as u64)); - } - let mut conds = self.conditions.lock().unwrap(); - let v = conds.map.get_mut(&key).unwrap(); - let id = current().id(); - let mut ret = 0; - v.retain(|cond| { - if cond.thread.id() == id { - ret = if cond.notified { 0 } else { 2 }; - false - } else { - true - } - }); - if v.is_empty() { - conds.map.remove(&key); - } - if conds.map.len() > 1 << 32 { - ret = 0xffff; - } - ret - } - /// Perform an Atomic.Wait32 pub(crate) fn local_memory_wait32( &mut self, @@ -861,7 +821,8 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { - ret = self.do_wait(memory_index.as_u32(), dst, timeout); + let memory = self.get_local_vmmemory_mut(memory_index); + ret = memory.do_wait(dst, timeout); } if ret == 0xffff { // ret is 0xffff if there is more than 2^32 waiter in queue @@ -888,10 +849,10 @@ impl Instance { //} let ret = unsafe { memory32_atomic_check32(memory, dst, val) }; - if let Ok(mut ret) = ret { if ret == 0 { - ret = self.do_wait(memory_index.as_u32(), dst, timeout); + let memory = self.get_vmmemory_mut(memory_index); + ret = memory.do_wait(dst, timeout); } if ret == 0xffff { // ret is 0xffff if there is more than 2^32 waiter in queue @@ -920,7 +881,8 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { - ret = self.do_wait(memory_index.as_u32(), dst, timeout); + let memory = self.get_local_vmmemory_mut(memory_index); + ret = memory.do_wait(dst, timeout); } if ret == 0xffff { // ret is 0xffff if there is more than 2^32 waiter in queue @@ -950,7 +912,8 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { - ret = self.do_wait(memory_index.as_u32(), dst, timeout); + let memory = self.get_vmmemory_mut(memory_index); + ret = memory.do_wait(dst, timeout); } if ret == 0xffff { // ret is 0xffff if there is more than 2^32 waiter in queue @@ -962,21 +925,6 @@ impl Instance { } } - fn do_notify(&mut self, key: NotifyLocation, count: u32) -> Result { - let mut conds = self.conditions.lock().unwrap(); - let mut cnt = 0u32; - if let Some(v) = conds.map.get_mut(&key) { - for waiter in v { - if cnt < count { - waiter.notified = true; // mark as was waiked up - waiter.thread.unpark(); // wakeup! - cnt += 1; - } - } - } - Ok(cnt) - } - /// Perform an Atomic.Notify pub(crate) fn local_memory_notify( &mut self, @@ -984,17 +932,9 @@ impl Instance { dst: u32, count: u32, ) -> Result { - //let memory = self.memory(memory_index); - //if ! memory.shared { - // We should trap according to spec, but official test rely on not trapping... - //} - + let memory = self.get_local_vmmemory_mut(memory_index); // fetch the notifier - let key = NotifyLocation { - memory_index: memory_index.as_u32(), - address: dst, - }; - self.do_notify(key, count) + Ok(memory.do_notify(dst, count)) } /// Perform an Atomic.Notify @@ -1004,18 +944,9 @@ impl Instance { dst: u32, count: u32, ) -> Result { - //let import = self.imported_memory(memory_index); - //let memory = unsafe { import.definition.as_ref() }; - //if ! memory.shared { - // We should trap according to spec, but official test rely on not trapping... - //} - + let memory = self.get_vmmemory_mut(memory_index); // fetch the notifier - let key = NotifyLocation { - memory_index: memory_index.as_u32(), - address: dst, - }; - self.do_notify(key, count) + Ok(memory.do_notify(dst, count)) } } @@ -1125,9 +1056,6 @@ impl VMInstance { funcrefs, imported_funcrefs, vmctx: VMContext {}, - conditions: Arc::new(Mutex::new(NotifyMap { - map: HashMap::new(), - })), }; let mut instance_handle = allocator.into_vminstance(instance); diff --git a/lib/vm/src/lib.rs b/lib/vm/src/lib.rs index b2d69b8616d..6d09f4cce3c 100644 --- a/lib/vm/src/lib.rs +++ b/lib/vm/src/lib.rs @@ -32,6 +32,7 @@ mod probestack; mod sig_registry; mod store; mod table; +mod threadconditions; mod trap; mod vmcontext; diff --git a/lib/vm/src/memory.rs b/lib/vm/src/memory.rs index d3d288a0c04..4b4cb90fc36 100644 --- a/lib/vm/src/memory.rs +++ b/lib/vm/src/memory.rs @@ -5,6 +5,7 @@ //! //! `Memory` is to WebAssembly linear memories what `Table` is to WebAssembly tables. +use crate::threadconditions::ThreadConditions; use crate::trap::Trap; use crate::{mmap::Mmap, store::MaybeInstanceOwned, vmcontext::VMMemoryDefinition}; use more_asserts::assert_ge; @@ -285,6 +286,7 @@ impl VMOwnedMemory { VMSharedMemory { mmap: Arc::new(RwLock::new(self.mmap)), config: self.config, + conditions: ThreadConditions::new(), } } @@ -346,6 +348,8 @@ pub struct VMSharedMemory { mmap: Arc>, // Configuration of this memory config: VMMemoryConfig, + // waiters list for this memory + conditions: ThreadConditions, } unsafe impl Send for VMSharedMemory {} @@ -381,6 +385,7 @@ impl VMSharedMemory { Ok(Self { mmap: Arc::new(RwLock::new(guard.duplicate()?)), config: self.config.clone(), + conditions: ThreadConditions::new(), }) } } @@ -431,6 +436,16 @@ impl LinearMemory for VMSharedMemory { let forked = Self::duplicate(self)?; Ok(Box::new(forked)) } + + // Add current thread to waiter list + fn do_wait(&mut self, dst: u32, timeout: i64) -> u32 { + self.conditions.do_wait(dst, timeout) + } + + /// Notify waiters from the wait list. Return the number of waiters notified + fn do_notify(&mut self, dst: u32, count: u32) -> u32 { + self.conditions.do_notify(dst, count) + } } impl From for VMMemory { @@ -498,6 +513,16 @@ impl LinearMemory for VMMemory { fn duplicate(&mut self) -> Result, MemoryError> { self.0.duplicate() } + + // Add current thread to waiter list + fn do_wait(&mut self, dst: u32, timeout: i64) -> u32 { + self.0.do_wait(dst, timeout) + } + + /// Notify waiters from the wait list. Return the number of waiters notified + fn do_notify(&mut self, dst: u32, count: u32) -> u32 { + self.0.do_notify(dst, count) + } } impl VMMemory { @@ -616,4 +641,15 @@ where /// Copies this memory to a new memory fn duplicate(&mut self) -> Result, MemoryError>; + + /// Add current thread to the waiter hash, and wait until notified or timout. + /// Return 0 if the waiter has been notified, 2 if the timeout occured, or 0xffff if en error happened + fn do_wait(&mut self, _dst: u32, _timeout: i64) -> u32 { + 0xffff + } + + /// Notify waiters from the wait list. Return the number of waiters notified + fn do_notify(&mut self, _dst: u32, _count: u32) -> u32 { + 0 + } } diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs new file mode 100644 index 00000000000..10b394b4062 --- /dev/null +++ b/lib/vm/src/threadconditions.rs @@ -0,0 +1,108 @@ +use std::collections::HashMap; +use std::sync::{Arc, LockResult, Mutex, MutexGuard}; +use std::thread::{current, park, park_timeout, Thread}; + +#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug)] +struct NotifyLocation { + pub address: u32, +} + +#[derive(Debug)] +struct NotifyWaiter { + pub thread: Thread, + pub notified: bool, +} +#[derive(Debug, Default)] +struct NotifyMap { + pub map: HashMap>, +} + +/// HashMap of Waiters for the Thread/Notify opcodes +#[derive(Debug)] +pub struct ThreadConditions { + inner: Arc>, // The Hasmap with the Notify for the Notify/wait opcodes +} + +impl ThreadConditions { + /// Create a new ThreadConditions + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(NotifyMap::default())), + } + } + + fn lock_conditions(&mut self) -> LockResult> { + self.inner.lock() + } + + // To implement Wait / Notify, a HasMap, behind a mutex, will be used + // to track the address of waiter. The key of the hashmap is based on the memory + // and waiter threads are "park"'d (with or without timeout) + // Notify will wake the waiters by simply "unpark" the thread + // as the Thread info is stored on the HashMap + // once unparked, the waiter thread will remove it's mark on the HashMap + // timeout / awake is tracked with a boolean in the HashMap + // because `park_timeout` doesn't gives any information on why it returns + + /// Add current thread to the waiter hash + pub fn do_wait(&mut self, dst: u32, timeout: i64) -> u32 { + // fetch the notifier + let key = NotifyLocation { address: dst }; + let mut conds = self.lock_conditions().unwrap(); + if conds.map.len() > 1 << 32 { + return 0xffff; + } + let v = conds.map.entry(key).or_insert_with(Vec::new); + v.push(NotifyWaiter { + thread: current(), + notified: false, + }); + drop(conds); + if timeout < 0 { + park(); + } else { + park_timeout(std::time::Duration::from_nanos(timeout as u64)); + } + let mut conds = self.lock_conditions().unwrap(); + let v = conds.map.get_mut(&key).unwrap(); + let id = current().id(); + let mut ret = 0; + v.retain(|cond| { + if cond.thread.id() == id { + ret = if cond.notified { 0 } else { 2 }; + false + } else { + true + } + }); + if v.is_empty() { + conds.map.remove(&key); + } + ret + } + + /// Notify waiters from the wait list + pub fn do_notify(&mut self, dst: u32, count: u32) -> u32 { + let key = NotifyLocation { address: dst }; + let mut conds = self.lock_conditions().unwrap(); + let mut cnt = 0u32; + if let Some(v) = conds.map.get_mut(&key) { + for waiter in v { + if cnt < count && !waiter.notified { + waiter.notified = true; // mark as was waiked up + waiter.thread.unpark(); // wakeup! + cnt += 1; + } + } + } + cnt + } +} + +impl Clone for ThreadConditions { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} From 0675310440af9bcc94f212da776422112556e6ba Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Fri, 31 Mar 2023 10:41:02 +0200 Subject: [PATCH 02/11] Refactored for clarity --- lib/vm/src/instance/mod.rs | 40 ++++++++++++++++++++++++++++------ lib/vm/src/lib.rs | 3 ++- lib/vm/src/memory.rs | 14 +++++++----- lib/vm/src/threadconditions.rs | 37 ++++++++++++++++--------------- 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/lib/vm/src/instance/mod.rs b/lib/vm/src/instance/mod.rs index 51cac3b3a7e..67774846b06 100644 --- a/lib/vm/src/instance/mod.rs +++ b/lib/vm/src/instance/mod.rs @@ -19,8 +19,8 @@ use crate::vmcontext::{ VMFunctionImport, VMFunctionKind, VMGlobalDefinition, VMGlobalImport, VMMemoryDefinition, VMMemoryImport, VMSharedSignatureIndex, VMTableDefinition, VMTableImport, VMTrampoline, }; -use crate::LinearMemory; use crate::{FunctionBodyPtr, MaybeInstanceOwned, TrapHandlerFn, VMFunctionBody}; +use crate::{LinearMemory, NotifyLocation}; use crate::{VMFuncRef, VMFunction, VMGlobal, VMMemory, VMTable}; pub use allocator::InstanceAllocator; use memoffset::offset_of; @@ -822,7 +822,13 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { let memory = self.get_local_vmmemory_mut(memory_index); - ret = memory.do_wait(dst, timeout); + let location = NotifyLocation { address: dst }; + let timeout = if timeout < 0 { + None + } else { + Some(std::time::Duration::from_nanos(timeout as u64)) + }; + ret = memory.do_wait(location, timeout); } if ret == 0xffff { // ret is 0xffff if there is more than 2^32 waiter in queue @@ -852,7 +858,13 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { let memory = self.get_vmmemory_mut(memory_index); - ret = memory.do_wait(dst, timeout); + let location = NotifyLocation { address: dst }; + let timeout = if timeout < 0 { + None + } else { + Some(std::time::Duration::from_nanos(timeout as u64)) + }; + ret = memory.do_wait(location, timeout); } if ret == 0xffff { // ret is 0xffff if there is more than 2^32 waiter in queue @@ -882,7 +894,13 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { let memory = self.get_local_vmmemory_mut(memory_index); - ret = memory.do_wait(dst, timeout); + let location = NotifyLocation { address: dst }; + let timeout = if timeout < 0 { + None + } else { + Some(std::time::Duration::from_nanos(timeout as u64)) + }; + ret = memory.do_wait(location, timeout); } if ret == 0xffff { // ret is 0xffff if there is more than 2^32 waiter in queue @@ -913,7 +931,13 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { let memory = self.get_vmmemory_mut(memory_index); - ret = memory.do_wait(dst, timeout); + let location = NotifyLocation { address: dst }; + let timeout = if timeout < 0 { + None + } else { + Some(std::time::Duration::from_nanos(timeout as u64)) + }; + ret = memory.do_wait(location, timeout); } if ret == 0xffff { // ret is 0xffff if there is more than 2^32 waiter in queue @@ -934,7 +958,8 @@ impl Instance { ) -> Result { let memory = self.get_local_vmmemory_mut(memory_index); // fetch the notifier - Ok(memory.do_notify(dst, count)) + let location = NotifyLocation { address: dst }; + Ok(memory.do_notify(location, count)) } /// Perform an Atomic.Notify @@ -946,7 +971,8 @@ impl Instance { ) -> Result { let memory = self.get_vmmemory_mut(memory_index); // fetch the notifier - Ok(memory.do_notify(dst, count)) + let location = NotifyLocation { address: dst }; + Ok(memory.do_notify(location, count)) } } diff --git a/lib/vm/src/lib.rs b/lib/vm/src/lib.rs index 6d09f4cce3c..b2ef5284bed 100644 --- a/lib/vm/src/lib.rs +++ b/lib/vm/src/lib.rs @@ -48,7 +48,8 @@ pub use crate::imports::Imports; #[allow(deprecated)] pub use crate::instance::{InstanceAllocator, InstanceHandle, VMInstance}; pub use crate::memory::{ - initialize_memory_with_data, LinearMemory, VMMemory, VMOwnedMemory, VMSharedMemory, + initialize_memory_with_data, LinearMemory, NotifyLocation, VMMemory, VMOwnedMemory, + VMSharedMemory, }; pub use crate::mmap::Mmap; pub use crate::probestack::PROBESTACK; diff --git a/lib/vm/src/memory.rs b/lib/vm/src/memory.rs index 4b4cb90fc36..ea1fbfefded 100644 --- a/lib/vm/src/memory.rs +++ b/lib/vm/src/memory.rs @@ -5,6 +5,7 @@ //! //! `Memory` is to WebAssembly linear memories what `Table` is to WebAssembly tables. +pub use crate::threadconditions::NotifyLocation; use crate::threadconditions::ThreadConditions; use crate::trap::Trap; use crate::{mmap::Mmap, store::MaybeInstanceOwned, vmcontext::VMMemoryDefinition}; @@ -14,6 +15,7 @@ use std::convert::TryInto; use std::ptr::NonNull; use std::slice; use std::sync::{Arc, RwLock}; +use std::time::Duration; use wasmer_types::{Bytes, MemoryError, MemoryStyle, MemoryType, Pages}; // The memory mapped area @@ -438,12 +440,12 @@ impl LinearMemory for VMSharedMemory { } // Add current thread to waiter list - fn do_wait(&mut self, dst: u32, timeout: i64) -> u32 { + fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> u32 { self.conditions.do_wait(dst, timeout) } /// Notify waiters from the wait list. Return the number of waiters notified - fn do_notify(&mut self, dst: u32, count: u32) -> u32 { + fn do_notify(&mut self, dst: NotifyLocation, count: u32) -> u32 { self.conditions.do_notify(dst, count) } } @@ -515,12 +517,12 @@ impl LinearMemory for VMMemory { } // Add current thread to waiter list - fn do_wait(&mut self, dst: u32, timeout: i64) -> u32 { + fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> u32 { self.0.do_wait(dst, timeout) } /// Notify waiters from the wait list. Return the number of waiters notified - fn do_notify(&mut self, dst: u32, count: u32) -> u32 { + fn do_notify(&mut self, dst: NotifyLocation, count: u32) -> u32 { self.0.do_notify(dst, count) } } @@ -644,12 +646,12 @@ where /// Add current thread to the waiter hash, and wait until notified or timout. /// Return 0 if the waiter has been notified, 2 if the timeout occured, or 0xffff if en error happened - fn do_wait(&mut self, _dst: u32, _timeout: i64) -> u32 { + fn do_wait(&mut self, _dst: NotifyLocation, _timeout: Option) -> u32 { 0xffff } /// Notify waiters from the wait list. Return the number of waiters notified - fn do_notify(&mut self, _dst: u32, _count: u32) -> u32 { + fn do_notify(&mut self, _dst: NotifyLocation, _count: u32) -> u32 { 0 } } diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index 10b394b4062..98ef3c59237 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -1,9 +1,12 @@ use std::collections::HashMap; use std::sync::{Arc, LockResult, Mutex, MutexGuard}; use std::thread::{current, park, park_timeout, Thread}; +use std::time::Duration; +/// A location in memory for a Waiter #[derive(Hash, Eq, PartialEq, Clone, Copy, Debug)] -struct NotifyLocation { +pub struct NotifyLocation { + /// The address of the Waiter location pub address: u32, } @@ -23,6 +26,8 @@ pub struct ThreadConditions { inner: Arc>, // The Hasmap with the Notify for the Notify/wait opcodes } +pub const WAIT_ERROR: u32 = 0xffff; + impl ThreadConditions { /// Create a new ThreadConditions pub fn new() -> Self { @@ -45,26 +50,25 @@ impl ThreadConditions { // because `park_timeout` doesn't gives any information on why it returns /// Add current thread to the waiter hash - pub fn do_wait(&mut self, dst: u32, timeout: i64) -> u32 { + pub fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> u32 { // fetch the notifier - let key = NotifyLocation { address: dst }; let mut conds = self.lock_conditions().unwrap(); if conds.map.len() > 1 << 32 { - return 0xffff; + return WAIT_ERROR; } - let v = conds.map.entry(key).or_insert_with(Vec::new); + let v = conds.map.entry(dst).or_insert_with(Vec::new); v.push(NotifyWaiter { thread: current(), notified: false, }); drop(conds); - if timeout < 0 { - park(); + if let Some(timeout) = timeout { + park_timeout(timeout); } else { - park_timeout(std::time::Duration::from_nanos(timeout as u64)); + park(); } let mut conds = self.lock_conditions().unwrap(); - let v = conds.map.get_mut(&key).unwrap(); + let v = conds.map.get_mut(&dst).unwrap(); let id = current().id(); let mut ret = 0; v.retain(|cond| { @@ -76,26 +80,25 @@ impl ThreadConditions { } }); if v.is_empty() { - conds.map.remove(&key); + conds.map.remove(&dst); } ret } /// Notify waiters from the wait list - pub fn do_notify(&mut self, dst: u32, count: u32) -> u32 { - let key = NotifyLocation { address: dst }; + pub fn do_notify(&mut self, dst: NotifyLocation, count: u32) -> u32 { let mut conds = self.lock_conditions().unwrap(); - let mut cnt = 0u32; - if let Some(v) = conds.map.get_mut(&key) { + let mut count_token = 0u32; + if let Some(v) = conds.map.get_mut(&dst) { for waiter in v { - if cnt < count && !waiter.notified { + if count_token < count && !waiter.notified { waiter.notified = true; // mark as was waiked up waiter.thread.unpark(); // wakeup! - cnt += 1; + count_token += 1; } } } - cnt + count_token } } From 03fc8210ced16cea069c1ef3a6c4cb443753de25 Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Fri, 31 Mar 2023 10:51:20 +0200 Subject: [PATCH 03/11] Use WAIT_ERROR everywhere --- lib/vm/src/instance/mod.rs | 18 +++++++++--------- lib/vm/src/lib.rs | 1 + lib/vm/src/threadconditions.rs | 1 + 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/vm/src/instance/mod.rs b/lib/vm/src/instance/mod.rs index 67774846b06..60d809b3436 100644 --- a/lib/vm/src/instance/mod.rs +++ b/lib/vm/src/instance/mod.rs @@ -20,7 +20,7 @@ use crate::vmcontext::{ VMMemoryImport, VMSharedSignatureIndex, VMTableDefinition, VMTableImport, VMTrampoline, }; use crate::{FunctionBodyPtr, MaybeInstanceOwned, TrapHandlerFn, VMFunctionBody}; -use crate::{LinearMemory, NotifyLocation}; +use crate::{LinearMemory, NotifyLocation, WAIT_ERROR}; use crate::{VMFuncRef, VMFunction, VMGlobal, VMMemory, VMTable}; pub use allocator::InstanceAllocator; use memoffset::offset_of; @@ -830,8 +830,8 @@ impl Instance { }; ret = memory.do_wait(location, timeout); } - if ret == 0xffff { - // ret is 0xffff if there is more than 2^32 waiter in queue + if ret == WAIT_ERROR { + // ret is WAIT_ERROR if there is more than 2^32 waiter in queue return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); } Ok(ret) @@ -866,8 +866,8 @@ impl Instance { }; ret = memory.do_wait(location, timeout); } - if ret == 0xffff { - // ret is 0xffff if there is more than 2^32 waiter in queue + if ret == WAIT_ERROR { + // ret is WAIT_ERROR if there is more than 2^32 waiter in queue return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); } Ok(ret) @@ -902,8 +902,8 @@ impl Instance { }; ret = memory.do_wait(location, timeout); } - if ret == 0xffff { - // ret is 0xffff if there is more than 2^32 waiter in queue + if ret == WAIT_ERROR { + // ret is WAIT_ERROR if there is more than 2^32 waiter in queue return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); } Ok(ret) @@ -939,8 +939,8 @@ impl Instance { }; ret = memory.do_wait(location, timeout); } - if ret == 0xffff { - // ret is 0xffff if there is more than 2^32 waiter in queue + if ret == WAIT_ERROR { + // ret is WAIT_ERROR if there is more than 2^32 waiter in queue return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); } Ok(ret) diff --git a/lib/vm/src/lib.rs b/lib/vm/src/lib.rs index b2ef5284bed..bf24f84c5c5 100644 --- a/lib/vm/src/lib.rs +++ b/lib/vm/src/lib.rs @@ -56,6 +56,7 @@ pub use crate::probestack::PROBESTACK; pub use crate::sig_registry::SignatureRegistry; pub use crate::store::{InternalStoreHandle, MaybeInstanceOwned, StoreHandle, StoreObjects}; pub use crate::table::{TableElement, VMTable}; +pub use crate::threadconditions::WAIT_ERROR; pub use crate::trap::*; pub use crate::vmcontext::{ VMCallerCheckedAnyfunc, VMContext, VMDynamicFunctionContext, VMFunctionContext, diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index 98ef3c59237..5238922524f 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -26,6 +26,7 @@ pub struct ThreadConditions { inner: Arc>, // The Hasmap with the Notify for the Notify/wait opcodes } +/// do_wait will return this in case of error pub const WAIT_ERROR: u32 = 0xffff; impl ThreadConditions { From 9679767d456e1167d23decf677a074392b606f4d Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Fri, 31 Mar 2023 11:33:06 +0200 Subject: [PATCH 04/11] Change from WAIT_ERROR to Option --- lib/vm/src/instance/mod.rs | 46 ++++++++++++++++++---------------- lib/vm/src/lib.rs | 1 - lib/vm/src/memory.rs | 10 ++++---- lib/vm/src/threadconditions.rs | 9 +++---- 4 files changed, 33 insertions(+), 33 deletions(-) diff --git a/lib/vm/src/instance/mod.rs b/lib/vm/src/instance/mod.rs index 60d809b3436..1fdc2f4becf 100644 --- a/lib/vm/src/instance/mod.rs +++ b/lib/vm/src/instance/mod.rs @@ -20,7 +20,7 @@ use crate::vmcontext::{ VMMemoryImport, VMSharedSignatureIndex, VMTableDefinition, VMTableImport, VMTrampoline, }; use crate::{FunctionBodyPtr, MaybeInstanceOwned, TrapHandlerFn, VMFunctionBody}; -use crate::{LinearMemory, NotifyLocation, WAIT_ERROR}; +use crate::{LinearMemory, NotifyLocation}; use crate::{VMFuncRef, VMFunction, VMGlobal, VMMemory, VMTable}; pub use allocator::InstanceAllocator; use memoffset::offset_of; @@ -828,11 +828,12 @@ impl Instance { } else { Some(std::time::Duration::from_nanos(timeout as u64)) }; - ret = memory.do_wait(location, timeout); - } - if ret == WAIT_ERROR { - // ret is WAIT_ERROR if there is more than 2^32 waiter in queue - return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + let waiter = memory.do_wait(location, timeout); + if waiter.is_none() { + // ret is None if there is more than 2^32 waiter in queue or some other error + return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + } + ret = waiter.unwrap(); } Ok(ret) } else { @@ -864,11 +865,12 @@ impl Instance { } else { Some(std::time::Duration::from_nanos(timeout as u64)) }; - ret = memory.do_wait(location, timeout); - } - if ret == WAIT_ERROR { - // ret is WAIT_ERROR if there is more than 2^32 waiter in queue - return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + let waiter = memory.do_wait(location, timeout); + if waiter.is_none() { + // ret is None if there is more than 2^32 waiter in queue or some other error + return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + } + ret = waiter.unwrap(); } Ok(ret) } else { @@ -900,11 +902,12 @@ impl Instance { } else { Some(std::time::Duration::from_nanos(timeout as u64)) }; - ret = memory.do_wait(location, timeout); - } - if ret == WAIT_ERROR { - // ret is WAIT_ERROR if there is more than 2^32 waiter in queue - return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + let waiter = memory.do_wait(location, timeout); + if waiter.is_none() { + // ret is None if there is more than 2^32 waiter in queue or some other error + return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + } + ret = waiter.unwrap(); } Ok(ret) } else { @@ -937,11 +940,12 @@ impl Instance { } else { Some(std::time::Duration::from_nanos(timeout as u64)) }; - ret = memory.do_wait(location, timeout); - } - if ret == WAIT_ERROR { - // ret is WAIT_ERROR if there is more than 2^32 waiter in queue - return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + let waiter = memory.do_wait(location, timeout); + if waiter.is_none() { + // ret is None if there is more than 2^32 waiter in queue or some other error + return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + } + ret = waiter.unwrap(); } Ok(ret) } else { diff --git a/lib/vm/src/lib.rs b/lib/vm/src/lib.rs index bf24f84c5c5..b2ef5284bed 100644 --- a/lib/vm/src/lib.rs +++ b/lib/vm/src/lib.rs @@ -56,7 +56,6 @@ pub use crate::probestack::PROBESTACK; pub use crate::sig_registry::SignatureRegistry; pub use crate::store::{InternalStoreHandle, MaybeInstanceOwned, StoreHandle, StoreObjects}; pub use crate::table::{TableElement, VMTable}; -pub use crate::threadconditions::WAIT_ERROR; pub use crate::trap::*; pub use crate::vmcontext::{ VMCallerCheckedAnyfunc, VMContext, VMDynamicFunctionContext, VMFunctionContext, diff --git a/lib/vm/src/memory.rs b/lib/vm/src/memory.rs index ea1fbfefded..57037664151 100644 --- a/lib/vm/src/memory.rs +++ b/lib/vm/src/memory.rs @@ -440,7 +440,7 @@ impl LinearMemory for VMSharedMemory { } // Add current thread to waiter list - fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> u32 { + fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> Option { self.conditions.do_wait(dst, timeout) } @@ -517,7 +517,7 @@ impl LinearMemory for VMMemory { } // Add current thread to waiter list - fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> u32 { + fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> Option { self.0.do_wait(dst, timeout) } @@ -645,9 +645,9 @@ where fn duplicate(&mut self) -> Result, MemoryError>; /// Add current thread to the waiter hash, and wait until notified or timout. - /// Return 0 if the waiter has been notified, 2 if the timeout occured, or 0xffff if en error happened - fn do_wait(&mut self, _dst: NotifyLocation, _timeout: Option) -> u32 { - 0xffff + /// Return 0 if the waiter has been notified, 2 if the timeout occured, or None if en error happened + fn do_wait(&mut self, _dst: NotifyLocation, _timeout: Option) -> Option { + None } /// Notify waiters from the wait list. Return the number of waiters notified diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index 5238922524f..5cdd47ce4f6 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -26,9 +26,6 @@ pub struct ThreadConditions { inner: Arc>, // The Hasmap with the Notify for the Notify/wait opcodes } -/// do_wait will return this in case of error -pub const WAIT_ERROR: u32 = 0xffff; - impl ThreadConditions { /// Create a new ThreadConditions pub fn new() -> Self { @@ -51,11 +48,11 @@ impl ThreadConditions { // because `park_timeout` doesn't gives any information on why it returns /// Add current thread to the waiter hash - pub fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> u32 { + pub fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> Option { // fetch the notifier let mut conds = self.lock_conditions().unwrap(); if conds.map.len() > 1 << 32 { - return WAIT_ERROR; + return None; } let v = conds.map.entry(dst).or_insert_with(Vec::new); v.push(NotifyWaiter { @@ -83,7 +80,7 @@ impl ThreadConditions { if v.is_empty() { conds.map.remove(&dst); } - ret + Some(ret) } /// Notify waiters from the wait list From fb7ca81f9b1b523089f244318c02b3b133558a4f Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Fri, 31 Mar 2023 14:10:37 +0200 Subject: [PATCH 05/11] Added some unit test for threadconditions --- lib/vm/src/threadconditions.rs | 92 ++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index 5cdd47ce4f6..4b31cce79cb 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -107,3 +107,95 @@ impl Clone for ThreadConditions { } } } + +#[cfg(test)] +#[test] +fn threadconditions_notify_nowaiters() { + let mut conditions = ThreadConditions::new(); + let dst = NotifyLocation { address: 0 }; + let ret = conditions.do_notify(dst, 1); + assert_eq!(ret, 0); +} + +#[cfg(test)] +#[test] +fn threadconditions_notify_1waiter() { + use std::thread; + + let mut conditions = ThreadConditions::new(); + let mut threadcond = conditions.clone(); + + thread::spawn(move || { + let dst = NotifyLocation { address: 0 }; + let ret = threadcond.do_wait(dst.clone(), None); + assert_eq!(ret, Some(0)); + }); + thread::sleep(Duration::from_millis(1)); + let dst = NotifyLocation { address: 0 }; + let ret = conditions.do_notify(dst, 1); + assert_eq!(ret, 1); +} + +#[cfg(test)] +#[test] +fn threadconditions_notify_waiter_timeout() { + use std::thread; + + let mut conditions = ThreadConditions::new(); + let mut threadcond = conditions.clone(); + + thread::spawn(move || { + let dst = NotifyLocation { address: 0 }; + let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(1))); + assert_eq!(ret, Some(2)); + }); + thread::sleep(Duration::from_millis(10)); + let dst = NotifyLocation { address: 0 }; + let ret = conditions.do_notify(dst, 1); + assert_eq!(ret, 0); +} + +#[cfg(test)] +#[test] +fn threadconditions_notify_waiter_mismatch() { + use std::thread; + + let mut conditions = ThreadConditions::new(); + let mut threadcond = conditions.clone(); + + thread::spawn(move || { + let dst = NotifyLocation { address: 8 }; + let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(10))); + assert_eq!(ret, Some(2)); + }); + thread::sleep(Duration::from_millis(1)); + let dst = NotifyLocation { address: 0 }; + let ret = conditions.do_notify(dst, 1); + assert_eq!(ret, 0); + thread::sleep(Duration::from_millis(10)); +} + +#[cfg(test)] +#[test] +fn threadconditions_notify_2waiters() { + use std::thread; + + let mut conditions = ThreadConditions::new(); + let mut threadcond = conditions.clone(); + let mut threadcond2 = conditions.clone(); + + thread::spawn(move || { + let dst = NotifyLocation { address: 0 }; + let ret = threadcond.do_wait(dst.clone(), None); + assert_eq!(ret, Some(0)); + }); + thread::spawn(move || { + let dst = NotifyLocation { address: 0 }; + let ret = threadcond2.do_wait(dst.clone(), None); + assert_eq!(ret, Some(0)); + }); + thread::sleep(Duration::from_millis(1)); + let dst = NotifyLocation { address: 0 }; + let ret = conditions.do_notify(dst, 5); + assert_eq!(ret, 2); +} From df578a62604939ea6c87ed05d48498113df8e44a Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Fri, 31 Mar 2023 15:33:52 +0200 Subject: [PATCH 06/11] Switch from `Mutex>` to `DashMap<...>` for the NotifyMap --- Cargo.lock | 14 +++++++++++ lib/vm/Cargo.toml | 1 + lib/vm/src/threadconditions.rs | 46 ++++++++++++++++------------------ 3 files changed, 37 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2165f2188b8..d6f7ae07dee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -980,6 +980,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if 1.0.0", + "hashbrown 0.12.3", + "lock_api", + "once_cell", + "parking_lot_core 0.9.7", +] + [[package]] name = "derivative" version = "2.2.0" @@ -5729,6 +5742,7 @@ dependencies = [ "cc", "cfg-if 1.0.0", "corosensei", + "dashmap", "derivative", "enum-iterator", "indexmap", diff --git a/lib/vm/Cargo.toml b/lib/vm/Cargo.toml index dec609c0d85..e6aa57b1d0d 100644 --- a/lib/vm/Cargo.toml +++ b/lib/vm/Cargo.toml @@ -26,6 +26,7 @@ lazy_static = "1.4.0" region = { version = "3.0" } corosensei = { version = "0.1.2" } derivative = { version = "^2" } +dashmap = { version = "5.4" } # - Optional shared dependencies. tracing = { version = "0.1", optional = true } diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index 4b31cce79cb..fd87b9cdc23 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -1,5 +1,5 @@ -use std::collections::HashMap; -use std::sync::{Arc, LockResult, Mutex, MutexGuard}; +use dashmap::DashMap; +use std::sync::Arc; use std::thread::{current, park, park_timeout, Thread}; use std::time::Duration; @@ -17,27 +17,23 @@ struct NotifyWaiter { } #[derive(Debug, Default)] struct NotifyMap { - pub map: HashMap>, + pub map: DashMap>, } /// HashMap of Waiters for the Thread/Notify opcodes #[derive(Debug)] pub struct ThreadConditions { - inner: Arc>, // The Hasmap with the Notify for the Notify/wait opcodes + inner: Arc, // The Hasmap with the Notify for the Notify/wait opcodes } impl ThreadConditions { /// Create a new ThreadConditions pub fn new() -> Self { Self { - inner: Arc::new(Mutex::new(NotifyMap::default())), + inner: Arc::new(NotifyMap::default()), } } - fn lock_conditions(&mut self) -> LockResult> { - self.inner.lock() - } - // To implement Wait / Notify, a HasMap, behind a mutex, will be used // to track the address of waiter. The key of the hashmap is based on the memory // and waiter threads are "park"'d (with or without timeout) @@ -50,23 +46,24 @@ impl ThreadConditions { /// Add current thread to the waiter hash pub fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> Option { // fetch the notifier - let mut conds = self.lock_conditions().unwrap(); - if conds.map.len() > 1 << 32 { + if self.inner.map.len() >= 1 << 32 { return None; } - let v = conds.map.entry(dst).or_insert_with(Vec::new); - v.push(NotifyWaiter { - thread: current(), - notified: false, - }); - drop(conds); + self.inner + .map + .entry(dst) + .or_insert_with(Vec::new) + .push(NotifyWaiter { + thread: current(), + notified: false, + }); if let Some(timeout) = timeout { park_timeout(timeout); } else { park(); } - let mut conds = self.lock_conditions().unwrap(); - let v = conds.map.get_mut(&dst).unwrap(); + let mut bindding = self.inner.map.get_mut(&dst).unwrap(); + let v = bindding.value_mut(); let id = current().id(); let mut ret = 0; v.retain(|cond| { @@ -77,18 +74,19 @@ impl ThreadConditions { true } }); - if v.is_empty() { - conds.map.remove(&dst); + let empty = v.is_empty(); + drop(bindding); + if empty { + self.inner.map.remove(&dst); } Some(ret) } /// Notify waiters from the wait list pub fn do_notify(&mut self, dst: NotifyLocation, count: u32) -> u32 { - let mut conds = self.lock_conditions().unwrap(); let mut count_token = 0u32; - if let Some(v) = conds.map.get_mut(&dst) { - for waiter in v { + if let Some(mut v) = self.inner.map.get_mut(&dst) { + for waiter in v.value_mut() { if count_token < count && !waiter.notified { waiter.notified = true; // mark as was waiked up waiter.thread.unpark(); // wakeup! From ed2b8957264f9cfe73f49e1979419d5c2260af81 Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Fri, 31 Mar 2023 16:04:07 +0200 Subject: [PATCH 07/11] Use FnvHasher for Dashmap --- Cargo.lock | 1 + lib/vm/Cargo.toml | 1 + lib/vm/src/threadconditions.rs | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index d6f7ae07dee..ebca58e7b1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5745,6 +5745,7 @@ dependencies = [ "dashmap", "derivative", "enum-iterator", + "fnv", "indexmap", "lazy_static", "libc", diff --git a/lib/vm/Cargo.toml b/lib/vm/Cargo.toml index e6aa57b1d0d..01387520794 100644 --- a/lib/vm/Cargo.toml +++ b/lib/vm/Cargo.toml @@ -27,6 +27,7 @@ region = { version = "3.0" } corosensei = { version = "0.1.2" } derivative = { version = "^2" } dashmap = { version = "5.4" } +fnv = "1.0.3" # - Optional shared dependencies. tracing = { version = "0.1", optional = true } diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index fd87b9cdc23..a75c8bb9300 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -1,4 +1,5 @@ use dashmap::DashMap; +use fnv::FnvBuildHasher; use std::sync::Arc; use std::thread::{current, park, park_timeout, Thread}; use std::time::Duration; @@ -17,7 +18,7 @@ struct NotifyWaiter { } #[derive(Debug, Default)] struct NotifyMap { - pub map: DashMap>, + pub map: DashMap, FnvBuildHasher>, } /// HashMap of Waiters for the Thread/Notify opcodes From feca793b32edbfa0e62bd89c9c919ff0051a6074 Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Fri, 31 Mar 2023 17:23:06 +0200 Subject: [PATCH 08/11] Change timeout value in unit test to leave more margin to the system to react. --- lib/vm/src/threadconditions.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index a75c8bb9300..abc8ba7593a 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -148,7 +148,7 @@ fn threadconditions_notify_waiter_timeout() { let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(1))); assert_eq!(ret, Some(2)); }); - thread::sleep(Duration::from_millis(10)); + thread::sleep(Duration::from_millis(50)); let dst = NotifyLocation { address: 0 }; let ret = conditions.do_notify(dst, 1); assert_eq!(ret, 0); @@ -171,7 +171,7 @@ fn threadconditions_notify_waiter_mismatch() { let dst = NotifyLocation { address: 0 }; let ret = conditions.do_notify(dst, 1); assert_eq!(ret, 0); - thread::sleep(Duration::from_millis(10)); + thread::sleep(Duration::from_millis(100)); } #[cfg(test)] From 745e31847e6710cc749ca8c4d440ca99a815c857 Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Wed, 12 Apr 2023 12:39:46 +0200 Subject: [PATCH 09/11] Consolidate code, avoid duplication --- lib/vm/src/instance/mod.rs | 67 +++++++++++--------------------------- 1 file changed, 19 insertions(+), 48 deletions(-) diff --git a/lib/vm/src/instance/mod.rs b/lib/vm/src/instance/mod.rs index 1fdc2f4becf..04e78397504 100644 --- a/lib/vm/src/instance/mod.rs +++ b/lib/vm/src/instance/mod.rs @@ -804,6 +804,21 @@ impl Instance { } } + fn memory_wait(memory: &mut VMMemory, dst: u32, timeout: i64) -> Result { + let location = NotifyLocation { address: dst }; + let timeout = if timeout < 0 { + None + } else { + Some(std::time::Duration::from_nanos(timeout as u64)) + }; + let waiter = memory.do_wait(location, timeout); + if waiter.is_none() { + // ret is None if there is more than 2^32 waiter in queue or some other error + return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); + } + Ok(waiter.unwrap()) + } + /// Perform an Atomic.Wait32 pub(crate) fn local_memory_wait32( &mut self, @@ -822,18 +837,7 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { let memory = self.get_local_vmmemory_mut(memory_index); - let location = NotifyLocation { address: dst }; - let timeout = if timeout < 0 { - None - } else { - Some(std::time::Duration::from_nanos(timeout as u64)) - }; - let waiter = memory.do_wait(location, timeout); - if waiter.is_none() { - // ret is None if there is more than 2^32 waiter in queue or some other error - return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); - } - ret = waiter.unwrap(); + ret = Instance::memory_wait(memory, dst, timeout)?; } Ok(ret) } else { @@ -859,18 +863,7 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { let memory = self.get_vmmemory_mut(memory_index); - let location = NotifyLocation { address: dst }; - let timeout = if timeout < 0 { - None - } else { - Some(std::time::Duration::from_nanos(timeout as u64)) - }; - let waiter = memory.do_wait(location, timeout); - if waiter.is_none() { - // ret is None if there is more than 2^32 waiter in queue or some other error - return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); - } - ret = waiter.unwrap(); + ret = Instance::memory_wait(memory, dst, timeout)?; } Ok(ret) } else { @@ -896,18 +889,7 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { let memory = self.get_local_vmmemory_mut(memory_index); - let location = NotifyLocation { address: dst }; - let timeout = if timeout < 0 { - None - } else { - Some(std::time::Duration::from_nanos(timeout as u64)) - }; - let waiter = memory.do_wait(location, timeout); - if waiter.is_none() { - // ret is None if there is more than 2^32 waiter in queue or some other error - return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); - } - ret = waiter.unwrap(); + ret = Instance::memory_wait(memory, dst, timeout)?; } Ok(ret) } else { @@ -934,18 +916,7 @@ impl Instance { if let Ok(mut ret) = ret { if ret == 0 { let memory = self.get_vmmemory_mut(memory_index); - let location = NotifyLocation { address: dst }; - let timeout = if timeout < 0 { - None - } else { - Some(std::time::Duration::from_nanos(timeout as u64)) - }; - let waiter = memory.do_wait(location, timeout); - if waiter.is_none() { - // ret is None if there is more than 2^32 waiter in queue or some other error - return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); - } - ret = waiter.unwrap(); + ret = Instance::memory_wait(memory, dst, timeout)?; } Ok(ret) } else { From 050d153c9e61915f1947c9c4518d916635004817 Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Wed, 12 Apr 2023 12:58:43 +0200 Subject: [PATCH 10/11] Put test in a test module --- lib/vm/src/threadconditions.rs | 152 ++++++++++++++++----------------- 1 file changed, 76 insertions(+), 76 deletions(-) diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index abc8ba7593a..3b6dd73c7f0 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -108,93 +108,93 @@ impl Clone for ThreadConditions { } #[cfg(test)] -#[test] -fn threadconditions_notify_nowaiters() { - let mut conditions = ThreadConditions::new(); - let dst = NotifyLocation { address: 0 }; - let ret = conditions.do_notify(dst, 1); - assert_eq!(ret, 0); -} +mod tests { + use super::*; -#[cfg(test)] -#[test] -fn threadconditions_notify_1waiter() { - use std::thread; + #[test] + fn threadconditions_notify_nowaiters() { + let mut conditions = ThreadConditions::new(); + let dst = NotifyLocation { address: 0 }; + let ret = conditions.do_notify(dst, 1); + assert_eq!(ret, 0); + } + + #[test] + fn threadconditions_notify_1waiter() { + use std::thread; - let mut conditions = ThreadConditions::new(); - let mut threadcond = conditions.clone(); + let mut conditions = ThreadConditions::new(); + let mut threadcond = conditions.clone(); - thread::spawn(move || { + thread::spawn(move || { + let dst = NotifyLocation { address: 0 }; + let ret = threadcond.do_wait(dst.clone(), None); + assert_eq!(ret, Some(0)); + }); + thread::sleep(Duration::from_millis(1)); let dst = NotifyLocation { address: 0 }; - let ret = threadcond.do_wait(dst.clone(), None); - assert_eq!(ret, Some(0)); - }); - thread::sleep(Duration::from_millis(1)); - let dst = NotifyLocation { address: 0 }; - let ret = conditions.do_notify(dst, 1); - assert_eq!(ret, 1); -} + let ret = conditions.do_notify(dst, 1); + assert_eq!(ret, 1); + } -#[cfg(test)] -#[test] -fn threadconditions_notify_waiter_timeout() { - use std::thread; + #[test] + fn threadconditions_notify_waiter_timeout() { + use std::thread; - let mut conditions = ThreadConditions::new(); - let mut threadcond = conditions.clone(); + let mut conditions = ThreadConditions::new(); + let mut threadcond = conditions.clone(); - thread::spawn(move || { + thread::spawn(move || { + let dst = NotifyLocation { address: 0 }; + let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(1))); + assert_eq!(ret, Some(2)); + }); + thread::sleep(Duration::from_millis(50)); let dst = NotifyLocation { address: 0 }; - let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(1))); - assert_eq!(ret, Some(2)); - }); - thread::sleep(Duration::from_millis(50)); - let dst = NotifyLocation { address: 0 }; - let ret = conditions.do_notify(dst, 1); - assert_eq!(ret, 0); -} - -#[cfg(test)] -#[test] -fn threadconditions_notify_waiter_mismatch() { - use std::thread; - - let mut conditions = ThreadConditions::new(); - let mut threadcond = conditions.clone(); - - thread::spawn(move || { - let dst = NotifyLocation { address: 8 }; - let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(10))); - assert_eq!(ret, Some(2)); - }); - thread::sleep(Duration::from_millis(1)); - let dst = NotifyLocation { address: 0 }; - let ret = conditions.do_notify(dst, 1); - assert_eq!(ret, 0); - thread::sleep(Duration::from_millis(100)); -} + let ret = conditions.do_notify(dst, 1); + assert_eq!(ret, 0); + } -#[cfg(test)] -#[test] -fn threadconditions_notify_2waiters() { - use std::thread; + #[test] + fn threadconditions_notify_waiter_mismatch() { + use std::thread; - let mut conditions = ThreadConditions::new(); - let mut threadcond = conditions.clone(); - let mut threadcond2 = conditions.clone(); + let mut conditions = ThreadConditions::new(); + let mut threadcond = conditions.clone(); - thread::spawn(move || { + thread::spawn(move || { + let dst = NotifyLocation { address: 8 }; + let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(10))); + assert_eq!(ret, Some(2)); + }); + thread::sleep(Duration::from_millis(1)); let dst = NotifyLocation { address: 0 }; - let ret = threadcond.do_wait(dst.clone(), None); - assert_eq!(ret, Some(0)); - }); - thread::spawn(move || { + let ret = conditions.do_notify(dst, 1); + assert_eq!(ret, 0); + thread::sleep(Duration::from_millis(100)); + } + + #[test] + fn threadconditions_notify_2waiters() { + use std::thread; + + let mut conditions = ThreadConditions::new(); + let mut threadcond = conditions.clone(); + let mut threadcond2 = conditions.clone(); + + thread::spawn(move || { + let dst = NotifyLocation { address: 0 }; + let ret = threadcond.do_wait(dst.clone(), None); + assert_eq!(ret, Some(0)); + }); + thread::spawn(move || { + let dst = NotifyLocation { address: 0 }; + let ret = threadcond2.do_wait(dst.clone(), None); + assert_eq!(ret, Some(0)); + }); + thread::sleep(Duration::from_millis(1)); let dst = NotifyLocation { address: 0 }; - let ret = threadcond2.do_wait(dst.clone(), None); - assert_eq!(ret, Some(0)); - }); - thread::sleep(Duration::from_millis(1)); - let dst = NotifyLocation { address: 0 }; - let ret = conditions.do_notify(dst, 5); - assert_eq!(ret, 2); + let ret = conditions.do_notify(dst, 5); + assert_eq!(ret, 2); + } } From 76f3e57dfb9ffdecbb4d6da1215086245fc7c66a Mon Sep 17 00:00:00 2001 From: ptitSeb Date: Wed, 12 Apr 2023 13:57:19 +0200 Subject: [PATCH 11/11] Use an Result with custom error instead of an option for waiter --- lib/vm/src/instance/mod.rs | 2 +- lib/vm/src/memory.rs | 22 ++++++++++---- lib/vm/src/threadconditions.rs | 54 ++++++++++++++++++++++++---------- 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/lib/vm/src/instance/mod.rs b/lib/vm/src/instance/mod.rs index 04e78397504..0909e6927f5 100644 --- a/lib/vm/src/instance/mod.rs +++ b/lib/vm/src/instance/mod.rs @@ -812,7 +812,7 @@ impl Instance { Some(std::time::Duration::from_nanos(timeout as u64)) }; let waiter = memory.do_wait(location, timeout); - if waiter.is_none() { + if waiter.is_err() { // ret is None if there is more than 2^32 waiter in queue or some other error return Err(Trap::lib(TrapCode::TableAccessOutOfBounds)); } diff --git a/lib/vm/src/memory.rs b/lib/vm/src/memory.rs index 57037664151..a55b242f809 100644 --- a/lib/vm/src/memory.rs +++ b/lib/vm/src/memory.rs @@ -5,8 +5,8 @@ //! //! `Memory` is to WebAssembly linear memories what `Table` is to WebAssembly tables. -pub use crate::threadconditions::NotifyLocation; use crate::threadconditions::ThreadConditions; +pub use crate::threadconditions::{NotifyLocation, WaiterError}; use crate::trap::Trap; use crate::{mmap::Mmap, store::MaybeInstanceOwned, vmcontext::VMMemoryDefinition}; use more_asserts::assert_ge; @@ -440,7 +440,11 @@ impl LinearMemory for VMSharedMemory { } // Add current thread to waiter list - fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> Option { + fn do_wait( + &mut self, + dst: NotifyLocation, + timeout: Option, + ) -> Result { self.conditions.do_wait(dst, timeout) } @@ -517,7 +521,11 @@ impl LinearMemory for VMMemory { } // Add current thread to waiter list - fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> Option { + fn do_wait( + &mut self, + dst: NotifyLocation, + timeout: Option, + ) -> Result { self.0.do_wait(dst, timeout) } @@ -646,8 +654,12 @@ where /// Add current thread to the waiter hash, and wait until notified or timout. /// Return 0 if the waiter has been notified, 2 if the timeout occured, or None if en error happened - fn do_wait(&mut self, _dst: NotifyLocation, _timeout: Option) -> Option { - None + fn do_wait( + &mut self, + _dst: NotifyLocation, + _timeout: Option, + ) -> Result { + Err(WaiterError::Unimplemented) } /// Notify waiters from the wait list. Return the number of waiters notified diff --git a/lib/vm/src/threadconditions.rs b/lib/vm/src/threadconditions.rs index 3b6dd73c7f0..8549453d185 100644 --- a/lib/vm/src/threadconditions.rs +++ b/lib/vm/src/threadconditions.rs @@ -3,6 +3,22 @@ use fnv::FnvBuildHasher; use std::sync::Arc; use std::thread::{current, park, park_timeout, Thread}; use std::time::Duration; +use thiserror::Error; + +/// Wait/Notify error type +#[derive(Debug, Error)] +pub enum WaiterError { + /// Wait/Notify is not implemented for this memory + Unimplemented, + /// To many waiter for an address + TooManyWaiters, +} + +impl std::fmt::Display for WaiterError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "WaiterError") + } +} /// A location in memory for a Waiter #[derive(Hash, Eq, PartialEq, Clone, Copy, Debug)] @@ -45,10 +61,14 @@ impl ThreadConditions { // because `park_timeout` doesn't gives any information on why it returns /// Add current thread to the waiter hash - pub fn do_wait(&mut self, dst: NotifyLocation, timeout: Option) -> Option { + pub fn do_wait( + &mut self, + dst: NotifyLocation, + timeout: Option, + ) -> Result { // fetch the notifier if self.inner.map.len() >= 1 << 32 { - return None; + return Err(WaiterError::TooManyWaiters); } self.inner .map @@ -80,7 +100,7 @@ impl ThreadConditions { if empty { self.inner.map.remove(&dst); } - Some(ret) + Ok(ret) } /// Notify waiters from the wait list @@ -128,10 +148,10 @@ mod tests { thread::spawn(move || { let dst = NotifyLocation { address: 0 }; - let ret = threadcond.do_wait(dst.clone(), None); - assert_eq!(ret, Some(0)); + let ret = threadcond.do_wait(dst.clone(), None).unwrap(); + assert_eq!(ret, 0); }); - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::from_millis(10)); let dst = NotifyLocation { address: 0 }; let ret = conditions.do_notify(dst, 1); assert_eq!(ret, 1); @@ -146,8 +166,10 @@ mod tests { thread::spawn(move || { let dst = NotifyLocation { address: 0 }; - let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(1))); - assert_eq!(ret, Some(2)); + let ret = threadcond + .do_wait(dst.clone(), Some(Duration::from_millis(1))) + .unwrap(); + assert_eq!(ret, 2); }); thread::sleep(Duration::from_millis(50)); let dst = NotifyLocation { address: 0 }; @@ -164,8 +186,10 @@ mod tests { thread::spawn(move || { let dst = NotifyLocation { address: 8 }; - let ret = threadcond.do_wait(dst.clone(), Some(Duration::from_millis(10))); - assert_eq!(ret, Some(2)); + let ret = threadcond + .do_wait(dst.clone(), Some(Duration::from_millis(10))) + .unwrap(); + assert_eq!(ret, 2); }); thread::sleep(Duration::from_millis(1)); let dst = NotifyLocation { address: 0 }; @@ -184,15 +208,15 @@ mod tests { thread::spawn(move || { let dst = NotifyLocation { address: 0 }; - let ret = threadcond.do_wait(dst.clone(), None); - assert_eq!(ret, Some(0)); + let ret = threadcond.do_wait(dst.clone(), None).unwrap(); + assert_eq!(ret, 0); }); thread::spawn(move || { let dst = NotifyLocation { address: 0 }; - let ret = threadcond2.do_wait(dst.clone(), None); - assert_eq!(ret, Some(0)); + let ret = threadcond2.do_wait(dst.clone(), None).unwrap(); + assert_eq!(ret, 0); }); - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::from_millis(20)); let dst = NotifyLocation { address: 0 }; let ret = conditions.do_notify(dst, 5); assert_eq!(ret, 2);