From 1e650aac413b351b4996d4a5a12734c4c724a327 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 6 Mar 2019 17:07:58 -0800 Subject: [PATCH] Fix ingress thread deadlock for resubmit packets Resubmit packets were written to the input_buffer with a blocking call by the ingress thread. Since the ingress thread is also in charge of draining the input_buffer, this could have lead to a deadlock. Resubmit packets can now be dropped if the buffer is full. To limit the number of resubmit packets being lost, we place them is a higher priority queue than "normal" packets. Fixes #729 --- include/bm/bm_sim/queueing.h | 6 +- targets/simple_switch/simple_switch.cpp | 111 ++++++++++++++++++++++-- targets/simple_switch/simple_switch.h | 6 +- 3 files changed, 111 insertions(+), 12 deletions(-) diff --git a/include/bm/bm_sim/queueing.h b/include/bm/bm_sim/queueing.h index e17cb6d56..3601ce162 100644 --- a/include/bm/bm_sim/queueing.h +++ b/include/bm/bm_sim/queueing.h @@ -397,9 +397,9 @@ class QueueingLogicRL { //! high priority can starve lower-priority queues. For example, if the queue //! with priority `0` always contains at least one element, the other queues //! will never be served. -//! As for QueueingLogicRL, the write behavior (push_front()) is blocking: once -//! a logical queue is full, subsequent incoming elements will be dropped until -//! the queue starts draining again. +//! As for QueueingLogicRL, the write behavior (push_front()) is not blocking: +//! once a logical queue is full, subsequent incoming elements will be dropped +//! until the queue starts draining again. //! Look at the documentation for QueueingLogic for more information about the //! template parameters (they are the same). template diff --git a/targets/simple_switch/simple_switch.cpp b/targets/simple_switch/simple_switch.cpp index e1822a0c7..15a6e8df7 100644 --- a/targets/simple_switch/simple_switch.cpp +++ b/targets/simple_switch/simple_switch.cpp @@ -18,12 +18,15 @@ * */ +#include #include #include #include #include +#include +#include #include #include #include @@ -98,10 +101,94 @@ class SimpleSwitch::MirroringSessions { std::unordered_map sessions_map; }; +// Arbitrates which packets are processed by the ingress thread. Resubmit and +// recirculate packets go to a high priority queue, while normal pakcets go to a +// low priority queue. We assume that starvation is not going to be a problem. +// Resubmit packets are dropped if the queue is full in order to make sure the +// ingress thread cannot deadlock. We do the same for recirculate packets even +// though the same argument does not apply for them. Enqueueing normal packets +// is blocking (back pressure is applied to the interface). +class SimpleSwitch::InputBuffer { + public: + enum class PacketType { + NORMAL, + RESUBMIT, + RECIRCULATE, + SENTINEL // signal for the ingress thread to terminate + }; + + InputBuffer(size_t capacity_hi, size_t capacity_lo) + : capacity_hi(capacity_hi), capacity_lo(capacity_lo) { } + + int push_front(PacketType packet_type, std::unique_ptr &&item) { + switch (packet_type) { + case PacketType::NORMAL: + return push_front(&queue_lo, capacity_lo, &cvar_can_push_lo, + std::move(item), true); + case PacketType::RESUBMIT: + case PacketType::RECIRCULATE: + return push_front(&queue_hi, capacity_hi, &cvar_can_push_hi, + std::move(item), false); + case PacketType::SENTINEL: + return push_front(&queue_hi, capacity_hi, &cvar_can_push_hi, + std::move(item), true); + } + _BM_UNREACHABLE("Unreachable statement"); + return 0; + } + + void pop_back(std::unique_ptr *pItem) { + Lock lock(mutex); + cvar_can_pop.wait( + lock, [this] { return (queue_hi.size() + queue_lo.size()) > 0; }); + // give higher priority to resubmit/recirculate queue + if (queue_hi.size() > 0) { + *pItem = std::move(queue_hi.back()); + queue_hi.pop_back(); + lock.unlock(); + cvar_can_push_hi.notify_one(); + } else { + *pItem = std::move(queue_lo.back()); + queue_lo.pop_back(); + lock.unlock(); + cvar_can_push_lo.notify_one(); + } + } + + private: + using Mutex = std::mutex; + using Lock = std::unique_lock; + using QueueImpl = std::deque >; + + int push_front(QueueImpl *queue, size_t capacity, + std::condition_variable *cvar, + std::unique_ptr &&item, bool blocking) { + Lock lock(mutex); + while (queue->size() == capacity) { + if (!blocking) return 0; + cvar->wait(lock); + } + queue->push_front(std::move(item)); + lock.unlock(); + cvar_can_pop.notify_one(); + return 1; + } + + mutable std::mutex mutex; + mutable std::condition_variable cvar_can_push_hi; + mutable std::condition_variable cvar_can_push_lo; + mutable std::condition_variable cvar_can_pop; + size_t capacity_hi; + size_t capacity_lo; + QueueImpl queue_hi; + QueueImpl queue_lo; +}; + SimpleSwitch::SimpleSwitch(port_t max_port, bool enable_swap) : Switch(enable_swap), max_port(max_port), - input_buffer(1024), + input_buffer(new InputBuffer( + 1024 /* normal capacity */, 1024 /* resubmit/recirc capacity */)), #ifdef SSWITCH_PRIORITY_QUEUEING_ON egress_buffers(max_port, nb_egress_threads, 64, EgressThreadMapper(nb_egress_threads), @@ -175,7 +262,8 @@ SimpleSwitch::receive_(port_t port_num, const char *buffer, int len) { .set(get_ts().count()); } - input_buffer.push_front(std::move(packet)); + input_buffer->push_front( + InputBuffer::PacketType::NORMAL, std::move(packet)); return 0; } @@ -191,12 +279,17 @@ SimpleSwitch::start_and_return_() { } SimpleSwitch::~SimpleSwitch() { - input_buffer.push_front(nullptr); + input_buffer->push_front( + InputBuffer::PacketType::SENTINEL, nullptr); for (size_t i = 0; i < nb_egress_threads; i++) { + // The push_front call is called inside a while loop because there is no + // guarantee that the sentinel was enqueued otherwise. It should not be an + // issue because at this stage the ingress thread has been sent a signal to + // stop, and only egress clones can be sent to the buffer. #ifdef SSWITCH_PRIORITY_QUEUEING_ON - egress_buffers.push_front(i, 0, nullptr); + while (egress_buffers.push_front(i, 0, nullptr) == 0) continue; #else - egress_buffers.push_front(i, nullptr); + while (egress_buffers.push_front(i, nullptr) == 0) continue; #endif } output_buffer.push_front(nullptr); @@ -375,7 +468,7 @@ SimpleSwitch::ingress_thread() { while (1) { std::unique_ptr packet; - input_buffer.pop_back(&packet); + input_buffer->pop_back(&packet); if (packet == nullptr) break; // TODO(antonin): only update these if swapping actually happened? @@ -491,7 +584,8 @@ SimpleSwitch::ingress_thread() { copy_field_list_and_set_type(packet, packet_copy, PKT_INSTANCE_TYPE_RESUBMIT, field_list_id); - input_buffer.push_front(std::move(packet_copy)); + input_buffer->push_front( + InputBuffer::PacketType::RESUBMIT, std::move(packet_copy)); continue; } } @@ -634,7 +728,8 @@ SimpleSwitch::egress_thread(size_t worker_id) { // TODO(antonin): really it may be better to create a new packet here or // to fold this functionality into the Packet class? packet_copy->set_ingress_length(packet_size); - input_buffer.push_front(std::move(packet_copy)); + input_buffer->push_front( + InputBuffer::PacketType::RECIRCULATE, std::move(packet_copy)); continue; } } diff --git a/targets/simple_switch/simple_switch.h b/targets/simple_switch/simple_switch.h index 28b82f1cb..ef1121576 100644 --- a/targets/simple_switch/simple_switch.h +++ b/targets/simple_switch/simple_switch.h @@ -126,6 +126,8 @@ class SimpleSwitch : public Switch { class MirroringSessions; + class InputBuffer; + enum PktInstanceType { PKT_INSTANCE_TYPE_NORMAL, PKT_INSTANCE_TYPE_INGRESS_CLONE, @@ -169,7 +171,9 @@ class SimpleSwitch : public Switch { private: port_t max_port; std::vector threads_; - Queue > input_buffer; + std::unique_ptr input_buffer; + // for these queues, the write operation is non-blocking and we drop the + // packet if the queue is full #ifdef SSWITCH_PRIORITY_QUEUEING_ON bm::QueueingLogicPriRL, EgressThreadMapper> #else