From 310b0a3d652beaf53d95a9956d2666cb2eb5b012 Mon Sep 17 00:00:00 2001 From: Christoph Herzog Date: Thu, 23 Nov 2023 11:03:17 +0100 Subject: [PATCH] fix: Prevent panicking in VirtualTaskManagerExt::spawn_and_block_on This helper method previously would panic if the inner async task failed to send a result, instead of returning an error that can be handled. It also: * used an mpsc channel. a oneshot channel is better here * used the InlineWaker to wait for the result, which is also redundant because tokio channels have sync receive methods --- lib/cli/src/commands/run/mod.rs | 8 ++++---- lib/cli/src/commands/run/wasi.rs | 2 +- lib/wasix/src/runners/wasi.rs | 2 +- lib/wasix/src/runners/wcgi/runner.rs | 2 +- lib/wasix/src/runtime/task_manager/mod.rs | 17 ++++++++++++----- 5 files changed, 19 insertions(+), 12 deletions(-) diff --git a/lib/cli/src/commands/run/mod.rs b/lib/cli/src/commands/run/mod.rs index c7af2cc2c97..ae9493f3fda 100644 --- a/lib/cli/src/commands/run/mod.rs +++ b/lib/cli/src/commands/run/mod.rs @@ -200,7 +200,7 @@ impl Run { .spawn_and_block_on(async move { BinaryPackage::from_registry(&specifier, inner_runtime.as_ref()).await }) - .with_context(|| format!("Unable to load \"{name}\""))? + .with_context(|| format!("Unable to load \"{name}\""))?? }; dependencies.push(pkg); } @@ -490,7 +490,7 @@ impl PackageSource { let inner_rt = rt.clone(); let pkg = rt.task_manager().spawn_and_block_on(async move { BinaryPackage::from_registry(&inner_pck, inner_rt.as_ref()).await - })?; + })??; Ok(ExecutableTarget::Package(pkg)) } } @@ -579,7 +579,7 @@ impl ExecutableTarget { let inner_runtime = runtime.clone(); let pkg = runtime.task_manager().spawn_and_block_on(async move { BinaryPackage::from_webc(&container, inner_runtime.as_ref()).await - })?; + })??; Ok(ExecutableTarget::Package(pkg)) } @@ -624,7 +624,7 @@ impl ExecutableTarget { let inner_runtime = runtime.clone(); let pkg = runtime.task_manager().spawn_and_block_on(async move { BinaryPackage::from_webc(&container, inner_runtime.as_ref()).await - })?; + })??; Ok(ExecutableTarget::Package(pkg)) } } diff --git a/lib/cli/src/commands/run/wasi.rs b/lib/cli/src/commands/run/wasi.rs index 744b2876dfd..79851da2547 100644 --- a/lib/cli/src/commands/run/wasi.rs +++ b/lib/cli/src/commands/run/wasi.rs @@ -180,7 +180,7 @@ impl Wasi { .spawn_and_block_on(async move { BinaryPackage::from_registry(&specifier, &*inner_rt).await }) - .with_context(|| format!("Unable to load \"{name}\""))? + .with_context(|| format!("Unable to load \"{name}\""))?? }; uses.push(pkg); } diff --git a/lib/wasix/src/runners/wasi.rs b/lib/wasix/src/runners/wasi.rs index f1461315066..912401df425 100644 --- a/lib/wasix/src/runners/wasi.rs +++ b/lib/wasix/src/runners/wasi.rs @@ -320,7 +320,7 @@ impl crate::runners::Runner for WasiRunner { .context("Unable to wait for the process to exit") } .in_current_span(), - )?; + )??; if exit_code.raw() == 0 { Ok(()) diff --git a/lib/wasix/src/runners/wcgi/runner.rs b/lib/wasix/src/runners/wcgi/runner.rs index 200481c9425..83beaf348ea 100644 --- a/lib/wasix/src/runners/wcgi/runner.rs +++ b/lib/wasix/src/runners/wcgi/runner.rs @@ -141,7 +141,7 @@ impl crate::runners::Runner for WcgiRunner { }) .await }) - .context("Unable to start the server")?; + .context("Unable to start the server")??; Ok(()) } diff --git a/lib/wasix/src/runtime/task_manager/mod.rs b/lib/wasix/src/runtime/task_manager/mod.rs index 2e6d024fe27..2f59529b4d1 100644 --- a/lib/wasix/src/runtime/task_manager/mod.rs +++ b/lib/wasix/src/runtime/task_manager/mod.rs @@ -367,7 +367,10 @@ impl dyn VirtualTaskManager { pub trait VirtualTaskManagerExt { /// Runs the work in the background via the task managers shared background /// threads while blocking the current execution until it finishs - fn spawn_and_block_on(&self, task: impl Future + Send + 'static) -> A + fn spawn_and_block_on( + &self, + task: impl Future + Send + 'static, + ) -> Result where A: Send + 'static; } @@ -379,16 +382,20 @@ where { /// Runs the work in the background via the task managers shared background /// threads while blocking the current execution until it finishs - fn spawn_and_block_on(&self, task: impl Future + Send + 'static) -> A + fn spawn_and_block_on( + &self, + task: impl Future + Send + 'static, + ) -> Result where A: Send + 'static, { - let (work_tx, mut work_rx) = ::tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = ::tokio::sync::oneshot::channel(); let work = Box::pin(async move { let ret = task.await; - work_tx.send(ret).ok(); + tx.send(ret).ok(); }); self.task_shared(Box::new(move || work)).unwrap(); - InlineWaker::block_on(work_rx.recv()).unwrap() + rx.blocking_recv() + .map_err(|_| anyhow::anyhow!("task execution failed - result channel dropped")) } }