diff --git a/core/platform.h b/core/platform.h index 8453cf25e..14a67c2a2 100644 --- a/core/platform.h +++ b/core/platform.h @@ -90,6 +90,11 @@ typedef _microstep_t microstep_t; #ifdef NUMBER_OF_WORKERS +/** + * @brief Get the number of cores on the host machine. + */ +extern int lf_available_cores(); + /** * Create a new thread, starting with execution of lf_thread * getting passed arguments. The new handle is stored in thread_id. diff --git a/core/platform/lf_linux_support.c b/core/platform/lf_linux_support.c index b7a87d72c..58188f022 100644 --- a/core/platform/lf_linux_support.c +++ b/core/platform/lf_linux_support.c @@ -41,6 +41,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endif #include "lf_unix_clock_support.c" +#include "lf_unix_syscall_support.c" /** * Pause execution for a number of nanoseconds. diff --git a/core/platform/lf_macos_support.c b/core/platform/lf_macos_support.c index 5344d03e1..c0836d02e 100644 --- a/core/platform/lf_macos_support.c +++ b/core/platform/lf_macos_support.c @@ -41,6 +41,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endif #include "lf_unix_clock_support.c" +#include "lf_unix_syscall_support.c" /** * Pause execution for a number of nanoseconds. diff --git a/core/platform/lf_unix_syscall_support.c b/core/platform/lf_unix_syscall_support.c new file mode 100644 index 000000000..e3cacc3ca --- /dev/null +++ b/core/platform/lf_unix_syscall_support.c @@ -0,0 +1,19 @@ +/** + * @file lf_unix_syscall_support.c + * @author Soroush Bateni (soroush@utdallas.edu) + * @brief Platform support for syscalls in Unix-like systems. + * @version 0.1 + * @date 2022-03-09 + * + * @copyright Copyright (c) 2022 The University of Texas at Dallas + * + */ + +#include + +/** + * @brief Get the number of cores on the host machine. + */ +int lf_available_cores() { + return (int)sysconf(_SC_NPROCESSORS_ONLN); +} diff --git a/core/platform/lf_windows_support.c b/core/platform/lf_windows_support.c index f5f3438e7..a09d5880a 100644 --- a/core/platform/lf_windows_support.c +++ b/core/platform/lf_windows_support.c @@ -35,6 +35,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include +#include #include #include "lf_windows_support.h" #include "../platform.h" @@ -56,6 +57,15 @@ double _lf_frequency_to_ns = 1.0; #define BILLION 1000000000 #ifdef NUMBER_OF_WORKERS +/** + * @brief Get the number of cores on the host machine. + */ +int lf_available_cores() { + SYSTEM_INFO sysinfo; + GetSystemInfo(&sysinfo); + return sysinfo.dwNumberOfProcessors; +} + #if __STDC_VERSION__ < 201112L || defined (__STDC_NO_THREADS__) // (Not C++11 or later) or no threads support /** diff --git a/core/reactor.h b/core/reactor.h index 0bc0f05a3..607be8a59 100644 --- a/core/reactor.h +++ b/core/reactor.h @@ -943,7 +943,7 @@ bool _lf_check_deadline(self_base_t* self, bool invoke_deadline_handler); * By default, execution is not threaded and this variable will have value 0, * meaning that the execution is not threaded. */ -extern unsigned int _lf_number_of_threads; +extern unsigned int _lf_number_of_workers; #include "trace.h" diff --git a/core/reactor_common.c b/core/reactor_common.c index 32fa3006e..1b20ffeda 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -49,10 +49,13 @@ bool fast = false; /** * The number of worker threads for threaded execution. - * By default, execution is not threaded and this variable will have value 0, - * meaning that the execution is not threaded. + * By default, execution is not threaded and this variable will have value 0. + * + * If the execution is threaded, a value of 0 indicates that the runtime should + * decide on the number of workers (which will be decided based on the number of + * available cores on the host machine). */ -unsigned int _lf_number_of_threads = 0u; +unsigned int _lf_number_of_workers = 0u; /** * The logical time to elapse during execution, or -1 if no timeout time has @@ -1719,7 +1722,7 @@ void usage(int argc, char* argv[]) { printf(" nsec, usec, msec, sec, minute, hour, day, week, or the plurals of those.\n\n"); printf(" -k, --keepalive\n"); printf(" Whether continue execution even when there are no events to process.\n\n"); - printf(" -t, --threads \n"); + printf(" -w, --workers \n"); printf(" Executed in threads if possible (optional feature).\n\n"); printf(" -i, --id \n"); printf(" The ID of the federation that this reactor will join.\n\n"); @@ -1819,19 +1822,19 @@ int process_args(int argc, char* argv[]) { } else { error_print("Invalid value for --keepalive: %s", keep_spec); } - } else if (strcmp(arg, "-t") == 0 || strcmp(arg, "--threads") == 0) { + } else if (strcmp(arg, "-t") == 0 || strcmp(arg, "--workers") == 0) { if (argc < i + 1) { - error_print("--threads needs an integer argument.s"); + error_print("--workers needs an integer argument.s"); usage(argc, argv); return 0; } char* threads_spec = argv[i++]; - int num_threads = atoi(threads_spec); - if (num_threads <= 0) { - error_print("Invalid value for --threads: %s. Using 1.", threads_spec); - num_threads = 1; + int num_workers = atoi(threads_spec); + if (num_workers <= 0) { + error_print("Invalid value for --workers: %s. Using 1.", threads_spec); + num_workers = 1; } - _lf_number_of_threads = (unsigned int)num_threads; + _lf_number_of_workers = (unsigned int)num_workers; } #ifdef FEDERATED else if (strcmp(arg, "-i") == 0 || strcmp(arg, "--id") == 0) { diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 66227a92e..c3b7bc2c2 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -1080,13 +1080,37 @@ lf_thread_t* _lf_thread_ids; // Start threads in the thread pool. void start_threads() { - LOG_PRINT("Starting %u worker threads.", _lf_number_of_threads); - _lf_thread_ids = (lf_thread_t*)malloc(_lf_number_of_threads * sizeof(lf_thread_t)); - for (unsigned int i = 0; i < _lf_number_of_threads; i++) { + LOG_PRINT("Starting %u worker threads.", _lf_number_of_workers); + _lf_thread_ids = (lf_thread_t*)malloc(_lf_number_of_workers * sizeof(lf_thread_t)); + for (unsigned int i = 0; i < _lf_number_of_workers; i++) { lf_thread_create(&_lf_thread_ids[i], worker, NULL); } } +/** + * @brief Determine the number of workers. + * + */ +void determine_number_of_workers(void) { + // If _lf_number_of_workers is 0, it means that it was not provided on + // the command-line using the --threads argument. + if (_lf_number_of_workers == 0u) { + #if !defined(NUMBER_OF_WORKERS) || NUMBER_OF_WORKERS == 0 + // Use the number of cores on the host machine. + _lf_number_of_workers = lf_available_cores(); + #else + // Use the provided number of workers by the user + _lf_number_of_workers = NUMBER_OF_WORKERS; + #endif + } + + #if defined(WORKERS_NEEDED_FOR_FEDERATE) + // Add the required number of workers needed for the proper function of + // federated execution + _lf_number_of_workers += WORKERS_NEEDED_FOR_FEDERATE; + #endif +} + /** * The main loop of the LF program. * @@ -1137,12 +1161,17 @@ int lf_reactor_c_main(int argc, char* argv[]) { if (process_args(default_argc, default_argv) && process_args(argc, argv)) { - lf_mutex_lock(&mutex); // Sets start_time - initialize(); + + determine_number_of_workers(); + + lf_mutex_lock(&mutex); + initialize(); // Sets start_time + + info_print("---- Using %d workers.", _lf_number_of_workers); // Initialize the scheduler lf_sched_init( - (size_t)_lf_number_of_threads, + (size_t)_lf_number_of_workers, NULL); // Call the following function only once, rather than per worker thread (although @@ -1156,9 +1185,9 @@ int lf_reactor_c_main(int argc, char* argv[]) { // Wait for the worker threads to exit. void* worker_thread_exit_status = NULL; - DEBUG_PRINT("Number of threads: %d.", _lf_number_of_threads); + DEBUG_PRINT("Number of threads: %d.", _lf_number_of_workers); int ret = 0; - for (int i = 0; i < _lf_number_of_threads; i++) { + for (int i = 0; i < _lf_number_of_workers; i++) { int failure = lf_thread_join(_lf_thread_ids[i], &worker_thread_exit_status); if (failure) { error_print("Failed to join thread listening for incoming messages: %s", strerror(failure)); diff --git a/core/trace.c b/core/trace.c index aa7c88721..379b0d6a8 100644 --- a/core/trace.c +++ b/core/trace.c @@ -329,7 +329,7 @@ void start_trace(char* filename) { // Allocate an array of arrays of trace records, one per worker thread plus one // for the 0 thread (the main thread, or in an unthreaded program, the only // thread). - _lf_number_of_trace_buffers = _lf_number_of_threads + 1; + _lf_number_of_trace_buffers = _lf_number_of_workers + 1; _lf_trace_buffer = (trace_record_t**)malloc(sizeof(trace_record_t*) * _lf_number_of_trace_buffers); for (int i = 0; i < _lf_number_of_trace_buffers; i++) { _lf_trace_buffer[i] = (trace_record_t*)malloc(sizeof(trace_record_t) * TRACE_BUFFER_CAPACITY); diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 040b9b95d..16d32048d 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -14e26feddb2e2770596c22b2e9a2397c53d468dd +276cb2592c20cc6da2f5722dc013581b085e5d4c