-
Notifications
You must be signed in to change notification settings - Fork 0
/
WorkQue.h
80 lines (75 loc) · 2.98 KB
/
WorkQue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#pragma once
#include "pch.h"
class WorkQue {
using size_type = size_t;
std::vector<std::function<void()>> tasks;
std::function<void()> remainder;
size_type number_of_threads = std::min(SUGGESTED_THREAD_COUNT, std::thread::hardware_concurrency());
size_type _partition_size = 0;
template<class Callable>
void schedule(Callable process, size_type start, size_type end, size_type part) {
assert(start < end && part >= 0 && part <= tasks.size());
tasks.emplace_back([process, start, end, part]() noexcept {
for (auto i = start; i < end; ++i) {
std::invoke(process, part, i);
}
});
}
template<class Callable>
void schedule_remainder(Callable process, size_type start, size_type end, size_type part) {
assert(start < end && part >= 0 && part <= tasks.size());
remainder = [process, start, end, part]() noexcept {
for (auto i = start; i < end; ++i) {
std::invoke(process, part, i);
}
};
}
public:
WorkQue() noexcept = default;
explicit WorkQue(size_type numberOfThreads) noexcept : number_of_threads(numberOfThreads) {}
constexpr size_type calculate_partition_size(size_type itemsToProcess) noexcept {
_partition_size = itemsToProcess / thread_count();
return _partition_size;
}
constexpr size_type partition_size() const noexcept {
return _partition_size;
}
constexpr size_type thread_count() const noexcept {
return number_of_threads;
}
void run_in_parallel() const noexcept {
std::for_each(std::execution::par, tasks.begin(), tasks.end(),
[](const auto& task) { std::invoke(task); }
);
if (remainder) {
std::invoke(remainder);
}
}
void run_sequentially() const noexcept {
std::ranges::for_each(tasks,
[](const auto& task) { std::invoke(task); }
);
if (remainder) {
std::invoke(remainder);
}
}
template<class Callable>
void schedule(size_type itemsToProcess, Callable process) {
const size_type partition_size = calculate_partition_size(itemsToProcess);
size_type end = 0;
for (size_type part = 0; partition_size && part < thread_count(); ++part) {
const auto start = part * partition_size;
end = (part + 1) * partition_size;
schedule(process, start, end, part);
}
if (end < itemsToProcess) {
size_type part = tasks.empty() ? 0 : tasks.size()-1;
schedule_remainder(process, end, itemsToProcess, part);
}
}
void clear() noexcept {
tasks.clear();
remainder = {};
calculate_partition_size(0);
}
};