Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: channel #47

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ It contains:
- `flex_array`: A combination of `std::array` and `std::span`
- `tuple_algorithms`: Some algorithms for iterating tuples
- `generator`: The reference implementation of `std::generator` from [P2502R2](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2502r2.pdf)
- `channel`: A single producer, single consumer queue

## Usage

Expand Down Expand Up @@ -82,6 +83,10 @@ If you want this generator to serve as a drop in replacement for `std::generator
use `#define DICE_TEMPLATELIBRARY_GENERATOR_STD_COMPAT 1` before including the generator header. That will export
all generator related things under namespace `std::`.

### `channel`
A single producer, single consume queue. This can be used to communicate between threads in a more high level
fashion than a mutex+container would allow.

### Further Examples

Compilable code examples can be found in [examples](./examples). The example build requires the cmake
Expand Down
7 changes: 7 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,10 @@ target_link_libraries(example_generator
PRIVATE
dice-template-library::dice-template-library
)

add_executable(example_channel
example_channel.cpp)
target_link_libraries(example_channel
PRIVATE
dice-template-library::dice-template-library
)
26 changes: 26 additions & 0 deletions examples/example_channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include <dice/template-library/channel.hpp>

#include <algorithm>
#include <iostream>
#include <thread>
#include <vector>


int main() {
dice::template_library::channel<int> chan{8};

std::jthread thrd{[&chan]() {
std::vector<int> ints;
for (int x : chan) {
ints.push_back(x);
std::cout << x << ' ';
}

assert(std::ranges::equal(ints, std::vector<int>{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
}};

for (int x = 0; x < 10; ++x) {
chan.push(x);
}
chan.close(); // don't forget to close
}
304 changes: 304 additions & 0 deletions include/dice/template-library/channel.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
#ifndef DICE_TEMPLATELIBRARY_CHANNEL_HPP
#define DICE_TEMPLATELIBRARY_CHANNEL_HPP

#include <atomic>
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <mutex>
#include <optional>

namespace dice::template_library {

/**
* A single producer, single consumer channel/queue
* @note this can technically be used as a multi producer, single consumer queue, but care must be taken
* of when exactly close() is called
* @warning close() must be called once the producing thread is done, otherwise the reading thread will hang indefinitely or SEGFAULT
liss-h marked this conversation as resolved.
Show resolved Hide resolved
*
* @tparam T value type of the channel
*/
template<typename T>
struct channel {
using value_type = T;
using size_type = size_t;
using difference_type = std::ptrdiff_t;
using reference = value_type &;
using const_reference = value_type const &;
using pointer = T *;
using const_pointer = T const *;

private:
size_t max_cap_; ///< maximum allowed number of elements in queue_
liss-h marked this conversation as resolved.
Show resolved Hide resolved
std::deque<T> queue_; ///< queue for elements
liss-h marked this conversation as resolved.
Show resolved Hide resolved

std::atomic_flag closed_; ///< true if this channel is closed
std::mutex queue_mutex_; ///< mutex for queue_
std::condition_variable queue_not_empty_; ///< condvar for queue_.size() > 0
std::condition_variable queue_not_full_; ///< condvar for queue_.size() < max_cap_;

public:
explicit channel(size_t capacity) : max_cap_{capacity} {
// note: for some reason std::deque does not have .reserve()
// TODO: it would probably make sense to implement a VecDeque for this (https://doc.rust-lang.org/std/collections/struct.VecDeque.html)
liss-h marked this conversation as resolved.
Show resolved Hide resolved
}

// there is no way to safely implement these with concurrent access
channel(channel const &other) = delete;
channel(channel &&other) = delete;
channel &operator=(channel const &other) = delete;
channel &operator=(channel &&other) noexcept = delete;

~channel() noexcept = default;

/**
* Close the channel.
* After calling close calls to push() will return false
* and calls to try_pop will return std::nullopt once the already present elements are exhausted
*/
void close() noexcept {
{
// "Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread."
// - https://en.cppreference.com/w/cpp/thread/condition_variable
//
// Here closed_ is the shared variable used by queue_not_empty_ in another thread (the one waiting in try_pop)
std::lock_guard lock{queue_mutex_};
closed_.test_and_set(std::memory_order_release);
liss-h marked this conversation as resolved.
Show resolved Hide resolved
}
queue_not_empty_.notify_one(); // notify try_pop so that it does not get stuck
liss-h marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @return true if this channel is closed
*/
[[nodiscard]] bool closed() const noexcept {
return closed_.test(std::memory_order_acquire);
}

/**
* Emplace an element into the channel, blocks if there is no capacity left in the channel.
*
* @param args constructor args
* @return true if emplacing the element succeeded because the channel is not yet closed
*/
template<typename ...Args>
bool emplace(Args &&...args) noexcept(std::is_nothrow_constructible_v<value_type, decltype(std::forward<Args>(args))...>) {
if (closed_.test(std::memory_order_acquire)) [[unlikely]] {
return false;
}

{
std::unique_lock lock{queue_mutex_};
queue_not_full_.wait(lock, [this]() noexcept { return queue_.size() < max_cap_; });
queue_.emplace_back(std::forward<Args>(args)...);
}

queue_not_empty_.notify_one();
return true;
}

/**
* Emplace an element into the channel, returns immediately if there is no capacity in the channel.
*
* @param args constructor args
* @return true if emplacing the element succeeded
*/
template<typename ...Args>
bool try_emplace(Args &&...args) noexcept(std::is_nothrow_constructible_v<value_type, decltype(std::forward<Args>(args))...>) {
if (closed_.test(std::memory_order_acquire)) [[unlikely]] {
return false;
}

{
std::unique_lock lock{queue_mutex_};
if (queue_.size() >= max_cap_) {
return false;
}

queue_.emplace_back(std::forward<Args>(args)...);
}

queue_not_empty_.notify_one();
return true;
}

/**
* Push a single element into the channel, blocks if there is no capacity left in the channel.
*
* @param value the element to push
* @return true if pushing the element succeeded because the channel is not yet closed
*/
bool push(value_type const &value) noexcept(std::is_nothrow_copy_constructible_v<value_type>) {
return emplace(value);
}

/**
* Push a single element into the channel, returns immediately if there is no capacity in the channel.
*
* @param value the element to push
* @return true if pushing the element succeeded because the channel is not yet closed
*/
bool try_push(value_type const &value) noexcept(std::is_nothrow_copy_constructible_v<value_type>) {
return try_emplace(value);
}

/**
* Push a single element into the channel, blocks if there is no capacity left in the channel.
*
* @param value the element to push
* @return true if pushing the element succeeded because the channel is not yet closed
*/
bool push(value_type &&value) noexcept(std::is_nothrow_move_constructible_v<value_type>) {
return emplace(std::move(value));
}

/**
* Push a single element into the channel, returns immediately if there is no capacity in the channel.
*
* @param value the element to push
* @return true if pushing the element succeeded because the channel is not yet closed
*/
bool try_push(value_type &&value) noexcept(std::is_nothrow_move_constructible_v<value_type>) {
return try_emplace(std::move(value));
}

/**
* Try to get a (previously pushed) element from the channel.
* If there is no element available, blocks until there is one available or the channel is closed.
*
* @return std::nullopt if the channel was closed, an element otherwise
*/
[[nodiscard]] std::optional<value_type> pop() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
std::unique_lock lock{queue_mutex_};
queue_not_empty_.wait(lock, [this]() noexcept { return !queue_.empty() || closed_.test(std::memory_order_acquire); });

if (queue_.empty()) [[unlikely]] {
// implies closed_ == true
return std::nullopt;
}

auto ret = std::move(queue_.front());
queue_.pop_front();

lock.unlock();
queue_not_full_.notify_one();
return ret;
}

/**
* Try to get a (previously pushed) element from the channel.
* Unlike pop(), if there is no element available, returns std::nullopt immediatly.
*/
[[nodiscard]] std::optional<value_type> try_pop() noexcept(std::is_nothrow_move_constructible_v<value_type>) {
std::unique_lock lock{queue_mutex_};
if (queue_.empty()) {
return std::nullopt;
}

auto ret = std::move(queue_.front());
queue_.pop_front();

lock.unlock();
queue_not_full_.notify_one();
return ret;
}

struct iterator {
using channel_type = channel;
using value_type = T;
using difference_type = std::ptrdiff_t;
using reference = T &;
using const_reference = T const &;
using pointer = typename channel_type::pointer;
using const_pointer = typename channel_type::const_pointer;
using iterator_category = std::input_iterator_tag;

private:
channel_type *chan_;
mutable std::optional<value_type> buf_; ///< this has to be mutable for this iterator to fullfill std::input_iterator

void advance() noexcept {
buf_ = chan_->pop();
}

public:
explicit iterator(channel_type *chan) noexcept : chan_{chan} {
advance();
}

iterator(iterator const &other) noexcept(std::is_nothrow_copy_constructible_v<value_type>)
: chan_{other.chan_},
buf_{other.buf_} {
}

iterator &operator=(iterator const &other) noexcept(std::is_nothrow_copy_assignable_v<value_type>) {
if (this == &other) {
return *this;
}

chan_ = other.chan_;
buf_ = other.buf_;
return *this;
}

iterator(iterator &&other) noexcept(std::is_nothrow_move_constructible_v<value_type>)
: chan_{other.chan_},
buf_{std::move(other.buf_)} {
}

iterator &operator=(iterator &&other) noexcept(std::is_nothrow_swappable_v<value_type>) {
assert(this != &other);
std::swap(chan_, other.chan_);
std::swap(buf_, other.buf_);
return *this;
}

reference operator*() noexcept {
return *buf_;
}

reference operator*() const noexcept {
return *buf_;
}

pointer operator->() noexcept {
return &*buf_;
}

pointer operator->() const noexcept {
return &*buf_;
}

iterator &operator++() noexcept {
advance();
return *this;
}

void operator++(int) noexcept {
advance();
}

bool operator==(std::default_sentinel_t) const noexcept {
return !buf_.has_value();
}
};

using sentinel = std::default_sentinel_t;

/**
* @return an iterator over all present and future elements of this channel
* @note iterator == end() is true once the channel is closed
*/
[[nodiscard]] iterator begin() noexcept {
return iterator{this};
}

[[nodiscard]] sentinel end() const noexcept {
return std::default_sentinel;
}
};

} // namespace dice::template_library

#endif // DICE_TEMPLATELIBRARY_MPSCCHANNEL_HPP
liss-h marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 13 additions & 0 deletions include/dice/template-library/version.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#ifndef DICE_TEMPLATELIBRARY_VERSION_HPP
#define DICE_TEMPLATELIBRARY_VERSION_HPP

#include <array>

namespace dice::template_library {
inline constexpr char name[] = "dice-template-library";
inline constexpr char version[] = "1.5.1";
inline constexpr std::array<int, 3> version_tuple = {1, 5, 1};
inline constexpr int pobr_version = 1; ///< persisted object binary representation version
} // namespace dice::template_library

#endif // DICE_TEMPLATELIBRARY_VERSION_HPP
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ custom_add_test(tests_flex_array)

add_executable(tests_generator tests_generator.cpp)
custom_add_test(tests_generator)

add_executable(tests_channel tests_channel.cpp)
custom_add_test(tests_channel)
Loading
Loading