Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add TaskFailedException to propagate backtrace of failed task in wait #32814

Merged
merged 1 commit into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ New library functions
Standard library changes
------------------------

* When `wait` (or `@sync`, or `fetch`) is called on a failing `Task`, the exception is propagated as a
`TaskFailedException` wrapping the task.
This makes it possible to see the location of the original failure inside the task (as well as the
location of the `wait` call, as before) ([#32814]).
* `Regex` can now be multiplied (`*`) and exponentiated (`^`), like strings ([#23422]).
* `Cmd` interpolation (``` `$(x::Cmd) a b c` ``` where) now propagates `x`'s process flags
(environment, flags, working directory, etc) if `x` is the first interpolant and errors
Expand Down
4 changes: 2 additions & 2 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ Block the current task until some event occurs, depending on the type of the arg
* [`Condition`](@ref): Wait for [`notify`](@ref) on a condition.
* `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process
can be used to determine success or failure.
* [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, the
exception is propagated (re-thrown in the task that called `wait`).
* [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, a
`TaskFailedException` (which wraps the failed task) is thrown.
* [`RawFD`](@ref): Wait for changes on a file descriptor (see the `FileWatching` package).

If no argument is passed, the task blocks for an undefined period. A task can only be
Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export
KeyError,
MissingException,
ProcessFailedException,
TaskFailedException,
SystemError,
StringIndexError,

Expand Down
84 changes: 67 additions & 17 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,31 @@ function showerror(io::IO, ex::CompositeException)
end
end

"""
TaskFailedException

This exception is thrown by a `wait(t)` call when task `t` fails.
`TaskFailedException` wraps the failed task `t`.
"""
struct TaskFailedException <: Exception
task::Task
end

function showerror(io::IO, ex::TaskFailedException)
stacks = []
while isa(ex.task.exception, TaskFailedException)
pushfirst!(stacks, ex.task.backtrace)
ex = ex.task.exception
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this surfaces the underlying error through multiple levels of wait(). That's nice:

julia> t1 = @async sum()
       t2 = @async wait(t1)
       wait(t2)

ERROR: FailedTaskException:
MethodError: no method matching sum()
Closest candidates are:
  sum(::Tuple{Any,Vararg{Any,N} where N}) at tuple.jl:379
  sum(::StepRangeLen) at twiceprecision.jl:536
  sum(::AbstractRange{#s66} where #s66<:Real) at range.jl:978
  ...
Stacktrace:
 [1] (::getfield(Main, Symbol("##23#24")))() at ./task.jl:332
Stacktrace:
 [1] wait(::Task) at ./task.jl:251
 [2] (::getfield(Main, Symbol("##25#26")))() at ./task.jl:332
Stacktrace:
 [1] wait(::Task) at ./task.jl:251
 [2] top-level scope at REPL[5]:3

println(io, "TaskFailedException:")
showerror(io, ex.task.exception, ex.task.backtrace)
if !isempty(stacks)
for bt in stacks
show_backtrace(io, bt)
end
end
end

function show(io::IO, t::Task)
print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
end
Expand Down Expand Up @@ -204,9 +229,8 @@ function task_local_storage(body::Function, key, val)
end
end

# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
t === current_task() && error("deadlock detected: cannot wait on current task")
# just wait for a task to be done, no error propagation
function _wait(t::Task)
if !istaskdone(t)
lock(t.donenotify)
try
Expand All @@ -217,8 +241,14 @@ function wait(t::Task)
unlock(t.donenotify)
end
end
nothing
end

function wait(t::Task)
t === current_task() && error("deadlock detected: cannot wait on current task")
_wait(t)
if istaskfailed(t)
throw(t.exception)
throw(TaskFailedException(t))
end
nothing
end
Expand All @@ -228,8 +258,9 @@ fetch(@nospecialize x) = x
"""
fetch(t::Task)

Wait for a Task to finish, then return its result value. If the task fails with an
exception, the exception is propagated (re-thrown in the task that called fetch).
Wait for a Task to finish, then return its result value.
If the task fails with an exception, a `TaskFailedException` (which wraps the failed task)
is thrown.
"""
function fetch(t::Task)
wait(t)
Expand All @@ -240,22 +271,32 @@ end
## lexically-scoped waiting for multiple items

function sync_end(refs)
c_ex = CompositeException()
local c_ex
defined = false
for r in refs
try
wait(r)
catch
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow()
if isa(r, Task)
_wait(r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x-ref: #32677

If the first task is blocking on a second task and the second task is throwing an exception, we will never see the exception and @sync will block forever.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 but out of scope for this PR.

if istaskfailed(r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement not having the extra catch and rethrow.

if !defined
defined = true
c_ex = CompositeException()
end
push!(c_ex, TaskFailedException(r))
end
finally
if isa(r, Task) && istaskfailed(r)
push!(c_ex, CapturedException(task_result(r), r.backtrace))
else
try
wait(r)
catch e
if !defined
defined = true
c_ex = CompositeException()
end
push!(c_ex, e)
end
end
end

if !isempty(c_ex)
if defined
throw(c_ex)
end
nothing
Expand Down Expand Up @@ -301,6 +342,15 @@ macro async(expr)
end
end

# add a wait-able object to the sync pool
macro sync_add(expr)
var = esc(sync_varname)
quote
local ref = $(esc(expr))
push!($var, ref)
ref
end
end

function register_taskdone_hook(t::Task, hook)
tls = get_task_tls(t)
Expand All @@ -324,7 +374,7 @@ function task_done_hook(t::Task)
try
if !isempty(donenotify.waitq)
handled = true
notify(donenotify, result, true, err)
notify(donenotify)
end
finally
unlock(donenotify)
Expand Down
4 changes: 2 additions & 2 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
hash, ==, kill, close, isopen, showerror

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes,
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, @sync_add,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, something, notnothing, isbuffered
Expand Down Expand Up @@ -74,7 +74,7 @@ function _require_callback(mod::Base.PkgId)
# broadcast top-level (e.g. from Main) import/using from node 1 (only)
@sync for p in procs()
p == 1 && continue
@async remotecall_wait(p) do
@sync_add remotecall(p) do
Base.require(mod)
nothing
end
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ An exception is raised if a global constant is requested to be cleared.
"""
function clear!(syms, pids=workers(); mod=Main)
@sync for p in pids
@async remotecall_wait(clear_impl!, p, syms, mod)
@sync_add remotecall(clear_impl!, p, syms, mod)
end
end
clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod)
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ function remotecall_eval(m::Module, procs, ex)
if pid == myid()
run_locally += 1
else
@async remotecall_wait(Core.eval, pid, m, ex)
@sync_add remotecall(Core.eval, pid, m, ex)
end
end
yield() # ensure that the remotecall_fetch have had a chance to start
Expand Down
14 changes: 8 additions & 6 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,11 @@ try
catch ex
@test typeof(ex) == CompositeException
@test length(ex) == 5
@test typeof(ex.exceptions[1]) == CapturedException
@test typeof(ex.exceptions[1].ex) == ErrorException
@test typeof(ex.exceptions[1]) == TaskFailedException
@test typeof(ex.exceptions[1].task.exception) == ErrorException
# test start, next, and done
for (i, i_ex) in enumerate(ex)
@test i == parse(Int, i_ex.ex.msg)
@test i == parse(Int, i_ex.task.exception.msg)
end
# test showerror
err_str = sprint(showerror, ex)
Expand Down Expand Up @@ -738,7 +738,7 @@ end # full-test

let t = @task 42
schedule(t, ErrorException(""), error=true)
@test_throws ErrorException Base.wait(t)
@test_throws TaskFailedException(t) Base.wait(t)
end

# issue #8207
Expand Down Expand Up @@ -964,13 +964,15 @@ let (p, p2) = filter!(p -> p != myid(), procs())
if procs isa Int
ex = Any[excpt]
else
ex = Any[ (ex::CapturedException).ex for ex in (excpt::CompositeException).exceptions ]
ex = (excpt::CompositeException).exceptions
end
for (p, ex) in zip(procs, ex)
local p
if procs isa Int || p != myid()
@test (ex::RemoteException).pid == p
ex = ((ex::RemoteException).captured::CapturedException).ex
else
ex = (ex::TaskFailedException).task.exception
end
@test (ex::ErrorException).msg == msg
end
Expand Down Expand Up @@ -1165,7 +1167,7 @@ for (addp_testf, expected_errstr, env) in testruns
close(stdout_in)
@test isempty(fetch(stdout_txt))
@test isa(ex, CompositeException)
@test ex.exceptions[1].ex.msg == expected_errstr
@test ex.exceptions[1].task.exception.msg == expected_errstr
end
end

Expand Down
4 changes: 2 additions & 2 deletions stdlib/Serialization/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,12 @@ end
struct MyErrorTypeTest <: Exception end
create_serialization_stream() do s # user-defined type array
t = Task(()->throw(MyErrorTypeTest()))
@test_throws MyErrorTypeTest Base.wait(schedule(t))
@test_throws TaskFailedException(t) Base.wait(schedule(t))
@test isa(t.exception, MyErrorTypeTest)
serialize(s, t)
seek(s, 0)
r = deserialize(s)
@test r.state == :failed
@test isa(t.exception, MyErrorTypeTest)
end

# corner case: undefined inside immutable struct
Expand Down
4 changes: 2 additions & 2 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ end
ct = current_task()
testerr = ErrorException("expected")
Copy link
Contributor

@oxinabox oxinabox Aug 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably out of scope for this PR,
but do you think
we should have a custom error type defined in Test stdlib for testing these things,
so it is harder to get false positives from other errors being thrown?

(If so i can open an issue)

@async Base.throwto(t, testerr)
@test try
@test (try
Base.wait(t)
false
catch ex
ex
end === testerr
end).task.exception === testerr
end

@testset "Timer / AsyncCondition triggering and race #12719" begin
Expand Down
16 changes: 16 additions & 0 deletions test/errorshow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,19 @@ let buf = IOBuffer()
Base.show_method_candidates(buf, Base.MethodError(sin, Tuple{NoMethodsDefinedHere}))
@test length(take!(buf)) !== 0
end

# pr #32814
let t1 = @async(error(1)),
t2 = @async(wait(t1))
local e
try
wait(t2)
catch e_
e = e_
end
buf = IOBuffer()
showerror(buf, e)
s = String(take!(buf))
@test length(findall("Stacktrace:", s)) == 2
@test occursin("[1] error(::Int", s)
end
4 changes: 2 additions & 2 deletions test/exceptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,12 @@ end
@test catch_stack(t, include_bt=false) == [ErrorException("A"), ErrorException("B")]
# Exception stacks for tasks which never get the chance to start
t = @task nothing
@test try
@test (try
@async Base.throwto(t, ErrorException("expected"))
wait(t)
catch e
e
end == ErrorException("expected")
end).task.exception == ErrorException("expected")
@test length(catch_stack(t)) == 1
@test length(catch_stack(t)[1][2]) > 0 # backtrace is nonempty
# Exception stacks should not be accessed on concurrently running tasks
Expand Down
3 changes: 2 additions & 1 deletion test/worlds.jl
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ h265() = true
loc_h265 = "$(@__FILE__):$(@__LINE__() - 1)"
@test h265()
@test_throws MethodError put_n_take!(h265, ())
@test_throws MethodError fetch(t265)
@test_throws TaskFailedException(t265) fetch(t265)
@test istaskdone(t265)
let ex = t265.exception
@test ex isa MethodError
@test ex.f == h265
@test ex.args == ()
@test ex.world == wc265
Expand Down