Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

main: use C11 thread mutex #357

Merged
merged 9 commits into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/main/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ int libre_init(void)
#ifdef USE_OPENSSL
err = openssl_init();
if (err)
goto out;
return err;
#endif

err = net_sock_init();
if (err)
goto out;

out:
if (err)
net_sock_close();

Expand Down
186 changes: 79 additions & 107 deletions src/main/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
* @file main.c Main polling routine
*
* Copyright (C) 2010 Creytiv.com
* Copyright (C) 2020-2022 Sebastian Reimers
*/
#include <stdlib.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
Expand Down Expand Up @@ -42,12 +44,8 @@
#include <re_list.h>
#include <re_tmr.h>
#include <re_main.h>
#include <re_thread.h>
#include "main.h"
#ifdef HAVE_PTHREAD
#define __USE_GNU 1
#include <stdlib.h>
#include <pthread.h>
#endif


#define DEBUG_MODULE "main"
Expand Down Expand Up @@ -97,76 +95,92 @@ struct re {
struct kevent *evlist;
int kqfd;
#endif

#ifdef HAVE_PTHREAD
pthread_mutex_t mutex; /**< Mutex for thread synchronization */
pthread_mutex_t *mutexp; /**< Pointer to active mutex */
#endif
};

static struct re global_re = {
NULL,
0,
0,
METHOD_NULL,
false,
false,
0,
LIST_INIT,
#ifdef HAVE_POLL
NULL,
#endif
#ifdef HAVE_EPOLL
NULL,
-1,
#endif
#ifdef HAVE_KQUEUE
NULL,
-1,
#endif
#ifdef HAVE_PTHREAD
#if MAIN_DEBUG && defined (PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP,
#else
PTHREAD_MUTEX_INITIALIZER,
#endif
&global_re.mutex,
#endif
mtx_t mutex; /**< Mutex for thread synchronization */
mtx_t *mutexp; /**< Pointer to active mutex */
};


#ifdef HAVE_PTHREAD
static struct re *re_global = NULL;
static tss_t key;
static once_flag flag = ONCE_FLAG_INIT;

static void poll_close(struct re *re);

static pthread_once_t pt_once = PTHREAD_ONCE_INIT;
static pthread_key_t pt_key;


/** fallback destructor if thread gets destroyed before re_thread_close() */
static void thread_destructor(void *arg)
{
poll_close(arg);
free(arg);
}


static void re_once(void)
static int re_init(void)
{
pthread_key_create(&pt_key, thread_destructor);
struct re *re;
int err;

re = malloc(sizeof(struct re));
if (!re)
return ENOMEM;

memset(re, 0, sizeof(*re));

err = mtx_init(&re->mutex, mtx_plain);
if (err) {
DEBUG_WARNING("thread_init: mtx_init error\n");
goto out;
}
re->mutexp = &re->mutex;

list_init(&re->tmrl);

#ifdef HAVE_EPOLL
re->epfd = -1;
#endif

#ifdef HAVE_KQUEUE
re->kqfd = -1;
#endif

err = tss_set(key, re);
if (err)
DEBUG_WARNING("thread_init: tss_set error\n");

out:
if (err)
mem_deref(re);

return err;
}


static struct re *re_get(void)
static void re_once(void)
{
struct re *re;
int err;

pthread_once(&pt_once, re_once);
err = tss_create(&key, thread_destructor);
if (err) {
DEBUG_WARNING("tss_create failed: %d\n", err);
exit(err);
}

re = pthread_getspecific(pt_key);
if (!re) {
re = &global_re;
err = re_init();
if (err) {
DEBUG_WARNING("re_init failed: %d\n", err);
exit(err);
}

re_global = tss_get(key);
}


static struct re *re_get(void)
{
struct re *re;

call_once(&flag, re_once);
re = tss_get(key);
if (!re)
re = re_global;
return re;
}

Expand All @@ -175,35 +189,21 @@ static inline void re_lock(struct re *re)
{
int err;

err = pthread_mutex_lock(re->mutexp);
if (err) {
err = mtx_lock(re->mutexp);
if (err)
DEBUG_WARNING("re_lock: %m\n", err);
}
}


static inline void re_unlock(struct re *re)
{
int err;

err = pthread_mutex_unlock(re->mutexp);
if (err) {
err = mtx_unlock(re->mutexp);
if (err)
DEBUG_WARNING("re_unlock: %m\n", err);
}
}


#else

static struct re *re_get(void)
{
return &global_re;
}

#define re_lock(x) /**< Stub */
#define re_unlock(x) /**< Stub */

#endif

#ifdef WIN32
/**
Expand Down Expand Up @@ -1152,39 +1152,17 @@ int poll_method_set(enum poll_method method)
*/
int re_thread_init(void)
{
#ifdef HAVE_PTHREAD
struct re *re;

pthread_once(&pt_once, re_once);
call_once(&flag, re_once);

re = pthread_getspecific(pt_key);
re = tss_get(key);
if (re) {
DEBUG_WARNING("thread_init: already added for thread %d\n",
pthread_self());
DEBUG_WARNING("thread_init: already added for thread\n");
return EALREADY;
}

re = malloc(sizeof(*re));
if (!re)
return ENOMEM;

memset(re, 0, sizeof(*re));
pthread_mutex_init(&re->mutex, NULL);
re->mutexp = &re->mutex;

#ifdef HAVE_EPOLL
re->epfd = -1;
#endif

#ifdef HAVE_KQUEUE
re->kqfd = -1;
#endif

pthread_setspecific(pt_key, re);
return 0;
#else
return ENOSYS;
#endif
return re_init();
}


Expand All @@ -1193,18 +1171,16 @@ int re_thread_init(void)
*/
void re_thread_close(void)
{
#ifdef HAVE_PTHREAD
struct re *re;

pthread_once(&pt_once, re_once);
call_once(&flag, re_once);

re = pthread_getspecific(pt_key);
re = tss_get(key);
if (re) {
poll_close(re);
free(re);
pthread_setspecific(pt_key, NULL);
tss_set(key, NULL);
}
#endif
}


Expand Down Expand Up @@ -1237,13 +1213,9 @@ void re_thread_leave(void)
*/
void re_set_mutex(void *mutexp)
{
#ifdef HAVE_PTHREAD
struct re *re = re_get();

re->mutexp = mutexp ? mutexp : &re->mutex;
#else
(void)mutexp;
#endif
}


Expand Down