From 8b678f40fedd9e4c6c925a6d4a7ebce8adb3c4a1 Mon Sep 17 00:00:00 2001 From: Imbris Date: Tue, 1 Oct 2024 01:57:43 -0400 Subject: [PATCH 1/2] Fix use after free of task in FuturesUnordered (caused by accidental reference count release when dropping a future panics and unwinds) --- .../src/stream/futures_unordered/mod.rs | 68 +++++++++++++------ .../futures_unordered/ready_to_run_queue.rs | 2 + futures/tests/stream_futures_unordered.rs | 23 +++++++ 3 files changed, 73 insertions(+), 20 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 2d4f15158..d4ee16937 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -255,16 +255,6 @@ impl FuturesUnordered { // `wake` from doing any work in the future let prev = task.queued.swap(true, SeqCst); - // Drop the future, even if it hasn't finished yet. This is safe - // because we're dropping the future on the thread that owns - // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and - // such. - unsafe { - // Set to `None` rather than `take()`ing to prevent moving the - // future. - *task.future.get() = None; - } - // If the queued flag was previously set, then it means that this task // is still in our internal ready to run queue. We then transfer // ownership of our reference count to the ready to run queue, and it'll @@ -276,8 +266,25 @@ impl FuturesUnordered { // enqueue the task, so our task will never see the ready to run queue // again. The task itself will be deallocated once all reference counts // have been dropped elsewhere by the various wakers that contain it. - if prev { - mem::forget(task); + // + // Use ManuallyDrop to transfer the reference count ownership before + // dropping the future so unwinding won't release the reference count. + let md_slot; + let task = if prev { + md_slot = mem::ManuallyDrop::new(task); + &*md_slot + } else { + &task + }; + + // Drop the future, even if it hasn't finished yet. This is safe + // because we're dropping the future on the thread that owns + // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and + // such. + unsafe { + // Set to `None` rather than `take()`ing to prevent moving the + // future. + *task.future.get() = None; } } @@ -566,15 +573,36 @@ impl FuturesUnordered { impl Drop for FuturesUnordered { fn drop(&mut self) { - // When a `FuturesUnordered` is dropped we want to drop all futures - // associated with it. At the same time though there may be tons of - // wakers flying around which contain `Task` references - // inside them. We'll let those naturally get deallocated. - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = unsafe { self.unlink(head) }; - self.release_task(task); + // Before the strong reference to the queue is dropped we need all + // futures to be dropped. See note at the bottom of this method. + // + // This ensures we drop all futures even in the case of one drop + // panicking and unwinding. + // + // If a second future panics, that will trigger an abort from panicking + // while panicking. + struct DropGuard<'a, Fut>(&'a mut FuturesUnordered); + impl DropGuard<'_, Fut> { + fn drop_futures(&mut self) { + // When a `FuturesUnordered` is dropped we want to drop all futures + // associated with it. At the same time though there may be tons of + // wakers flying around which contain `Task` references + // inside them. We'll let those naturally get deallocated. + while !self.0.head_all.get_mut().is_null() { + let head = *self.0.head_all.get_mut(); + let task = unsafe { self.0.unlink(head) }; + self.0.release_task(task); + } + } + } + impl Drop for DropGuard<'_, Fut> { + fn drop(&mut self) { + self.drop_futures(); + } } + let mut guard = DropGuard(self); + guard.drop_futures(); + mem::forget(guard); // no need to check head_all again // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 77abdf4ea..6ffaf554d 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -27,6 +27,8 @@ pub(super) struct ReadyToRunQueue { /// An MPSC queue into which the tasks containing the futures are inserted /// whenever the future inside is scheduled for polling. impl ReadyToRunQueue { + // FIXME: this takes raw pointer without safety conditions. + /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. pub(super) fn enqueue(&self, task: *const Task) { unsafe { diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 727feb062..6be7f05be 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -406,3 +406,26 @@ fn clear_in_loop() { } }); } + +// https://github.com/rust-lang/futures-rs/issues/2863#issuecomment-2219441515 +#[test] +#[should_panic] +fn panic_on_drop_fut() { + struct BadFuture; + + impl Drop for BadFuture { + fn drop(&mut self) { + panic!() + } + } + + impl Future for BadFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } + } + + FuturesUnordered::default().push(BadFuture); +} From f7464a40a1d798ce6ef29d5a5b32892247689737 Mon Sep 17 00:00:00 2001 From: Imbris Date: Wed, 2 Oct 2024 01:06:08 -0400 Subject: [PATCH 2/2] Leak internal queue in FuturesUnordered drop if dropping a future panics and avoid potential abort if we try to continue dropping futures and another one panics --- .github/workflows/ci.yml | 10 ++++- .../src/stream/futures_unordered/mod.rs | 39 +++++++------------ 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 744168c09..cc52ae682 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -246,11 +246,17 @@ jobs: - uses: taiki-e/checkout-action@v1 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - - run: cargo miri test --workspace --all-features + - run: cargo miri test --workspace --all-features -- --skip panic_on_drop_fut env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTDOCFLAGS: ${{ env.RUSTDOCFLAGS }} -Z randomize-layout RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout + # This test is expected to leak. + - run: cargo miri test --workspace --all-features --test stream_futures_unordered -- panic_on_drop_fut + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTDOCFLAGS: ${{ env.RUSTDOCFLAGS }} -Z randomize-layout + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout san: name: cargo test -Z sanitizer=${{ matrix.sanitizer }} @@ -269,7 +275,7 @@ jobs: run: rustup toolchain install nightly --component rust-src && rustup default nightly # https://github.com/google/sanitizers/issues/1716 / https://github.com/actions/runner-images/issues/9491 - run: sudo sysctl vm.mmap_rnd_bits=28 - - run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests + - run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests -- --skip panic_on_drop_fut env: # TODO: Once `cfg(sanitize = "..")` is stable, replace # `cfg(futures_sanitizer)` with `cfg(sanitize = "..")` and remove diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index d4ee16937..913e260fd 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -576,33 +576,24 @@ impl Drop for FuturesUnordered { // Before the strong reference to the queue is dropped we need all // futures to be dropped. See note at the bottom of this method. // - // This ensures we drop all futures even in the case of one drop - // panicking and unwinding. - // - // If a second future panics, that will trigger an abort from panicking - // while panicking. - struct DropGuard<'a, Fut>(&'a mut FuturesUnordered); - impl DropGuard<'_, Fut> { - fn drop_futures(&mut self) { - // When a `FuturesUnordered` is dropped we want to drop all futures - // associated with it. At the same time though there may be tons of - // wakers flying around which contain `Task` references - // inside them. We'll let those naturally get deallocated. - while !self.0.head_all.get_mut().is_null() { - let head = *self.0.head_all.get_mut(); - let task = unsafe { self.0.unlink(head) }; - self.0.release_task(task); - } - } - } - impl Drop for DropGuard<'_, Fut> { + // If there is a panic before this completes, we leak the queue. + struct LeakQueueOnDrop<'a, Fut>(&'a mut FuturesUnordered); + impl Drop for LeakQueueOnDrop<'_, Fut> { fn drop(&mut self) { - self.drop_futures(); + mem::forget(Arc::clone(&self.0.ready_to_run_queue)); } } - let mut guard = DropGuard(self); - guard.drop_futures(); - mem::forget(guard); // no need to check head_all again + let guard = LeakQueueOnDrop(self); + // When a `FuturesUnordered` is dropped we want to drop all futures + // associated with it. At the same time though there may be tons of + // wakers flying around which contain `Task` references + // inside them. We'll let those naturally get deallocated. + while !guard.0.head_all.get_mut().is_null() { + let head = *guard.0.head_all.get_mut(); + let task = unsafe { guard.0.unlink(head) }; + guard.0.release_task(task); + } + mem::forget(guard); // safe to release strong reference to queue // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures