Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Threads.foreach for convenient multithreaded Channel consumption #34543

Merged
merged 19 commits into from
May 28, 2020

Conversation

jrevels
Copy link
Member

@jrevels jrevels commented Jan 27, 2020

This is a spiritual successor to #34185, but is a bit more minimal. The motivation came about when I realized #34185 was a wonky solution to a pattern that had started cropping up for me: I wanted a multithreaded foreach(f, channel) where I could communicate multiple results per f call by put!ing them individually to a closed-over Channel (so not a direct map). While #34185 could be hackily employed for this, I thought it was overkill so I ripped this out instead. Once I started using it, I realized I liked it a bit more.

Unlike #34185, it's only defined on Channels, not arbitrary iterators. It could be extended to arbitrary iterators if we wanted via #34185's locking approach; I just thought it'd be better to keep things simple from the get-go. I could also loosen the dispatch constraint to AbstractChannel and just document the expectation of threadsafe iteration.

cc @tkf, who made a wonderful Transducers implementation of #34185

@jrevels jrevels changed the title add threaded_foreach function to convenient multithreaded Channel consumption add threaded_foreach for convenient multithreaded Channel consumption Jan 27, 2020
@ararslan ararslan requested a review from JeffBezanson January 27, 2020 22:32
@ararslan ararslan added multithreading Base.Threads and related functionality needs compat annotation Add !!! compat "Julia x.y" to the docstring needs news A NEWS entry is required for this change needs tests Unit tests are required for this change labels Jan 27, 2020
base/channels.jl Outdated Show resolved Hide resolved
@tkf
Copy link
Member

tkf commented Jan 27, 2020

it's only defined on Channels, not arbitrary iterators

It'd be really nice if we had Channel(iterator) overload but unfortunately we already have Channel(callable). I've been wondering if it makes sense to add a function (say) into(::Type{T}, iterator) :: T with methods like into(::Type{Channel}, iterator) and into(::Type{<:AbstractVector}, iterator). Then, we can just do threaded_foreach(f, into(Channel, iterator)).

base/channels.jl Outdated
If `async` is `false`, this function will `wait` for all internally spawned tasks
to complete before returning.
"""
function threaded_foreach(f, channel::Channel; ntasks=Threads.nthreads(), async=true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use the name Threads.foreach, like we have Iterators.filter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ref: @vchuravy suggested to use C++ -like policy-based interface #34185 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use the name Threads.foreach, like we have Iterators.filter?

That would be nice.

One tricky detail (but not an actual problem) is that writing code that uses Threads.@spawn from within Threads is pretty wonky right now because the macro depends on functions that aren't defined until later (in tasks.jl) and Channels also aren't defined until later. Clearly can churn things around to be defined in the right order but might involve some bikeshedding to untangle the dependencies

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't know that. I guess one way to work around the problem is to just declare the "stub" function like this?

module Threads
function foreach end
...
end

# later
function Threads.foreach(f, channel::Channel)
    ...
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed the above change, since I like Threads.foreach and the stub approach is way less hassle/churn than actually shuffling code :p

Folks should, of course, feel free to continue bikeshedding the name.

base/channels.jl Outdated
tasks = map(1:ntasks) do _
Threads.@spawn begin
for item in channel
wait(Threads.@spawn f(item))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a bit too tangent, but FYI I noticed you can get a bit more speedup JuliaFolds/Transducers.jl#123 (comment) if you collect the elements into a buffer Vector and then iterate over it. You may want to improve latency instead of throughput so I guess it's not always a better choice. But I wonder if it makes sense to add a "buffer size" option to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indeed sounds like a useful option! I'd probably want to fiddle around with it in a follow-up PR, though.

@JeffBezanson JeffBezanson changed the title add threaded_foreach for convenient multithreaded Channel consumption add Threads.foreach for convenient multithreaded Channel consumption Jan 30, 2020
@jrevels
Copy link
Member Author

jrevels commented Feb 4, 2020

Added some extremely basic tests - let me know if folks have other testing ideas and I'll pop 'em in.

@jrevels jrevels marked this pull request as ready for review February 4, 2020 19:43
@jrevels
Copy link
Member Author

jrevels commented Feb 4, 2020

I've been wondering if it makes sense to add a function (say) into(::Type{T}, iterator) :: T with methods like into(::Type{Channel}, iterator) and into(::Type{<:AbstractVector}, iterator).

In this vein, I feel like I often want shorthand Channel{T}(c -> foreach(i -> put!(i, c), iterator), ...) it's not that long really, but the chaining usually makes my eyeballs do a double-scan :p

It might be nice to be able to write e.g. Channel{T}(from(iterator), ...) for this, but from(x) = c -> foreach(i -> put!(i, c), x) is so short and easy to write that I almost hesitate to suggest adding it to Base...maybe something with a less generic/more descriptive name like puteach(iterator) wouldn't be bad though.

@jrevels jrevels removed the needs tests Unit tests are required for this change label Feb 4, 2020
base/channels.jl Outdated Show resolved Hide resolved
@tkf
Copy link
Member

tkf commented Feb 5, 2020

It might be nice to be able to write e.g. Channel{T}(from(iterator), ...) for this, but from(x) = c -> foreach(i -> put!(i, c), x) is so short and easy to write that I almost hesitate to suggest adding it to Base

It looks like from is a nice useful one-linear but I agree it's not enough for adding it to Base.

@tkf
Copy link
Member

tkf commented Feb 6, 2020

What this API should do for error handling? If the user function f throws, I suppose we should stop all the tasks immediately and propagate the error to the caller? If so, I guess we need something like this?

function Threads.foreach(f, channel::Channel; ntasks = Threads.nthreads())
    @sync for _ in 1:ntasks
        Threads.@spawn for x in channel
            @sync Threads.@spawn try
                f(x)
            catch
                close(channel)
                rethrow()
            end
        end
    end
end

One question is if it is OK to close the given channel on error. If not, we need something like below.

If we are to include generic non-Channel fallback:

function Threads.foreach(f, itr; ntasks = Threads.nthreads())
    closing = Threads.Atomic{Bool}(false)
    @sync begin
        if itr isa Channel
            # Remove this branch if we are not closing given channel
            # `itr` on error.
            channel = itr
        else
            channel = Channel{eltype(itr)}()
            # Not using `Channel{}(f)` to avoid reporting error twice:
            @async try
                for x in itr
                    try
                        put!(channel, x)
                    catch
                        closing[] && return
                        closing[] = true
                        close(channel)
                        rethrow()
                    end
                end
            finally
                close(channel)
            end
        end
        for _ in 1:ntasks
            Threads.@spawn for x in channel
                @sync Threads.@spawn try
                    f(x)
                catch
                    closing[] = true
                    close(channel)
                    rethrow()
                end
            end
        end
    end
end

(I think we can avoid using @async and do the put!ing in the root task. But it's a bit hairy as @sync is not exception-safe.)

@jrevels
Copy link
Member Author

jrevels commented Feb 7, 2020

I suppose we should stop all the tasks immediately and propagate the error to the caller

Yeah, that'd probably be the right thing to do.

One question is if it is OK to close the given channel on error.

Eh, this would go against my expectation as a caller - is this generally desirable for a reason I might be missing? It makes sense to me if a use case treated "dropped" elements (i.e. elements removed from the channel but not fully consumed due to the error) as a corruption or something like that, but in that case, the caller should probably just wrap this in a try-catch and close the channel themselves if that's how they desire to handle the situation, right?

@jrevels
Copy link
Member Author

jrevels commented Feb 7, 2020

is this generally desirable for a reason I might be missing

Oh, you probably mean to prevent other threads from taking objects before we get a chance to kill them? Hmm. Yeah, it would be ideal if there were a different mechanism for that IMO though I obviously haven't thought about this much 😅

@tkf
Copy link
Member

tkf commented Feb 8, 2020

It was a genuine question as I don't know which one is better.

the caller should probably just wrap this in a try-catch and close the channel themselves if that's how they desire to handle the situation, right?

Ah, I think this is a good point. It's easy to mimic close-on-error behavior on top of non-closing Threads.foreach. However, inserting a "shim" channel to opt-out close-on-error behavior takes more code.

@jrevels
Copy link
Member Author

jrevels commented Feb 28, 2020

Bump. @JeffBezanson thoughts on this PR (especially the error-handling discussion above?)

@ararslan
Copy link
Member

Re-bump. It would be really nice to be able to use this.

@ararslan ararslan requested a review from vtjnash March 26, 2020 17:50
@tkf
Copy link
Member

tkf commented Mar 26, 2020

(I'm thinking to add this overload to ThreadsX.foreach but I feel bad to steal @jrevels's idea twice 😄)

@jrevels
Copy link
Member Author

jrevels commented Mar 30, 2020

(I'm thinking to add this overload to ThreadsX.foreach but I feel bad to steal @jrevels's idea twice 😄)

Go for it! 😁

@jrevels
Copy link
Member Author

jrevels commented Mar 31, 2020

@JeffBezanson / @vtjnash if either of you do get some time to looks at this, IMO it's essentially mergeable as-is pending approval.

The only remaining decision point AFAICT (discussed above) is whether we should/need to add some exception handling, i.e. to prevent threads from taking objects from the channel before we get a chance to kill them in the event of an error. I'm not sure we do - seems like the caller can handle a good chunk of that if they need to - but would love to hear your thoughts, especially if you have any ideas that would make exception handling here more ergonomic

@ararslan ararslan added the triage This should be discussed on a triage call label Apr 16, 2020
Copy link
Member

@tkf tkf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM other than the minor point about the test.

test/threads_exec.jl Show resolved Hide resolved
base/threads_overloads.jl Outdated Show resolved Hide resolved
@jrevels
Copy link
Member Author

jrevels commented May 26, 2020

Assuming no further feedback, it seems like Threads.foreach has iterated towards a fairly uncontroversial state by this point. I'll plan on merging Thursday morning-ish unless there are further concerns; would still very much welcome additional review if folks see something over the next couple days that jumps out at them. 🙂

@jrevels jrevels force-pushed the jr/threaded_foreach branch from 52efaa5 to dcf47e1 Compare May 26, 2020 11:17
@jrevels jrevels merged commit 39fc4ee into master May 28, 2020
@jrevels jrevels deleted the jr/threaded_foreach branch May 28, 2020 16:58
@jrevels jrevels removed the triage This should be discussed on a triage call label May 28, 2020
simeonschaub pushed a commit to simeonschaub/julia that referenced this pull request Aug 11, 2020
JuliaLang#34543)

Co-authored-by: Takafumi Arakaki <[email protected]>
Co-authored-by: Alex Arslan <[email protected]>
Co-authored-by: Valentin Churavy <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multithreading Base.Threads and related functionality
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants