From f022a6af30ef9ae9d839f4c3bed4e96de5f80946 Mon Sep 17 00:00:00 2001 From: Liss Heidrich <31625940+Clueliss@users.noreply.github.com> Date: Mon, 29 Jul 2024 11:29:07 +0200 Subject: [PATCH 1/4] WIP --- README.md | 5 + examples/CMakeLists.txt | 7 + examples/example_channel.cpp | 26 +++ include/dice/template-library/channel.hpp | 234 ++++++++++++++++++++++ tests/CMakeLists.txt | 3 + tests/tests_channel.cpp | 19 ++ 6 files changed, 294 insertions(+) create mode 100644 examples/example_channel.cpp create mode 100644 include/dice/template-library/channel.hpp create mode 100644 tests/tests_channel.cpp diff --git a/README.md b/README.md index 4f0a9fb..a177593 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ It contains: - `flex_array`: A combination of `std::array` and `std::span` - `tuple_algorithms`: Some algorithms for iterating tuples - `generator`: The reference implementation of `std::generator` from [P2502R2](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2502r2.pdf) +- `channel`: A single producer, single consumer queue ## Usage @@ -82,6 +83,10 @@ If you want this generator to serve as a drop in replacement for `std::generator use `#define DICE_TEMPLATELIBRARY_GENERATOR_STD_COMPAT 1` before including the generator header. That will export all generator related things under namespace `std::`. +### `channel` +A single producer, single consume queue. This can be used to communicate between threads in a more high level +fashion than a mutex+container would allow. + ### Further Examples Compilable code examples can be found in [examples](./examples). The example build requires the cmake diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 78d9517..dd0df24 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -65,3 +65,10 @@ target_link_libraries(example_generator PRIVATE dice-template-library::dice-template-library ) + +add_executable(example_channel + example_channel.cpp) +target_link_libraries(example_channel + PRIVATE + dice-template-library::dice-template-library +) diff --git a/examples/example_channel.cpp b/examples/example_channel.cpp new file mode 100644 index 0000000..9211fa6 --- /dev/null +++ b/examples/example_channel.cpp @@ -0,0 +1,26 @@ +#include + +#include +#include +#include +#include + + +int main() { + dice::template_library::channel chan{8}; + + std::jthread thrd{[&chan]() { + std::vector ints; + for (int x : chan) { + ints.push_back(x); + std::cout << x << ' '; + } + + assert(std::ranges::equal(ints, std::vector{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); + }}; + + for (int x = 0; x < 10; ++x) { + chan.push(x); + } + chan.close(); // don't forget to close +} diff --git a/include/dice/template-library/channel.hpp b/include/dice/template-library/channel.hpp new file mode 100644 index 0000000..78a7959 --- /dev/null +++ b/include/dice/template-library/channel.hpp @@ -0,0 +1,234 @@ +#ifndef DICE_TEMPLATELIBRARY_CHANNEL_HPP +#define DICE_TEMPLATELIBRARY_CHANNEL_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace dice::template_library { + + /** + * A single producer, single consumer channel/queue + * @note this can technically be used as a multi producer, single consumer queue, but care must be taken + * of when exactly close() is called + * @warning close() must be called once the producing thread is done, otherwise the reading thread will hang indefinitely or SEGFAULT + * + * @tparam T value type of the channel + */ + template + struct channel { + using value_type = T; + using size_type = size_t; + using difference_type = std::ptrdiff_t; + using reference = value_type &; + using const_reference = value_type const &; + using pointer = T *; + using const_pointer = T const *; + + private: + size_t max_cap_; ///< maximum allowed number of elements in queue_ + std::deque queue_; ///< queue for elements + + std::atomic_flag closed_; ///< true if this channel is closed + std::mutex queue_mutex_; ///< mutex for queue_ + std::condition_variable queue_not_empty_; ///< condvar for queue_.size() > 0 + std::condition_variable queue_not_full_; ///< condvar for queue_.size() < max_cap_; + + public: + explicit channel(size_t capacity) : max_cap_{capacity} { + // note: for some reason std::deque does not have .reserve() + // TODO: it would probably make sense to implement a VecDeque for this (https://doc.rust-lang.org/std/collections/struct.VecDeque.html) + } + + // there is no way to safely implement these with concurrent access + channel(channel const &other) = delete; + channel(channel &&other) = delete; + channel &operator=(channel const &other) = delete; + channel &operator=(channel &&other) noexcept = delete; + + ~channel() noexcept = default; + + /** + * Close the channel. + * After calling close calls to push() will return false + * and calls to try_pop will return std::nullopt once the already present elements are exhausted + */ + void close() noexcept { + { + // "Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread." + // - https://en.cppreference.com/w/cpp/thread/condition_variable + // + // Here closed_ is the shared variable used by queue_not_empty_ in another thread (the one waiting in try_pop) + std::lock_guard lock{queue_mutex_}; + closed_.test_and_set(std::memory_order_release); + } + queue_not_empty_.notify_one(); // notify try_pop so that it does not get stuck + } + + /** + * Emplace an element into the channel + * + * @param args constructor args + * @return true if emplacing the element succeeded because the channel is not yet closed + */ + template + bool emplace(Args &&...args) noexcept(std::is_nothrow_constructible_v(args))...>) { + if (closed_.test(std::memory_order_acquire)) [[unlikely]] { + return false; + } + + { + std::unique_lock lock{queue_mutex_}; + queue_not_full_.wait(lock, [this]() noexcept { return queue_.size() < max_cap_; }); + queue_.emplace_back(std::forward(args)...); + } + + queue_not_empty_.notify_one(); + return true; + } + + /** + * Push a single element into the channel + * + * @param value the element to push + * @return true if pushing the element succeeded because the channel is not yet closed + */ + bool push(value_type const &value) noexcept(std::is_nothrow_copy_constructible_v) { + return emplace(value); + } + + /** + * Push a single element into the channel + * + * @param value the element to push + * @return true if pushing the element succeeded because the channel is not yet closed + */ + bool push(value_type &&value) noexcept(std::is_nothrow_move_constructible_v) { + return emplace(std::move(value)); + } + + /** + * Try to get a (previously pushed) element from the channel. + * If there is no element available, blocks until there is one available or the channel is closed. + * + * @return std::nullopt if the channel was closed, an element otherwise + */ + [[nodiscard]] std::optional try_pop() noexcept(std::is_nothrow_move_constructible_v) { + std::unique_lock lock{queue_mutex_}; + queue_not_empty_.wait(lock, [this]() noexcept { return !queue_.empty() || closed_.test(std::memory_order_acquire); }); + + if (queue_.empty()) [[unlikely]] { + // implies closed_ == true + return std::nullopt; + } + + auto ret = std::move(queue_.front()); + queue_.pop_front(); + + lock.unlock(); + queue_not_full_.notify_one(); + return ret; + } + + struct iterator { + using channel_type = channel; + using value_type = T; + using difference_type = std::ptrdiff_t; + using reference = T &; + using const_reference = T const &; + using pointer = typename channel_type::pointer; + using const_pointer = typename channel_type::const_pointer; + using iterator_category = std::input_iterator_tag; + + private: + channel_type *chan_; + mutable std::optional buf_; ///< this has to be mutable for this iterator to fullfill std::input_iterator + + void advance() noexcept { + buf_ = chan_->try_pop(); + } + + public: + explicit iterator(channel_type *chan) noexcept : chan_{chan} { + advance(); + } + + iterator(iterator const &other) noexcept(std::is_nothrow_copy_constructible_v) + : chan_{other.chan_}, + buf_{other.buf_} { + } + + iterator &operator=(iterator const &other) noexcept(std::is_nothrow_copy_assignable_v) { + if (this == &other) { + return *this; + } + + chan_ = other.chan_; + buf_ = other.buf_; + return *this; + } + + iterator(iterator &&other) noexcept(std::is_nothrow_move_constructible_v) + : chan_{other.chan_}, + buf_{std::move(other.buf_)} { + } + + iterator &operator=(iterator &&other) noexcept(std::is_nothrow_swappable_v) { + assert(this != &other); + std::swap(chan_, other.chan_); + std::swap(buf_, other.buf_); + return *this; + } + + reference operator*() noexcept { + return *buf_; + } + + reference operator*() const noexcept { + return *buf_; + } + + pointer operator->() noexcept { + return &*buf_; + } + + pointer operator->() const noexcept { + return &*buf_; + } + + iterator &operator++() noexcept { + advance(); + return *this; + } + + void operator++(int) noexcept { + advance(); + } + + bool operator==(std::default_sentinel_t) const noexcept { + return !buf_.has_value(); + } + }; + + using sentinel = std::default_sentinel_t; + + /** + * @return an iterator over all present and future elements of this channel + * @note iterator == end() is true once the channel is closed + */ + [[nodiscard]] iterator begin() noexcept { + return iterator{this}; + } + + [[nodiscard]] sentinel end() const noexcept { + return std::default_sentinel; + } + }; + +} // namespace dice::template_library + +#endif // DICE_TEMPLATELIBRARY_MPSCCHANNEL_HPP diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f912eb6..096b8cf 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -55,3 +55,6 @@ custom_add_test(tests_flex_array) add_executable(tests_generator tests_generator.cpp) custom_add_test(tests_generator) + +add_executable(tests_channel tests_channel.cpp) +custom_add_test(tests_channel) diff --git a/tests/tests_channel.cpp b/tests/tests_channel.cpp new file mode 100644 index 0000000..0b531d7 --- /dev/null +++ b/tests/tests_channel.cpp @@ -0,0 +1,19 @@ +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include + +#include + +#include +#include + +TEST_SUITE("mpmc_channel") { + using namespace dice::template_library; + + TEST_CASE("is range") { + static_assert(std::input_iterator::iterator>); + static_assert(std::ranges::range>); + static_assert(std::ranges::input_range>); + } + + // TODO tests?? +} From cacaf886c6ed006baed3a045df46dc0a5bcaa425 Mon Sep 17 00:00:00 2001 From: Liss Heidrich <31625940+Clueliss@users.noreply.github.com> Date: Mon, 29 Jul 2024 13:47:25 +0200 Subject: [PATCH 2/4] tests --- include/dice/template-library/channel.hpp | 80 +++++++++++++++++++++-- include/dice/template-library/version.hpp | 13 ++++ tests/tests_channel.cpp | 79 +++++++++++++++++++++- 3 files changed, 166 insertions(+), 6 deletions(-) create mode 100644 include/dice/template-library/version.hpp diff --git a/include/dice/template-library/channel.hpp b/include/dice/template-library/channel.hpp index 78a7959..4f15865 100644 --- a/include/dice/template-library/channel.hpp +++ b/include/dice/template-library/channel.hpp @@ -70,7 +70,14 @@ namespace dice::template_library { } /** - * Emplace an element into the channel + * @return true if this channel is closed + */ + [[nodiscard]] bool closed() const noexcept { + return closed_.test(std::memory_order_acquire); + } + + /** + * Emplace an element into the channel, blocks if there is no capacity left in the channel. * * @param args constructor args * @return true if emplacing the element succeeded because the channel is not yet closed @@ -87,12 +94,37 @@ namespace dice::template_library { queue_.emplace_back(std::forward(args)...); } + queue_not_empty_.notify_one(); + return true; + } + + /** + * Emplace an element into the channel, returns immediately if there is no capacity in the channel. + * + * @param args constructor args + * @return true if emplacing the element succeeded + */ + template + bool try_emplace(Args &&...args) noexcept(std::is_nothrow_constructible_v(args))...>) { + if (closed_.test(std::memory_order_acquire)) [[unlikely]] { + return false; + } + + { + std::unique_lock lock{queue_mutex_}; + if (queue_.size() >= max_cap_) { + return false; + } + + queue_.emplace_back(std::forward(args)...); + } + queue_not_empty_.notify_one(); return true; } /** - * Push a single element into the channel + * Push a single element into the channel, blocks if there is no capacity left in the channel. * * @param value the element to push * @return true if pushing the element succeeded because the channel is not yet closed @@ -102,7 +134,17 @@ namespace dice::template_library { } /** - * Push a single element into the channel + * Push a single element into the channel, returns immediately if there is no capacity in the channel. + * + * @param value the element to push + * @return true if pushing the element succeeded because the channel is not yet closed + */ + bool try_push(value_type const &value) noexcept(std::is_nothrow_copy_constructible_v) { + return try_emplace(value); + } + + /** + * Push a single element into the channel, blocks if there is no capacity left in the channel. * * @param value the element to push * @return true if pushing the element succeeded because the channel is not yet closed @@ -111,13 +153,23 @@ namespace dice::template_library { return emplace(std::move(value)); } + /** + * Push a single element into the channel, returns immediately if there is no capacity in the channel. + * + * @param value the element to push + * @return true if pushing the element succeeded because the channel is not yet closed + */ + bool try_push(value_type &&value) noexcept(std::is_nothrow_move_constructible_v) { + return try_emplace(std::move(value)); + } + /** * Try to get a (previously pushed) element from the channel. * If there is no element available, blocks until there is one available or the channel is closed. * * @return std::nullopt if the channel was closed, an element otherwise */ - [[nodiscard]] std::optional try_pop() noexcept(std::is_nothrow_move_constructible_v) { + [[nodiscard]] std::optional pop() noexcept(std::is_nothrow_move_constructible_v) { std::unique_lock lock{queue_mutex_}; queue_not_empty_.wait(lock, [this]() noexcept { return !queue_.empty() || closed_.test(std::memory_order_acquire); }); @@ -134,6 +186,24 @@ namespace dice::template_library { return ret; } + /** + * Try to get a (previously pushed) element from the channel. + * Unlike pop(), if there is no element available, returns std::nullopt immediatly. + */ + [[nodiscard]] std::optional try_pop() noexcept(std::is_nothrow_move_constructible_v) { + std::unique_lock lock{queue_mutex_}; + if (queue_.empty()) { + return std::nullopt; + } + + auto ret = std::move(queue_.front()); + queue_.pop_front(); + + lock.unlock(); + queue_not_full_.notify_one(); + return ret; + } + struct iterator { using channel_type = channel; using value_type = T; @@ -149,7 +219,7 @@ namespace dice::template_library { mutable std::optional buf_; ///< this has to be mutable for this iterator to fullfill std::input_iterator void advance() noexcept { - buf_ = chan_->try_pop(); + buf_ = chan_->pop(); } public: diff --git a/include/dice/template-library/version.hpp b/include/dice/template-library/version.hpp new file mode 100644 index 0000000..ee2e7bc --- /dev/null +++ b/include/dice/template-library/version.hpp @@ -0,0 +1,13 @@ +#ifndef DICE_TEMPLATELIBRARY_VERSION_HPP +#define DICE_TEMPLATELIBRARY_VERSION_HPP + +#include + +namespace dice::template_library { + inline constexpr char name[] = "dice-template-library"; + inline constexpr char version[] = "1.5.1"; + inline constexpr std::array version_tuple = {1, 5, 1}; + inline constexpr int pobr_version = 1; ///< persisted object binary representation version +} // namespace dice::template_library + +#endif // DICE_TEMPLATELIBRARY_VERSION_HPP diff --git a/tests/tests_channel.cpp b/tests/tests_channel.cpp index 0b531d7..2adc79a 100644 --- a/tests/tests_channel.cpp +++ b/tests/tests_channel.cpp @@ -3,6 +3,8 @@ #include +#include +#include #include #include @@ -15,5 +17,80 @@ TEST_SUITE("mpmc_channel") { static_assert(std::ranges::input_range>); } - // TODO tests?? + TEST_CASE("sanity check") { + channel chan{3}; + CHECK_FALSE(chan.closed()); + CHECK_EQ(chan.try_pop(), std::nullopt); + + std::string const s{"a"}; + chan.push(s); + + chan.push(std::string{"b"}); + chan.emplace("c"); + + // no capacity left + CHECK_FALSE(chan.try_push(s)); + CHECK_FALSE(chan.try_push(std::string{"b"})); + CHECK_FALSE(chan.try_emplace("c")); + + chan.close(); + CHECK(chan.closed()); + + CHECK_EQ(chan.pop(), "a"); + CHECK_EQ(chan.pop(), "b"); + CHECK_EQ(chan.pop(), "c"); + CHECK_EQ(chan.pop(), std::nullopt); + CHECK_EQ(chan.pop(), std::nullopt); + CHECK_EQ(chan.try_pop(), std::nullopt); + CHECK_EQ(chan.try_pop(), std::nullopt); + } + + TEST_CASE("usecase sanity check") { + channel chan{3}; + + std::jthread thrd{[&chan]() { + std::vector ints; + for (int x : chan) { + ints.push_back(x); + } + + CHECK_EQ(ints, std::vector{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); + }}; + + for (int x = 0; x < 10; ++x) { + chan.push(x); + } + chan.close(); // don't forget to close + } + + TEST_CASE("iter") { + channel chan{8}; + chan.emplace("a"); + chan.emplace("b"); + chan.close(); + + std::vector actual; + for (auto x : chan) { + actual.push_back(x); + } + + CHECK_EQ(actual, std::vector{"a", "b"}); + } + + TEST_CASE("closed push") { + channel chan{8}; + chan.close(); + CHECK(chan.closed()); + + std::string const s{"a"}; + CHECK_FALSE(chan.push(s)); + CHECK_FALSE(chan.try_push(s)); + + CHECK_FALSE(chan.push(std::string{"a"})); + CHECK_FALSE(chan.try_push(std::string{"a"})); + CHECK_FALSE(chan.emplace("a")); + + CHECK_EQ(chan.pop(), std::nullopt); + CHECK_EQ(chan.try_pop(), std::nullopt); + } } From 4ebfed1dd5346be42a50f32b3e65d83593a46b6a Mon Sep 17 00:00:00 2001 From: Liss Heidrich <31625940+Clueliss@users.noreply.github.com> Date: Tue, 30 Jul 2024 13:21:16 +0200 Subject: [PATCH 3/4] address review --- include/dice/template-library/channel.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/dice/template-library/channel.hpp b/include/dice/template-library/channel.hpp index 4f15865..7e605c9 100644 --- a/include/dice/template-library/channel.hpp +++ b/include/dice/template-library/channel.hpp @@ -15,7 +15,7 @@ namespace dice::template_library { * A single producer, single consumer channel/queue * @note this can technically be used as a multi producer, single consumer queue, but care must be taken * of when exactly close() is called - * @warning close() must be called once the producing thread is done, otherwise the reading thread will hang indefinitely or SEGFAULT + * @warning close() must be called once the producing thread is done, otherwise the reading thread will hang indefinitely * * @tparam T value type of the channel */ @@ -33,7 +33,7 @@ namespace dice::template_library { size_t max_cap_; ///< maximum allowed number of elements in queue_ std::deque queue_; ///< queue for elements - std::atomic_flag closed_; ///< true if this channel is closed + std::atomic_flag closed_ = ATOMIC_FLAG_INIT; ///< true if this channel is closed std::mutex queue_mutex_; ///< mutex for queue_ std::condition_variable queue_not_empty_; ///< condvar for queue_.size() > 0 std::condition_variable queue_not_full_; ///< condvar for queue_.size() < max_cap_; @@ -66,7 +66,7 @@ namespace dice::template_library { std::lock_guard lock{queue_mutex_}; closed_.test_and_set(std::memory_order_release); } - queue_not_empty_.notify_one(); // notify try_pop so that it does not get stuck + queue_not_empty_.notify_one(); // notify pop() so that it does not get stuck } /** @@ -301,4 +301,4 @@ namespace dice::template_library { } // namespace dice::template_library -#endif // DICE_TEMPLATELIBRARY_MPSCCHANNEL_HPP +#endif // DICE_TEMPLATELIBRARY_CHANNEL_HPP From a63a4640fb51b229047141da4db0d176aa0979d4 Mon Sep 17 00:00:00 2001 From: Liss Heidrich <31625940+Clueliss@users.noreply.github.com> Date: Tue, 30 Jul 2024 13:31:27 +0200 Subject: [PATCH 4/4] fix --- include/dice/template-library/version.hpp | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 include/dice/template-library/version.hpp diff --git a/include/dice/template-library/version.hpp b/include/dice/template-library/version.hpp deleted file mode 100644 index ee2e7bc..0000000 --- a/include/dice/template-library/version.hpp +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef DICE_TEMPLATELIBRARY_VERSION_HPP -#define DICE_TEMPLATELIBRARY_VERSION_HPP - -#include - -namespace dice::template_library { - inline constexpr char name[] = "dice-template-library"; - inline constexpr char version[] = "1.5.1"; - inline constexpr std::array version_tuple = {1, 5, 1}; - inline constexpr int pobr_version = 1; ///< persisted object binary representation version -} // namespace dice::template_library - -#endif // DICE_TEMPLATELIBRARY_VERSION_HPP