diff --git a/README.md b/README.md index 4f0a9fb..a177593 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ It contains: - `flex_array`: A combination of `std::array` and `std::span` - `tuple_algorithms`: Some algorithms for iterating tuples - `generator`: The reference implementation of `std::generator` from [P2502R2](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2502r2.pdf) +- `channel`: A single producer, single consumer queue ## Usage @@ -82,6 +83,10 @@ If you want this generator to serve as a drop in replacement for `std::generator use `#define DICE_TEMPLATELIBRARY_GENERATOR_STD_COMPAT 1` before including the generator header. That will export all generator related things under namespace `std::`. +### `channel` +A single producer, single consume queue. This can be used to communicate between threads in a more high level +fashion than a mutex+container would allow. + ### Further Examples Compilable code examples can be found in [examples](./examples). The example build requires the cmake diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 78d9517..dd0df24 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -65,3 +65,10 @@ target_link_libraries(example_generator PRIVATE dice-template-library::dice-template-library ) + +add_executable(example_channel + example_channel.cpp) +target_link_libraries(example_channel + PRIVATE + dice-template-library::dice-template-library +) diff --git a/examples/example_channel.cpp b/examples/example_channel.cpp new file mode 100644 index 0000000..9211fa6 --- /dev/null +++ b/examples/example_channel.cpp @@ -0,0 +1,26 @@ +#include + +#include +#include +#include +#include + + +int main() { + dice::template_library::channel chan{8}; + + std::jthread thrd{[&chan]() { + std::vector ints; + for (int x : chan) { + ints.push_back(x); + std::cout << x << ' '; + } + + assert(std::ranges::equal(ints, std::vector{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); + }}; + + for (int x = 0; x < 10; ++x) { + chan.push(x); + } + chan.close(); // don't forget to close +} diff --git a/include/dice/template-library/channel.hpp b/include/dice/template-library/channel.hpp new file mode 100644 index 0000000..7e605c9 --- /dev/null +++ b/include/dice/template-library/channel.hpp @@ -0,0 +1,304 @@ +#ifndef DICE_TEMPLATELIBRARY_CHANNEL_HPP +#define DICE_TEMPLATELIBRARY_CHANNEL_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace dice::template_library { + + /** + * A single producer, single consumer channel/queue + * @note this can technically be used as a multi producer, single consumer queue, but care must be taken + * of when exactly close() is called + * @warning close() must be called once the producing thread is done, otherwise the reading thread will hang indefinitely + * + * @tparam T value type of the channel + */ + template + struct channel { + using value_type = T; + using size_type = size_t; + using difference_type = std::ptrdiff_t; + using reference = value_type &; + using const_reference = value_type const &; + using pointer = T *; + using const_pointer = T const *; + + private: + size_t max_cap_; ///< maximum allowed number of elements in queue_ + std::deque queue_; ///< queue for elements + + std::atomic_flag closed_ = ATOMIC_FLAG_INIT; ///< true if this channel is closed + std::mutex queue_mutex_; ///< mutex for queue_ + std::condition_variable queue_not_empty_; ///< condvar for queue_.size() > 0 + std::condition_variable queue_not_full_; ///< condvar for queue_.size() < max_cap_; + + public: + explicit channel(size_t capacity) : max_cap_{capacity} { + // note: for some reason std::deque does not have .reserve() + // TODO: it would probably make sense to implement a VecDeque for this (https://doc.rust-lang.org/std/collections/struct.VecDeque.html) + } + + // there is no way to safely implement these with concurrent access + channel(channel const &other) = delete; + channel(channel &&other) = delete; + channel &operator=(channel const &other) = delete; + channel &operator=(channel &&other) noexcept = delete; + + ~channel() noexcept = default; + + /** + * Close the channel. + * After calling close calls to push() will return false + * and calls to try_pop will return std::nullopt once the already present elements are exhausted + */ + void close() noexcept { + { + // "Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread." + // - https://en.cppreference.com/w/cpp/thread/condition_variable + // + // Here closed_ is the shared variable used by queue_not_empty_ in another thread (the one waiting in try_pop) + std::lock_guard lock{queue_mutex_}; + closed_.test_and_set(std::memory_order_release); + } + queue_not_empty_.notify_one(); // notify pop() so that it does not get stuck + } + + /** + * @return true if this channel is closed + */ + [[nodiscard]] bool closed() const noexcept { + return closed_.test(std::memory_order_acquire); + } + + /** + * Emplace an element into the channel, blocks if there is no capacity left in the channel. + * + * @param args constructor args + * @return true if emplacing the element succeeded because the channel is not yet closed + */ + template + bool emplace(Args &&...args) noexcept(std::is_nothrow_constructible_v(args))...>) { + if (closed_.test(std::memory_order_acquire)) [[unlikely]] { + return false; + } + + { + std::unique_lock lock{queue_mutex_}; + queue_not_full_.wait(lock, [this]() noexcept { return queue_.size() < max_cap_; }); + queue_.emplace_back(std::forward(args)...); + } + + queue_not_empty_.notify_one(); + return true; + } + + /** + * Emplace an element into the channel, returns immediately if there is no capacity in the channel. + * + * @param args constructor args + * @return true if emplacing the element succeeded + */ + template + bool try_emplace(Args &&...args) noexcept(std::is_nothrow_constructible_v(args))...>) { + if (closed_.test(std::memory_order_acquire)) [[unlikely]] { + return false; + } + + { + std::unique_lock lock{queue_mutex_}; + if (queue_.size() >= max_cap_) { + return false; + } + + queue_.emplace_back(std::forward(args)...); + } + + queue_not_empty_.notify_one(); + return true; + } + + /** + * Push a single element into the channel, blocks if there is no capacity left in the channel. + * + * @param value the element to push + * @return true if pushing the element succeeded because the channel is not yet closed + */ + bool push(value_type const &value) noexcept(std::is_nothrow_copy_constructible_v) { + return emplace(value); + } + + /** + * Push a single element into the channel, returns immediately if there is no capacity in the channel. + * + * @param value the element to push + * @return true if pushing the element succeeded because the channel is not yet closed + */ + bool try_push(value_type const &value) noexcept(std::is_nothrow_copy_constructible_v) { + return try_emplace(value); + } + + /** + * Push a single element into the channel, blocks if there is no capacity left in the channel. + * + * @param value the element to push + * @return true if pushing the element succeeded because the channel is not yet closed + */ + bool push(value_type &&value) noexcept(std::is_nothrow_move_constructible_v) { + return emplace(std::move(value)); + } + + /** + * Push a single element into the channel, returns immediately if there is no capacity in the channel. + * + * @param value the element to push + * @return true if pushing the element succeeded because the channel is not yet closed + */ + bool try_push(value_type &&value) noexcept(std::is_nothrow_move_constructible_v) { + return try_emplace(std::move(value)); + } + + /** + * Try to get a (previously pushed) element from the channel. + * If there is no element available, blocks until there is one available or the channel is closed. + * + * @return std::nullopt if the channel was closed, an element otherwise + */ + [[nodiscard]] std::optional pop() noexcept(std::is_nothrow_move_constructible_v) { + std::unique_lock lock{queue_mutex_}; + queue_not_empty_.wait(lock, [this]() noexcept { return !queue_.empty() || closed_.test(std::memory_order_acquire); }); + + if (queue_.empty()) [[unlikely]] { + // implies closed_ == true + return std::nullopt; + } + + auto ret = std::move(queue_.front()); + queue_.pop_front(); + + lock.unlock(); + queue_not_full_.notify_one(); + return ret; + } + + /** + * Try to get a (previously pushed) element from the channel. + * Unlike pop(), if there is no element available, returns std::nullopt immediatly. + */ + [[nodiscard]] std::optional try_pop() noexcept(std::is_nothrow_move_constructible_v) { + std::unique_lock lock{queue_mutex_}; + if (queue_.empty()) { + return std::nullopt; + } + + auto ret = std::move(queue_.front()); + queue_.pop_front(); + + lock.unlock(); + queue_not_full_.notify_one(); + return ret; + } + + struct iterator { + using channel_type = channel; + using value_type = T; + using difference_type = std::ptrdiff_t; + using reference = T &; + using const_reference = T const &; + using pointer = typename channel_type::pointer; + using const_pointer = typename channel_type::const_pointer; + using iterator_category = std::input_iterator_tag; + + private: + channel_type *chan_; + mutable std::optional buf_; ///< this has to be mutable for this iterator to fullfill std::input_iterator + + void advance() noexcept { + buf_ = chan_->pop(); + } + + public: + explicit iterator(channel_type *chan) noexcept : chan_{chan} { + advance(); + } + + iterator(iterator const &other) noexcept(std::is_nothrow_copy_constructible_v) + : chan_{other.chan_}, + buf_{other.buf_} { + } + + iterator &operator=(iterator const &other) noexcept(std::is_nothrow_copy_assignable_v) { + if (this == &other) { + return *this; + } + + chan_ = other.chan_; + buf_ = other.buf_; + return *this; + } + + iterator(iterator &&other) noexcept(std::is_nothrow_move_constructible_v) + : chan_{other.chan_}, + buf_{std::move(other.buf_)} { + } + + iterator &operator=(iterator &&other) noexcept(std::is_nothrow_swappable_v) { + assert(this != &other); + std::swap(chan_, other.chan_); + std::swap(buf_, other.buf_); + return *this; + } + + reference operator*() noexcept { + return *buf_; + } + + reference operator*() const noexcept { + return *buf_; + } + + pointer operator->() noexcept { + return &*buf_; + } + + pointer operator->() const noexcept { + return &*buf_; + } + + iterator &operator++() noexcept { + advance(); + return *this; + } + + void operator++(int) noexcept { + advance(); + } + + bool operator==(std::default_sentinel_t) const noexcept { + return !buf_.has_value(); + } + }; + + using sentinel = std::default_sentinel_t; + + /** + * @return an iterator over all present and future elements of this channel + * @note iterator == end() is true once the channel is closed + */ + [[nodiscard]] iterator begin() noexcept { + return iterator{this}; + } + + [[nodiscard]] sentinel end() const noexcept { + return std::default_sentinel; + } + }; + +} // namespace dice::template_library + +#endif // DICE_TEMPLATELIBRARY_CHANNEL_HPP diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f912eb6..096b8cf 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -55,3 +55,6 @@ custom_add_test(tests_flex_array) add_executable(tests_generator tests_generator.cpp) custom_add_test(tests_generator) + +add_executable(tests_channel tests_channel.cpp) +custom_add_test(tests_channel) diff --git a/tests/tests_channel.cpp b/tests/tests_channel.cpp new file mode 100644 index 0000000..2adc79a --- /dev/null +++ b/tests/tests_channel.cpp @@ -0,0 +1,96 @@ +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include + +#include + +#include +#include +#include +#include + +TEST_SUITE("mpmc_channel") { + using namespace dice::template_library; + + TEST_CASE("is range") { + static_assert(std::input_iterator::iterator>); + static_assert(std::ranges::range>); + static_assert(std::ranges::input_range>); + } + + TEST_CASE("sanity check") { + channel chan{3}; + CHECK_FALSE(chan.closed()); + CHECK_EQ(chan.try_pop(), std::nullopt); + + std::string const s{"a"}; + chan.push(s); + + chan.push(std::string{"b"}); + chan.emplace("c"); + + // no capacity left + CHECK_FALSE(chan.try_push(s)); + CHECK_FALSE(chan.try_push(std::string{"b"})); + CHECK_FALSE(chan.try_emplace("c")); + + chan.close(); + CHECK(chan.closed()); + + CHECK_EQ(chan.pop(), "a"); + CHECK_EQ(chan.pop(), "b"); + CHECK_EQ(chan.pop(), "c"); + CHECK_EQ(chan.pop(), std::nullopt); + CHECK_EQ(chan.pop(), std::nullopt); + CHECK_EQ(chan.try_pop(), std::nullopt); + CHECK_EQ(chan.try_pop(), std::nullopt); + } + + TEST_CASE("usecase sanity check") { + channel chan{3}; + + std::jthread thrd{[&chan]() { + std::vector ints; + for (int x : chan) { + ints.push_back(x); + } + + CHECK_EQ(ints, std::vector{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); + }}; + + for (int x = 0; x < 10; ++x) { + chan.push(x); + } + chan.close(); // don't forget to close + } + + TEST_CASE("iter") { + channel chan{8}; + chan.emplace("a"); + chan.emplace("b"); + chan.close(); + + std::vector actual; + for (auto x : chan) { + actual.push_back(x); + } + + CHECK_EQ(actual, std::vector{"a", "b"}); + } + + TEST_CASE("closed push") { + channel chan{8}; + chan.close(); + CHECK(chan.closed()); + + std::string const s{"a"}; + CHECK_FALSE(chan.push(s)); + CHECK_FALSE(chan.try_push(s)); + + CHECK_FALSE(chan.push(std::string{"a"})); + CHECK_FALSE(chan.try_push(std::string{"a"})); + CHECK_FALSE(chan.emplace("a")); + + CHECK_EQ(chan.pop(), std::nullopt); + CHECK_EQ(chan.try_pop(), std::nullopt); + } +}