|
1 | 1 | # This file is a part of Julia. License is MIT: https://julialang.org/license |
2 | 2 |
|
3 | | -export threadid, nthreads, @threads, @spawn |
| 3 | +export threadid, nthreads, @threads, @spawn, |
| 4 | + threadpool, nthreadpools |
4 | 5 |
|
5 | 6 | """ |
6 | | - Threads.threadid() |
| 7 | + Threads.threadid() -> Int |
7 | 8 |
|
8 | | -Get the ID number of the current thread of execution. The master thread has ID `1`. |
| 9 | +Get the ID number of the current thread of execution. The master thread has |
| 10 | +ID `1`. |
9 | 11 | """ |
10 | 12 | threadid() = Int(ccall(:jl_threadid, Int16, ())+1) |
11 | 13 |
|
12 | | -# Inclusive upper bound on threadid() |
13 | 14 | """ |
14 | | - Threads.nthreads() |
| 15 | + Threads.nthreads([:default|:interactive]) -> Int |
15 | 16 |
|
16 | | -Get the number of threads available to the Julia process. This is the inclusive upper bound |
17 | | -on [`threadid()`](@ref). |
| 17 | +Get the number of threads (across all thread pools or within the specified |
| 18 | +thread pool) available to Julia. The number of threads across all thread |
| 19 | +pools is the inclusive upper bound on [`threadid()`](@ref). |
18 | 20 |
|
19 | 21 | See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the |
20 | 22 | [`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the |
21 | 23 | [`Distributed`](@ref man-distributed) standard library. |
22 | 24 | """ |
| 25 | +function nthreads end |
| 26 | + |
23 | 27 | nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint))) |
| 28 | +function nthreads(pool::Symbol) |
| 29 | + if pool == :default |
| 30 | + tpid = Int8(0) |
| 31 | + elseif pool == :interactive |
| 32 | + tpid = Int8(1) |
| 33 | + else |
| 34 | + error("invalid threadpool specified") |
| 35 | + end |
| 36 | + return _nthreads_in_pool(tpid) |
| 37 | +end |
| 38 | +function _nthreads_in_pool(tpid::Int8) |
| 39 | + p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint})) |
| 40 | + return Int(unsafe_load(p, tpid + 1)) |
| 41 | +end |
| 42 | + |
| 43 | +""" |
| 44 | + Threads.threadpool(tid = threadid()) -> Symbol |
| 45 | +
|
| 46 | +Returns the specified thread's threadpool; either `:default` or `:interactive`. |
| 47 | +""" |
| 48 | +function threadpool(tid = threadid()) |
| 49 | + tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1) |
| 50 | + return tpid == 0 ? :default : :interactive |
| 51 | +end |
| 52 | + |
| 53 | +""" |
| 54 | + Threads.nthreadpools() -> Int |
| 55 | +
|
| 56 | +Returns the number of threadpools currently configured. |
| 57 | +""" |
| 58 | +nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint))) |
| 59 | + |
24 | 60 |
|
25 | 61 | function threading_run(fun, static) |
26 | 62 | ccall(:jl_enter_threaded_region, Cvoid, ()) |
@@ -48,7 +84,7 @@ function _threadsfor(iter, lbody, schedule) |
48 | 84 | quote |
49 | 85 | local threadsfor_fun |
50 | 86 | let range = $(esc(range)) |
51 | | - function threadsfor_fun(tid=1; onethread=false) |
| 87 | + function threadsfor_fun(tid = 1; onethread = false) |
52 | 88 | r = range # Load into local variable |
53 | 89 | lenr = length(r) |
54 | 90 | # divide loop iterations among threads |
@@ -232,35 +268,63 @@ macro threads(args...) |
232 | 268 | end |
233 | 269 |
|
234 | 270 | """ |
235 | | - Threads.@spawn expr |
| 271 | + Threads.@spawn [:default|:interactive] expr |
236 | 272 |
|
237 | | -Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available thread. |
238 | | -The task is allocated to a thread after it becomes available. To wait for the task |
239 | | -to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to |
240 | | -wait and then obtain its return value. |
| 273 | +Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available |
| 274 | +thread in the specified threadpool (`:default` if unspecified). The task is |
| 275 | +allocated to a thread once one becomes available. To wait for the task to |
| 276 | +finish, call [`wait`](@ref) on the result of this macro, or call |
| 277 | +[`fetch`](@ref) to wait and then obtain its return value. |
241 | 278 |
|
242 | | -Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the |
243 | | -constructed underlying closure. This allows you to insert the _value_ of a variable, |
244 | | -isolating the asynchronous code from changes to the variable's value in the current task. |
| 279 | +Values can be interpolated into `@spawn` via `\$`, which copies the value |
| 280 | +directly into the constructed underlying closure. This allows you to insert |
| 281 | +the _value_ of a variable, isolating the asynchronous code from changes to |
| 282 | +the variable's value in the current task. |
245 | 283 |
|
246 | 284 | !!! note |
247 | | - See the manual chapter on threading for important caveats. |
| 285 | + See the manual chapter on [multi-threading](@ref man-multithreading) |
| 286 | + for important caveats. See also the chapter on [threadpools](@ref man-threadpools). |
248 | 287 |
|
249 | 288 | !!! compat "Julia 1.3" |
250 | 289 | This macro is available as of Julia 1.3. |
251 | 290 |
|
252 | 291 | !!! compat "Julia 1.4" |
253 | 292 | Interpolating values via `\$` is available as of Julia 1.4. |
| 293 | +
|
| 294 | +!!! compat "Julia 1.9" |
| 295 | + A threadpool may be specified as of Julia 1.9. |
254 | 296 | """ |
255 | | -macro spawn(expr) |
256 | | - letargs = Base._lift_one_interp!(expr) |
| 297 | +macro spawn(args...) |
| 298 | + tpid = Int8(0) |
| 299 | + na = length(args) |
| 300 | + if na == 2 |
| 301 | + ttype, ex = args |
| 302 | + if ttype isa QuoteNode |
| 303 | + ttype = ttype.value |
| 304 | + elseif ttype isa Symbol |
| 305 | + # TODO: allow unquoted symbols |
| 306 | + ttype = nothing |
| 307 | + end |
| 308 | + if ttype === :interactive |
| 309 | + tpid = Int8(1) |
| 310 | + elseif ttype !== :default |
| 311 | + throw(ArgumentError("unsupported threadpool in @spawn: $ttype")) |
| 312 | + end |
| 313 | + elseif na == 1 |
| 314 | + ex = args[1] |
| 315 | + else |
| 316 | + throw(ArgumentError("wrong number of arguments in @spawn")) |
| 317 | + end |
| 318 | + |
| 319 | + letargs = Base._lift_one_interp!(ex) |
257 | 320 |
|
258 | | - thunk = esc(:(()->($expr))) |
| 321 | + thunk = esc(:(()->($ex))) |
259 | 322 | var = esc(Base.sync_varname) |
260 | 323 | quote |
261 | 324 | let $(letargs...) |
262 | 325 | local task = Task($thunk) |
263 | 326 | task.sticky = false |
| 327 | + ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid) |
264 | 328 | if $(Expr(:islocal, var)) |
265 | 329 | put!($var, task) |
266 | 330 | end |
|
0 commit comments