Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the wasm-node no-std #1376

Merged
merged 11 commits into from
Nov 21, 2023
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions wasm-node/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,24 @@ repository.workspace = true
include.workspace = true
publish = false

[features]
std = [] # TODO: this `std` feature exists purely in order to bypass weird Cargo behaviour when running `cargo test`

[lib]
crate-type = ["cdylib", "rlib"]

[dependencies]
async-executor = { version = "1.5.1", default-features = false }
async-lock = { version = "3.0.0", default-features = false }
async-task = { version = "4.4.0", default-features = false }
crossbeam-queue = { version = "0.3.8", default-features = false }
derive_more = "0.99.17"
dlmalloc = { version = "0.2.4", default-features = false, features = ["global"] }
event-listener = { version = "3.0.0", default-features = false }
fnv = { version = "1.0.7", default-features = false }
futures-lite = { version = "2.0.0", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.27", default-features = false }
hashbrown = { version = "0.14.0", default-features = false }
log = { version = "0.4.18", features = ["std"] }
log = { version = "0.4.18", default-features = false }
nom = { version = "7.1.3", default-features = false }
no-std-net = { version = "0.6.0", default-features = false }
pin-project = "1.1.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! simple, apart from the fact that it counts the total number of bytes that have been allocated.
//! This value can then be retrieved by calling [`total_alloc_bytes`].

use std::{alloc, sync::atomic};
use core::{alloc, sync::atomic};

/// Returns the total number of bytes that have been allocated through the Rust `alloc` crate
/// throughout the entire Wasm node.
Expand All @@ -28,21 +28,18 @@ pub fn total_alloc_bytes() -> usize {
}

struct AllocCounter {
/// Use the default "system" allocator. In the context of Wasm, this uses the `dlmalloc`
/// library. See <https://github.com/rust-lang/rust/tree/1.47.0/library/std/src/sys/wasm>.
///
/// While the `wee_alloc` crate is usually the recommended choice in WebAssembly, testing has
/// shown that using it makes memory usage explode from `~100MiB` to `~2GiB` and more (the
/// environment then refuses to allocate `4GiB`).
inner: alloc::System,
/// Use the `dlmalloc` allocator. This is the library that the Rust standard library uses in
/// the context of Wasm.
/// See <https://github.com/rust-lang/rust/tree/1.47.0/library/std/src/sys/wasm>.
inner: dlmalloc::GlobalDlmalloc,

/// Total number of bytes allocated.
total: atomic::AtomicUsize,
}

#[global_allocator]
static ALLOCATOR: AllocCounter = AllocCounter {
inner: alloc::System,
inner: dlmalloc::GlobalDlmalloc,
total: atomic::AtomicUsize::new(0),
};

Expand Down
2 changes: 2 additions & 0 deletions wasm-node/rust/src/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
//! they are treated as unsigned integers by the JavaScript.
//!

use alloc::vec::Vec;

#[link(wasm_import_module = "smoldot")]
extern "C" {
/// Must stop the execution immediately. The message is a UTF-8 string found in the memory of
Expand Down
24 changes: 13 additions & 11 deletions wasm-node/rust/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use crate::{alloc, bindings, platform, timers::Delay};
use crate::{allocator, bindings, platform, timers::Delay};

use core::time::Duration;
use alloc::{boxed::Box, format, string::String};
use core::{sync::atomic::Ordering, time::Duration};
use futures_util::stream;
use smoldot::informant::BytesDisplay;
use smoldot_light::platform::PlatformRef;
use std::{panic, sync::atomic::Ordering};

pub(crate) struct Client<TPlat: smoldot_light::platform::PlatformRef, TChain> {
pub(crate) smoldot: smoldot_light::Client<TPlat, TChain>,
Expand Down Expand Up @@ -55,8 +55,8 @@ pub(crate) enum Chain {
}

pub(crate) fn init(max_log_level: u32) {
// Try initialize the logging and the panic hook.
let _ = log::set_boxed_logger(Box::new(Logger)).map(|()| {
// Try initialize the logging.
if let Ok(_) = log::set_logger(&LOGGER) {
log::set_max_level(match max_log_level {
0 => log::LevelFilter::Off,
1 => log::LevelFilter::Error,
Expand All @@ -65,10 +65,7 @@ pub(crate) fn init(max_log_level: u32) {
4 => log::LevelFilter::Debug,
_ => log::LevelFilter::Trace,
})
});
panic::set_hook(Box::new(|info| {
panic(info.to_string());
}));
}

// First things first, print the version in order to make it easier to debug issues by
// reading logs provided by third parties.
Expand All @@ -92,7 +89,7 @@ pub(crate) fn init(max_log_level: u32) {

// For the unwrap below to fail, the quantity of allocated would have to
// not fit in a `u64`, which as of 2021 is basically impossible.
let mem = u64::try_from(alloc::total_alloc_bytes()).unwrap();
let mem = u64::try_from(allocator::total_alloc_bytes()).unwrap();

// Due to the way the calculation below is performed, sending or receiving
// more than `type_of(TOTAL_BYTES_RECEIVED or TOTAL_BYTES_SENT)::max_value`
Expand Down Expand Up @@ -125,7 +122,11 @@ pub(crate) fn init(max_log_level: u32) {
}

/// Stops execution, providing a string explaining what happened.
fn panic(message: String) -> ! {
#[cfg(not(any(test, feature = "std")))]
#[panic_handler]
fn panic(info: &core::panic::PanicInfo) -> ! {
let message = alloc::string::ToString::to_string(info);

unsafe {
bindings::panic(
u32::try_from(message.as_bytes().as_ptr() as usize).unwrap(),
Expand All @@ -144,6 +145,7 @@ fn panic(message: String) -> ! {

/// Implementation of [`log::Log`] that sends out logs to the FFI.
struct Logger;
static LOGGER: Logger = Logger;

impl log::Log for Logger {
fn enabled(&self, _: &log::Metadata) -> bool {
Expand Down
117 changes: 35 additions & 82 deletions wasm-node/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,26 @@

//! Contains a light client implementation usable from a browser environment.

#![cfg_attr(not(feature = "std"), no_std)]
#![deny(rustdoc::broken_intra_doc_links)]
#![deny(unused_crate_dependencies)]

use core::{future, mem, num::NonZeroU32, pin::Pin, str};
extern crate alloc;

use alloc::{
boxed::Box,
string::{String, ToString as _},
sync::Arc,
vec::Vec,
};
use async_lock::Mutex;
use core::{num::NonZeroU32, pin::Pin, str, task};
use futures_util::{stream, Stream as _, StreamExt as _};
use smoldot_light::HandleRpcError;
use std::{
sync::{Arc, Mutex},
task,
};

pub mod bindings;

mod alloc;
mod allocator;
mod init;
mod platform;
mod timers;
Expand All @@ -51,13 +57,13 @@ fn add_chain(
json_rpc_max_subscriptions: u32,
potential_relay_chains: Vec<u8>,
) -> u32 {
let mut client_lock = CLIENT.lock().unwrap();
let mut client_lock = CLIENT.try_lock().unwrap();

// Fail any new chain initialization if we're running low on memory space, which can
// realistically happen as Wasm is a 32 bits platform. This avoids potentially running into
// OOM errors. The threshold is completely empirical and should probably be updated
// regularly to account for changes in the implementation.
if alloc::total_alloc_bytes() >= usize::max_value() - 400 * 1024 * 1024 {
if allocator::total_alloc_bytes() >= usize::max_value() - 400 * 1024 * 1024 {
let chain_id = client_lock.chains.insert(init::Chain::Erroneous {
error:
"Wasm node is running low on memory and will prevent any new chain from being added"
Expand Down Expand Up @@ -154,7 +160,7 @@ fn add_chain(
}

fn remove_chain(chain_id: u32) {
let mut client_lock = CLIENT.lock().unwrap();
let mut client_lock = CLIENT.try_lock().unwrap();

match client_lock
.chains
Expand Down Expand Up @@ -183,7 +189,7 @@ fn remove_chain(chain_id: u32) {
}

fn chain_is_ok(chain_id: u32) -> u32 {
let client_lock = CLIENT.lock().unwrap();
let client_lock = CLIENT.try_lock().unwrap();
if matches!(
client_lock
.chains
Expand All @@ -198,7 +204,7 @@ fn chain_is_ok(chain_id: u32) -> u32 {
}

fn chain_error_len(chain_id: u32) -> u32 {
let client_lock = CLIENT.lock().unwrap();
let client_lock = CLIENT.try_lock().unwrap();
match client_lock
.chains
.get(usize::try_from(chain_id).unwrap())
Expand All @@ -210,7 +216,7 @@ fn chain_error_len(chain_id: u32) -> u32 {
}

fn chain_error_ptr(chain_id: u32) -> u32 {
let client_lock = CLIENT.lock().unwrap();
let client_lock = CLIENT.try_lock().unwrap();
match client_lock
.chains
.get(usize::try_from(chain_id).unwrap())
Expand All @@ -228,7 +234,7 @@ fn json_rpc_send(json_rpc_request: Vec<u8>, chain_id: u32) -> u32 {
let json_rpc_request: String = String::from_utf8(json_rpc_request)
.unwrap_or_else(|_| panic!("non-UTF-8 JSON-RPC request"));

let mut client_lock = CLIENT.lock().unwrap();
let mut client_lock = CLIENT.try_lock().unwrap();
let client_chain_id = match client_lock
.chains
.get(usize::try_from(chain_id).unwrap())
Expand All @@ -250,7 +256,7 @@ fn json_rpc_send(json_rpc_request: Vec<u8>, chain_id: u32) -> u32 {
}

fn json_rpc_responses_peek(chain_id: u32) -> u32 {
let mut client_lock = CLIENT.lock().unwrap();
let mut client_lock = CLIENT.try_lock().unwrap();
match client_lock
.chains
.get_mut(usize::try_from(chain_id).unwrap())
Expand Down Expand Up @@ -316,7 +322,7 @@ fn json_rpc_responses_peek(chain_id: u32) -> u32 {
}

fn json_rpc_responses_pop(chain_id: u32) {
let mut client_lock = CLIENT.lock().unwrap();
let mut client_lock = CLIENT.try_lock().unwrap();
match client_lock
.chains
.get_mut(usize::try_from(chain_id).unwrap())
Expand All @@ -333,82 +339,29 @@ struct JsonRpcResponsesNonEmptyWaker {
chain_id: u32,
}

impl task::Wake for JsonRpcResponsesNonEmptyWaker {
impl alloc::task::Wake for JsonRpcResponsesNonEmptyWaker {
fn wake(self: Arc<Self>) {
unsafe { bindings::json_rpc_responses_non_empty(self.chain_id) }
}
}

/// Since "spawning a task" isn't really something that a browser or Node environment do
/// efficiently, we instead combine all the asynchronous tasks into one executor.
// TODO: we use an Executor instead of LocalExecutor because it is planned to allow multithreading; if this plan is abandoned, switch to SendWrapper<LocalExecutor>
static EXECUTOR: async_executor::Executor = async_executor::Executor::new();

/// While [`EXECUTOR`] is global to all threads, [`EXECUTOR_EXECUTE`] is thread specific. If
/// smoldot eventually gets support for multiple threads, this value would be store in a
/// thread-local storage. Since it only has one thread, it is instead a global variable.
static EXECUTOR_EXECUTE: Mutex<ExecutionState> = Mutex::new(ExecutionState::NotStarted);
enum ExecutionState {
/// Execution has not started. Everything remains to be initialized. Default state.
NotStarted,
/// Execution has been started in the past and is now waiting to be woken up.
NotReady,
/// Execution has been woken up. Ready to continue running.
Ready(async_task::Runnable),
}
/// List of light tasks waiting to be executed.
static TASKS_QUEUE: crossbeam_queue::SegQueue<async_task::Runnable> =
crossbeam_queue::SegQueue::new();

fn advance_execution() {
let runnable = {
let mut executor_execute_guard = EXECUTOR_EXECUTE.lock().unwrap();
match *executor_execute_guard {
ExecutionState::NotStarted => {
// Spawn a task that repeatedly executes one task then yields.
// This makes sure that we return to the JS engine after every task.
let (runnable, task) = {
let run = async move {
loop {
EXECUTOR.tick().await;
let mut has_yielded = false;
future::poll_fn(|cx| {
if has_yielded {
task::Poll::Ready(())
} else {
cx.waker().wake_by_ref();
has_yielded = true;
task::Poll::Pending
}
})
.await;
}
};

async_task::spawn(run, |runnable| {
let mut lock = EXECUTOR_EXECUTE.lock().unwrap();
if !matches!(*lock, ExecutionState::NotReady) {
return;
}
*lock = ExecutionState::Ready(runnable);
unsafe {
bindings::advance_execution_ready();
}
})
};
// This function executes one task then returns. This ensures that the Wasm doesn't use up
// all the available CPU of the host.

task.detach();
*executor_execute_guard = ExecutionState::NotReady;
runnable
}
ExecutionState::NotReady => return,
ExecutionState::Ready(_) => {
let ExecutionState::Ready(runnable) =
mem::replace(&mut *executor_execute_guard, ExecutionState::NotReady)
else {
unreachable!()
};
runnable
}
}
let Some(runnable) = TASKS_QUEUE.pop() else {
return;
};

runnable.run();

if !TASKS_QUEUE.is_empty() {
unsafe {
bindings::advance_execution_ready();
}
}
}
Loading
Loading