Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@catch, retry, partition, asyncmap and pmap #15409

Merged
merged 2 commits into from
Apr 14, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ Breaking changes
is now divided among the fields `code`, `slotnames`, `slottypes`, `slotflags`,
`gensymtypes`, `rettype`, `nargs`, and `isva` in the `LambdaInfo` type ([#15609]).

* `pmap` keyword arguments `err_retry=true` and `err_stop=false` are deprecated.
`pmap` no longer retries or returns `Exception` objects in the result collection.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"retries nor returns"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion on this. My feeling is that if it was "neither retries", then it would be "nor returns". I believe that in the "does not verb or verb" case, there is no hard rule about "or" or "nor". When I read it out aloud as written, it sounds better to me with "or" rather than "nor". However, I'm Australian, so what sounds normal to me would make an Englishman cringe! Happy to change it if you feel strongly ...

`pmap(retry(f), c)` or `pmap(@catch(f), c)` can be used instead.
([#15409](https://github.com/JuliaLang/julia/pull/15409#discussion_r57494701)).


Library improvements
--------------------

Expand Down
187 changes: 187 additions & 0 deletions base/asyncmap.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license


"""
AsyncCollector(f, results, c...; ntasks=100) -> iterator

Apply f to each element of c using at most 100 asynchronous tasks.
For multiple collection arguments, apply f elementwise.
Output is collected into "results".

Note: `next(::AsyncCollector, state) -> (nothing, state)`

Note: `for task in AsyncCollector(f, results, c...) end` is equivalent to
`map!(f, results, c...)`.
"""
type AsyncCollector
f
results
enumerator::Enumerate
ntasks::Int
end

function AsyncCollector(f, results, c...; ntasks=0)
if ntasks == 0
ntasks = 100
end
AsyncCollector(f, results, enumerate(zip(c...)), ntasks)
end


type AsyncCollectorState
enum_state
active_count::Int
task_done::Condition
done::Bool
end


# Busy if the maximum number of concurrent tasks is running.
function isbusy(itr::AsyncCollector, state::AsyncCollectorState)
state.active_count == itr.ntasks
end


# Wait for @async task to end.
wait(state::AsyncCollectorState) = wait(state.task_done)


# Open a @sync block and initialise iterator state.
function start(itr::AsyncCollector)
sync_begin()
AsyncCollectorState(start(itr.enumerator), 0, Condition(), false)
end

# Close @sync block when iterator is done.
function done(itr::AsyncCollector, state::AsyncCollectorState)
if !state.done && done(itr.enumerator, state.enum_state)
state.done = true
sync_end()
end
return state.done
end

function next(itr::AsyncCollector, state::AsyncCollectorState)

# Wait if the maximum number of concurrent tasks are already running...
while isbusy(itr, state)
wait(state)
end

# Get index and mapped function arguments from enumeration iterator...
(i, args), state.enum_state = next(itr.enumerator, state.enum_state)

# Execute function call and save result asynchronously...
@async begin
itr.results[i] = itr.f(args...)
state.active_count -= 1
notify(state.task_done, nothing)
end

# Count number of concurrent tasks...
state.active_count += 1

return (nothing, state)
end



"""
AsyncGenerator(f, c...; ntasks=100) -> iterator

Apply f to each element of c using at most 100 asynchronous tasks.
For multiple collection arguments, apply f elementwise.
Results are returned by the iterator as they become available.
Note: `collect(AsyncGenerator(f, c...; ntasks=1))` is equivalent to
`map(f, c...)`.
"""
type AsyncGenerator
collector::AsyncCollector
end

function AsyncGenerator(f, c...; ntasks=0)
AsyncGenerator(AsyncCollector(f, Dict{Int,Any}(), c...; ntasks=ntasks))
end


type AsyncGeneratorState
i::Int
async_state::AsyncCollectorState
end


start(itr::AsyncGenerator) = AsyncGeneratorState(0, start(itr.collector))

# Done when source async collector is done and all results have been consumed.
function done(itr::AsyncGenerator, state::AsyncGeneratorState)
done(itr.collector, state.async_state) && isempty(itr.collector.results)
end

# Pump the source async collector if it is not already busy...
function pump_source(itr::AsyncGenerator, state::AsyncGeneratorState)
if !isbusy(itr.collector, state.async_state) &&
!done(itr.collector, state.async_state)
ignored, state.async_state = next(itr.collector, state.async_state)
return true
else
return false
end
end

function next(itr::AsyncGenerator, state::AsyncGeneratorState)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a line break is in order here, even if we allowed the exception of blocks starting with a comment.

state.i += 1

results = itr.collector.results
while !haskey(results, state.i)

# Wait for results to become available...
if !pump_source(itr,state) && !haskey(results, state.i)
wait(state.async_state)
end
end
r = results[state.i]
delete!(results, state.i)

return (r, state)
end

iteratorsize(::Type{AsyncGenerator}) = SizeUnknown()


"""
asyncgenerate(f, c...) -> iterator

Apply `@async f` to each element of `c`.

For multiple collection arguments, apply f elementwise.

Results are returned in order as they become available.
"""
asyncgenerate(f, c...) = AsyncGenerator(f, c...)


"""
asyncmap(f, c...) -> collection

Transform collection `c` by applying `@async f` to each element.

For multiple collection arguments, apply f elementwise.
"""
asyncmap(f, c...) = collect(asyncgenerate(f, c...))


"""
asyncmap!(f, c)

In-place version of `asyncmap()`.
"""
asyncmap!(f, c) = (for x in AsyncCollector(f, c, c) end; c)


"""
asyncmap!(f, results, c...)

Like `asyncmap()`, but stores output in `results` rather returning a collection.
"""
asyncmap!(f, r, c1, c...) = (for x in AsyncCollector(f, r, c1, c...) end; r)
28 changes: 28 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1003,5 +1003,33 @@ export call
# 1933
@deprecate_binding SingleAsyncWork AsyncCondition


# #12872
@deprecate istext istextmime

#15409
function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here (and likely elsewhere).

if err_retry != nothing
depwarn("`err_retry` is deprecated, use `pmap(retry(f), c...)`.", :pmap)
if err_retry == true
f = retry(f)
end
end

if err_stop != nothing
depwarn("`err_stop` is deprecated, use `pmap(@catch(f), c...).", :pmap)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing closing backquote. BTW, are you sure backquotes in error messages are the usual way of printing exceptions? I think I made a PR about that ages ago which was rejected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the backquotes.

if err_stop == false
f = @catch(f)
end
end

if pids == nothing
p = default_worker_pool()
else
depwarn("`pids` is deprecated, use `pmap(::WorkerPool, f, c...).", :pmap)
p = WorkerPool(pids)
end

return pmap(p, f, c...)
end
15 changes: 0 additions & 15 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,6 @@ If `types` is specified, returns an array of methods whose types match.
"""
methods

"""
pmap(f, lsts...; err_retry=true, err_stop=false, pids=workers())

Transform collections `lsts` by applying `f` to each element in parallel. (Note that
`f` must be made available to all worker processes; see [Code Availability and Loading Packages](:ref:`Code Availability and Loading Packages <man-parallel-computing-code-availability>`)
for details.) If `nprocs() > 1`, the calling process will be dedicated to assigning tasks.
All other available processes will be used as parallel workers, or on the processes
specified by `pids`.

If `err_retry` is `true`, it retries a failed application of `f` on a different worker. If
`err_stop` is `true`, it takes precedence over the value of `err_retry` and `pmap` stops
execution on the first error.
"""
pmap

"""
workers()

Expand Down
60 changes: 60 additions & 0 deletions base/error.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,63 @@ macro assert(ex, msgs...)
end
:($(esc(ex)) ? $(nothing) : throw(Main.Base.AssertionError($msg)))
end


"""
retry(f, [condition]; n=3; max_delay=10) -> Function

Returns a lambda that retries function `f` up to `n` times in the
event of an exception. If `condition` is a `Type` then retry only
for exceptions of that type. If `condition` is a function
`cond(::Exception) -> Bool` then retry only if it is true.

# Examples
```julia
retry(http_get, e -> e.status == "503")(url)
retry(read, UVError)(io)
```
"""
function retry(f::Function, condition::Function=e->true;
n::Int=3, max_delay::Int=10)

(args...) -> begin
delay = 0.05
for i = 1:n
try
return f(args...)
catch e
if i == n || try condition(e) end != true
rethrow(e)
end
end
sleep(delay * (0.8 + (rand() * 0.4)))
delay = min(max_delay, delay * 5)
end
end
end

retry(f::Function, t::Type; kw...) = retry(f, e->isa(e, t); kw...)


"""
@catch(f) -> Function

Returns a lambda that executes `f` and returns either the result of `f` or
an `Exception` thrown by `f`.

# Examples
```julia
julia> r = @catch(length)([1,2,3])
3

julia> r = @catch(length)()
MethodError(length,())

julia> typeof(r)
MethodError
```
"""
catchf(f) = (args...) -> try f(args...) catch ex; ex end
macro catch(f)
esc(:(Base.catchf($f)))
end
5 changes: 5 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1045,9 +1045,11 @@ export
# errors
assert,
backtrace,
@catch,
catch_backtrace,
error,
rethrow,
retry,
systemerror,

# stack traces
Expand Down Expand Up @@ -1211,7 +1213,9 @@ export

# multiprocessing
addprocs,
asyncmap,
ClusterManager,
default_worker_pool,
fetch,
init_worker,
interrupt,
Expand All @@ -1233,6 +1237,7 @@ export
timedwait,
wait,
workers,
WorkerPool,

# multimedia I/O
Display,
Expand Down
10 changes: 10 additions & 0 deletions base/generator.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ function next(g::Generator, s)
g.f(v), s2
end


"""
generate(f, c...) -> iterator

Return an iterator applying `f` to each element of `c`.
For multiple collection arguments, apply f elementwise.
"""
generate(f, c...) = Generator(f, c...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don't export this. It's a very small wrapper and unfortunately close in syntax to @generated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call Generator directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, because it feels to me like it should not be necessary to think about types to use a Julia API.

It seems like there is a greater burden of understanding if I have to call a type constructor. If I'm making something explicitly this type, what else do I have to understand about this type?

If I just call a function, the type will be whatever clever type the method dispatch system figures out is best for the particular situation. Maybe generate(f, c...) will return a different type for some values of f and c....

Note, that this is not actually called anywhere. It is included mostly for completeness to establish the pattern followed by asyncgenerate and pgenerate.

Also, there are precedents for having a verb function to create an iterator: zip, filter, enumerate etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Clearly, we don't know what policy to follow as regards the naming of iterators and functions that create them...

Anyway, the docs should explicitly say "Return an iterator applying f to each element of c.".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that what -> iterator implies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough. done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just #10452 - though "not having to think about types" sounds like you're using the wrong language. You have to think about types all the time with Julia, and unless there are actual examples of a lowercase function API returning different types for different inputs, exporting a new function that's just a trivial wrapper around an existing type is not necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tkelman I agree with you, but in #10162 no clear decision was taken. I would have preferred to have only upper case constructors for all this family of iterators which are always of the same type. Maybe restart the discussion there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't you want to allow generate to return any one of FallbackGenerator, GeneratorSpecialisedForVectors, GeneratorOptimisedForInputOfKnownLength, GeneratorOptimisedForSingleInputCollection, etc, etc. ?

julia> typeof(zip([1]))
Base.Zip1{Array{Int64,1}}

julia> typeof(zip([1],[2]))
Base.Zip2{Array{Int64,1},Array{Int64,1}}

julia> typeof(zip([1],[2],[3]))
Zip{Array{Int64,1},Base.Zip2{Array{Int64,1},Array{Int64,1}}}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we had any of the latter, I might agree with you. At the moment we don't, so it's redundant to start creating and using API's that don't need to exist yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definition serves no purpose. As long as this is the only method of generate, there's no need for this to have 2 names. Same with asyncgenerate.



## iterator traits

abstract IteratorSize
Expand Down
Loading