Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using atomic counters on shared containers #473

Merged
merged 13 commits into from
May 16, 2023
27 changes: 27 additions & 0 deletions .github/workflows/ubuntu-sani-thread-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Ubuntu-Sanitized-CI

'on':
- push
- pull_request

permissions:
contents: read

jobs:
ci:
name: ubuntu-gcc
runs-on: ubuntu-latest

env:
CC: gcc
CXX: g++

steps:
- uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
- name: Build and Test
run: |
mkdir build
cd build
cmake -DROARING_SANITIZE_THREADS=ON ..
cmake --build .
ctest . --output-on-failure
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ option(ROARING_BUILD_LTO "Build library with Link Time Optimization" OFF)
option(ROARING_BUILD_C_AS_CPP "Build library C files using C++ compilation" OFF)
option(ROARING_BUILD_C_TESTS_AS_CPP "Build test C files using C++ compilation" OFF)
option(ROARING_SANITIZE "Sanitize addresses" OFF)
option(ROARING_SANITIZE_THREADS "Sanitize threads" OFF)

option(ENABLE_ROARING_TESTS "If OFF, disable unit tests altogether" ON)

set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/tools/cmake")
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,15 @@ Our AVX-512 code is only enabled on recent hardware (Intel Ice Lake or better an

Like, for example, STL containers or Java's default data structures, the CRoaring library has no built-in thread support. Thus whenever you modify a bitmap in one thread, it is unsafe to query it in others. It is safe however to query bitmaps (without modifying them) from several distinct threads, as long as you do not use the copy-on-write attribute. For example, you can safely copy a bitmap and use both copies in concurrently. One should probably avoid the use of the copy-on-write attribute in a threaded environment.

Some of our users rely on "copy-on-write" (default to disabled). A bitmap with the copy-on-write flag
set to true might generate shared containers. A shared container is just a reference to a single
container with reference counting (we keep track of the number of shallow copies). If you copy shared
containers over several threads, this might be unsafe due to the need to update the counter concurrently.
Thus for shared containers, we use reference counting with an atomic counter. If the library is compiled
as a C library (the default), we use C11 atomics. Unfortunately, Visual Studio does not support C11
atomics at this times (though this is subject to change). To compensate, we
use Windows-specific code in such instances (`_InterlockedDecrement` `_InterlockedIncrement`).


# How to best aggregate bitmaps?

Expand Down
3 changes: 1 addition & 2 deletions include/roaring/containers/containers.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ extern "C" { namespace roaring { namespace internal {
* A shared container is a wrapper around a container
* with reference counting.
*/

STRUCT_CONTAINER(shared_container_s) {
container_t *container;
uint8_t typecode;
uint32_t counter; // to be managed atomically
croaring_refcount_t counter; // to be managed atomically
};

typedef struct shared_container_s shared_container_t;
Expand Down
140 changes: 134 additions & 6 deletions include/roaring/portability.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@
#define CROARING_REGULAR_VISUAL_STUDIO 1
#endif // __clang__
#endif // _MSC_VER
#ifndef CROARING_VISUAL_STUDIO
#define CROARING_VISUAL_STUDIO 0
#endif
#ifndef CROARING_CLANG_VISUAL_STUDIO
#define CROARING_CLANG_VISUAL_STUDIO 0
#endif
#ifndef CROARING_REGULAR_VISUAL_STUDIO
#define CROARING_REGULAR_VISUAL_STUDIO 0
#endif

#if defined(_POSIX_C_SOURCE) && (_POSIX_C_SOURCE < 200809L)
#undef _POSIX_C_SOURCE
Expand All @@ -61,11 +70,6 @@
extern "C" { // portability definitions are in global scope, not a namespace
#endif

#if CROARING_REGULAR_VISUAL_STUDIO && !defined(_WIN64) && !defined(CROARING_ACK_32BIT)
#pragma message( \
"You appear to be attempting a 32-bit build under Visual Studio. We recommend a 64-bit build instead.")
#endif

#if defined(__SIZEOF_LONG_LONG__) && __SIZEOF_LONG_LONG__ != 8
#error This code assumes 64-bit long longs (by use of the GCC intrinsics). Your system is not currently supported.
#endif
Expand Down Expand Up @@ -366,7 +370,7 @@ static inline int roaring_hamming(uint64_t x) {

#if defined(__BYTE_ORDER__) && defined(__ORDER_BIG_ENDIAN__)
#define CROARING_IS_BIG_ENDIAN (__BYTE_ORDER__ == __ORDER_BIG_ENDIAN__)
#elif defined(_WIN32)
#elif defined(_WIN32)
#define CROARING_IS_BIG_ENDIAN 0
#else
#if defined(__APPLE__) || defined(__FreeBSD__) // defined __BYTE_ORDER__ && defined __ORDER_BIG_ENDIAN__
Expand Down Expand Up @@ -395,6 +399,130 @@ static inline int roaring_hamming(uint64_t x) {
#endif // __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
#endif

// Defines for the possible CROARING atomic implementations
#define CROARING_ATOMIC_IMPL_NONE 1
#define CROARING_ATOMIC_IMPL_CPP 2
#define CROARING_ATOMIC_IMPL_C 3
#define CROARING_ATOMIC_IMPL_C_WINDOWS 4

// If the use has forced a specific implementation, use that, otherwise,
// figure out the best implementation we can use.
#if !defined(CROARING_ATOMIC_IMPL)
#if defined(__cplusplus) && __cplusplus >= 201103L
#ifdef __has_include
#if __has_include(<atomic>)
#define CROARING_ATOMIC_IMPL CROARING_ATOMIC_IMPL_CPP
#endif //__has_include(<atomic>)
#else
// We lack __has_include to check:
#define CROARING_ATOMIC_IMPL CROARING_ATOMIC_IMPL_CPP
#endif //__has_include
#elif __STDC_VERSION__ >= 201112L && !defined(__STDC_NO_ATOMICS__)
#define CROARING_ATOMIC_IMPL CROARING_ATOMIC_IMPL_C
#elif CROARING_REGULAR_VISUAL_STUDIO
// https://www.technetworkhub.com/c11-atomics-in-visual-studio-2022-version-17/
#define CROARING_ATOMIC_IMPL CROARING_ATOMIC_IMPL_C_WINDOWS
#endif
#endif // !defined(CROARING_ATOMIC_IMPL)

#if !defined(CROARING_ATOMIC_IMPL)
#pragma message ( "No atomic implementation found, copy on write bitmaps will not be threadsafe" )
#define CROARING_ATOMIC_IMPL CROARING_ATOMIC_IMPL_NONE
#endif

#if CROARING_ATOMIC_IMPL == CROARING_ATOMIC_IMPL_C
#include <stdatomic.h>
typedef _Atomic(uint32_t) croaring_refcount_t;

static inline void croaring_refcount_inc(croaring_refcount_t *val) {
// Increasing the reference counter can always be done with
// memory_order_relaxed: New references to an object can only be formed from
// an existing reference, and passing an existing reference from one thread to
// another must already provide any required synchronization.
atomic_fetch_add_explicit(val, 1, memory_order_relaxed);
}

static inline bool croaring_refcount_dec(croaring_refcount_t *val) {
// It is important to enforce any possible access to the object in one thread
// (through an existing reference) to happen before deleting the object in a
// different thread. This is achieved by a "release" operation after dropping
// a reference (any access to the object through this reference must obviously
// happened before), and an "acquire" operation before deleting the object.
bool is_zero = atomic_fetch_sub_explicit(val, 1, memory_order_release) == 1;
if (is_zero) {
atomic_thread_fence(memory_order_acquire);
}
return is_zero;
}

static inline uint32_t croaring_refcount_get(croaring_refcount_t *val) {
return atomic_load_explicit(val, memory_order_relaxed);
}
#elif CROARING_ATOMIC_IMPL == CROARING_ATOMIC_IMPL_CPP
#include <atomic>
typedef std::atomic<uint32_t> croaring_refcount_t;

static inline void croaring_refcount_inc(croaring_refcount_t *val) {
val->fetch_add(1, std::memory_order_relaxed);
}

static inline bool croaring_refcount_dec(croaring_refcount_t *val) {
// See above comments on the c11 atomic implementation for memory ordering
bool is_zero = val->fetch_sub(1, std::memory_order_release) == 1;
if (is_zero) {
std::atomic_thread_fence(std::memory_order_acquire);
}
return is_zero;
}

static inline uint32_t croaring_refcount_get(croaring_refcount_t *val) {
return val->load(std::memory_order_relaxed);
}
#elif CROARING_ATOMIC_IMPL == CROARING_ATOMIC_IMPL_C_WINDOWS
#include <intrin.h>
#pragma intrinsic(_InterlockedIncrement)
#pragma intrinsic(_InterlockedDecrement)

// _InterlockedIncrement and _InterlockedDecrement take a (signed) long, and
// overflow is defined to wrap, so we can pretend it is a uint32_t for our case
typedef volatile long croaring_refcount_t;

static inline void croaring_refcount_inc(croaring_refcount_t *val) {
_InterlockedIncrement(val);
}

static inline bool croaring_refcount_dec(croaring_refcount_t *val) {
return _InterlockedDecrement(val) == 0;
}

static inline uint32_t croaring_refcount_get(croaring_refcount_t *val) {
// Per https://learn.microsoft.com/en-us/windows/win32/sync/interlocked-variable-access
// > Simple reads and writes to properly-aligned 32-bit variables are atomic
// > operations. In other words, you will not end up with only one portion
// > of the variable updated; all bits are updated in an atomic fashion.
return *val;
}
#elif CROARING_ATOMIC_IMPL == CROARING_ATOMIC_IMPL_NONE
typedef uint32_t croaring_refcount_t;

static inline void croaring_refcount_inc(croaring_refcount_t *val) {
*val += 1;
}

static inline bool croaring_refcount_dec(croaring_refcount_t *val) {
assert(*val > 0);
*val -= 1;
return val == 0;
}

static inline uint32_t croaring_refcount_get(croaring_refcount_t *val) {
return *val;
}
#else
#error "Unknown atomic implementation"
#endif


// We need portability.h to be included first,
// but we also always want isadetection.h to be
// included (right after).
Expand Down
6 changes: 3 additions & 3 deletions microbenchmarks/performancecounters/apple_arm_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -874,11 +874,11 @@ struct AppleEvents {
u64 counters_1[KPC_MAX_COUNTERS] = {0};
static constexpr usize ev_count =
sizeof(profile_events) / sizeof(profile_events[0]);
bool init = false;
bool worked = false;

inline bool setup_performance_counters() {
static bool init = false;
static bool worked = false;

inline bool setup_performance_counters() {
if (init) {
return worked;
}
Expand Down
15 changes: 7 additions & 8 deletions src/containers/containers.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ container_t *get_copy_of_container(
shared_container_t *shared_container;
if (*typecode == SHARED_CONTAINER_TYPE) {
shared_container = CAST_shared(c);
shared_container->counter += 1;
croaring_refcount_inc(&shared_container->counter);
return shared_container;
}
assert(*typecode != SHARED_CONTAINER_TYPE);
Expand All @@ -149,7 +149,10 @@ container_t *get_copy_of_container(

shared_container->container = c;
shared_container->typecode = *typecode;

// At this point, we are creating new shared container
// so there should be no other references, and setting
// the counter to 2 - even non-atomically - is safe as
// long as the value is set before the return statement.
shared_container->counter = 2;
*typecode = SHARED_CONTAINER_TYPE;

Expand Down Expand Up @@ -188,12 +191,10 @@ container_t *container_clone(const container_t *c, uint8_t typecode) {
container_t *shared_container_extract_copy(
shared_container_t *sc, uint8_t *typecode
){
assert(sc->counter > 0);
assert(sc->typecode != SHARED_CONTAINER_TYPE);
sc->counter--;
*typecode = sc->typecode;
container_t *answer;
if (sc->counter == 0) {
if (croaring_refcount_dec(&sc->counter)) {
answer = sc->container;
sc->container = NULL; // paranoid
roaring_free(sc);
Expand All @@ -205,9 +206,7 @@ container_t *shared_container_extract_copy(
}

void shared_container_free(shared_container_t *container) {
assert(container->counter > 0);
container->counter--;
if (container->counter == 0) {
if (croaring_refcount_dec(&container->counter)) {
assert(container->typecode != SHARED_CONTAINER_TYPE);
container_free(container->container, container->typecode);
container->container = NULL; // paranoid
Expand Down
23 changes: 11 additions & 12 deletions src/isadetection.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,33 +194,32 @@ static inline uint32_t dynamic_croaring_detect_supported_architectures() {

#if defined(__x86_64__) || defined(_M_AMD64) // x64

#if defined(__cplusplus)
#if CROARING_ATOMIC_IMPL == CROARING_ATOMIC_IMPL_CPP
static inline uint32_t croaring_detect_supported_architectures() {
// thread-safe as per the C++11 standard.
static uint32_t buffer = dynamic_croaring_detect_supported_architectures();
return buffer;
}
#elif CROARING_VISUAL_STUDIO
// Visual Studio does not support C11 atomics.
static inline uint32_t croaring_detect_supported_architectures() {
static int buffer = CROARING_UNINITIALIZED;
#elif CROARING_ATOMIC_IMPL == CROARING_ATOMIC_IMPL_C
static uint32_t croaring_detect_supported_architectures() {
// we use an atomic for thread safety
static _Atomic uint32_t buffer = CROARING_UNINITIALIZED;
if (buffer == CROARING_UNINITIALIZED) {
// atomicity is sufficient
buffer = dynamic_croaring_detect_supported_architectures();
}
return buffer;
}
#else // CROARING_VISUAL_STUDIO
#include <stdatomic.h>
uint32_t croaring_detect_supported_architectures() {
// we use an atomic for thread safety
static _Atomic uint32_t buffer = CROARING_UNINITIALIZED;
#else
// If we do not have atomics, we do the best we can.
static inline uint32_t croaring_detect_supported_architectures() {
static uint32_t buffer = CROARING_UNINITIALIZED;
if (buffer == CROARING_UNINITIALIZED) {
// atomicity is sufficient
buffer = dynamic_croaring_detect_supported_architectures();
}
return buffer;
}
#endif // CROARING_REGULAR_VISUAL_STUDIO
#endif // CROARING_C_ATOMIC

#ifdef ROARING_DISABLE_AVX

Expand Down
6 changes: 3 additions & 3 deletions src/roaring.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,9 @@ void roaring_bitmap_printf_describe(const roaring_bitmap_t *r) {
get_full_container_name(ra->containers[i], ra->typecodes[i]),
container_get_cardinality(ra->containers[i], ra->typecodes[i]));
if (ra->typecodes[i] == SHARED_CONTAINER_TYPE) {
printf(
"(shared count = %" PRIu32 " )",
CAST_shared(ra->containers[i])->counter);
printf("(shared count = %" PRIu32 " )",
croaring_refcount_get(
&(CAST_shared(ra->containers[i])->counter)));
}

if (i + 1 < ra->size) {
Expand Down
18 changes: 18 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ add_c_test(format_portability_unit)
add_c_test(robust_deserialization_unit)
add_c_test(container_comparison_unit)
add_c_test(add_offset)
find_package(Threads)
if(Threads_FOUND)
message(STATUS "Your system supports threads.")
add_executable(threads_unit threads_unit.cpp)
target_link_libraries(threads_unit PRIVATE roaring Threads::Threads)
if(ROARING_SANITIZE_THREADS)
# libtsan might be needed
if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
message(STATUS "Under Linux, you may need to install libtsan." )
endif()
target_compile_options(threads_unit PRIVATE -fsanitize=thread -fno-sanitize-recover=all)
target_link_options(threads_unit PRIVATE -fsanitize=thread -fno-sanitize-recover=all)
message(STATUS "Sanitizing threads.")
endif()
add_test(threads_unit threads_unit)
else(Threads_FOUND)
message(STATUS "Your system does not support threads.")
endif(Threads_FOUND)

if (NOT WIN32)
# We exclude POSIX tests from Microsoft Windows
Expand Down
Loading