Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Introducing libuv global lock on master #29706

Closed
wants to merge 8 commits into from
8 changes: 4 additions & 4 deletions base/event.jl
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ global const Workqueue = Task[]

function enq_work(t::Task)
t.state == :runnable || error("schedule: Task not runnable")
ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:jl_uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
push!(Workqueue, t)
t.state = :queued
return t
@@ -359,8 +359,8 @@ mutable struct Timer
associate_julia_struct(this.handle, this)
finalizer(uvfinalize, this)

ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
ccall(:jl_uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:jl_uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
this, uv_jl_timercb::Ptr{Cvoid},
UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000)))
return this
@@ -380,7 +380,7 @@ isopen(t::Union{Timer, AsyncCondition}) = t.isopen
function close(t::Union{Timer, AsyncCondition})
if t.handle != C_NULL && isopen(t)
t.isopen = false
isa(t, Timer) && ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), t)
isa(t, Timer) && ccall(:jl_uv_timer_stop, Cint, (Ptr{Cvoid},), t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
nothing
4 changes: 2 additions & 2 deletions base/file.jl
Original file line number Diff line number Diff line change
@@ -620,7 +620,7 @@ function readdir(path::AbstractString)
uv_readdir_req = zeros(UInt8, ccall(:jl_sizeof_uv_fs_t, Int32, ()))

# defined in sys.c, to call uv_fs_readdir, which sets errno on error.
err = ccall(:uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}),
err = ccall(:jl_uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}),
eventloop(), uv_readdir_req, path, 0, C_NULL)
err < 0 && throw(SystemError("unable to read directory $path", -err))
#uv_error("unable to read directory $path", err)
@@ -805,7 +805,7 @@ Return the target location a symbolic link `path` points to.
function readlink(path::AbstractString)
req = Libc.malloc(_sizeof_uv_fs)
try
ret = ccall(:uv_fs_readlink, Int32,
ret = ccall(:jl_uv_fs_readlink, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}),
eventloop(), req, path, C_NULL)
if ret < 0
6 changes: 3 additions & 3 deletions base/filesystem.jl
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ function open(path::AbstractString, flags::Integer, mode::Integer=0)
req = Libc.malloc(_sizeof_uv_fs)
local handle
try
ret = ccall(:uv_fs_open, Int32,
ret = ccall(:jl_uv_fs_open, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32, Int32, Ptr{Cvoid}),
eventloop(), req, path, flags, mode, C_NULL)
handle = ccall(:jl_uv_fs_result, Cssize_t, (Ptr{Cvoid},), req)
@@ -125,7 +125,7 @@ write(f::File, c::UInt8) = write(f, Ref{UInt8}(c))
function truncate(f::File, n::Integer)
check_open(f)
req = Libc.malloc(_sizeof_uv_fs)
err = ccall(:uv_fs_ftruncate, Int32,
err = ccall(:jl_uv_fs_ftruncate, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int64, Ptr{Cvoid}),
eventloop(), req, f.handle, n, C_NULL)
Libc.free(req)
@@ -136,7 +136,7 @@ end
function futime(f::File, atime::Float64, mtime::Float64)
check_open(f)
req = Libc.malloc(_sizeof_uv_fs)
err = ccall(:uv_fs_futime, Int32,
err = ccall(:jl_uv_fs_futime, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Float64, Float64, Ptr{Cvoid}),
eventloop(), req, f.handle, atime, mtime, C_NULL)
Libc.free(req)
6 changes: 3 additions & 3 deletions base/stream.jl
Original file line number Diff line number Diff line change
@@ -502,7 +502,7 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
((bytesavailable(stream.buffer) >= stream.throttle) ||
(bytesavailable(stream.buffer) >= stream.buffer.maxsize)))
# save cycles by stopping kernel notifications from arriving
ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream)
stream.status = StatusOpen
end
nothing
@@ -641,7 +641,7 @@ function start_reading(stream::LibuvStream)
# libuv may call the alloc callback immediately
# for a TTY on Windows, so ensure the status is set first
stream.status = StatusActive
ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
ret = ccall(:jl_uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
stream, uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_readcb::Ptr{Cvoid})
return ret
elseif stream.status == StatusPaused
@@ -663,7 +663,7 @@ if Sys.iswindows()
function stop_reading(stream::LibuvStream)
if stream.status == StatusActive
stream.status = StatusOpen
ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream)
end
nothing
end
1 change: 1 addition & 0 deletions src/atomics.h
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@
* which is very hard unless the layout of the object is fully
* specified.
*/

#if defined(__GNUC__)
# define jl_signal_fence() __atomic_signal_fence(__ATOMIC_SEQ_CST)
# define jl_atomic_fetch_add_relaxed(obj, arg) \
14 changes: 13 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
@@ -153,9 +153,11 @@ static void jl_uv_exitcleanup_add(uv_handle_t *handle, struct uv_shutdown_queue
struct uv_shutdown_queue_item *item = (struct uv_shutdown_queue_item*)malloc(sizeof(struct uv_shutdown_queue_item));
item->h = handle;
item->next = NULL;
JL_UV_LOCK();
if (queue->last) queue->last->next = item;
if (!queue->first) queue->first = item;
queue->last = item;
JL_UV_UNLOCK();
}

static void jl_uv_exitcleanup_walk(uv_handle_t *handle, void *arg)
@@ -255,6 +257,7 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode)
}

struct uv_shutdown_queue queue = {NULL, NULL};
JL_UV_LOCK();
uv_walk(loop, jl_uv_exitcleanup_walk, &queue);
struct uv_shutdown_queue_item *item = queue.first;
if (ptls->current_task != NULL) {
@@ -284,6 +287,7 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode)

// force libuv to spin until everything has finished closing
loop->stop_flag = 0;
JL_UV_UNLOCK();
while (uv_run(loop, UV_RUN_DEFAULT)) { }

// TODO: Destroy threads
@@ -407,10 +411,13 @@ static void *init_stdio_handle(const char *stdio, uv_os_fd_t fd, int readable)
break;
case UV_NAMED_PIPE:
handle = malloc(sizeof(uv_pipe_t));
JL_UV_LOCK();
if ((err = uv_pipe_init(jl_io_loop, (uv_pipe_t*)handle, 0))) {
// JL_UV_UNLOCK() equivalent is done during unwinding
jl_errorf("error initializing %s in uv_pipe_init: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err);
}
if ((err = uv_pipe_open((uv_pipe_t*)handle, fd))) {
// JL_UV_UNLOCK() equivalent is done during unwinding
jl_errorf("error initializing %s in uv_pipe_open: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err);
}
#ifndef _OS_WINDOWS_
@@ -421,16 +428,21 @@ static void *init_stdio_handle(const char *stdio, uv_os_fd_t fd, int readable)
((uv_pipe_t*)handle)->flags &= ~UV_STREAM_READABLE;
#endif
((uv_pipe_t*)handle)->data = NULL;
JL_UV_UNLOCK();
break;
case UV_TCP:
handle = malloc(sizeof(uv_tcp_t));
JL_UV_LOCK();
if ((err = uv_tcp_init(jl_io_loop, (uv_tcp_t*)handle))) {
// JL_UV_UNLOCK() equivalent is done during unwinding
jl_errorf("error initializing %s in uv_tcp_init: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err);
}
if ((err = uv_tcp_open((uv_tcp_t*)handle, (uv_os_sock_t)fd))) {
// JL_UV_UNLOCK() equivalent is done during unwinding
jl_errorf("error initializing %s in uv_tcp_open: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err);
}
((uv_tcp_t*)handle)->data = NULL;
JL_UV_UNLOCK();
break;
}
return handle;
@@ -618,7 +630,7 @@ void _julia_init(JL_IMAGE_SEARCH rel)
ios_set_io_wait_func = jl_set_io_wait;
jl_io_loop = uv_default_loop(); // this loop will internal events (spawning process etc.),
// best to call this first, since it also initializes libuv
jl_init_signal_async();
jl_init_uv();
restore_signals();

jl_resolve_sysimg_location(rel);
232 changes: 197 additions & 35 deletions src/jl_uv.c

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
#define JL_INTERNAL_H

#include "options.h"
#include "locks.h"
#include <uv.h>
#if !defined(_MSC_VER) && !defined(__MINGW32__)
#include <unistd.h>
@@ -108,6 +109,15 @@ static inline void jl_assume_(int cond)
# define JL_USE_IFUNC 0
#endif

// If this is detected in a backtrace of segfault, it means the functions
// that use this value must be reworked into their async form with cb arg
// provided and with JL_UV_LOCK used around the calls
static uv_loop_t *const unused_uv_loop_arg = (uv_loop_t *)0xBAD10;

extern jl_mutex_t jl_uv_mutex;
#define JL_UV_LOCK() JL_LOCK_NOGC(&jl_uv_mutex)
#define JL_UV_UNLOCK() JL_UNLOCK_NOGC(&jl_uv_mutex)

#ifdef __cplusplus
extern "C" {
#endif
@@ -488,7 +498,7 @@ void jl_init_stack_limits(int ismaster, void **stack_hi, void **stack_lo);
void jl_init_root_task(void *stack_lo, void *stack_hi);
void jl_init_serializer(void);
void jl_gc_init(void);
void jl_init_signal_async(void);
void jl_init_uv(void);
void jl_init_debuginfo(void);
void jl_init_thread_heap(jl_ptls_t ptls);

@@ -831,6 +841,8 @@ int jl_array_store_unboxed(jl_value_t *el_type);
JL_DLLEXPORT jl_value_t *(jl_array_data_owner)(jl_array_t *a);
JL_DLLEXPORT int jl_array_isassigned(jl_array_t *a, size_t i);

JL_DLLEXPORT void jl_uv_stop(uv_loop_t* loop);

// -- synchronization utilities -- //

extern jl_mutex_t typecache_lock;
1 change: 1 addition & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
#ifndef JL_THREADS_H
#define JL_THREADS_H

#include <atomics.h>
// threading ------------------------------------------------------------------

// WARNING: Threading support is incomplete and experimental
6 changes: 3 additions & 3 deletions src/sys.c
Original file line number Diff line number Diff line change
@@ -132,7 +132,7 @@ JL_DLLEXPORT int32_t jl_stat(const char *path, char *statbuf)

// Ideally one would use the statbuf for the storage in req, but
// it's not clear that this is possible using libuv
ret = uv_fs_stat(uv_default_loop(), &req, path, NULL);
ret = uv_fs_stat(unused_uv_loop_arg, &req, path, NULL);
if (ret == 0)
memcpy(statbuf, req.ptr, sizeof(uv_stat_t));
uv_fs_req_cleanup(&req);
@@ -144,7 +144,7 @@ JL_DLLEXPORT int32_t jl_lstat(const char *path, char *statbuf)
uv_fs_t req;
int ret;

ret = uv_fs_lstat(uv_default_loop(), &req, path, NULL);
ret = uv_fs_lstat(unused_uv_loop_arg, &req, path, NULL);
if (ret == 0)
memcpy(statbuf, req.ptr, sizeof(uv_stat_t));
uv_fs_req_cleanup(&req);
@@ -156,7 +156,7 @@ JL_DLLEXPORT int32_t jl_fstat(uv_os_fd_t fd, char *statbuf)
uv_fs_t req;
int ret;

ret = uv_fs_fstat(uv_default_loop(), &req, fd, NULL);
ret = uv_fs_fstat(unused_uv_loop_arg, &req, fd, NULL);
if (ret == 0)
memcpy(statbuf, req.ptr, sizeof(uv_stat_t));
uv_fs_req_cleanup(&req);