Skip to content

Commit

Permalink
Fixed HPX barrier synchronization (#783)
Browse files Browse the repository at this point in the history
Details:
- Fixed hpx barrier synchronization. HPX was hanging on larger cores
  because blis was using non-hpx synchronization primitives. But when
  using hpx-runtime only hpx-synchronization primitives should be used.
  Hence, a C style wrapper hpx_barrier_t is introduced to perform hpx
  barrier operations.
- Replaced hpx::for_loop with hpx::futures. Using hpx::for_loop with
  hpx::barrier on n_threads greater than actual hardware thread count
  causes synchronization issues making hpx hanging. This can be avoided
  by using hpx::futures, which are relatively very lightweight, robust
  and scalable.
  • Loading branch information
srinivasyadav18 authored Oct 14, 2023
1 parent 8fff1e3 commit 7a87e57
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 39 deletions.
15 changes: 12 additions & 3 deletions frame/thread/bli_thrcomm.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ typedef struct barrier_s barrier_t;
#endif
#endif

// Define hpx_barrier_t, which is specific to the barrier used in HPX
// implementation. This needs to be done first since it is (potentially)
// used within the definition of thrcomm_t below.

#ifdef BLIS_ENABLE_HPX
typedef struct hpx_barrier_t
{
void* handle;
} hpx_barrier_t;
#endif

// Define the thrcomm_t structure, which will be common to all threading
// implementations.

Expand Down Expand Up @@ -124,9 +135,7 @@ typedef struct thrcomm_s
// -- Fields specific to HPX --

#ifdef BLIS_ENABLE_HPX
#ifdef BLIS_USE_HPX_BARRIER
hpx::barrier<> * barrier;
#endif
hpx_barrier_t barrier;
#endif

} thrcomm_t;
Expand Down
53 changes: 24 additions & 29 deletions frame/thread/bli_thrcomm_hpx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,36 @@

#ifdef BLIS_ENABLE_HPX

#include <hpx/synchronization/barrier.hpp>
extern "C" {

#ifdef BLIS_USE_HPX_BARRIER

// Define the pthread_barrier_t implementations of the init, cleanup, and
// barrier functions.

void bli_thrcomm_init_hpx( dim_t n_threads, thrcomm_t* comm )
void hpx_barrier_init( hpx_barrier_t* barrier, dim_t n_threads )
{
if ( comm == nullptr ) return;

//comm->sent_object = nullptr;
//comm->n_threads = n_threads;
comm->ti = BLIS_HPX;
//comm->barrier_sense = 0;
//comm->barrier_threads_arrived = 0;

comm->barrier = new hpx:barrier<>();
if ( barrier == nullptr ) return;
barrier->handle = new hpx::barrier<>( n_threads );
}

void bli_thrcomm_cleanup_hpx( thrcomm_t* comm )
void hpx_barrier_destroy( hpx_barrier_t* barrier )
{
if ( comm == nullptr ) return;
if ( barrier == nullptr ) return;

delete comm->barrier;
}
auto* barrier_ = reinterpret_cast<hpx::barrier<>*>( barrier->handle );
barrier->handle = nullptr;

void bli_thrcomm_barrier( dim_t t_id, thrcomm_t* comm )
{
comm->barrier->arrive_and_wait();
delete barrier_;
}

#else
void hpx_barrier_arrive_and_wait( hpx_barrier_t* barrier )
{
if ( barrier == nullptr ) return;
auto* barrier_ = reinterpret_cast<hpx::barrier<>*>( barrier->handle );

// Define the non-hpx::barrier implementations of the init, cleanup,
// and barrier functions. These are the default unless the hpx::barrier
// versions are requested at compile-time.
if ( barrier_ == nullptr ) return;
barrier_->arrive_and_wait();
}

void bli_thrcomm_init_hpx( dim_t n_threads, thrcomm_t* comm )
{
Expand All @@ -81,22 +74,24 @@ void bli_thrcomm_init_hpx( dim_t n_threads, thrcomm_t* comm )
comm->sent_object = nullptr;
comm->n_threads = n_threads;
comm->ti = BLIS_HPX;
comm->barrier_sense = 0;
comm->barrier_threads_arrived = 0;
// comm->barrier_sense = 0;
// comm->barrier_threads_arrived = 0;

hpx_barrier_init( &comm->barrier, n_threads );
}

void bli_thrcomm_cleanup_hpx( thrcomm_t* comm )
{
if ( comm == nullptr ) return;
hpx_barrier_destroy( &comm->barrier );
}

void bli_thrcomm_barrier_hpx( dim_t t_id, thrcomm_t* comm )
{
bli_thrcomm_barrier_atomic( t_id, comm );
hpx_barrier_arrive_and_wait( &comm->barrier );
}

} // extern "C"

#endif
}

#endif

20 changes: 13 additions & 7 deletions frame/thread/bli_thread_hpx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,22 @@ void bli_thread_launch_hpx
// Allocate a global communicator for the root thrinfo_t structures.
pool_t* gl_comm_pool = nullptr;
thrcomm_t* gl_comm = bli_thrcomm_create( ti, gl_comm_pool, n_threads );

// Execute func on hpx-runtime with n_threads.
hpx::threads::run_as_hpx_thread([&]()
{
hpx::execution::experimental::num_cores num_cores_(n_threads);
hpx::execution::static_chunk_size chunk_size_(1);
hpx::experimental::for_loop(
hpx::execution::par.with(num_cores_).with(chunk_size_), 0, n_threads,
[&gl_comm, &func, &params](const dim_t tid)
std::vector<hpx::future<void>> futures;
futures.reserve(n_threads);

for (dim_t tid = 0; tid < n_threads; ++tid)
{
func( gl_comm, tid, params );
});
futures.push_back(hpx::async([tid, &gl_comm, &func, &params]()
{
func( gl_comm, tid, params );
}));
}

hpx::wait_all(futures);
});

// Free the global communicator, because the root thrinfo_t node
Expand Down

0 comments on commit 7a87e57

Please sign in to comment.