Skip to content

Commit

Permalink
Fix undesired reordering in queueing logic
Browse files Browse the repository at this point in the history
C++ clocks can return the same value in consecutive calls, which was
causing potential reordering in the queueing logic, in particular for
rate-limited queues when the pps rate was set to 0.

This was causing some unit tests to fail.

Fixes #900
  • Loading branch information
antoninbas committed Jun 2, 2020
1 parent 00b8e4a commit 77cc1a3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 29 deletions.
75 changes: 51 additions & 24 deletions include/bm/bm_sim/queueing.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ class QueueingLogicRL {
auto &w_info = workers_info.at(worker_id);
if (q_info.size >= q_info.capacity) return 0;
q_info.last_sent = get_next_tp(q_info);
w_info.queue.emplace(item, queue_id, q_info.last_sent);
w_info.queue.emplace(
item, queue_id, q_info.last_sent, w_info.wrapping_counter++);
q_info.size++;
w_info.q_not_empty.notify_one();
return 1;
Expand All @@ -289,7 +290,8 @@ class QueueingLogicRL {
auto &w_info = workers_info.at(worker_id);
if (q_info.size >= q_info.capacity) return 0;
q_info.last_sent = get_next_tp(q_info);
w_info.queue.emplace(std::move(item), queue_id, q_info.last_sent);
w_info.queue.emplace(
std::move(item), queue_id, q_info.last_sent, w_info.wrapping_counter++);
q_info.size++;
w_info.q_not_empty.notify_one();
return 1;
Expand Down Expand Up @@ -347,14 +349,14 @@ class QueueingLogicRL {

//! Set the maximum rate of the logical queue with id \p queue_id to \p
//! pps. \p pps is expressed in "number of elements per second". Until this
//! function is called, there will be no rate limit for the queue.
//! function is called, there will be no rate limit for the queue. The same
//! behavior (no rate limit) can be achieved by calling this method with a
//! rate of 0.
void set_rate(size_t queue_id, uint64_t pps) {
using std::chrono::duration;
using std::chrono::duration_cast;
LockType lock(mutex);
auto &q_info = get_queue(queue_id);
q_info.queue_rate_pps = pps;
q_info.pkt_delay_ticks = duration_cast<ticks>(duration<double>(1. / pps));
q_info.pkt_delay_ticks = rate_to_ticks(pps);
}

//! Set the maximum rate of all logical queues to \p pps.
Expand All @@ -365,7 +367,7 @@ class QueueingLogicRL {
for (auto &p : queues_info) {
auto &q_info = p.second;
q_info.queue_rate_pps = pps;
q_info.pkt_delay_ticks = duration_cast<ticks>(duration<double>(1. / pps));
q_info.pkt_delay_ticks = rate_to_ticks(pps);
}
queue_rate_pps = pps;
}
Expand All @@ -386,22 +388,32 @@ class QueueingLogicRL {
// using clock = std::chrono::steady_clock;
using clock = std::chrono::high_resolution_clock;

static constexpr ticks rate_to_ticks(uint64_t pps) {
using std::chrono::duration;
using std::chrono::duration_cast;
return (pps == 0) ?
ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
}

struct QE {
// QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
// : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
QE(T e, size_t queue_id, const clock::time_point &send)
: e(std::move(e)), queue_id(queue_id), send(send) { }
QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
: e(std::move(e)), queue_id(queue_id), send(send), id(id) { }

T e;
size_t queue_id;
clock::time_point send;
// size_t id;
size_t id;
};

struct QEComp {
bool operator()(const QE &lhs, const QE &rhs) const {
// return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
return lhs.send > rhs.send;
// the point of the id is to avoid re-orderings when the send timestamp is
// the same for 2 items, which seems to happen (when the pps rate is 0)
// with both the steady_clock and the high_resolution_clock on my Linux
// VM.
return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
}
};

Expand All @@ -413,8 +425,7 @@ class QueueingLogicRL {
QueueInfo(size_t capacity, uint64_t queue_rate_pps)
: capacity(capacity),
queue_rate_pps(queue_rate_pps),
pkt_delay_ticks(std::chrono::duration_cast<ticks>(
std::chrono::duration<double>(1. / queue_rate_pps))),
pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
last_sent(clock::now()) { }

size_t size{0};
Expand All @@ -427,6 +438,7 @@ class QueueingLogicRL {
struct WorkerInfo {
MyQ queue{};
mutable std::condition_variable q_not_empty{};
size_t wrapping_counter{0};
};

QueueInfo &get_queue(size_t queue_id) {
Expand Down Expand Up @@ -506,7 +518,8 @@ class QueueingLogicPriRL {
auto &q_info_pri = q_info.at(priority);
if (q_info_pri.size >= q_info_pri.capacity) return 0;
q_info_pri.last_sent = get_next_tp(q_info_pri);
w_info.queues[priority].emplace(item, queue_id, q_info_pri.last_sent);
w_info.queues[priority].emplace(
item, queue_id, q_info_pri.last_sent, w_info.wrapping_counter++);
q_info_pri.size++;
q_info.size++;
w_info.size++;
Expand All @@ -528,8 +541,11 @@ class QueueingLogicPriRL {
auto &q_info_pri = q_info.at(priority);
if (q_info_pri.size >= q_info_pri.capacity) return 0;
q_info_pri.last_sent = get_next_tp(q_info_pri);
w_info.queues[priority].emplace(std::move(item), queue_id,
q_info_pri.last_sent);
w_info.queues[priority].emplace(
std::move(item),
queue_id,
q_info_pri.last_sent,
w_info.wrapping_counter++);
q_info_pri.size++;
q_info.size++;
w_info.size++;
Expand Down Expand Up @@ -643,7 +659,8 @@ class QueueingLogicPriRL {
//! Set the maximum rate of all the priority queues for logical queue \p
//! queue_id to \p pps. \p pps is expressed in "number of elements per
//! second". Until this function is called, there will be no rate limit for
//! the queue.
//! the queue. The same behavior (no rate limit) can be achieved by calling
//! this method with a rate of 0.
void set_rate(size_t queue_id, uint64_t pps) {
LockType lock(mutex);
for_each_q(queue_id, SetRateFn(pps));
Expand Down Expand Up @@ -676,21 +693,31 @@ class QueueingLogicPriRL {
private:
using ticks = std::chrono::nanoseconds;
// clock choice? switch to steady if observing re-ordering
// in my Linux VM, it seems that both clocks behave the same (can sometimes
// stop increasing for a bit but do not go backwards).
// using clock = std::chrono::steady_clock;
using clock = std::chrono::high_resolution_clock;

static constexpr ticks rate_to_ticks(uint64_t pps) {
using std::chrono::duration;
using std::chrono::duration_cast;
return (pps == 0) ?
ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
}

struct QE {
QE(T e, size_t queue_id, const clock::time_point &send)
: e(std::move(e)), queue_id(queue_id), send(send) { }
QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
: e(std::move(e)), queue_id(queue_id), send(send), id(id) { }

T e;
size_t queue_id;
clock::time_point send;
size_t id;
};

struct QEComp {
bool operator()(const QE &lhs, const QE &rhs) const {
return lhs.send > rhs.send;
return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
}
};

Expand All @@ -700,8 +727,7 @@ class QueueingLogicPriRL {
QueueInfoPri(size_t capacity, uint64_t queue_rate_pps)
: capacity(capacity),
queue_rate_pps(queue_rate_pps),
pkt_delay_ticks(std::chrono::duration_cast<ticks>(
std::chrono::duration<double>(1. / queue_rate_pps))),
pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
last_sent(clock::now()) { }

size_t size{0};
Expand All @@ -723,6 +749,7 @@ class QueueingLogicPriRL {
mutable std::condition_variable q_not_empty{};
size_t size{0};
std::array<MyQ, 32> queues;
size_t wrapping_counter{0};
};

QueueInfo &get_queue(size_t queue_id) {
Expand Down Expand Up @@ -777,7 +804,7 @@ class QueueingLogicPriRL {
: pps(pps) {
using std::chrono::duration;
using std::chrono::duration_cast;
pkt_delay_ticks = duration_cast<ticks>(duration<double>(1. / pps));
pkt_delay_ticks = rate_to_ticks(pps);
}

void operator ()(QueueInfoPri &info) const { // NOLINT(runtime/references)
Expand Down
9 changes: 4 additions & 5 deletions tests/test_queueing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,30 @@ void QueueingTest<QType>::produce() {
namespace {

template <typename Q>
void produce_if_dropping(Q &queue, size_t iterations, size_t capacity,
void produce_if_dropping(Q &queue, size_t iterations,
const std::vector<RndInput> &values) {
for (size_t i = 0; i < iterations; i++) {
size_t queue_id = values[i].queue_id;
// this is to avoid drops
// kind of makes me question if using type parameterization is really useful
// for this
while (queue.size(queue_id) > capacity / 2) {
while (!queue.push_front(queue_id, unique_ptr<int>(new int(values[i].v)))) {
// originally, I just had an empty loop, but Valgrind was running forever
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
queue.push_front(queue_id, unique_ptr<int>(new int(values[i].v)));
}
}

} // namespace

template <>
void QueueingTest<QueueingLogicRL<QEm, WorkerMapper> >::produce() {
produce_if_dropping(queue, iterations, capacity, values);
produce_if_dropping(queue, iterations, values);
}

template <>
void QueueingTest<QueueingLogicPriRL<QEm, WorkerMapper> >::produce() {
produce_if_dropping(queue, iterations, capacity, values);
produce_if_dropping(queue, iterations, values);
}

using testing::Types;
Expand Down

0 comments on commit 77cc1a3

Please sign in to comment.