diff --git a/include/umappp/Umap.hpp b/include/umappp/Umap.hpp index 14b0945..1ca1a40 100644 --- a/include/umappp/Umap.hpp +++ b/include/umappp/Umap.hpp @@ -160,6 +160,11 @@ class Umap { * See `set_num_threads()`. */ static constexpr int num_threads = 1; + + /** + * See `set_parallel_optimization()`. + */ + static constexpr int parallel_optimization = false; }; private: @@ -180,6 +185,7 @@ class Umap { Float repulsion_strength = Defaults::repulsion_strength; Float learning_rate = Defaults::learning_rate; int nthreads = Defaults::num_threads; + bool parallel_optimization = Defaults::parallel_optimization; }; RuntimeParameters rparams; @@ -359,6 +365,7 @@ class Umap { * @return A reference to this `Umap` object. * * This setting affects nearest neighbor detection (if an existing list of neighbors is not supplied in `initialize()` or `run()`) and spectral initialization. + * If `set_parallel_optimization()` is true, it will also affect the layout optimization, i.e., the gradient descent iterations. * * The `UMAPPP_CUSTOM_PARALLEL` macro can be set to a function that specifies a custom parallelization scheme. * This function should be a template that accept three arguments: @@ -381,6 +388,26 @@ class Umap { return *this; } + /** + * @param p Whether to enable parallel optimization. + * If set to `true`, this will use the number of threads specified in `set_num_threads()` for the layout optimization step. + * + * @return A reference to this `Umap` object. + * + * By default, this is set to `false` as the increase in the number of threads is usually not cost-effective for layout optimization. + * Specifically, while CPU usage scales with the number of threads, the time spent does not decrease by the same factor. + * We also expect that the number of available CPUs is at least equal to the requested number of threads, otherwise contention will greatly degrade performance. + * Nonetheless, users can enable parallel optimization if cost is no issue - usually a higher number of threads (above 4) is required to see a reduction in time. + * + * If the `UMAPPP_NO_PARALLEL_OPTIMIZATION` macro is defined, **umappp** will not be compiled with support for parallel optimization. + * This may be desirable in environments that have no support for threading or atomics, or to reduce the binary size if parallelization is not of interest. + * In such cases, enabling parallel optimization and calling `Status::run()` will raise an error. + */ + Umap& set_parallel_optimization(bool p = Defaults::parallel_optimization) { + rparams.parallel_optimization = p; + return *this; + } + public: /** * @brief Status of the UMAP optimization iterations. @@ -448,17 +475,32 @@ class Umap { * */ void run(int epoch_limit = 0) { - optimize_layout( - ndim_, - embedding_, - epochs, - rparams.a, - rparams.b, - rparams.repulsion_strength, - rparams.learning_rate, - engine, - epoch_limit - ); + if (rparams.nthreads == 1 || !rparams.parallel_optimization) { + optimize_layout( + ndim_, + embedding_, + epochs, + rparams.a, + rparams.b, + rparams.repulsion_strength, + rparams.learning_rate, + engine, + epoch_limit + ); + } else { + optimize_layout_parallel( + ndim_, + embedding_, + epochs, + rparams.a, + rparams.b, + rparams.repulsion_strength, + rparams.learning_rate, + engine, + epoch_limit, + rparams.nthreads + ); + } return; } }; diff --git a/include/umappp/optimize_layout.hpp b/include/umappp/optimize_layout.hpp index f6787bb..733adb5 100644 --- a/include/umappp/optimize_layout.hpp +++ b/include/umappp/optimize_layout.hpp @@ -5,6 +5,10 @@ #include #include #include +#ifndef UMAPPP_NO_PARALLEL_OPTIMIZATION +#include +#include +#endif #include "NeighborList.hpp" #include "aarand/aarand.hpp" @@ -85,27 +89,9 @@ Float clamp(Float input) { return std::min(std::max(input, min_gradient), max_gradient); } -template -void optimize_sample( - size_t i, - int ndim, - Float* embedding, - Float* buffer, - Setup& setup, - Float a, - Float b, - Float gamma, - Float alpha, - Rng& rng, - Float epoch -) { - const auto& head = setup.head; - const auto& tail = setup.tail; - const auto& epochs_per_sample = setup.epochs_per_sample; - auto& epoch_of_next_sample = setup.epoch_of_next_sample; - auto& epoch_of_next_negative_sample = setup.epoch_of_next_negative_sample; - -} +/***************************************************** + ***************** Serial code *********************** + *****************************************************/ template void optimize_layout( @@ -188,6 +174,389 @@ void optimize_layout( return; } +/***************************************************** + **************** Parallel code ********************** + *****************************************************/ + +#ifndef UMAPPP_NO_PARALLEL_OPTIMIZATION +template +struct BusyWaiterThread { +public: + std::vector selections; + std::vector skips; + size_t observation; + Float alpha; + +private: + int ndim; + Float* embedding; + const Setup* setup; + Float a; + Float b; + Float gamma; + + std::vector self_modified; + +private: + std::thread pool; + std::atomic ready = false; + bool finished = false; + bool active = false; + +public: + void run() { + ready.store(true, std::memory_order_release); + } + + void wait() { + while (ready.load(std::memory_order_acquire)) { + ; + } + } + + void migrate_parameters(BusyWaiterThread& src) { + selections.swap(src.selections); + skips.swap(src.skips); + alpha = src.alpha; + observation = src.observation; + } + + void transfer_coordinates() { + std::copy(self_modified.begin(), self_modified.end(), embedding + observation * ndim); + } + +public: + void run_direct() { + auto seIt = selections.begin(); + auto skIt = skips.begin(); + const size_t i = observation; + const size_t start = (i == 0 ? 0 : setup->head[i-1]), end = setup->head[i]; + + // Copying it over into a thread-local buffer to avoid false sharing. + // We don't bother doing this for the neighbors, though, as it's + // tedious to make sure that the modified values are available during negative sampling. + // (This isn't a problem for the self, as the self cannot be its own negative sample.) + { + const Float* left = embedding + i * ndim; + std::copy(left, left + ndim, self_modified.data()); + } + + for (size_t j = start; j < end; ++j) { + if (*(skIt++)) { + continue; + } + + { + Float* left = self_modified.data(); + Float* right = embedding + setup->tail[j] * ndim; + + Float dist2 = quick_squared_distance(left, right, ndim); + const Float pd2b = std::pow(dist2, b); + const Float grad_coef = (-2 * a * b * pd2b) / (dist2 * (a * pd2b + 1.0)); + + for (int d = 0; d < ndim; ++d, ++left, ++right) { + Float gradient = alpha * clamp(grad_coef * (*left - *right)); + *left += gradient; + *right -= gradient; + } + } + + while (seIt != selections.end() && *seIt != -1) { + Float* left = self_modified.data(); + const Float* right = embedding + (*seIt) * ndim; + + Float dist2 = quick_squared_distance(left, right, ndim); + const Float grad_coef = 2 * gamma * b / ((0.001 + dist2) * (a * std::pow(dist2, b) + 1.0)); + + for (int d = 0; d < ndim; ++d, ++left, ++right) { + *left += alpha * clamp(grad_coef * (*left - *right)); + } + ++seIt; + } + ++seIt; // get past the -1. + } + } + +private: + void loop() { + while (true) { + while (!ready.load(std::memory_order_acquire)) { + ; + } + if (finished) { + break; + } + run_direct(); + ready.store(false, std::memory_order_release); + } + } + +public: + BusyWaiterThread() {} + + BusyWaiterThread(int ndim_, Float* embedding_, Setup& setup_, Float a_, Float b_, Float gamma_) : + ndim(ndim_), + embedding(embedding_), + setup(&setup_), + a(a_), + b(b_), + gamma(gamma_), + self_modified(ndim) + {} + + void start() { + active = true; + pool = std::thread(&BusyWaiterThread::loop, this); + } + +public: + ~BusyWaiterThread() { + if (active) { + finished = true; + ready.store(true, std::memory_order_release); + pool.join(); + } + } + + BusyWaiterThread(BusyWaiterThread&&) = default; + BusyWaiterThread& operator=(BusyWaiterThread&&) = default; + + BusyWaiterThread(const BusyWaiterThread& src) : + selections(src.selections), + skips(src.skips), + observation(src.observation), + + ndim(src.ndim), + embedding(src.embedding), + setup(src.setup), + a(src.a), + b(src.b), + gamma(src.gamma), + alpha(src.alpha), + + self_modified(src.self_modified) + {} + + BusyWaiterThread& operator=(const BusyWaiterThread& src) { + selections = src.selections; + skips = src.skips; + observation = src.observation; + + ndim = src.ndim; + embedding = src.embedding; + setup = src.setup; + a = src.a; + b = src.b; + gamma = src.gamma; + alpha = src.alpha; + + self_modified = src.self_modified; + } +}; +#endif + +//#define PRINT false + +template +void optimize_layout_parallel( + int ndim, + Float* embedding, + Setup& setup, + Float a, + Float b, + Float gamma, + Float initial_alpha, + Rng& rng, + int epoch_limit, + int nthreads +) { +#ifndef UMAPPP_NO_PARALLEL_OPTIMIZATION + auto& n = setup.current_epoch; + auto num_epochs = setup.total_epochs; + auto limit_epochs = num_epochs; + if (epoch_limit> 0) { + limit_epochs = std::min(epoch_limit, num_epochs); + } + + const size_t num_obs = setup.head.size(); + std::vector last_touched(num_obs); + std::vector touch_type(num_obs); + + // We run some things directly in this main thread to avoid excessive busy-waiting. + BusyWaiterThread staging(ndim, embedding, setup, a, b, gamma); + + int nthreadsm1 = nthreads - 1; + std::vector > pool; + pool.reserve(nthreadsm1); + for (int t = 0; t < nthreadsm1; ++t) { + pool.emplace_back(ndim, embedding, setup, a, b, gamma); + pool.back().start(); + } + + std::vector jobs_in_progress; + + for (; n < limit_epochs; ++n) { + const Float epoch = n; + const Float alpha = initial_alpha * (1.0 - epoch / num_epochs); + + int base_iteration = 0; + std::fill(last_touched.begin(), last_touched.end(), -1); + + size_t i = 0; + while (i < num_obs) { + bool is_clear = true; +// if (PRINT) { std::cout << "size is " << jobs_in_progress.size() << std::endl; } + + for (int t = jobs_in_progress.size(); t < nthreads && i < num_obs; ++t) { + staging.alpha = alpha; + staging.observation = i; + + // Tapping the RNG here in the serial section. + auto& selections = staging.selections; + selections.clear(); + auto& skips = staging.skips; + skips.clear(); + + const int self_iteration = i; + constexpr unsigned char READONLY = 0; + constexpr unsigned char WRITE = 1; + + { + auto& touched = last_touched[i]; + auto& ttype = touch_type[i]; +// if (PRINT) { std::cout << "SELF: " << i << ": " << touched << " (" << ttype << ")" << std::endl; } + if (touched >= base_iteration) { + is_clear = false; +// if (PRINT) { std::cout << "=== FAILED! ===" << std::endl; } + } + touched = self_iteration; + ttype = WRITE; + } + + const size_t start = (i == 0 ? 0 : setup.head[i-1]), end = setup.head[i]; + for (size_t j = start; j < end; ++j) { + bool skip = setup.epoch_of_next_sample[j] > epoch; + skips.push_back(skip); + if (skip) { + continue; + } + + { + auto neighbor = setup.tail[j]; + auto& touched = last_touched[neighbor]; + auto& ttype = touch_type[neighbor]; +// if (PRINT) { std::cout << "\tNEIGHBOR: " << neighbor << ": " << touched << " (" << ttype << ")" << std::endl; } + if (touched >= base_iteration) { + if (touched != self_iteration) { + is_clear = false; +// if (PRINT) { std::cout << "=== FAILED! ===" << std::endl; } + } + } + touched = self_iteration; + ttype = WRITE; + } + + const size_t num_neg_samples = (epoch - setup.epoch_of_next_negative_sample[j]) * + setup.negative_sample_rate / setup.epochs_per_sample[j]; + + for (size_t p = 0; p < num_neg_samples; ++p) { + size_t sampled = aarand::discrete_uniform(rng, num_obs); + if (sampled == i) { + continue; + } + selections.push_back(sampled); + + auto& touched = last_touched[sampled]; + auto& ttype = touch_type[sampled]; +// if (PRINT) { std::cout << "\t\tSAMPLED: " << sampled << ": " << touched << " (" << ttype << ")" << std::endl; } + if (touched >= base_iteration) { + if (touched != self_iteration) { + if (ttype == WRITE) { + is_clear = false; +// if (PRINT) { std::cout << "=== FAILED! ===" << std::endl; } + } + } + } else { + // Only updating if it wasn't touched by a previous thread in this + // round of thread iterations. + ttype = READONLY; + touched = self_iteration; + } + } + + selections.push_back(-1); + + setup.epoch_of_next_sample[j] += setup.epochs_per_sample[j]; + setup.epoch_of_next_negative_sample[j] = epoch; + } + + if (!is_clear) { + // As we only updated the access for 'sampled' to READONLY + // if they weren't touched by another thread, we need to go + // through and manually update them now that the next round + // of thread_iterations will use 'self_iteration' as the + // 'base_iteration'. This ensures that the flags are properly + // set for the next round, under the expectation that the + // pending thread becomes the first thread. + for (auto s : selections) { + if (s != -1) { + auto& touched = last_touched[s]; + if (touched != self_iteration) { + touched = self_iteration; + touch_type[s] = READONLY; + } + } + } + break; + } + + // Submitting if it's not the final job, otherwise just running it directly. + // This avoids a busy-wait on the main thread that uses up an extra CPU. + if (t < nthreadsm1) { + const int thread_index = i % nthreadsm1; + pool[thread_index].migrate_parameters(staging); + pool[thread_index].run(); + jobs_in_progress.push_back(thread_index); + } else { + staging.run_direct(); + staging.transfer_coordinates(); + } + + ++i; + } + + // Waiting for all the jobs that were submitted. + for (auto job : jobs_in_progress) { + pool[job].wait(); + pool[job].transfer_coordinates(); + } + jobs_in_progress.clear(); + +// if (PRINT) { std::cout << "###################### OK ##########################" << std::endl; } + + base_iteration = i; + if (!is_clear) { + const int thread_index = i % nthreadsm1; + pool[thread_index].migrate_parameters(staging); + pool[thread_index].run(); + jobs_in_progress.push_back(thread_index); + ++i; + } + } + + for (auto job : jobs_in_progress) { + pool[job].wait(); + pool[job].transfer_coordinates(); + } + jobs_in_progress.clear(); + } + + return; +#else + throw std::runtime_error("umappp was not compiled with support for parallel optimization"); +#endif +} + } #endif diff --git a/tests/src/Umap.cpp b/tests/src/Umap.cpp index 4169efb..6a13ef1 100644 --- a/tests/src/Umap.cpp +++ b/tests/src/Umap.cpp @@ -63,7 +63,13 @@ TEST_P(UmapTest, Basic) { EXPECT_EQ(copy, output); // Same results with multiple threads. - runner.set_num_threads(3); + runner.set_num_threads(2); + std::fill(copy.begin(), copy.end(), 0); + runner.run(ndim, nobs, data.data(), ndim, copy.data()); + EXPECT_EQ(copy, output); + + // Same results with multiple threads and parallel optimization enabled. + runner.set_parallel_optimization(true); std::fill(copy.begin(), copy.end(), 0); runner.run(ndim, nobs, data.data(), ndim, copy.data()); EXPECT_EQ(copy, output); diff --git a/tests/src/optimize_layout.cpp b/tests/src/optimize_layout.cpp index ba2f2a9..429cdcb 100644 --- a/tests/src/optimize_layout.cpp +++ b/tests/src/optimize_layout.cpp @@ -96,6 +96,28 @@ TEST_P(OptimizeTest, RestartedRun) { EXPECT_EQ(embedding, embedding2); } +TEST_P(OptimizeTest, ParallelRun) { + assemble(GetParam()); + auto epoch = umappp::similarities_to_epochs(stored, 500, 5.0); + auto epoch2 = epoch; + + std::vector embedding(data); + { + std::mt19937_64 rng(100); + umappp::optimize_layout<>(5, embedding.data(), epoch, 2.0, 1.0, 1.0, 1.0, rng, 0); + } + + // Trying with two threads. + std::vector embedding2(data); + { + std::mt19937_64 rng(100); + umappp::optimize_layout_parallel<>(5, embedding2.data(), epoch2, 2.0, 1.0, 1.0, 1.0, rng, 0, 2); + } + + EXPECT_NE(data, embedding); // some kind of change happened! + EXPECT_EQ(embedding, embedding2); +} + INSTANTIATE_TEST_SUITE_P( Optimize, OptimizeTest,