Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers #50

Merged
merged 11 commits into from
Mar 10, 2022
Merged
5 changes: 5 additions & 0 deletions core/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ typedef _microstep_t microstep_t;

#ifdef NUMBER_OF_WORKERS

/**
* @brief Get the number of cores on the host machine.
*/
extern int lf_available_cores();

/**
* Create a new thread, starting with execution of lf_thread
* getting passed arguments. The new handle is stored in thread_id.
Expand Down
1 change: 1 addition & 0 deletions core/platform/lf_linux_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endif

#include "lf_unix_clock_support.c"
#include "lf_unix_syscall_support.c"

/**
* Pause execution for a number of nanoseconds.
Expand Down
1 change: 1 addition & 0 deletions core/platform/lf_macos_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endif

#include "lf_unix_clock_support.c"
#include "lf_unix_syscall_support.c"

/**
* Pause execution for a number of nanoseconds.
Expand Down
19 changes: 19 additions & 0 deletions core/platform/lf_unix_syscall_support.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* @file lf_unix_syscall_support.c
* @author Soroush Bateni ([email protected])
* @brief Platform support for syscalls in Unix-like systems.
* @version 0.1
* @date 2022-03-09
*
* @copyright Copyright (c) 2022 The University of Texas at Dallas
*
*/

#include <unistd.h>

/**
* @brief Get the number of cores on the host machine.
*/
int lf_available_cores() {
return (int)sysconf(_SC_NPROCESSORS_ONLN);
}
10 changes: 10 additions & 0 deletions core/platform/lf_windows_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include <windows.h>
#include <process.h>
#include <sysinfoapi.h>
#include <errno.h>
#include "lf_windows_support.h"
#include "../platform.h"
Expand All @@ -56,6 +57,15 @@ double _lf_frequency_to_ns = 1.0;
#define BILLION 1000000000

#ifdef NUMBER_OF_WORKERS
/**
* @brief Get the number of cores on the host machine.
*/
int lf_available_cores() {
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
return sysinfo.dwNumberOfProcessors;
}

#if __STDC_VERSION__ < 201112L || defined (__STDC_NO_THREADS__) // (Not C++11 or later) or no threads support

/**
Expand Down
2 changes: 1 addition & 1 deletion core/reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ bool _lf_check_deadline(self_base_t* self, bool invoke_deadline_handler);
* By default, execution is not threaded and this variable will have value 0,
* meaning that the execution is not threaded.
*/
extern unsigned int _lf_number_of_threads;
extern unsigned int _lf_number_of_workers;

#include "trace.h"

Expand Down
25 changes: 14 additions & 11 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ bool fast = false;

/**
* The number of worker threads for threaded execution.
* By default, execution is not threaded and this variable will have value 0,
* meaning that the execution is not threaded.
* By default, execution is not threaded and this variable will have value 0.
*
* If the execution is threaded, a value of 0 indicates that the runtime should
* decide on the number of workers (which will be decided based on the number of
* available cores on the host machine).
*/
unsigned int _lf_number_of_threads = 0u;
unsigned int _lf_number_of_workers = 0u;

/**
* The logical time to elapse during execution, or -1 if no timeout time has
Expand Down Expand Up @@ -1719,7 +1722,7 @@ void usage(int argc, char* argv[]) {
printf(" nsec, usec, msec, sec, minute, hour, day, week, or the plurals of those.\n\n");
printf(" -k, --keepalive\n");
printf(" Whether continue execution even when there are no events to process.\n\n");
printf(" -t, --threads <n>\n");
printf(" -w, --workers <n>\n");
printf(" Executed in <n> threads if possible (optional feature).\n\n");
printf(" -i, --id <n>\n");
printf(" The ID of the federation that this reactor will join.\n\n");
Expand Down Expand Up @@ -1819,19 +1822,19 @@ int process_args(int argc, char* argv[]) {
} else {
error_print("Invalid value for --keepalive: %s", keep_spec);
}
} else if (strcmp(arg, "-t") == 0 || strcmp(arg, "--threads") == 0) {
} else if (strcmp(arg, "-t") == 0 || strcmp(arg, "--workers") == 0) {
if (argc < i + 1) {
error_print("--threads needs an integer argument.s");
error_print("--workers needs an integer argument.s");
usage(argc, argv);
return 0;
}
char* threads_spec = argv[i++];
int num_threads = atoi(threads_spec);
if (num_threads <= 0) {
error_print("Invalid value for --threads: %s. Using 1.", threads_spec);
num_threads = 1;
int num_workers = atoi(threads_spec);
if (num_workers <= 0) {
error_print("Invalid value for --workers: %s. Using 1.", threads_spec);
num_workers = 1;
}
_lf_number_of_threads = (unsigned int)num_threads;
_lf_number_of_workers = (unsigned int)num_workers;
}
#ifdef FEDERATED
else if (strcmp(arg, "-i") == 0 || strcmp(arg, "--id") == 0) {
Expand Down
45 changes: 37 additions & 8 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -1080,13 +1080,37 @@ lf_thread_t* _lf_thread_ids;

// Start threads in the thread pool.
void start_threads() {
LOG_PRINT("Starting %u worker threads.", _lf_number_of_threads);
_lf_thread_ids = (lf_thread_t*)malloc(_lf_number_of_threads * sizeof(lf_thread_t));
for (unsigned int i = 0; i < _lf_number_of_threads; i++) {
LOG_PRINT("Starting %u worker threads.", _lf_number_of_workers);
_lf_thread_ids = (lf_thread_t*)malloc(_lf_number_of_workers * sizeof(lf_thread_t));
for (unsigned int i = 0; i < _lf_number_of_workers; i++) {
lf_thread_create(&_lf_thread_ids[i], worker, NULL);
}
}

/**
* @brief Determine the number of workers.
*
*/
void determine_number_of_workers(void) {
// If _lf_number_of_workers is 0, it means that it was not provided on
// the command-line using the --threads argument.
if (_lf_number_of_workers == 0u) {
#if !defined(NUMBER_OF_WORKERS) || NUMBER_OF_WORKERS == 0
// Use the number of cores on the host machine.
_lf_number_of_workers = lf_available_cores();
#else
// Use the provided number of workers by the user
_lf_number_of_workers = NUMBER_OF_WORKERS;
#endif
}

#if defined(WORKERS_NEEDED_FOR_FEDERATE)
// Add the required number of workers needed for the proper function of
// federated execution
_lf_number_of_workers += WORKERS_NEEDED_FOR_FEDERATE;
#endif
}

/**
* The main loop of the LF program.
*
Expand Down Expand Up @@ -1137,12 +1161,17 @@ int lf_reactor_c_main(int argc, char* argv[]) {

if (process_args(default_argc, default_argv)
&& process_args(argc, argv)) {
lf_mutex_lock(&mutex); // Sets start_time
initialize();

determine_number_of_workers();

lf_mutex_lock(&mutex);
initialize(); // Sets start_time

info_print("---- Using %d workers.", _lf_number_of_workers);

// Initialize the scheduler
lf_sched_init(
(size_t)_lf_number_of_threads,
(size_t)_lf_number_of_workers,
NULL);

// Call the following function only once, rather than per worker thread (although
Expand All @@ -1156,9 +1185,9 @@ int lf_reactor_c_main(int argc, char* argv[]) {

// Wait for the worker threads to exit.
void* worker_thread_exit_status = NULL;
DEBUG_PRINT("Number of threads: %d.", _lf_number_of_threads);
DEBUG_PRINT("Number of threads: %d.", _lf_number_of_workers);
int ret = 0;
for (int i = 0; i < _lf_number_of_threads; i++) {
for (int i = 0; i < _lf_number_of_workers; i++) {
int failure = lf_thread_join(_lf_thread_ids[i], &worker_thread_exit_status);
if (failure) {
error_print("Failed to join thread listening for incoming messages: %s", strerror(failure));
Expand Down
2 changes: 1 addition & 1 deletion core/trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ void start_trace(char* filename) {
// Allocate an array of arrays of trace records, one per worker thread plus one
// for the 0 thread (the main thread, or in an unthreaded program, the only
// thread).
_lf_number_of_trace_buffers = _lf_number_of_threads + 1;
_lf_number_of_trace_buffers = _lf_number_of_workers + 1;
_lf_trace_buffer = (trace_record_t**)malloc(sizeof(trace_record_t*) * _lf_number_of_trace_buffers);
for (int i = 0; i < _lf_number_of_trace_buffers; i++) {
_lf_trace_buffer[i] = (trace_record_t*)malloc(sizeof(trace_record_t) * TRACE_BUFFER_CAPACITY);
Expand Down
2 changes: 1 addition & 1 deletion lingua-franca-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
14e26feddb2e2770596c22b2e9a2397c53d468dd
276cb2592c20cc6da2f5722dc013581b085e5d4c