Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inserting a new item into a delay queue always waits for that new item even if the queue already contained earlier expiring items #1700

Closed
doy opened this issue Oct 28, 2019 · 9 comments · Fixed by #2285
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-help-wanted Call for participation: Help is requested to fix this issue. M-time Module: tokio/time T-v0.1.x Topic: tokio 0.1.x

Comments

@doy
Copy link

doy commented Oct 28, 2019

Version

tokio v0.1.22

Platform

Linux hush 5.3.7-arch1-1-ARCH #1 SMP PREEMPT Fri Oct 18 00:17:03 UTC 2019 x86_64 GNU/Linux

Subcrates

tokio::timer

Description

inserting items into a delay queue after it is already running causes the delay queue to not be woken up until the newly inserted item is ready, even if there were previously other items that would have woken up earlier. example code:

use futures::stream::Stream as _;

struct ThingWithDelayQueue {
    delay_queue: tokio::timer::DelayQueue<u8>,
    did_thing: bool,
}

impl ThingWithDelayQueue {
    fn new() -> Self {
        let mut delay_queue = tokio::timer::DelayQueue::new();
        delay_queue.insert(1, std::time::Duration::from_secs(1));
        delay_queue.insert(2, std::time::Duration::from_secs(2));
        delay_queue.insert(3, std::time::Duration::from_secs(3));
        delay_queue.insert(4, std::time::Duration::from_secs(4));
        delay_queue.insert(5, std::time::Duration::from_secs(5));
        Self {
            delay_queue,
            did_thing: false,
        }
    }
}

impl futures::stream::Stream for ThingWithDelayQueue {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        if let Some(expired) =
            futures::try_ready!(self.delay_queue.poll().map_err(|_| ()))
        {
            let thing = expired.into_inner();
            log::error!("event happened: {}", thing);
            if !self.did_thing {
                self.did_thing = true;
                self.delay_queue
                    .insert(10, std::time::Duration::from_secs(10));
            }
            Ok(futures::Async::Ready(Some(())))
        } else {
            log::error!("done");
            Ok(futures::Async::Ready(None))
        }
    }
}

fn main() {
    env_logger::init();
    log::error!("start");
    tokio::run(ThingWithDelayQueue::new().for_each(|_| Ok(())))
}

this code produces this output:

[2019-10-28T06:12:25Z ERROR delay_queue_bug] start
[2019-10-28T06:12:26Z ERROR delay_queue_bug] event happened: 1
[2019-10-28T06:12:36Z ERROR delay_queue_bug] event happened: 2
[2019-10-28T06:12:36Z ERROR delay_queue_bug] event happened: 3
[2019-10-28T06:12:36Z ERROR delay_queue_bug] event happened: 4
[2019-10-28T06:12:36Z ERROR delay_queue_bug] event happened: 5
[2019-10-28T06:12:36Z ERROR delay_queue_bug] event happened: 10
[2019-10-28T06:12:36Z ERROR delay_queue_bug] done

but if you comment out the self.delay_queue.insert(10, std::time::Duration::from_secs(10)); line, you get this output as expected:

[2019-10-28T06:13:37Z ERROR delay_queue_bug] start
[2019-10-28T06:13:38Z ERROR delay_queue_bug] event happened: 1
[2019-10-28T06:13:39Z ERROR delay_queue_bug] event happened: 2
[2019-10-28T06:13:40Z ERROR delay_queue_bug] event happened: 3
[2019-10-28T06:13:41Z ERROR delay_queue_bug] event happened: 4
[2019-10-28T06:13:42Z ERROR delay_queue_bug] event happened: 5
[2019-10-28T06:13:42Z ERROR delay_queue_bug] done
@doy
Copy link
Author

doy commented Oct 28, 2019

@doy
Copy link
Author

doy commented Oct 28, 2019

i can't seem to reproduce this in tests though - these two both pass:

master:

#[test]
fn insert_later_item_doesnt_clear_timer() {
    let mut t = MockTask::new();

    clock::mock(|clock| {
        let mut queue = DelayQueue::new();

        queue.insert(1, ms(10));
        queue.insert(2, ms(20));
        queue.insert(3, ms(30));
        queue.insert(4, ms(40));
        queue.insert(5, ms(50));
        assert!(!t.is_woken());
        assert_pending!(poll!(t, queue));

        clock.turn_for(ms(15));
        assert!(t.is_woken());
        assert_ready!(poll!(t, queue));
        assert_pending!(poll!(t, queue));

        queue.insert(10, ms(100));
        assert!(!t.is_woken());
        assert_pending!(poll!(t, queue));

        clock.turn_for(ms(15));
        assert!(t.is_woken());
        assert_ready!(poll!(t, queue));
        assert_pending!(poll!(t, queue));
    })
}

v0.1.x:

#[test]
fn insert_later_item_doesnt_clear_timer() {
    mocked(|timer, time| {
        let mut queue = DelayQueue::new();
        let mut task = MockTask::new();

        let epoch = time.now();

        queue.insert_at(1, epoch + ms(10));
        queue.insert_at(2, epoch + ms(20));
        queue.insert_at(3, epoch + ms(30));
        queue.insert_at(4, epoch + ms(40));
        queue.insert_at(5, epoch + ms(50));
        assert!(!task.is_notified());
        task.enter(|| {
            assert_not_ready!(queue);
        });

        advance(timer, ms(15));
        assert!(task.is_notified());
        assert_ready!(queue);
        task.enter(|| {
            assert_not_ready!(queue);
        });

        queue.insert_at(10, epoch + ms(100));
        assert!(!task.is_notified());
        task.enter(|| {
            assert_not_ready!(queue);
        });

        advance(timer, ms(10));
        assert!(task.is_notified());
        assert_ready!(queue);
        task.enter(|| {
            assert_not_ready!(queue);
        });
    })
}

i'm not super familiar with the structure of the test suite, though - am i doing anything wrong here?

@xoac
Copy link
Contributor

xoac commented Mar 26, 2020

I can confirm this bug. It was added in tokio-timer 0.2.10. My example works fine with 0.2.9

Version

tokio  = "0.1"
futures = "0.1"

➜ cargo tree | grep tokio

tokio-timer-delay-queue-bug v0.1.0 (/home/sr/projects/github.com/xoac/tokio-timer-delay-queue-bug)
└── tokio v0.1.22
    ├── tokio-codec v0.1.2
    │   └── tokio-io v0.1.13
    ├── tokio-current-thread v0.1.7
    │   └── tokio-executor v0.1.10
    ├── tokio-executor v0.1.10 (*)
    ├── tokio-fs v0.1.7
    │   ├── tokio-io v0.1.13 (*)
    │   └── tokio-threadpool v0.1.18
    │       └── tokio-executor v0.1.10 (*)
    ├── tokio-io v0.1.13 (*)
    ├── tokio-reactor v0.1.12
    │   ├── tokio-executor v0.1.10 (*)
    │   ├── tokio-io v0.1.13 (*)
    │   └── tokio-sync v0.1.8
    ├── tokio-sync v0.1.8 (*)
    ├── tokio-tcp v0.1.4
    │   ├── tokio-io v0.1.13 (*)
    │   └── tokio-reactor v0.1.12 (*)
    ├── tokio-threadpool v0.1.18 (*)
    ├── tokio-timer v0.2.13
    │   └── tokio-executor v0.1.10 (*)
    ├── tokio-udp v0.1.6
    │   ├── tokio-codec v0.1.2 (*)
    │   ├── tokio-io v0.1.13 (*)
    │   └── tokio-reactor v0.1.12 (*)
    └── tokio-uds v0.2.6
        ├── tokio-codec v0.1.2 (*)
        ├── tokio-io v0.1.13 (*)
        └── tokio-reactor v0.1.12 (*)

Platform

Linux dell-inspiron 5.5.11-1-MANJARO #1 SMP PREEMPT Sat Mar 21 16:54:29 UTC 2020 x86_64 GNU/Linux

also on armv7 with musl target.

Description

I have create a separate repository that can be cloned to reproduce bug.

Steps:

  1. Clone: https://github.com/xoac/tokio-timer-delay-queue-bug and go inside cloned dir
  2. cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.04s
     Running `target/debug/tokio-timer-delay-queue-bug`
Item { what: "3", duration: 143s, init: 145s }
  1. Force use tokio-timer 0.2.9 in Cargo.toml
  2. cargo update
  3. cargo run
   Compiling tokio-timer-delay-queue-bug v0.1.0 (/home/sr/projects/github.com/xoac/tokio-timer-delay-queue-bug)
    Finished dev [unoptimized + debuginfo] target(s) in 1.18s
     Running `target/debug/tokio-timer-delay-queue-bug`
Item { what: "3", duration: 143s, init: 145s }
Item { what: "2", duration: 23s, init: 25s }
Item { what: "1", duration: 3s, init: 5s }
Item { what: "1", duration: 1s, init: 5s }

tokio-compat force to use at least 0.2.12

I don't know how much tokio::time is different between tokio-timer but can also be affected.

@xoac
Copy link
Contributor

xoac commented Mar 26, 2020

tokio v 0.2 is also affected.

Minimal example is in branch tokio02

@LucioFranco LucioFranco removed the T-v0.1.x Topic: tokio 0.1.x label Mar 26, 2020
@LucioFranco
Copy link
Member

ping @carllerche ^

@kleimkuhler
Copy link
Contributor

I think this should already be fixed in 0.2 by #2285

@xoac
Copy link
Contributor

xoac commented Mar 26, 2020

I can confirm #2285 fix this for tokio v0.2. I have tried to modify last tokio-timer with this patch but it doesn't work for me.

So this issue shouldn't be closed by this PR. It resolve problem for v0.2 but not for v0.1

carllerche pushed a commit that referenced this issue Mar 26, 2020
When the queue was polled and yielded an index from the wheel, the delay
until the next item was never updated. As a result, when one item was
yielded from `poll_idx` the following insert erronously updated the
delay to the instant of the inserted item.

Fixes: #1700
@xoac
Copy link
Contributor

xoac commented Mar 26, 2020

@carllerche This resolved problem in tokio v0.2 Shouldn't this be still open since tokio v0.1 is still affected?

@kleimkuhler kleimkuhler reopened this Mar 26, 2020
@kleimkuhler kleimkuhler added the T-v0.1.x Topic: tokio 0.1.x label Mar 26, 2020
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-time Module: tokio/time E-help-wanted Call for participation: Help is requested to fix this issue. labels Jul 25, 2020
@vilgotf
Copy link
Contributor

vilgotf commented Apr 16, 2024

I believe this can be closed now as 0.1 is no longer maintained.

@Darksonn Darksonn closed this as not planned Won't fix, can't repro, duplicate, stale Apr 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-help-wanted Call for participation: Help is requested to fix this issue. M-time Module: tokio/time T-v0.1.x Topic: tokio 0.1.x
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants