Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncmap: Include original backtrace in rethrown exception #32749

Closed
wants to merge 4 commits into from

Conversation

richiejp
Copy link

Wrap the exception from the user code in CapturedException which contains the
original backtrace to the user's code.

@richiejp
Copy link
Author

richiejp commented Aug 1, 2019

Ah Distributed uses asyncmap and the tests expect it to return a particular Exception type which already captures the original stack trace AFAICT.

@richiejp
Copy link
Author

richiejp commented Aug 1, 2019

Maybe what we should really do is rethrow the original exception, but because it is outside of a catch block rethrow needs to be extended so that one can set the backtrace in get_ptls_states()->current_task()->excstack from Julia.

@amitmurthy
Copy link
Contributor

The suggested change does aid in debugging. This would however be a breaking change since any existing error checking code would need to check for e.ex where if e is the caught CapturedException, ex would point to the originally raised exception.

  • This line in

    extract_exception(e) = isa(e, RemoteException) ? e.captured.ex : e
    would need to be changed accordingly

  • Where affected, Distributed tests would need to be modified to extract the original exception via the additionally introduced CapturedException.

  • Information about this breaking change to be added to the release notes.

  • Needs tests and docs.

Ah Distributed uses asyncmap and the tests expect it to return a particular Exception type which already captures the original stack trace AFAICT.

The larger issue here is how to deal with stack traces belong to different asynchronously executed tasks. We don't have a clean approach for this yet. The stack trace you are seeing captured are where some of the Distributed APIs internally use asyncmap. This is limited to the stack trace in the task calling asyncmap internally. asyncmap itself starts multiple tasks. Stack traces in the course of exceptions in those tasks are not being captured.

@richiejp
Copy link
Author

richiejp commented Aug 1, 2019

Well this seems to be a much more complex problem than I originally thought and I don't really see a simple and obvious solution.

Just some thoughts: Maybe I am being over optimistic, but it seems trivial to capture the backtraces and put them in CapturedException and if there are exceptions for multiple tasks running in parallel these can be combined in a CompositeException.

The problem is the end user will be given a complex tree of exceptions which may change in type , depth and width depending on the number of exceptions. It would probably be necessary to write some helper functions so that they could deal with it and get the original exceptions easily. Of course this would be quite a large breaking change as well.

Alternatively I think it would be possible to make this wrapping of exceptions internal by unwrapping them before rethrowing the exception, concatenating the backtraces, then passing them to a new version of throw. I am not really sure if that is sane, but I don't see why it could not be done and in theory at least it shouldn't be a breaking change.

Finally I suppose it would also be possible to add a backtrace(s) field to every existing exception structure in the std/base library and push! the backtrace(s) there as well as creating an interface for retrieving backtraces from Exceptions which falls back to using the one in thread local storage. I have thought before that Exceptions and backtraces are a bit magical because the Exception structure doesn't contain that data. This would remove the need for CapturedException at least.

@amitmurthy
Copy link
Contributor

For user code, a valid solution is for the mapping function to ensure that it never throws a exception but returns a CompositeException as an object in case of any errors. i.e., the mapping function wraps its logic in an overall try-catch block.

julia> result = asyncmap(x -> begin 
           try 
               iseven(x) ? x : error("foo")
           catch e
               CapturedException(e, catch_backtrace())
           end
       end, 1:2)
2-element Array{Any,1}:
  CapturedException(ErrorException("foo"), Any[(error(::String) at error.jl:33, 1), ((::getfield(Main, Symbol("##27#28")))(::Int64) at REPL[24]:3, 1), ((::getfield(Base, Symbol("##687#692")){getfield(Main, Symbol("##27#28"))})(::Base.RefValue{Any}, ::Tuple{Int64}) at asyncmap.jl:100, 1), (macro expansion at asyncmap.jl:235 [inlined], 1), ((::getfield(Base, Symbol("##703#704")){getfield(Base, Symbol("##687#692")){getfield(Main, Symbol("##27#28"))},Channel{Any},Nothing})() at task.jl:261, 1)])
 2

so, if you know the type of result you are expecting (say Array{Int}, you can use that to check for errors

if !isa(result, Array{Int})
    process_errors(result)  # Look for CapturedExceptions in the result
end

@c42f
Copy link
Member

c42f commented Aug 19, 2019

One option here might be to remove the try...catch wrapper completely from start_worker_task! and allow the task to fail hard. Then collect any failed tasks and ultimately throw a TaskFailedException as was newly introduced in #32814.

The benefit of that approach is you preserve the entire exception stack for the failed task, not just the most recently caught exception.

@richiejp richiejp changed the title asyncmap: Include original backtrace in rethrown exception [WIP] asyncmap: Include original backtrace in rethrown exception Aug 20, 2019
@richiejp
Copy link
Author

OK, I will make another attempt at this.

Copy link
Member

@c42f c42f left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR goes in a good direction because it starts to unify the many ways julia can distribute work, at least a little bit ;-)

A high level question: what are the desired cancellation semantics for async work when a subset of the the tasks fail? From a quick scan of the code it seems the current implementation is fail-fast, as a single failure will bring down a whole worker task which might be working on a batch of more than one piece of work.

base/asyncmap.jl Outdated
end
end
end
exs != nothing && throw(CompositeException(exs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop is functionality almost exactly the same as a @sync block with the new macro @sync_add from #32814, but @sync calls waitrather thanfetch`. Would the following work?

@sync begin
    foreach(t->@sync_add(t), worker_tasks)
end

(or just sync_end(worker_tasks), though that couples this to the implementation detail in task.jl)

@richiejp
Copy link
Author

what are the desired cancellation semantics for async work when a subset of the the tasks fail?

Good question; I suppose fail-fast because the user can wrap their loop body in try...catch to make it robust, but they can not force fail-fast unless the function supports it (except by raising a signal or similar). Perhaps we should try to cancel all the tasks as soon as one fails?

In my use case I want it to be robust, but I just did something similar to what @amitmurthy posted. For a large 'mapreduce' style calculation the user probably wants it to fail fast instead of waiting until the reduce stage to find out there was an error in one of the map batches.

@richiejp
Copy link
Author

FYI; Seems like there is a race condition, not sure if I introduced it yet. The below hangs on my system.

julia> asyncmap(t -> error("foo"), [1, 2, 3, 4]; ntasks=3)
^CERROR: InterruptException:
Stacktrace:
 [1] poptaskref(::Base.InvasiveLinkedListSynchronized{Task}) at ./task.jl:651
 [2] wait() at ./task.jl:658
 [3] wait(::Base.GenericCondition{ReentrantLock}) at ./condition.jl:107
 [4] put_unbuffered(::Channel{Any}, ::Tuple{Base.RefValue{Any},Tuple{Int64}}) at ./channels.jl:350
 [5] collect_to!(::Array{Base.RefValue{Any},1}, ::Base.Generator{Array{Int64,1},Base.var"##732#734"{Channel{Any}}}, ::Int64, ::Int64) at ./channels.jl:325
 [6] collect_to_with_first! at ./array.jl:644 [inlined]
 [7] _collect(::Array{Int64,1}, ::Base.Generator{Array{Int64,1},Base.var"##732#734"{Channel{Any}}}, ::Base.EltypeUnknown, ::Base.HasShape{1}) at ./array.jl:638
 [8] collect_similar(::Array{Int64,1}, ::Base.Generator{Array{Int64,1},Base.var"##732#734"{Channel{Any}}}) at ./array.jl:562
 [9] map(::Function, ::Array{Int64,1}) at ./abstractarray.jl:2073
 [10] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::Array{Int64,1}) at ./asyncmap.jl:162
 [11] #async_usemap#721 at ./asyncmap.jl:154 [inlined]
 [12] #async_usemap at ./none:0 [inlined]
 [13] #asyncmap#720 at ./asyncmap.jl:81 [inlined]
 [14] (::Base.var"#kw##asyncmap")(::NamedTuple{(:ntasks,),Tuple{Int64}}, ::typeof(asyncmap), ::Function, ::Array{Int64,1}) at ./none:0
 [15] top-level scope at REPL[10]:1

Only happens with ntasks set and a simple call to error in the lambda. I guess the task exits too early or something. Doesn't happen on older versions of Juila, but I haven't tried it with current master yet.

base/asyncmap.jl Show resolved Hide resolved
@richiejp richiejp changed the title [WIP] asyncmap: Include original backtrace in rethrown exception asyncmap: Include original backtrace in rethrown exception Aug 24, 2019
Copy link
Member

@c42f c42f left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks quite neat here now.

TBH I'm not sure whether we should emit CompositeException in the fail-fast case; cancellation seems to be a rich topic in its own right and we've not really scratched the surface in designing such a system for Julia.

(Kotlin, go and python trio all have interesting ideas for structured cancellation. Some interesting reading: https://vorpus.org/blog/timeouts-and-cancellation-for-humans/
https://kotlinlang.org/docs/reference/coroutines/exception-handling.html)

base/asyncmap.jl Outdated Show resolved Hide resolved
end
end
2-element Array{Any,1}:
CapturedException(ErrorException("foo"), Any[(error(::String) at error.jl:33, 1), ...])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see these docs 👍

@richiejp
Copy link
Author

TBH I'm not sure whether we should emit CompositeException in the fail-fast case; cancellation seems to be a rich topic in its own right and we've not really scratched the surface in designing such a system for Julia.

I suppose this is not really fail-fast at the moment, at least relative to a system with (more) cancellation points:

julia> asyncmap([3, 2, 1]) do i
       sleep(0.01 * i)
       error("$i")
       end
ERROR: TaskFailedException:
3
Stacktrace:
 [1] error(::String) at ./error.jl:33
 [2] (::var"##9#10")(::Int64) at ./REPL[2]:3
 [3] (::Base.var"##726#731"{var"##9#10"})(::Base.RefValue{Any}, ::Tuple{Int64}) at ./asyncmap.jl:127
 [4] macro expansion at ./asyncmap.jl:260 [inlined]
 [5] (::Base.var"##742#743"{Base.var"##726#731"{var"##9#10"},Channel{Any},Nothing})() at ./task.jl:333

...and 2 more exception(s).

Stacktrace:
 [1] sync_end(::Array{Any,1}) at ./task.jl:300
 [2] macro expansion at ./task.jl:319 [inlined]
 [3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::Array{Int64,1}) at ./asyncmap.jl:205
 [4] #async_usemap#721 at ./asyncmap.jl:181 [inlined]
 [5] #async_usemap at ./none:0 [inlined]
 [6] #asyncmap#720 at ./asyncmap.jl:108 [inlined]
 [7] asyncmap(::Function, ::Array{Int64,1}) at ./asyncmap.jl:108
 [8] top-level scope at REPL[2]:1

https://vorpus.org/blog/timeouts-and-cancellation-for-humans/

Yes, this looks good. If sleep were a cancellation point and the tasks were all in the same 'nursery', then a cancellation event could be raised for all tasks.

It seems like the task would have to track anything which wait or lock can be called on and know some way to safely interrupt it. I guess calls to sleep can be interrupted by calling close on the Timer or (maybe unsafely) by calling notify on its lock. I'm not sure how one would obtain a reference to the timer or lock though; this probably needs some tracking mechanism implementing.

Anyway, for this PR, I am wondering whether to simply modify the documentation to state that it will be cancelled at the end of each loop (assuming I am not missing anything)? If cancellation points are introduced, this would be a huge breaking change, so removing the composite exception would not matter too much, relatively speaking.

@c42f
Copy link
Member

c42f commented Aug 28, 2019

Anyway, for this PR, I am wondering whether to simply modify the documentation to state that it will be cancelled at the end of each loop (assuming I am not missing anything)? If cancellation points are introduced, this would be a huge breaking change, so removing the composite exception would not matter too much, relatively speaking.

Yes, I think that's a fair comment; for the moment it seems good to be explicit about never loosing exceptions. However, it's also true that there's a particular exception which causes the cancellation (the first one which calls close) so you could use the channel to communicate which task this was and at least put it first in the resulting CompositeException?

Personally I think it would be reasonable to go ahead with what you have here (with updated docs, and perhaps a guarantee about the ordering inside the CompositeException). But it would be interesting to hear from @amitmurthy or other people who are actually using asyncmap.

Some extra thoughts, a little off topic:

If we do introduce a whole system for cancellation and structured concurrency I'm not certain asyncmap would be the right API in that system anyway. This is just a very speculative thought bubble, but it seems somehow oddly non-composable to need something other than map itself.

It's also interesting that kotlin has chosen not to use an equivalent of CompositeException — see https://kotlinlang.org/docs/reference/coroutines/exception-handling.html#exceptions-aggregation. Their argument is that a function should be free to use internal concurrency and that this should be an implementation detail which callers don't need to worry about in terms of which exception is thrown. This seems to be a good point but perhaps the solution should be more powerful exception matching at the catch site rather than suppressing exceptions per se.

@richiejp
Copy link
Author

However, it's also true that there's a particular exception which causes the cancellation (the first one which calls close) so you could use the channel to communicate which task this was and at least put it first in the resulting CompositeException?

I will try it.

If we do introduce a whole system for cancellation and structured concurrency I'm not certain asyncmap would be the right API in that system anyway.

Yes this sounds reasonable, I was thinking, this might be best done as an external library for initial prototyping.

Their argument is that a function should be free to use internal concurrency and that this should be an implementation detail which callers don't need to worry about in terms of which exception is thrown.

If a function fails for a truly unexpected reason (i.e. a real exception, not a timeout from a remote resource or a documented error condition) then a bunch of implementation details should come spilling out. You don't want anything to be hidden. Especially as the error may not be easily reproducible. Plus I think it would be a bit odd if Julia started trying to hide implementation details because it currently doesn't appear to hide anything :-p

This seems to be a good point but perhaps the solution should be more powerful exception matching at the catch site rather than suppressing exceptions per se.

Maybe (I have sometimes wanted this) or it could be that exceptions are being abused for expected errors and a Union return value should be used instead. Also I think it is often best to fail completely when an exception is thrown and use Erlang style error recovery (i.e. turn it off and on again).

@c42f
Copy link
Member

c42f commented Aug 29, 2019

I was thinking, this might be best done as an external library for initial prototyping.

Right. In 1.3 the runtime pieces are in place to start experimenting more seriously with the API. I know other people are interested, for example @tkf has done some prototyping at https://github.com/tkf/Awaits.jl and @vchuravy has #31086. I'd be interested to know about other efforts.

Copy link
Member

@vchuravy vchuravy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a strict improvement, but we do need to think more carefully about structured concurrency (single-process and multi-process interactions)

@@ -595,7 +595,7 @@ let error_thrown = false
try
pmap(x -> x == 50 ? error("foobar") : x, 1:100)
catch e
@test e.captured.ex.msg == "foobar"
@test e.exceptions[1].task.exception.captured.ex.msg == "foobar"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is accessing fairly deep into the implementation details and just recently tripped up DistributedArrays.jl

JuliaParallel/DistributedArrays.jl#212

So either the test should be a series of isa documenting the expected object-chain or you use something like unpack

@richiejp
Copy link
Author

However, it's also true that there's a particular exception which causes the cancellation (the first one which calls close) so you could use the channel to communicate which task this was and at least put it first in the resulting CompositeException?

Sorry I can't figure out how to do this without creating a mess. Also it only solves the problem for a specific case when this seems to be an issue for all asynchronous code. I am wondering if the Task itself should have a termination 'time stamp'; if we want them in chronological order? Then sync_end can sort them. I suppose this may help establish a chain of cause and effect in the cases where the failure of the first task then causes the failure of others.

I suppose another way would be to have an extra error or output channel which could collect any exceptions in order.

FYI I will be away from my computer for a few days.

@c42f
Copy link
Member

c42f commented Sep 4, 2019

I suppose another way would be to have an extra error or output channel which could collect any exceptions in order.

Right, that's what I was thinking; I misunderstood the other uses the code currently makes of chnl.

@richiejp
Copy link
Author

richiejp commented Sep 4, 2019

So I was able to use another channel for delivering errors in order. Back to using CapturedException though, but then technically the task doesn't fail. Still need to fix some other issues.

@richiejp richiejp force-pushed the async-rethrow branch 2 times, most recently from ccbffbc to 9d25661 Compare September 5, 2019 18:58
@richiejp
Copy link
Author

richiejp commented Sep 6, 2019

This seems to work quite well, if everyone is happy with it then I can fixup the commits.

Perhaps Tasks themselves and/or the @sync macro should use a common (scoped) error Channel? It would probably be best though to see how this goes first.

@c42f
Copy link
Member

c42f commented Sep 19, 2019

Heh, thinking too hard about cancellation has got me into all sorts of confusion (#33248)

For the current issue (and for some action we can do in the short term) I've been mulling over the CapturedException vs TaskFailedException distinction. It's kind of annoying that these are almost but not quite the same thing; the former makes good sense when the task acts as a worker for a sequence of async jobs. On the other hand it seems odd to me that we're building a "scheduler on top of a scheduler" in this situation rather than doing a single piece of work per task and relying on the julia scheduler itself.

Overall I think it would be cleaner to go with TaskFailedException and to do a single piece of work per task, allowing the tasks to fail hard. That means ntasks would be reinterpreted as the number of tasks which will be running concurrently rather than the number of tasks actually used to do the work.

Having worked on the actual code, do you think it would be easy to set things up this way? It would mean that we don't use CapturedException at all in Base which seems like a win for simplicity. (Arguably that could even be moved to Distributed in the future, given that it seems designed to be serialized).

@richiejp
Copy link
Author

Assuming Tasks are cheap to create, I think it makes perfect sense to have a one-to-one correspondence between each chunk of asynchronous work and a task. This gives the system scheduler much more information about the extent of the parallelism inherent in a calculation and simplifies asyncmap. If Tasks are not cheap, then they should be.

However, how do we limit the number of concurrent tasks with ntasks unless the scheduler provides some mechanism (say a scheduling token/context with independent limits and queues) or asyncmap implements something to restrict the number of concurrent tasks?

I believe there is precedent for having scheduling contexts in Operating Systems and it could fit with cancellation tokens, but I think it is way outside the scope of this PR. Currently I think it makes sense to limit the number of tasks and have them take work off of a queue.

Also I think I can remove CapturedException by making it so a failed task puts itself on the error channel. Then I can use this to create a TaskFailedException while preserving the chronological order of the errors.

In fact, instead of wrapping the user supplied function in an outer function. It would be possible to create a wrapping task which runs an inner task synchronously. So the inner task just runs the user function as-is and can fail hard producing a TaskFailedException which the outer task can then put on the error channel.

@richiejp
Copy link
Author

richiejp commented Sep 23, 2019

I think the choices are:

  1. A clean, efficient implementation
  2. Output data in the order of input
  3. Errors in the order of completion

Pick two.

Ignoring batching and composite exceptions, the implementation could almost be as simple as the following:

function asyncmap(f, itr, ntasks)
  concur = 0
  tsks = map(itr) do x
    t = @task begin
      y = f(x)
      concur -= 1
      y
    end
    while concur >= ntasks
      yield()
    end
    concur += 1
    schedule(t)
    t
  end
  fetch.(tsks)
end

Regardless of this, I have pushed an update to the current version so that it uses TaskFailedException.

@richiejp
Copy link
Author

Actually it is maybe possible to have something quite clean...

function asyncmap(f, c...; ntasks=0, batch_size=nothing)
    ntasks = max(ntasks, 100)

    do_asyncmap(f, c, ntasks)
end

function do_asyncmap(f, c, ntasks)
    echnl = Channel(ntasks)
    concur = 0
    tasks = Task[]

    for x in zip(c...)
        t = @task try
            f(x...)
        catch
            put!(echnl, current_task())
            rethrow()
        finally
            concur -= 1
        end

        while concur >= ntasks && !isready(echnl)
            yield()
        end

        isready(echnl) && break;

        concur += 1
        schedule(t)
        push!(tasks, t)
    end

    _wait.(tasks)
    close(echnl)

    isready(echnl) &&
        throw(CompositeException(TaskFailedException.(echnl)))

    task_result.(tasks)
end

There doesn't seem to be a way to exit map early, but still return the results. I guess this is possible with a 'take until' iterator.

@richiejp
Copy link
Author

I have completely rewritten it now (syntactically), although I have gone in a bit of a circle to come back round to something more similar to the original.

It now has one task per work item, partial fail-fast semantics, errors are delivered in the order they happen and it is maybe a little shorter. However things would be a lot cleaner if map were simply an alias for collect(Generator(...)).

I still have to add some new tests for ensuring ntasks is obeyed as the old ones just checked the number of tasks which is no longer relevant.

@richiejp richiejp force-pushed the async-rethrow branch 2 times, most recently from 2e349ed to 39410c7 Compare October 3, 2019 09:27
@richiejp
Copy link
Author

richiejp commented Oct 3, 2019

I added more tests, updated the docs and cleaned up the commits. I think/hope it is ready for final review now.

Richard Palethorpe and others added 4 commits November 30, 2019 10:04
Collect the TaskExceptions into a CompositeException.
Avoids more complicated scheduling within asyncmap by limiting the number of
concurrent tasks with a simple counter. The counter may be removed in the
future as the main scheduler advances.
@ericphanson
Copy link
Contributor

I didn’t see this PR at the time, but #42105 does something similar FYI

@richiejp richiejp closed this Jun 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants