Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix undesired reordering in queueing logic #903

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 51 additions & 24 deletions include/bm/bm_sim/queueing.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ class QueueingLogicRL {
auto &w_info = workers_info.at(worker_id);
if (q_info.size >= q_info.capacity) return 0;
q_info.last_sent = get_next_tp(q_info);
w_info.queue.emplace(item, queue_id, q_info.last_sent);
w_info.queue.emplace(
item, queue_id, q_info.last_sent, w_info.wrapping_counter++);
q_info.size++;
w_info.q_not_empty.notify_one();
return 1;
Expand All @@ -289,7 +290,8 @@ class QueueingLogicRL {
auto &w_info = workers_info.at(worker_id);
if (q_info.size >= q_info.capacity) return 0;
q_info.last_sent = get_next_tp(q_info);
w_info.queue.emplace(std::move(item), queue_id, q_info.last_sent);
w_info.queue.emplace(
std::move(item), queue_id, q_info.last_sent, w_info.wrapping_counter++);
q_info.size++;
w_info.q_not_empty.notify_one();
return 1;
Expand Down Expand Up @@ -347,14 +349,14 @@ class QueueingLogicRL {

//! Set the maximum rate of the logical queue with id \p queue_id to \p
//! pps. \p pps is expressed in "number of elements per second". Until this
//! function is called, there will be no rate limit for the queue.
//! function is called, there will be no rate limit for the queue. The same
//! behavior (no rate limit) can be achieved by calling this method with a
//! rate of 0.
void set_rate(size_t queue_id, uint64_t pps) {
using std::chrono::duration;
using std::chrono::duration_cast;
LockType lock(mutex);
auto &q_info = get_queue(queue_id);
q_info.queue_rate_pps = pps;
q_info.pkt_delay_ticks = duration_cast<ticks>(duration<double>(1. / pps));
q_info.pkt_delay_ticks = rate_to_ticks(pps);
}

//! Set the maximum rate of all logical queues to \p pps.
Expand All @@ -365,7 +367,7 @@ class QueueingLogicRL {
for (auto &p : queues_info) {
auto &q_info = p.second;
q_info.queue_rate_pps = pps;
q_info.pkt_delay_ticks = duration_cast<ticks>(duration<double>(1. / pps));
q_info.pkt_delay_ticks = rate_to_ticks(pps);
}
queue_rate_pps = pps;
}
Expand All @@ -386,22 +388,32 @@ class QueueingLogicRL {
// using clock = std::chrono::steady_clock;
using clock = std::chrono::high_resolution_clock;

static constexpr ticks rate_to_ticks(uint64_t pps) {
using std::chrono::duration;
using std::chrono::duration_cast;
return (pps == 0) ?
ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
}

struct QE {
// QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
// : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
QE(T e, size_t queue_id, const clock::time_point &send)
: e(std::move(e)), queue_id(queue_id), send(send) { }
QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
: e(std::move(e)), queue_id(queue_id), send(send), id(id) { }

T e;
size_t queue_id;
clock::time_point send;
// size_t id;
size_t id;
};

struct QEComp {
bool operator()(const QE &lhs, const QE &rhs) const {
// return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
return lhs.send > rhs.send;
// the point of the id is to avoid re-orderings when the send timestamp is
// the same for 2 items, which seems to happen (when the pps rate is 0)
// with both the steady_clock and the high_resolution_clock on my Linux
// VM.
return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
}
};

Expand All @@ -413,8 +425,7 @@ class QueueingLogicRL {
QueueInfo(size_t capacity, uint64_t queue_rate_pps)
: capacity(capacity),
queue_rate_pps(queue_rate_pps),
pkt_delay_ticks(std::chrono::duration_cast<ticks>(
std::chrono::duration<double>(1. / queue_rate_pps))),
pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
last_sent(clock::now()) { }

size_t size{0};
Expand All @@ -427,6 +438,7 @@ class QueueingLogicRL {
struct WorkerInfo {
MyQ queue{};
mutable std::condition_variable q_not_empty{};
size_t wrapping_counter{0};
};

QueueInfo &get_queue(size_t queue_id) {
Expand Down Expand Up @@ -506,7 +518,8 @@ class QueueingLogicPriRL {
auto &q_info_pri = q_info.at(priority);
if (q_info_pri.size >= q_info_pri.capacity) return 0;
q_info_pri.last_sent = get_next_tp(q_info_pri);
w_info.queues[priority].emplace(item, queue_id, q_info_pri.last_sent);
w_info.queues[priority].emplace(
item, queue_id, q_info_pri.last_sent, w_info.wrapping_counter++);
q_info_pri.size++;
q_info.size++;
w_info.size++;
Expand All @@ -528,8 +541,11 @@ class QueueingLogicPriRL {
auto &q_info_pri = q_info.at(priority);
if (q_info_pri.size >= q_info_pri.capacity) return 0;
q_info_pri.last_sent = get_next_tp(q_info_pri);
w_info.queues[priority].emplace(std::move(item), queue_id,
q_info_pri.last_sent);
w_info.queues[priority].emplace(
std::move(item),
queue_id,
q_info_pri.last_sent,
w_info.wrapping_counter++);
q_info_pri.size++;
q_info.size++;
w_info.size++;
Expand Down Expand Up @@ -643,7 +659,8 @@ class QueueingLogicPriRL {
//! Set the maximum rate of all the priority queues for logical queue \p
//! queue_id to \p pps. \p pps is expressed in "number of elements per
//! second". Until this function is called, there will be no rate limit for
//! the queue.
//! the queue. The same behavior (no rate limit) can be achieved by calling
//! this method with a rate of 0.
void set_rate(size_t queue_id, uint64_t pps) {
LockType lock(mutex);
for_each_q(queue_id, SetRateFn(pps));
Expand Down Expand Up @@ -676,21 +693,31 @@ class QueueingLogicPriRL {
private:
using ticks = std::chrono::nanoseconds;
// clock choice? switch to steady if observing re-ordering
// in my Linux VM, it seems that both clocks behave the same (can sometimes
// stop increasing for a bit but do not go backwards).
// using clock = std::chrono::steady_clock;
using clock = std::chrono::high_resolution_clock;

static constexpr ticks rate_to_ticks(uint64_t pps) {
using std::chrono::duration;
using std::chrono::duration_cast;
return (pps == 0) ?
ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
}

struct QE {
QE(T e, size_t queue_id, const clock::time_point &send)
: e(std::move(e)), queue_id(queue_id), send(send) { }
QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
: e(std::move(e)), queue_id(queue_id), send(send), id(id) { }

T e;
size_t queue_id;
clock::time_point send;
size_t id;
};

struct QEComp {
bool operator()(const QE &lhs, const QE &rhs) const {
return lhs.send > rhs.send;
return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
}
};

Expand All @@ -700,8 +727,7 @@ class QueueingLogicPriRL {
QueueInfoPri(size_t capacity, uint64_t queue_rate_pps)
: capacity(capacity),
queue_rate_pps(queue_rate_pps),
pkt_delay_ticks(std::chrono::duration_cast<ticks>(
std::chrono::duration<double>(1. / queue_rate_pps))),
pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
last_sent(clock::now()) { }

size_t size{0};
Expand All @@ -723,6 +749,7 @@ class QueueingLogicPriRL {
mutable std::condition_variable q_not_empty{};
size_t size{0};
std::array<MyQ, 32> queues;
size_t wrapping_counter{0};
};

QueueInfo &get_queue(size_t queue_id) {
Expand Down Expand Up @@ -777,7 +804,7 @@ class QueueingLogicPriRL {
: pps(pps) {
using std::chrono::duration;
using std::chrono::duration_cast;
pkt_delay_ticks = duration_cast<ticks>(duration<double>(1. / pps));
pkt_delay_ticks = rate_to_ticks(pps);
}

void operator ()(QueueInfoPri &info) const { // NOLINT(runtime/references)
Expand Down
9 changes: 4 additions & 5 deletions tests/test_queueing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,30 @@ void QueueingTest<QType>::produce() {
namespace {

template <typename Q>
void produce_if_dropping(Q &queue, size_t iterations, size_t capacity,
void produce_if_dropping(Q &queue, size_t iterations,
const std::vector<RndInput> &values) {
for (size_t i = 0; i < iterations; i++) {
size_t queue_id = values[i].queue_id;
// this is to avoid drops
// kind of makes me question if using type parameterization is really useful
// for this
while (queue.size(queue_id) > capacity / 2) {
while (!queue.push_front(queue_id, unique_ptr<int>(new int(values[i].v)))) {
// originally, I just had an empty loop, but Valgrind was running forever
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
queue.push_front(queue_id, unique_ptr<int>(new int(values[i].v)));
}
}

} // namespace

template <>
void QueueingTest<QueueingLogicRL<QEm, WorkerMapper> >::produce() {
produce_if_dropping(queue, iterations, capacity, values);
produce_if_dropping(queue, iterations, values);
}

template <>
void QueueingTest<QueueingLogicPriRL<QEm, WorkerMapper> >::produce() {
produce_if_dropping(queue, iterations, capacity, values);
produce_if_dropping(queue, iterations, values);
}

using testing::Types;
Expand Down