diff --git a/NEWS.md b/NEWS.md index e461f0c66f2d0..872d0f8b580ef 100644 --- a/NEWS.md +++ b/NEWS.md @@ -42,6 +42,10 @@ New library functions Standard library changes ------------------------ +* When `wait` (or `@sync`, or `fetch`) is called on a failing `Task`, the exception is propagated as a + `TaskFailedException` wrapping the task. + This makes it possible to see the location of the original failure inside the task (as well as the + location of the `wait` call, as before) ([#32814]). * `Regex` can now be multiplied (`*`) and exponentiated (`^`), like strings ([#23422]). * `Cmd` interpolation (``` `$(x::Cmd) a b c` ``` where) now propagates `x`'s process flags (environment, flags, working directory, etc) if `x` is the first interpolant and errors diff --git a/base/condition.jl b/base/condition.jl index eb136637dc1d5..f70b3660778d1 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -88,8 +88,8 @@ Block the current task until some event occurs, depending on the type of the arg * [`Condition`](@ref): Wait for [`notify`](@ref) on a condition. * `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process can be used to determine success or failure. -* [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, the - exception is propagated (re-thrown in the task that called `wait`). +* [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, a + `TaskFailedException` (which wraps the failed task) is thrown. * [`RawFD`](@ref): Wait for changes on a file descriptor (see the `FileWatching` package). If no argument is passed, the task blocks for an undefined period. A task can only be diff --git a/base/exports.jl b/base/exports.jl index f89209e5df0ad..6f5ad39ed1f4a 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -125,6 +125,7 @@ export KeyError, MissingException, ProcessFailedException, + TaskFailedException, SystemError, StringIndexError, diff --git a/base/task.jl b/base/task.jl index 88055d851943e..41c874bf543e0 100644 --- a/base/task.jl +++ b/base/task.jl @@ -56,6 +56,31 @@ function showerror(io::IO, ex::CompositeException) end end +""" + TaskFailedException + +This exception is thrown by a `wait(t)` call when task `t` fails. +`TaskFailedException` wraps the failed task `t`. +""" +struct TaskFailedException <: Exception + task::Task +end + +function showerror(io::IO, ex::TaskFailedException) + stacks = [] + while isa(ex.task.exception, TaskFailedException) + pushfirst!(stacks, ex.task.backtrace) + ex = ex.task.exception + end + println(io, "TaskFailedException:") + showerror(io, ex.task.exception, ex.task.backtrace) + if !isempty(stacks) + for bt in stacks + show_backtrace(io, bt) + end + end +end + function show(io::IO, t::Task) print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))") end @@ -204,9 +229,8 @@ function task_local_storage(body::Function, key, val) end end -# NOTE: you can only wait for scheduled tasks -function wait(t::Task) - t === current_task() && error("deadlock detected: cannot wait on current task") +# just wait for a task to be done, no error propagation +function _wait(t::Task) if !istaskdone(t) lock(t.donenotify) try @@ -217,8 +241,14 @@ function wait(t::Task) unlock(t.donenotify) end end + nothing +end + +function wait(t::Task) + t === current_task() && error("deadlock detected: cannot wait on current task") + _wait(t) if istaskfailed(t) - throw(t.exception) + throw(TaskFailedException(t)) end nothing end @@ -228,8 +258,9 @@ fetch(@nospecialize x) = x """ fetch(t::Task) -Wait for a Task to finish, then return its result value. If the task fails with an -exception, the exception is propagated (re-thrown in the task that called fetch). +Wait for a Task to finish, then return its result value. +If the task fails with an exception, a `TaskFailedException` (which wraps the failed task) +is thrown. """ function fetch(t::Task) wait(t) @@ -240,22 +271,32 @@ end ## lexically-scoped waiting for multiple items function sync_end(refs) - c_ex = CompositeException() + local c_ex + defined = false for r in refs - try - wait(r) - catch - if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r)) - rethrow() + if isa(r, Task) + _wait(r) + if istaskfailed(r) + if !defined + defined = true + c_ex = CompositeException() + end + push!(c_ex, TaskFailedException(r)) end - finally - if isa(r, Task) && istaskfailed(r) - push!(c_ex, CapturedException(task_result(r), r.backtrace)) + else + try + wait(r) + catch e + if !defined + defined = true + c_ex = CompositeException() + end + push!(c_ex, e) end end end - if !isempty(c_ex) + if defined throw(c_ex) end nothing @@ -301,6 +342,15 @@ macro async(expr) end end +# add a wait-able object to the sync pool +macro sync_add(expr) + var = esc(sync_varname) + quote + local ref = $(esc(expr)) + push!($var, ref) + ref + end +end function register_taskdone_hook(t::Task, hook) tls = get_task_tls(t) @@ -324,7 +374,7 @@ function task_done_hook(t::Task) try if !isempty(donenotify.waitq) handled = true - notify(donenotify, result, true, err) + notify(donenotify) end finally unlock(donenotify) diff --git a/stdlib/Distributed/src/Distributed.jl b/stdlib/Distributed/src/Distributed.jl index c1d6cc5d5ffff..504d21f97406f 100644 --- a/stdlib/Distributed/src/Distributed.jl +++ b/stdlib/Distributed/src/Distributed.jl @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length, hash, ==, kill, close, isopen, showerror # imports for use -using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, +using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, @sync_add, VERSION_STRING, binding_module, atexit, julia_exename, julia_cmd, AsyncGenerator, acquire, release, invokelatest, shell_escape_posixly, uv_error, something, notnothing, isbuffered @@ -74,7 +74,7 @@ function _require_callback(mod::Base.PkgId) # broadcast top-level (e.g. from Main) import/using from node 1 (only) @sync for p in procs() p == 1 && continue - @async remotecall_wait(p) do + @sync_add remotecall(p) do Base.require(mod) nothing end diff --git a/stdlib/Distributed/src/clusterserialize.jl b/stdlib/Distributed/src/clusterserialize.jl index e21ac32dc39d6..6bca816687af3 100644 --- a/stdlib/Distributed/src/clusterserialize.jl +++ b/stdlib/Distributed/src/clusterserialize.jl @@ -243,7 +243,7 @@ An exception is raised if a global constant is requested to be cleared. """ function clear!(syms, pids=workers(); mod=Main) @sync for p in pids - @async remotecall_wait(clear_impl!, p, syms, mod) + @sync_add remotecall(clear_impl!, p, syms, mod) end end clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod) diff --git a/stdlib/Distributed/src/macros.jl b/stdlib/Distributed/src/macros.jl index e3b40c209a176..d15332a110a53 100644 --- a/stdlib/Distributed/src/macros.jl +++ b/stdlib/Distributed/src/macros.jl @@ -220,7 +220,7 @@ function remotecall_eval(m::Module, procs, ex) if pid == myid() run_locally += 1 else - @async remotecall_wait(Core.eval, pid, m, ex) + @sync_add remotecall(Core.eval, pid, m, ex) end end yield() # ensure that the remotecall_fetch have had a chance to start diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index b76a9414cd558..3bcbd9fc5934c 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -420,11 +420,11 @@ try catch ex @test typeof(ex) == CompositeException @test length(ex) == 5 - @test typeof(ex.exceptions[1]) == CapturedException - @test typeof(ex.exceptions[1].ex) == ErrorException + @test typeof(ex.exceptions[1]) == TaskFailedException + @test typeof(ex.exceptions[1].task.exception) == ErrorException # test start, next, and done for (i, i_ex) in enumerate(ex) - @test i == parse(Int, i_ex.ex.msg) + @test i == parse(Int, i_ex.task.exception.msg) end # test showerror err_str = sprint(showerror, ex) @@ -738,7 +738,7 @@ end # full-test let t = @task 42 schedule(t, ErrorException(""), error=true) - @test_throws ErrorException Base.wait(t) + @test_throws TaskFailedException(t) Base.wait(t) end # issue #8207 @@ -964,13 +964,15 @@ let (p, p2) = filter!(p -> p != myid(), procs()) if procs isa Int ex = Any[excpt] else - ex = Any[ (ex::CapturedException).ex for ex in (excpt::CompositeException).exceptions ] + ex = (excpt::CompositeException).exceptions end for (p, ex) in zip(procs, ex) local p if procs isa Int || p != myid() @test (ex::RemoteException).pid == p ex = ((ex::RemoteException).captured::CapturedException).ex + else + ex = (ex::TaskFailedException).task.exception end @test (ex::ErrorException).msg == msg end @@ -1165,7 +1167,7 @@ for (addp_testf, expected_errstr, env) in testruns close(stdout_in) @test isempty(fetch(stdout_txt)) @test isa(ex, CompositeException) - @test ex.exceptions[1].ex.msg == expected_errstr + @test ex.exceptions[1].task.exception.msg == expected_errstr end end diff --git a/stdlib/Serialization/test/runtests.jl b/stdlib/Serialization/test/runtests.jl index dc44c897f8153..e80771de53148 100644 --- a/stdlib/Serialization/test/runtests.jl +++ b/stdlib/Serialization/test/runtests.jl @@ -361,12 +361,12 @@ end struct MyErrorTypeTest <: Exception end create_serialization_stream() do s # user-defined type array t = Task(()->throw(MyErrorTypeTest())) - @test_throws MyErrorTypeTest Base.wait(schedule(t)) + @test_throws TaskFailedException(t) Base.wait(schedule(t)) + @test isa(t.exception, MyErrorTypeTest) serialize(s, t) seek(s, 0) r = deserialize(s) @test r.state == :failed - @test isa(t.exception, MyErrorTypeTest) end # corner case: undefined inside immutable struct diff --git a/test/channels.jl b/test/channels.jl index c26840234d7b2..5712bc64f93d7 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -327,12 +327,12 @@ end ct = current_task() testerr = ErrorException("expected") @async Base.throwto(t, testerr) - @test try + @test (try Base.wait(t) false catch ex ex - end === testerr + end).task.exception === testerr end @testset "Timer / AsyncCondition triggering and race #12719" begin diff --git a/test/errorshow.jl b/test/errorshow.jl index 039a6f5834d33..5281382e7ad8f 100644 --- a/test/errorshow.jl +++ b/test/errorshow.jl @@ -562,3 +562,19 @@ let buf = IOBuffer() Base.show_method_candidates(buf, Base.MethodError(sin, Tuple{NoMethodsDefinedHere})) @test length(take!(buf)) !== 0 end + +# pr #32814 +let t1 = @async(error(1)), + t2 = @async(wait(t1)) + local e + try + wait(t2) + catch e_ + e = e_ + end + buf = IOBuffer() + showerror(buf, e) + s = String(take!(buf)) + @test length(findall("Stacktrace:", s)) == 2 + @test occursin("[1] error(::Int", s) +end diff --git a/test/exceptions.jl b/test/exceptions.jl index ee7f7e1b8849a..e47862c8312d1 100644 --- a/test/exceptions.jl +++ b/test/exceptions.jl @@ -274,12 +274,12 @@ end @test catch_stack(t, include_bt=false) == [ErrorException("A"), ErrorException("B")] # Exception stacks for tasks which never get the chance to start t = @task nothing - @test try + @test (try @async Base.throwto(t, ErrorException("expected")) wait(t) catch e e - end == ErrorException("expected") + end).task.exception == ErrorException("expected") @test length(catch_stack(t)) == 1 @test length(catch_stack(t)[1][2]) > 0 # backtrace is nonempty # Exception stacks should not be accessed on concurrently running tasks diff --git a/test/worlds.jl b/test/worlds.jl index a1bda0ff9a4e5..218d6d39d0783 100644 --- a/test/worlds.jl +++ b/test/worlds.jl @@ -139,9 +139,10 @@ h265() = true loc_h265 = "$(@__FILE__):$(@__LINE__() - 1)" @test h265() @test_throws MethodError put_n_take!(h265, ()) -@test_throws MethodError fetch(t265) +@test_throws TaskFailedException(t265) fetch(t265) @test istaskdone(t265) let ex = t265.exception + @test ex isa MethodError @test ex.f == h265 @test ex.args == () @test ex.world == wc265