From 605e7882e894dd96208a60c264ce21de0971b8c9 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Tue, 2 Jun 2020 01:01:48 -0700 Subject: [PATCH] Fix undesired reordering in queueing logic C++ clocks can return the same value in consecutive calls, which was causing potential reordering in the queueing logic, in particular for rate-limited queues when the pps rate was set to 0. This was causing some unit tests to fail. Fixes #900 --- include/bm/bm_sim/queueing.h | 75 ++++++++++++++++++++++++------------ tests/test_queueing.cpp | 9 ++--- 2 files changed, 55 insertions(+), 29 deletions(-) diff --git a/include/bm/bm_sim/queueing.h b/include/bm/bm_sim/queueing.h index 7bf46b7e7..e084a1226 100644 --- a/include/bm/bm_sim/queueing.h +++ b/include/bm/bm_sim/queueing.h @@ -274,7 +274,8 @@ class QueueingLogicRL { auto &w_info = workers_info.at(worker_id); if (q_info.size >= q_info.capacity) return 0; q_info.last_sent = get_next_tp(q_info); - w_info.queue.emplace(item, queue_id, q_info.last_sent); + w_info.queue.emplace( + item, queue_id, q_info.last_sent, w_info.wrapping_counter++); q_info.size++; w_info.q_not_empty.notify_one(); return 1; @@ -289,7 +290,8 @@ class QueueingLogicRL { auto &w_info = workers_info.at(worker_id); if (q_info.size >= q_info.capacity) return 0; q_info.last_sent = get_next_tp(q_info); - w_info.queue.emplace(std::move(item), queue_id, q_info.last_sent); + w_info.queue.emplace( + std::move(item), queue_id, q_info.last_sent, w_info.wrapping_counter++); q_info.size++; w_info.q_not_empty.notify_one(); return 1; @@ -347,14 +349,14 @@ class QueueingLogicRL { //! Set the maximum rate of the logical queue with id \p queue_id to \p //! pps. \p pps is expressed in "number of elements per second". Until this - //! function is called, there will be no rate limit for the queue. + //! function is called, there will be no rate limit for the queue. The same + //! behavior (no rate limit) can be achieved by calling this method with a + //! rate of 0. void set_rate(size_t queue_id, uint64_t pps) { - using std::chrono::duration; - using std::chrono::duration_cast; LockType lock(mutex); auto &q_info = get_queue(queue_id); q_info.queue_rate_pps = pps; - q_info.pkt_delay_ticks = duration_cast(duration(1. / pps)); + q_info.pkt_delay_ticks = rate_to_ticks(pps); } //! Set the maximum rate of all logical queues to \p pps. @@ -365,7 +367,7 @@ class QueueingLogicRL { for (auto &p : queues_info) { auto &q_info = p.second; q_info.queue_rate_pps = pps; - q_info.pkt_delay_ticks = duration_cast(duration(1. / pps)); + q_info.pkt_delay_ticks = rate_to_ticks(pps); } queue_rate_pps = pps; } @@ -386,22 +388,32 @@ class QueueingLogicRL { // using clock = std::chrono::steady_clock; using clock = std::chrono::high_resolution_clock; + static constexpr ticks rate_to_ticks(uint64_t pps) { + using std::chrono::duration; + using std::chrono::duration_cast; + return (pps == 0) ? + ticks(0) : duration_cast(duration(1. / pps)); + } + struct QE { // QE(T e, size_t queue_id, const clock::time_point &send, size_t id) // : e(std::move(e)), queue_id(queue_id), send(send), id(id) { } - QE(T e, size_t queue_id, const clock::time_point &send) - : e(std::move(e)), queue_id(queue_id), send(send) { } + QE(T e, size_t queue_id, const clock::time_point &send, size_t id) + : e(std::move(e)), queue_id(queue_id), send(send), id(id) { } T e; size_t queue_id; clock::time_point send; - // size_t id; + size_t id; }; struct QEComp { bool operator()(const QE &lhs, const QE &rhs) const { - // return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send; - return lhs.send > rhs.send; + // the point of the id is to avoid re-orderings when the send timestamp is + // the same for 2 items, which seems to happen (when the pps rate is 0) + // with both the steady_clock and the high_resolution_clock on my Linux + // VM. + return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send; } }; @@ -413,8 +425,7 @@ class QueueingLogicRL { QueueInfo(size_t capacity, uint64_t queue_rate_pps) : capacity(capacity), queue_rate_pps(queue_rate_pps), - pkt_delay_ticks(std::chrono::duration_cast( - std::chrono::duration(1. / queue_rate_pps))), + pkt_delay_ticks(rate_to_ticks(queue_rate_pps)), last_sent(clock::now()) { } size_t size{0}; @@ -427,6 +438,7 @@ class QueueingLogicRL { struct WorkerInfo { MyQ queue{}; mutable std::condition_variable q_not_empty{}; + size_t wrapping_counter{0}; }; QueueInfo &get_queue(size_t queue_id) { @@ -506,7 +518,8 @@ class QueueingLogicPriRL { auto &q_info_pri = q_info.at(priority); if (q_info_pri.size >= q_info_pri.capacity) return 0; q_info_pri.last_sent = get_next_tp(q_info_pri); - w_info.queues[priority].emplace(item, queue_id, q_info_pri.last_sent); + w_info.queues[priority].emplace( + item, queue_id, q_info_pri.last_sent, w_info.wrapping_counter++); q_info_pri.size++; q_info.size++; w_info.size++; @@ -528,8 +541,11 @@ class QueueingLogicPriRL { auto &q_info_pri = q_info.at(priority); if (q_info_pri.size >= q_info_pri.capacity) return 0; q_info_pri.last_sent = get_next_tp(q_info_pri); - w_info.queues[priority].emplace(std::move(item), queue_id, - q_info_pri.last_sent); + w_info.queues[priority].emplace( + std::move(item), + queue_id, + q_info_pri.last_sent, + w_info.wrapping_counter++); q_info_pri.size++; q_info.size++; w_info.size++; @@ -643,7 +659,8 @@ class QueueingLogicPriRL { //! Set the maximum rate of all the priority queues for logical queue \p //! queue_id to \p pps. \p pps is expressed in "number of elements per //! second". Until this function is called, there will be no rate limit for - //! the queue. + //! the queue. The same behavior (no rate limit) can be achieved by calling + //! this method with a rate of 0. void set_rate(size_t queue_id, uint64_t pps) { LockType lock(mutex); for_each_q(queue_id, SetRateFn(pps)); @@ -676,21 +693,31 @@ class QueueingLogicPriRL { private: using ticks = std::chrono::nanoseconds; // clock choice? switch to steady if observing re-ordering + // in my Linux VM, it seems that both clocks behave the same (can sometimes + // stop increasing for a bit but do not go backwards). // using clock = std::chrono::steady_clock; using clock = std::chrono::high_resolution_clock; + static constexpr ticks rate_to_ticks(uint64_t pps) { + using std::chrono::duration; + using std::chrono::duration_cast; + return (pps == 0) ? + ticks(0) : duration_cast(duration(1. / pps)); + } + struct QE { - QE(T e, size_t queue_id, const clock::time_point &send) - : e(std::move(e)), queue_id(queue_id), send(send) { } + QE(T e, size_t queue_id, const clock::time_point &send, size_t id) + : e(std::move(e)), queue_id(queue_id), send(send), id(id) { } T e; size_t queue_id; clock::time_point send; + size_t id; }; struct QEComp { bool operator()(const QE &lhs, const QE &rhs) const { - return lhs.send > rhs.send; + return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send; } }; @@ -700,8 +727,7 @@ class QueueingLogicPriRL { QueueInfoPri(size_t capacity, uint64_t queue_rate_pps) : capacity(capacity), queue_rate_pps(queue_rate_pps), - pkt_delay_ticks(std::chrono::duration_cast( - std::chrono::duration(1. / queue_rate_pps))), + pkt_delay_ticks(rate_to_ticks(queue_rate_pps)), last_sent(clock::now()) { } size_t size{0}; @@ -723,6 +749,7 @@ class QueueingLogicPriRL { mutable std::condition_variable q_not_empty{}; size_t size{0}; std::array queues; + size_t wrapping_counter{0}; }; QueueInfo &get_queue(size_t queue_id) { @@ -777,7 +804,7 @@ class QueueingLogicPriRL { : pps(pps) { using std::chrono::duration; using std::chrono::duration_cast; - pkt_delay_ticks = duration_cast(duration(1. / pps)); + pkt_delay_ticks = rate_to_ticks(pps); } void operator ()(QueueInfoPri &info) const { // NOLINT(runtime/references) diff --git a/tests/test_queueing.cpp b/tests/test_queueing.cpp index 2f8363371..a00eacaeb 100644 --- a/tests/test_queueing.cpp +++ b/tests/test_queueing.cpp @@ -92,18 +92,17 @@ void QueueingTest::produce() { namespace { template -void produce_if_dropping(Q &queue, size_t iterations, size_t capacity, +void produce_if_dropping(Q &queue, size_t iterations, const std::vector &values) { for (size_t i = 0; i < iterations; i++) { size_t queue_id = values[i].queue_id; // this is to avoid drops // kind of makes me question if using type parameterization is really useful // for this - while (queue.size(queue_id) > capacity / 2) { + while (!queue.push_front(queue_id, unique_ptr(new int(values[i].v)))) { // originally, I just had an empty loop, but Valgrind was running forever std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - queue.push_front(queue_id, unique_ptr(new int(values[i].v))); } } @@ -111,12 +110,12 @@ void produce_if_dropping(Q &queue, size_t iterations, size_t capacity, template <> void QueueingTest >::produce() { - produce_if_dropping(queue, iterations, capacity, values); + produce_if_dropping(queue, iterations, values); } template <> void QueueingTest >::produce() { - produce_if_dropping(queue, iterations, capacity, values); + produce_if_dropping(queue, iterations, values); } using testing::Types;