Skip to content

Commit

Permalink
basic support for printing to TTYs from multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson committed Mar 22, 2019
1 parent d9d8d4c commit b9d36ae
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 43 deletions.
72 changes: 49 additions & 23 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ mutable struct PipeEndpoint <: LibuvStream
buffer::IOBuffer
readnotify::Condition
connectnotify::Condition
closenotify::Condition
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
Expand All @@ -130,7 +130,7 @@ mutable struct PipeEndpoint <: LibuvStream
PipeBuffer(),
Condition(),
Condition(),
Condition(),
ThreadSynchronizer(),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -165,7 +165,7 @@ mutable struct TTY <: LibuvStream
status::Int
buffer::IOBuffer
readnotify::Condition
closenotify::Condition
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
Expand All @@ -176,7 +176,7 @@ mutable struct TTY <: LibuvStream
status,
PipeBuffer(),
Condition(),
Condition(),
ThreadSynchronizer(),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -380,25 +380,38 @@ function wait_readnb(x::LibuvStream, nb::Int)
end

function wait_close(x::Union{LibuvStream, LibuvServer})
if isopen(x)
stream_wait(x, x.closenotify)
lock(x.closenotify)
try
if isopen(x)
stream_wait(x, x.closenotify)
end
finally
unlock(x.closenotify)
end
nothing
end

function close(stream::Union{LibuvStream, LibuvServer})
if stream.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
elseif isopen(stream)
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
if uv_handle_data(stream) != C_NULL
stream_wait(stream, stream.closenotify)
return nothing
end
lock(stream.closenotify)
try
if isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
if should_wait
stream_wait(stream, stream.closenotify)
end
end
finally
unlock(stream.closenotify)
end
nothing
return nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
Expand Down Expand Up @@ -547,7 +560,12 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.readnotify)
notify(stream.closenotify)
lock(stream.closenotify)
try
notify(stream.closenotify)
finally
unlock(stream.closenotify)
end
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
Expand Down Expand Up @@ -589,10 +607,15 @@ function reseteof(x::TTY)
end

function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.closenotify)
lock(uv.closenotify)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.closenotify)
finally
unlock(uv.closenotify)
end
isdefined(uv, :readnotify) && notify(uv.readnotify)
isdefined(uv, :connectnotify) && notify(uv.connectnotify)
nothing
Expand Down Expand Up @@ -842,14 +865,13 @@ end
uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p)))

function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
uvw = uv_write_async(s, p, n)
ct = current_task()
uvw = uv_write_async(s, p, n, ct)
preserve_handle(ct)
try
# wait for the last chunk to complete (or error)
# assume that any errors would be sticky,
# (so we don't need to monitor the error status of the intermediate writes)
uv_req_set_data(uvw, ct)
wait()
finally
if uv_req_data(uvw) != C_NULL
Expand All @@ -867,11 +889,15 @@ end

# helper function for uv_write that returns the uv_write_t struct for the write
# rather than waiting on it
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt, reqdata = nothing)
check_open(s)
while true
uvw = Libc.malloc(_sizeof_uv_write)
uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
if reqdata === nothing
uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
else
uv_req_set_data(uvw, reqdata)
end
nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle.
# TODO: use writev, when that is added to uv-win
err = ccall(:jl_uv_write,
Expand Down
34 changes: 14 additions & 20 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
uv_cond_broadcast(&sleep_alarm); // TODO: make this uv_cond_signal / just wake up correct thread
uv_mutex_unlock(&sleep_lock);
}
/* stop the event loop too, if on thread 1 and alerting thread 1 */
if (ptls->tid == 0 && (tid == 0 || tid == -1))
uv_stop(jl_global_event_loop());
uv_stop(jl_global_event_loop());
}


Expand Down Expand Up @@ -260,8 +258,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
if (task)
return task;

if (ptls->tid == 0) {
if (!_threadedregion) {
if (!_threadedregion) {
if (ptls->tid == 0) {
if (jl_run_once(jl_global_event_loop()) == 0) {
task = get_next_task(getsticky);
if (task)
Expand All @@ -274,29 +272,25 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
}
}
else {
jl_process_events(jl_global_event_loop());
}
}
else {
int sleepnow = 0;
if (!_threadedregion) {
int sleepnow = 0;
uv_mutex_lock(&sleep_lock);
if (!_threadedregion) {
sleepnow = 1;
}
else {
uv_mutex_unlock(&sleep_lock);
}
if (sleepnow) {
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_cond_wait(&sleep_alarm, &sleep_lock);
uv_mutex_unlock(&sleep_lock);
jl_gc_safe_leave(ptls, gc_state);
}
}
else {
jl_cpu_pause();
}
if (sleepnow) {
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_cond_wait(&sleep_alarm, &sleep_lock);
uv_mutex_unlock(&sleep_lock);
jl_gc_safe_leave(ptls, gc_state);
}
}
else {
// for now just have all threads poll during threaded regions
jl_process_events(jl_global_event_loop());
}
}
}
Expand Down

0 comments on commit b9d36ae

Please sign in to comment.