Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] generalize tbb's thread_pool_base #1403

Merged
merged 13 commits into from
Oct 9, 2024
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ compile_commands.json
/*.cpp
/*.patch
/*.diff
/include/execpools/asio/asio_config.hpp
callgrind.*
*.pbf
*.o
a.out
*.code-workspace
*.code-workspace
88 changes: 83 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -330,18 +330,96 @@ if (STDEXEC_ENABLE_TBB)
INSTALL_EXPORT_SET stdexec-exports
)

file(GLOB_RECURSE tbbexec_sources include/tbbexec/*.hpp)
add_library(tbbexec INTERFACE ${tbbexec_sources})
list(APPEND stdexec_export_targets tbbexec)
add_library(STDEXEC::tbbexec ALIAS tbbexec)
file(GLOB_RECURSE tbbpool_sources include/execpools/tbb/*.hpp)
add_library(tbbpool INTERFACE ${tbbpool_sources})
list(APPEND stdexec_export_targets tbbpool)
add_library(STDEXEC::tbbpool ALIAS tbbpool)

target_link_libraries(tbbexec
target_link_libraries(tbbpool
INTERFACE
STDEXEC::stdexec
TBB::tbb
)
endif()

option(STDEXEC_ENABLE_TASKFLOW "Enable TaskFlow targets" OFF)

if(STDEXEC_ENABLE_TASKFLOW)
include(rapids-find)
rapids_cpm_find(Taskflow 3.7.0
CPM_ARGS
GITHUB_REPOSITORY taskflow/taskflow
GIT_TAG v3.7.0
)
file(GLOB_RECURSE taskflow_pool include/execpools/taskflow/*.hpp)
add_library(taskflow_pool INTERFACE ${taskflowexec_sources})
list(APPEND stdexec_export_targets taskflow_pool)
add_library(STDEXEC::taskflow_pool ALIAS taskflow_pool)

target_link_libraries(taskflow_pool
INTERFACE
STDEXEC::stdexec
Taskflow
)
endif()

option(STDEXEC_ENABLE_ASIO "Enable Boost targets" OFF)
set(STDEXEC_ASIO_IMPLEMENTATION "boost" CACHE STRING "boost")
set_property(CACHE STDEXEC_ASIO_IMPLEMENTATION PROPERTY STRINGS boost standalone)

if(STDEXEC_ENABLE_ASIO)
set(STDEXEC_ASIO_USES_ASIO FALSE)
set(STDEXEC_ASIO_USES_STANDALONE FALSE)

include(rapids-find)
if(${STDEXEC_ASIO_IMPLEMENTATION} STREQUAL "boost")
set(STDEXEC_ASIO_USES_BOOST TRUE)
elseif(${STDEXEC_ASIO_IMPLEMENTATION} STREQUAL "standalone")
set(STDEXEC_ASIO_USES_STANDALONE TRUE)
else()
message(FATAL_ERROR "Unknown configuration for ASIO implementation: " ${STDEXEC_ASIO_IMPLEMENTATION})
endif()

file(GLOB_RECURSE boost_pool_sources include/execpools/asio/*.hpp)
set(STDEXEC_ASIO_CONFIG_FILE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/include/execpools/asio)
configure_file(include/execpools/asio/asio_config.hpp.in ${STDEXEC_ASIO_CONFIG_FILE_PATH}/asio_config.hpp)

if(${STDEXEC_ASIO_USES_BOOST})
set(BOOST_ENABLE_COMPATIBILITY_TARGETS TRUE)
rapids_cpm_find(Boost 1.86.0
CPM_ARGS
GITHUB_REPOSITORY boostorg/boost
GIT_TAG boost-1.86.0
)
add_library(stdexec_boost_pool INTERFACE ${boost_pool_sources})
list(APPEND stdexec_export_targets stdexec_boost_pool)
add_library(STDEXEC::asio_pool ALIAS stdexec_boost_pool)

target_link_libraries(stdexec_boost_pool
INTERFACE
STDEXEC::stdexec
Boost::boost
)
elseif(${STDEXEC_ASIO_USES_STANDALONE})
include(cmake/import_standalone_asio.cmake)
import_standalone_asio(
TAG "asio-1-31-0"
VERSION "1.31.0")

add_library(stdexec_asio_pool INTERFACE ${boost_pool_sources})
list(APPEND stdexec_export_targets stdexec_asio_pool)
add_library(STDEXEC::asio_pool ALIAS stdexec_asio_pool)

target_link_libraries(stdexec_asio_pool
INTERFACE
STDEXEC::stdexec
asio
)
else()
message(FATAL_ERROR "ASIO implementation is not configured")
endif()
endif()

include(CheckIncludeFileCXX)
if (CMAKE_SYSTEM_NAME STREQUAL "Darwin")
CHECK_INCLUDE_FILE_CXX("dispatch/dispatch.h" STDEXEC_FOUND_LIBDISPATCH)
Expand Down
63 changes: 63 additions & 0 deletions cmake/import_standalone_asio.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# - This function imports the standalone version of ASIO
#
# Importing standalone asio can't be done via rapids-cpm, because the library has no cmake
# build setup. But still it can be imported with CPM.
#
# This function is based on the CPM example: https://github.com/cpm-cmake/CPM.cmake/blob/master/examples/asio-standalone/CMakeLists.txt
#
# import_standalone_asio([TAG github-tag] VERSION [version-stirng])
#
function(import_standalone_asio)
set(options "")
set(args TAG VERSION)
set(multi_args "")
cmake_parse_arguments(IMPORT_STANDALONE_ASIO "${options}" "${args}" "${multi_args}")

CPMAddPackage("gh:chriskohlhoff/asio#${IMPORT_STANDALONE_ASIO_TAG}@${IMPORT_STANDALONE_ASIO_VERSION}")

# ASIO doesn't use CMake, we have to configure it manually. Extra notes for using on Windows:
#
# 1) If _WIN32_WINNT is not set, ASIO assumes _WIN32_WINNT=0x0501, i.e. Windows XP target, which is
# definitely not the platform which most users target.
#
# 2) WIN32_LEAN_AND_MEAN is defined to make Winsock2 work.
if(asio_ADDED)
add_library(asio INTERFACE)

target_include_directories(asio SYSTEM INTERFACE ${asio_SOURCE_DIR}/asio/include)

target_compile_definitions(asio INTERFACE ASIO_STANDALONE ASIO_NO_DEPRECATED)

target_link_libraries(asio INTERFACE Threads::Threads)

if(WIN32)
# macro see @ https://stackoverflow.com/a/40217291/1746503
macro(get_win32_winnt version)
if(CMAKE_SYSTEM_VERSION)
set(ver ${CMAKE_SYSTEM_VERSION})
string(REGEX MATCH "^([0-9]+).([0-9])" ver ${ver})
string(REGEX MATCH "^([0-9]+)" verMajor ${ver})
# Check for Windows 10, b/c we'll need to convert to hex 'A'.
if("${verMajor}" MATCHES "10")
set(verMajor "A")
string(REGEX REPLACE "^([0-9]+)" ${verMajor} ver ${ver})
endif("${verMajor}" MATCHES "10")
# Remove all remaining '.' characters.
string(REPLACE "." "" ver ${ver})
# Prepend each digit with a zero.
string(REGEX REPLACE "([0-9A-Z])" "0\\1" ver ${ver})
set(${version} "0x${ver}")
endif()
endmacro()

if(NOT DEFINED _WIN32_WINNT)
get_win32_winnt(ver)
set(_WIN32_WINNT ${ver})
endif()

message(STATUS "Set _WIN32_WINNET=${_WIN32_WINNT}")

target_compile_definitions(asio INTERFACE _WIN32_WINNT=${_WIN32_WINNT} WIN32_LEAN_AND_MEAN)
endif()
endif()
endfunction()
18 changes: 14 additions & 4 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,21 @@ endif()

if (STDEXEC_ENABLE_TBB)
add_executable(example.benchmark.tbb_thread_pool benchmark/tbb_thread_pool.cpp)
target_link_libraries(example.benchmark.tbb_thread_pool PRIVATE STDEXEC::tbbexec)
target_link_libraries(example.benchmark.tbb_thread_pool PRIVATE STDEXEC::tbbpool)

add_executable(example.benchmark.tbb_thread_pool_nested benchmark/tbb_thread_pool_nested.cpp)
target_link_libraries(example.benchmark.tbb_thread_pool_nested PRIVATE STDEXEC::tbbexec)
target_link_libraries(example.benchmark.tbb_thread_pool_nested PRIVATE STDEXEC::tbbpool)

add_executable(example.benchmark.fibonacci benchmark/fibonacci.cpp)
target_link_libraries(example.benchmark.fibonacci PRIVATE STDEXEC::tbbexec)
endif()
target_link_libraries(example.benchmark.fibonacci PRIVATE STDEXEC::tbbpool)
endif()

if(STDEXEC_ENABLE_TASKFLOW)
add_executable(example.benchmark.taskflow_thread_pool benchmark/taskflow_thread_pool.cpp)
target_link_libraries(example.benchmark.taskflow_thread_pool PRIVATE STDEXEC::taskflow_pool)
endif()

if(STDEXEC_ENABLE_ASIO)
add_executable(example.benchmark.asio_thread_pool benchmark/asio_thread_pool.cpp)
target_link_libraries(example.benchmark.asio_thread_pool PRIVATE STDEXEC::asio_pool)
endif()
89 changes: 89 additions & 0 deletions examples/benchmark/asio_thread_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2023 Maikel Nadolski
* Copyright (c) 2023 NVIDIA Corporation
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "./common.hpp"
#include <execpools/asio/asio_thread_pool.hpp>

struct RunThread {
void operator()(
execpools::asio_thread_pool& pool,
std::size_t total_scheds,
std::size_t tid,
std::barrier<>& barrier,
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop,
exec::numa_policy numa) {
int numa_node = numa.thread_index_to_node(tid);
numa.bind_to_node(numa_node);
auto scheduler = pool.get_scheduler();
std::mutex mut;
std::condition_variable cv;
while (true) {
barrier.arrive_and_wait();
if (stop.load()) {
break;
}
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
pmr::monotonic_buffer_resource resource{
buffer.data(), buffer.size(), pmr::null_memory_resource()};
pmr::polymorphic_allocator<char> alloc(&resource);
auto [start, end] = exec::_pool_::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
auto env = exec::make_env(stdexec::prop{stdexec::get_allocator, alloc});
while (scheds) {
stdexec::start_detached( //
stdexec::schedule(scheduler) //
| stdexec::then([&] {
auto prev = counter.fetch_sub(1);
if (prev == 1) {
std::lock_guard lock{mut};
cv.notify_one();
}
}),
env);
--scheds;
}
#else
auto [start, end] = exec::_pool_::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
while (scheds) {
stdexec::start_detached( //
stdexec::schedule(scheduler) //
| stdexec::then([&] {
auto prev = counter.fetch_sub(1);
if (prev == 1) {
std::lock_guard lock{mut};
cv.notify_one();
}
}));
--scheds;
}
#endif
std::unique_lock lock{mut};
cv.wait(lock, [&] { return counter.load() == 0; });
lock.unlock();
barrier.arrive_and_wait();
}
}
};

int main(int argc, char** argv) {
my_main<execpools::asio_thread_pool, RunThread>(argc, argv);
}
6 changes: 3 additions & 3 deletions examples/benchmark/fibonacci.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <cstdlib>
#include <iostream>

#include <tbbexec/tbb_thread_pool.hpp>
#include <execpools/tbb/tbb_thread_pool.hpp>
#include <exec/static_thread_pool.hpp>

#include <exec/any_sender_of.hpp>
Expand Down Expand Up @@ -100,10 +100,10 @@ int main(int argc, char** argv) {
return -1;
}

std::variant<tbbexec::tbb_thread_pool, exec::static_thread_pool> pool;
std::variant<execpools::tbb_thread_pool, exec::static_thread_pool> pool;

if (argv[4] == std::string_view("tbb")) {
pool.emplace<tbbexec::tbb_thread_pool>(static_cast<int>(std::thread::hardware_concurrency()));
pool.emplace<execpools::tbb_thread_pool>(static_cast<int>(std::thread::hardware_concurrency()));
} else {
pool.emplace<exec::static_thread_pool>(
std::thread::hardware_concurrency(), exec::bwos_params{}, exec::get_numa_policy());
Expand Down
Loading