Skip to content

Commit

Permalink
Add a threadpool parameter to Channel constructor (#50858)
Browse files Browse the repository at this point in the history
Without this, the task created by a `Channel` will run in the threadpool
of the creating task; in the REPL, this could be the interactive
threadpool.

On 1.8, without threadpools:
```julia
% julia +1.8 -t 8  
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.8.5 (2023-01-08)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> for _ in 1:10
           Channel{Int}(1; spawn=true) do _
               Core.print("threadid=$(Threads.threadid())\n")
           end
       end

threadid=2
threadid=5
threadid=2
threadid=2
threadid=1
threadid=6
threadid=7
threadid=8
threadid=3
threadid=4
```

On 1.9, with no interactive threads:
```julia
% julia +1.9 -t 8
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.9.2 (2023-07-05)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> for _ in 1:10
           Channel{Int}(1; spawn=true) do _
               Core.print("threadid=$(Threads.threadid())\n")
           end
       end

threadid=4
threadid=4
threadid=4
threadid=2
threadid=3
threadid=1
threadid=7
threadid=5
threadid=8
threadid=6
```
On 1.9, with an interactive thread:
```julia
% julia +1.9 -t 7,1
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.9.2 (2023-07-05)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> for _ in 1:10
           Channel{Int}(1; spawn=true) do _
               Core.print("threadid=$(Threads.threadid())\n")
           end
       end
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
```

With this PR, the `:default` threadpool is used instead.
```julia
% julia +master -t7,1
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.11.0-DEV.244 (2023-08-09)
 _/ |\__'_|_|_|\__'_|  |  Commit d99f249* (0 days old master)
|__/                   |

julia> for _ in 1:10
           Channel{Int}(1; spawn=true) do _
               Core.print("threadid=$(Threads.threadid())\n")
           end
       end

threadid=7
threadid=6
threadid=7
threadid=7
threadid=6
threadid=3
threadid=5
threadid=2
threadid=4
threadid=8
```
And, the behavior can be overridden.
```julia
% julia +master -t7,1
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.11.0-DEV.244 (2023-08-09)
 _/ |\__'_|_|_|\__'_|  |  Commit d99f249* (0 days old master)
|__/                   |

julia> for _ in 1:10
           Channel{Int}(1; spawn=true, threadpool=:interactive) do _
               Core.print("threadid=$(Threads.threadid())\n")
           end
       end
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
threadid=1
```

---------

Co-authored-by: Nathan Daly <[email protected]>
  • Loading branch information
kpamnany and NHDaly authored Aug 11, 2023
1 parent cda570e commit 555cd23
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
28 changes: 21 additions & 7 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Channel(sz=0) = Channel{Any}(sz)

# special constructors
"""
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Create a new task from `func`, bind it to a new channel of type
`T` and size `size`, and schedule the task, all in a single call.
Expand All @@ -70,9 +70,14 @@ The channel is automatically closed when the task terminates.
If you need a reference to the created task, pass a `Ref{Task}` object via
the keyword argument `taskref`.
If `spawn = true`, the Task created for `func` may be scheduled on another thread
If `spawn=true`, the `Task` created for `func` may be scheduled on another thread
in parallel, equivalent to creating a task via [`Threads.@spawn`](@ref).
If `spawn=true` and the `threadpool` argument is not set, it defaults to `:default`.
If the `threadpool` argument is set (to `:default` or `:interactive`), this implies
that `spawn=true` and the new Task is spawned to the specified threadpool.
Return a `Channel`.
# Examples
Expand Down Expand Up @@ -117,6 +122,9 @@ true
In earlier versions of Julia, Channel used keyword arguments to set `size` and `T`, but
those constructors are deprecated.
!!! compat "Julia 1.9"
The `threadpool=` argument was added in Julia 1.9.
```jldoctest
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
Expand All @@ -129,12 +137,18 @@ julia> String(collect(chnl))
"hello world"
```
"""
function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false) where T
function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing) where T
chnl = Channel{T}(size)
task = Task(() -> func(chnl))
if threadpool === nothing
threadpool = :default
else
spawn = true
end
task.sticky = !spawn
bind(chnl, task)
if spawn
Threads._spawn_set_thrpool(task, threadpool)
schedule(task) # start it on (potentially) another thread
else
yield(task) # immediately start it, yielding the current thread
Expand All @@ -149,17 +163,17 @@ Channel(func::Function, args...; kwargs...) = Channel{Any}(func, args...; kwargs
# of course not deprecated.)
# We use `nothing` default values to check which arguments were set in order to throw the
# deprecation warning if users try to use `spawn=` with `ctype=` or `csize=`.
function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing)
function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing, threadpool=nothing)
# The spawn= keyword argument was added in Julia v1.3, and cannot be used with the
# deprecated keyword arguments `ctype=` or `csize=`.
if (ctype !== nothing || csize !== nothing) && spawn !== nothing
throw(ArgumentError("Cannot set `spawn=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false)` instead!"))
if (ctype !== nothing || csize !== nothing) && (spawn !== nothing || threadpool !== nothing)
throw(ArgumentError("Cannot set `spawn=` or `threadpool=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false, threadpool=nothing)` instead!"))
end
# Set the actual default values for the arguments.
ctype === nothing && (ctype = Any)
csize === nothing && (csize = 0)
spawn === nothing && (spawn = false)
return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn)
return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn, threadpool=threadpool)
end

closed_exception() = InvalidStateException("Channel is closed.", :closed)
Expand Down
14 changes: 14 additions & 0 deletions test/channel_threadpool.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test
using Base.Threads

@testset "Task threadpools" begin
c = Channel{Symbol}() do c; put!(c, threadpool(current_task())); end
@test take!(c) === threadpool(current_task())
c = Channel{Symbol}(spawn = true) do c; put!(c, threadpool(current_task())); end
@test take!(c) === :default
c = Channel{Symbol}(threadpool = :interactive) do c; put!(c, threadpool(current_task())); end
@test take!(c) === :interactive
@test_throws ArgumentError Channel{Symbol}(threadpool = :foo) do c; put!(c, :foo); end
end
5 changes: 5 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ end
@test taskref[].sticky == false
@test collect(c) == [0]
end
let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no channel_threadpool.jl`
new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = "1,1"
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end

@testset "multiple concurrent put!/take! on a channel for different sizes" begin
function testcpt(sz)
Expand Down

0 comments on commit 555cd23

Please sign in to comment.