Skip to content

Commit

Permalink
feat: implement memory.atomic.notify,wait32,wait64
Browse files Browse the repository at this point in the history
Added the parking_spot crate, which provides the needed registry for the
operations.

Signed-off-by: Harald Hoyer <[email protected]>
  • Loading branch information
haraldh committed Nov 17, 2022
1 parent 95ca72a commit 35dadcf
Show file tree
Hide file tree
Showing 10 changed files with 666 additions and 38 deletions.
12 changes: 10 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ members = [
"crates/cli-flags",
"crates/environ/fuzz",
"crates/jit-icache-coherence",
"crates/wasmtime-parking_spot",
"crates/winch",
"examples/fib-debug/wasm",
"examples/wasi/wasm",
Expand Down
1 change: 1 addition & 0 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ anyhow = { workspace = true }
memfd = "0.6.1"
paste = "1.0.3"
encoding_rs = { version = "0.8.31", optional = true }
wasmtime-parking_spot = { path = "../wasmtime-parking_spot" }

[target.'cfg(target_os = "macos")'.dependencies]
mach = "0.3.2"
Expand Down
11 changes: 11 additions & 0 deletions crates/runtime/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ impl Instance {
}
}

/// Get a locally defined or imported memory.
pub(crate) fn get_runtime_memory(&mut self, index: MemoryIndex) -> &mut Memory {
if let Some(defined_index) = self.module().defined_memory_index(index) {
unsafe { &mut *self.get_defined_memory(defined_index) }
} else {
let import = self.imported_memory(index);
let ctx = unsafe { &mut *import.vmctx };
unsafe { &mut *ctx.instance_mut().get_defined_memory(import.index) }
}
}

/// Return the indexed `VMMemoryDefinition`.
fn memory(&self, index: DefinedMemoryIndex) -> VMMemoryDefinition {
unsafe { VMMemoryDefinition::load(self.memory_ptr(index)) }
Expand Down
122 changes: 92 additions & 30 deletions crates/runtime/src/libcalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,13 @@ use crate::TrapReason;
use anyhow::Result;
use std::mem;
use std::ptr::{self, NonNull};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use wasmtime_environ::{
DataIndex, ElemIndex, FuncIndex, GlobalIndex, MemoryIndex, TableIndex, TrapCode,
};
use wasmtime_parking_spot::{ParkResult, ParkingSpot};

/// Actually public trampolines which are used by the runtime as the entrypoint
/// for libcalls.
Expand Down Expand Up @@ -434,50 +438,100 @@ unsafe fn externref_global_set(vmctx: *mut VMContext, index: u32, externref: *mu
unsafe fn memory_atomic_notify(
vmctx: *mut VMContext,
memory_index: u32,
addr: u64,
_count: u32,
addr_index: u64,
count: u32,
) -> Result<u32, TrapReason> {
let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance();
validate_atomic_addr(instance, memory, addr, 4, 4)?;
Err(
anyhow::anyhow!("unimplemented: wasm atomics (fn memory_atomic_notify) unsupported",)
.into(),
)
let instance = (*vmctx).instance_mut();
let (_addr, spot) = validate_atomic_addr(instance, memory, addr_index, 4, 4)?;

if count == 0 {
return Ok(0);
}

let unparked_threads = if let Some(spot) = spot {
if count == u32::MAX {
spot.unpark_all(addr_index as _)
} else {
spot.unpark(addr_index as _, count as _)
}
} else {
0
};

Ok(unparked_threads as u32)
}

// Implementation of `memory.atomic.wait32` for locally defined memories.
unsafe fn memory_atomic_wait32(
vmctx: *mut VMContext,
memory_index: u32,
addr: u64,
_expected: u32,
_timeout: u64,
addr_index: u64,
expected: u32,
timeout: u64,
) -> Result<u32, TrapReason> {
let timeout = (timeout as i64 >= 0).then(|| Duration::from_nanos(timeout));

let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance();
validate_atomic_addr(instance, memory, addr, 4, 4)?;
Err(
anyhow::anyhow!("unimplemented: wasm atomics (fn memory_atomic_wait32) unsupported",)
.into(),
)
let instance = (*vmctx).instance_mut();
let (addr, spot) = validate_atomic_addr(instance, memory, addr_index, 4, 4)?;

if let Some(spot) = spot {
// SAFETY: `addr` was validated by `validate_atomic_addr` above.
let atomic = unsafe { &*(addr as *const AtomicU32) };
match spot.park(
addr_index as usize,
|| atomic.load(Ordering::SeqCst) == expected,
timeout,
) {
ParkResult::Unparked => Ok(0),
ParkResult::Invalid => Ok(1),
ParkResult::TimedOut => Ok(2),
}
} else {
if let Some(timeout) = timeout {
std::thread::sleep(timeout);
Ok(2)
} else {
Err(TrapReason::Wasm(TrapCode::AlwaysTrapAdapter))
}
}
}

// Implementation of `memory.atomic.wait64` for locally defined memories.
unsafe fn memory_atomic_wait64(
vmctx: *mut VMContext,
memory_index: u32,
addr: u64,
_expected: u64,
_timeout: u64,
addr_index: u64,
expected: u64,
timeout: u64,
) -> Result<u32, TrapReason> {
let timeout = (timeout as i64 >= 0).then(|| Duration::from_nanos(timeout));

let memory = MemoryIndex::from_u32(memory_index);
let instance = (*vmctx).instance();
validate_atomic_addr(instance, memory, addr, 8, 8)?;
Err(
anyhow::anyhow!("unimplemented: wasm atomics (fn memory_atomic_wait64) unsupported",)
.into(),
)
let instance = (*vmctx).instance_mut();
let (addr, spot) = validate_atomic_addr(instance, memory, addr_index, 8, 8)?;

if let Some(spot) = spot {
// SAFETY: `addr` was validated by `validate_atomic_addr` above.
let atomic = unsafe { &*(addr as *const AtomicU64) };
match spot.park(
addr_index as usize,
|| atomic.load(Ordering::SeqCst) == expected,
timeout,
) {
ParkResult::Unparked => Ok(0),
ParkResult::Invalid => Ok(1),
ParkResult::TimedOut => Ok(2),
}
} else {
if let Some(timeout) = timeout {
std::thread::sleep(timeout);
Ok(2)
} else {
Err(TrapReason::Wasm(TrapCode::AlwaysTrapAdapter))
}
}
}

macro_rules! ensure {
Expand All @@ -493,22 +547,30 @@ macro_rules! ensure {
/// check is here so we don't segfault from Rust. For other configurations,
/// these checks are required anyways.
unsafe fn validate_atomic_addr(
instance: &Instance,
instance: &mut Instance,
memory: MemoryIndex,
addr: u64,
access_size: u64,
access_alignment: u64,
) -> Result<(), TrapCode> {
) -> Result<(*mut u8, Option<Arc<ParkingSpot>>), TrapCode> {
debug_assert!(access_alignment.is_power_of_two());
ensure!(addr % access_alignment == 0, TrapCode::HeapMisaligned);

let length = u64::try_from(instance.get_memory(memory).current_length()).unwrap();
let mem = instance.get_memory(memory);

let length = u64::try_from(mem.current_length()).unwrap();
ensure!(
addr.saturating_add(access_size) < length,
TrapCode::HeapOutOfBounds
);

Ok(())
let mem_definition = instance.get_memory(memory);

let mem = instance.get_runtime_memory(memory);

let spot = mem.as_shared_memory().map(|mem| mem.parking_spot());

Ok((mem_definition.base.add(addr as usize), spot))
}

// Hook for when an instance runs out of fuel.
Expand Down
21 changes: 15 additions & 6 deletions crates/runtime/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::convert::TryFrom;
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use wasmtime_environ::{MemoryPlan, MemoryStyle, WASM32_MAX_PAGES, WASM64_MAX_PAGES};
use wasmtime_parking_spot::ParkingSpot;

const WASM_PAGE_SIZE: usize = wasmtime_environ::WASM_PAGE_SIZE as usize;
const WASM_PAGE_SIZE_U64: u64 = wasmtime_environ::WASM_PAGE_SIZE as u64;
Expand Down Expand Up @@ -432,7 +433,7 @@ impl RuntimeLinearMemory for StaticMemory {
/// [thread proposal]:
/// https://github.com/WebAssembly/threads/blob/master/proposals/threads/Overview.md#webassemblymemoryprototypegrow
#[derive(Clone)]
pub struct SharedMemory(Arc<RwLock<SharedMemoryInner>>);
pub struct SharedMemory(Arc<RwLock<SharedMemoryInner>>, Arc<ParkingSpot>);
impl SharedMemory {
/// Construct a new [`SharedMemory`].
pub fn new(plan: MemoryPlan) -> Result<Self> {
Expand All @@ -458,11 +459,14 @@ impl SharedMemory {
"cannot re-wrap a shared memory"
);
let def = LongTermVMMemoryDefinition(memory.vmmemory());
Ok(Self(Arc::new(RwLock::new(SharedMemoryInner {
memory: memory,
ty,
def,
}))))
Ok(Self(
Arc::new(RwLock::new(SharedMemoryInner {
memory: memory,
ty,
def,
})),
Arc::new(ParkingSpot::default()),
))
}

/// Return the memory type for this [`SharedMemory`].
Expand All @@ -484,6 +488,11 @@ impl SharedMemory {
pub fn vmmemory_ptr(&self) -> *const VMMemoryDefinition {
&self.0.read().unwrap().def.0 as *const _
}

/// Return a reference to the shared memory's [ParkingSpot].
pub fn parking_spot(&self) -> Arc<ParkingSpot> {
self.1.clone()
}
}

struct SharedMemoryInner {
Expand Down
15 changes: 15 additions & 0 deletions crates/wasmtime-parking_spot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "wasmtime-parking_spot"
version.workspace = true
authors.workspace = true
license = "MIT OR Apache-2.0"
repository = "https://github.com/bytecodealliance/wasmtime"
keywords = ["thread", "park", "unpark", "parking", "lot", "wait", "notify"]
categories = ["concurrency"]
readme = "README.md"
edition.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dev-dependencies]
once_cell = { workspace = true }
44 changes: 44 additions & 0 deletions crates/wasmtime-parking_spot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# parking_spot

Implements thread wait and notify primitives with `std::sync` primitives.

This is a simplified version of the `parking_lot_core` crate.

There are two main operations that can be performed:

- *Parking* refers to suspending the thread while simultaneously enqueuing it
on a queue keyed by some address.
- *Unparking* refers to dequeuing a thread from a queue keyed by some address
and resuming it.

## Example

```rust
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use parking_spot::ParkingSpot;

let parking_spot: Arc<ParkingSpot> = Arc::new(ParkingSpot::default());
let atomic: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let atomic_key = Arc::as_ptr(&atomic) as usize;

let handle = {
let parking_spot = parking_spot.clone();
let atomic = atomic.clone();
thread::spawn(move || {
atomic.store(1, Ordering::Relaxed);
parking_spot.unpark_all(atomic_key);
parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 1, None);
assert_eq!(atomic.load(Ordering::SeqCst), 2);
})
};

parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 0, None);
atomic.store(2, Ordering::Relaxed);
parking_spot.unpark_all(atomic_key);

handle.join().unwrap();
```

License: MIT OR Apache-2.0
Loading

0 comments on commit 35dadcf

Please sign in to comment.