ThreadPools.jl Documentation
Improved thread management for background and nonuniform tasks
A simple package that creates a few functions mimicked from Base
(bgforeach
, bgmap
, and @bgthreads
) that behave like the originals but generate spawned tasks that stay purely on background threads. For better throughput with more uniform tasks, fgforeach
, fgmap
, and @fgthreads
are also provided, and logging versions of all of the above and the Base.Threads.@threads
macro are included for tuning purposes.
Overview
As of v1.3.1, Julia does not have any built-in mechanisms for keeping computational threads off of the primary thread. For many use cases, this restriction is not important - usually, pure computational activities will run faster using all threads. But in some cases, we may want to keep the primary thread free of blocking tasks. For example, a GUI running on the primary thread will become unresponsive if a computational task hits. For another, parallel computations with very nonuniform processing times can benefit from sacrificing the primary thread to manage the loads on the remaining ones.
ThreadPools is a simple package that allows background-only Task assignment for cases where this makes sense. The standard foreach
, map
, and @threads
functions are mimicked, adding a bg
prefix to each to denote background operation: bgforeach
, bgmap
, and @bgthreads
. Code that runs with one of those Base functions should run just fine with the bg
prepended, but adding multithreading for free in the foreach
and map
cases, and in all cases keeping the primary thread free of blocking Tasks.
When the user would still like to include the primary thread, fg
versions of the above functions are provided: fgforeach
, fgmap
, and @fgthreads
. These can provide a little more throughput, though there will be occasional interruption of the thread management by the spawned tasks. Finally, all of the above have a logged counterpart: ThreadPools.logbgforeach
, ThreadPools.logbgmap
, ThreadPools.@logbgthreads
, ThreadPools.logfgforeach
, ThreadPools.logfgmap
, and ThreadPools.@logfgthreads
.
Usage
Each of the simple API functions can be used like the Base
versions of the same function, with a bg
prepended to the function:
julia> bgforeach([1,2,3]) do x
println("\$(x+1) \$(Threads.threadid())")
end
3 3
4 4
2 2
julia> bgmap([1,2,3]) do x
println("\$x \$(Threads.threadid())")
x^2
end
2 3
3 4
1 2
3-element Array{Int64,1}:
1
4
9
julia> @bgthreads for x in 1:3
println("\$x \$(Threads.threadid())")
end
2 3
3 4
1 2
For an example of a more complex load-management scenario, see examples/stackdemo.jl
.
Logger Usage
The logging versions of the functions take in an IO as the log, or and string that will cause a new file to be created and used by the log. The readlog
and showactivity
functions help visualize the activity (here, a 4-thread system using the primary with fgforeach
):
julia> ThreadPools.logfgforeach(x -> sleep(0.1*x), "log.txt", 1:8)
julia> log = ThreadPools.readlog("log.txt")
Dict{Int64,Array{ThreadPools.Job,1}} with 4 entries:
4 => ThreadPools.Job[Job(3, 4, 0.016, 0.328), Job(7, 4, 0.328, 1.039)]
2 => ThreadPools.Job[Job(2, 2, 0.016, 0.228), Job(6, 2, 0.228, 0.843)]
3 => ThreadPools.Job[Job(1, 3, 0.016, 0.128), Job(5, 3, 0.128, 0.629)]
1 => ThreadPools.Job[Job(4, 1, 0.016, 0.428), Job(8, 1, 0.428, 1.233)]
julia> ThreadPools.showstats(log)
Total duration: 1.217 s
Number of jobs: 8
Average job duration: 0.46 s
Minimum job duration: 0.112 s
Maximum job duration: 0.805 s
Thread 1: Duration 1.217 s, Gap time 0.0 s
Thread 2: Duration 0.827 s, Gap time 0.0 s
Thread 3: Duration 0.613 s, Gap time 0.0 s
Thread 4: Duration 1.023 s, Gap time 0.0 s
julia> ThreadPools.showactivity(log, 0.1)
0.000 - - - -
0.100 4 2 1 3
0.200 4 2 5 3
0.300 4 6 5 3
0.400 4 6 5 7
0.500 8 6 5 7
0.600 8 6 5 7
0.700 8 6 - 7
0.800 8 6 - 7
0.900 8 - - 7
1.000 8 - - 7
1.100 8 - - -
1.200 8 - - -
1.300 - - - -
1.400 - - - -
Demonstrations
There are a couple of demonstrations in the examples
directory. demo.jl
shows how jobs are distributed across threads in both the @threads
and @bgthreads
cases for various workload distributions. Running these demos is fairly simple (results below on 4 threads):
julia> include("examples/demo.jl")
Main.Demo
julia> Demo.run_with_outliers()
@bgthreads, Active Job Per Thread on 200ms Intervals
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 6 14 25 29 31 31 40 49 52 62 68 73 83 89 0 100 105 109 109 109 109 109 109 132 137 141 147 0 0 0
0 8 15 20 30 33 33 33 50 57 63 66 66 84 90 94 0 104 108 112 116 121 123 127 131 134 134 134 0 0 0
0 9 12 24 24 24 35 38 0 56 61 69 0 82 91 95 98 98 98 113 117 120 120 120 120 135 142 146 0 0 0
@threads, Active Job Per Thread on 200ms Intervals
0 4 6 9 10 12 15 16 20 24 24 24 28 29 31 31 32 33 33 34 37 0 0 0 0 0
0 43 46 50 52 54 56 60 62 65 66 66 68 70 73 0 0 0 0 0 0 0 0 0 0 0
0 79 82 84 87 90 92 94 96 98 98 98 98 100 101 104 106 108 109 109 109 109 109 110 112 0
0 117 119 120 120 120 120 121 124 127 131 133 134 134 134 137 141 143 146 149 0 0 0 0 0 0
Speed increase using all threads (ideal 33.3%): 14.4%
These demos generate numbered jobs with a randomized work distribution that can be varied. There are normal, uniform, and uniform with 10% outliers of 10x distributions. The activity graphs in these demos present time-sliced shapshots of the thread activities, showing which job number was active in that time slice.
The available demos are:
Demo.run_with_uniform()
Demo.run_with_variation()
Demo.run_with_outliers()
There is also a more complex demo at examples/stackdemo.jl
. Here, the workload is heirarchal - each jobs produces a result and possibly more jobs. The primary thread in this case is used purely more managing the job stack.
Simple API
Each function of the simple API tries to mimic an existing function in Base
or Base.Threads
to keep any code rework to a minimum.
ThreadPools.bgforeach
— Methodbgforeach(fn, itrs...) -> Nothing
Mimics the Base.foreach
function, but spawns each iteration to a background thread. Falls back to Base.foreach
when nthreads() == 1.
Example
julia> bgforeach([1,2,3]) do x
println("$(x+1) $(Threads.threadid())")
end
3 3
4 4
2 2
Note that the execution order across the threads is not guaranteed.
ThreadPools.bgmap
— Methodbgmap(fn, itrs...) -> collection
Mimics the Base.map
function, but spawns each case to a background thread. Falls back to Base.map
when nthreads() == 1.
Note that the collection(s) supplied must be of equal and finite length.
Example
julia> bgmap([1,2,3]) do x
println("$x $(Threads.threadid())")
x^2
end
2 3
3 4
1 2
3-element Array{Int64,1}:
1
4
9
Note that while the thread execution order is not guaranteed, the final result will maintain the proper sequence.
ThreadPools.@bgthreads
— Macro@bgthreads
A macro to parallelize a for-loop to run with multiple threads, preventing use of the primary.
@bgthreads
mimics the Threads.@threads
macro, but keeps the activity off of the primary thread. Will fall back gracefully to Base.foreach
behavior when nthreads == 1.
Example
julia> @bgthreads for x in 1:3
println("$x $(Threads.threadid())")
end
2 3
3 4
1 2
Note that the execution order across the threads is not guaranteed.
ThreadPools.fgforeach
— Methodfgforeach(fn, itrs...) -> Nothing
Equivalent to bgforeach(fn, itrs...)
, but allows processing on the primary thread.
Example
julia> fgforeach([1,2,3,4,5]) do x
println("$(x+1) $(Threads.threadid())")
end
3 1
2 2
4 3
5 4
6 1
Note that the primary thread was used to process indexes 2 and 5, in this case.
ThreadPools.fgmap
— Methodfgmap(fn, itrs...) -> collection
Equivalent to bgmap(fn, itrs...)
, but allows processing on the primary thread.
Note that the collection(s) supplied must be of equal and finite length.
Example
julia> fgmap([1,2,3,4,5]) do x
println("$x $(Threads.threadid())")
x^2
end
4 4
1 2
3 3
5 4
2 1
5-element Array{Int64,1}:
1
4
9
16
25
Note that the primary thread was used to process index 2, in this case.
ThreadPools.@fgthreads
— Macro@fgthreads
A macro to parallelize a for-loop to run with multiple threads, allowing use of the primary.
Equivalent to @bgthreads
, but allows processing on the primary thread.
Example
julia> @fgthreads for x in 1:5
println("$x $(Threads.threadid())")
end
4 3
1 4
2 2
5 3
3 1
Note that the primary thread was used to process index 3, in this case.
ThreadPool API
The ThreadPool
mimics the Channel{Task}
API, where put!
ting a Task
causes it to be executed, and take!
returns the completed Task
. The ThreadPool
is iterable over the completed Task
s in the same way a Channel
would be.
ThreadPools.ThreadPool
Base.put!(pool::ThreadPool, t::Task)
Base.put!(pool::ThreadPool, fn::Function, args...)
Base.take!(pool::ThreadPool, ind::Integer)
Base.close(pool::ThreadPool)
isactive(pool::ThreadPool)
results(pool::ThreadPool)
ThreadPools.ThreadPool
— TypeThreadPool(allow_primary=false)
The main ThreadPool object. Its API mimics that of a Channel{Task}
, but each submitted task is executed on a different thread. If allow_primary
is true, the assigned thread might be the primary, which will interfere with future thread management for the duration of any heavy-computational (blocking) processes. If it is false, all assigned threads will be off of the primary. Each thread will only be allowed one Task at a time, but each thread will backfill with the next queued Task immediately on completion of the previous, without regard to how bust the other threads may be.
Base.put!
— MethodBase.put!(pool::ThreadPool, t::Task)
Put the task t
into the pool, blocking until the pool has an available thread.
Base.put!
— MethodBase.put!(pool::ThreadPool, fn::Function, args...)
Base.put!(fn::Function, pool::ThreadPool, args...)
Creates a task that runs fn(args...)
and adds it to the pool, blocking until the pool has an available thread.
Base.take!
— MethodBase.take!(pool::ThreadPool) -> Task
Takes the next available completed task from the pool, blocking until a task is available.
Base.close
— MethodBase.close(pool::ThreadPool)
Shuts down the pool, closing the internal thread handlers. It is safe to issue this command after all Tasks have been submitted, regardless of the Task completion status. If issued while the pool is still active, it will yield
until all tasks have been completed.
ThreadPools.isactive
— MethodThreadPools.isactive(pool::ThreadPool)
Returns true
if there are queued Tasks anywhere in the pool, either awaiting execution, executing, or waiting to be retrieved.
ThreadPools.results
— Methodresults(pool::ThreadPool) -> result iterator
Returns an iterator over the fetch
ed results of the pooled tasks.
Example
julia> pool = ThreadPool();
julia> @async begin
for i in 1:4
put!(pool, x -> 2*x, i)
end
close(pool)
end;
julia> for r in results(pool)
println(r)
end
6
2
4
8
Note that the execution order across the threads is not guaranteed.
Logging API
For performance tuning, it can be useful to substitute in a logger that can be used to analyze the thread activity. LoggingThreadPool
is provided for this purpose.
ThreadPools.logbgforeach
ThreadPools.logbgmap
ThreadPools.@logbgthreads
ThreadPools.logfgforeach
ThreadPools.logfgmap
ThreadPools.@logfgthreads
ThreadPools.@logthreads
ThreadPools.readlog
ThreadPools.showstats
ThreadPools.showactivity
ThreadPools.LoggingThreadPool
ThreadPools.logbgforeach
— FunctionThreadPools.logbgforeach(fn, io, itrs...) -> Nothing
Mimics bgforeach
, but with a log that can be analyzed with readlog
. If io
is a string, a file will be opened with that name and used as the log.
!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.
ThreadPools.logbgmap
— FunctionThreadPools.logbgmap(fn, io, itrs...) -> Nothing
Mimics bgmap
, but with a log that can be analyzed with readlog
. If io
is a string, a file will be opened with that name and used as the log.
!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.
ThreadPools.@logbgthreads
— MacroThreadPools.@logbgthreads io
Mimics @bgthreads
, but with a log that can be analyzed with readlog
. If io
is a string, a file will be opened with that name and used as the log.
!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.
ThreadPools.logfgforeach
— FunctionThreadPools.logfgforeach(fn, io, itrs...) -> Nothing
Mimics fgforeach
, but with a log that can be analyzed with readlog
. If io
is a string, a file will be opened with that name and used as the log.
!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.
ThreadPools.logfgmap
— FunctionThreadPools.logfgmap(fn, io, itrs...) -> Nothing
Mimics fgmap
, but with a log that can be analyzed with readlog
. If io
is a string, a file will be opened with that name and used as the log.
!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.
ThreadPools.@logfgthreads
— MacroThreadPools.@logfgthreads io
Mimics @fgthreads
, but with a log that can be analyzed with readlog
. If io
is a string, a file will be opened with that name and used as the log.
!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.
ThreadPools.@logthreads
— MacroThreadPools.@logthreads io
Mimics Base.Threads.@threads
, but with a log that can be analyzed with readlog
to help tune performance. If io
is a string, a file will be opened with that name and used as the log.
ThreadPools.readlog
— FunctionThreadPools.readlog(io) -> Dict of (thread # => job list)
Analyzes the output of a LoggingThreadPool
and produces the history of each job on each thread.
Each job in the job list is a struct of:
struct Job
id :: Int
tid :: Int
start :: Float64
stop :: Float64
end
The default sorting order of the jobs in each thread are by stop time. io
can either be an IO object or a filename.
Example
julia> log = ThreadPools.readlog("mylog.txt")
Dict{Int64,Array{ThreadPools.Job,1}} with 3 entries:
4 => ThreadPools.Job[Job(3, 4, 0.016, 0.327), Job(6, 4, 0.327, 0.928)]
2 => ThreadPools.Job[Job(2, 2, 0.016, 0.233), Job(5, 2, 0.233, 0.749)]
3 => ThreadPools.Job[Job(1, 3, 0.016, 0.139), Job(4, 3, 0.139, 0.546)]
ThreadPools.showstats
— FunctionThreadPools.showstats([io, ]log)
Produces a statistical analysis of the provided log.
Example
julia> ThreadPools.showstats("mylog.txt")
Total duration: 1.542 s
Number of jobs: 8
Average job duration: 0.462 s
Minimum job duration: 0.111 s
Maximum job duration: 0.82 s
Thread 2: Duration 1.542 s, Gap time 0.0 s
Thread 3: Duration 1.23 s, Gap time 0.0 s
Thread 4: Duration 0.925 s, Gap time 0.0 s
ThreadPools.showactivity
— FunctionThreadPools.showactivity([io, ]log, dt, t0=0, t1=Inf; nthreads=Threads.nthreads())
Produces a textual graph of the thread activity in the provided log.
The format of the output is
julia> ThreadPools.showactivity("mylog.txt", 0.1)
0.000 - - - -
0.100 4 2 1 3
0.200 4 2 5 3
0.300 4 6 5 3
0.400 4 6 5 7
0.500 8 6 5 7
0.600 8 6 5 7
0.700 8 6 - 7
0.800 8 6 - 7
0.900 8 - - 7
1.000 8 - - 7
1.100 8 - - -
1.200 8 - - -
1.300 - - - -
1.400 - - - -
where the first column is time, and each column afterwards is the active job id in each thread (threads 1:nthreads, left to right) at that point in time.
If io
is provided, the output will be written there. log
may be a log IO object, or a filename to be opened and read. dt
is the time step for each row, t0
is the optional starting time, t1
the optional stopping time, and nthreads
is the number of threads to print.
ThreadPools.LoggingThreadPool
— FunctionThreadPools.LoggingThreadPool(io, allow_primary=false)
A ThreadPool that will index and log the start/stop times of each Task put
into the pool. The log format is:
522 3 S 7.932999849319458
523 4 S 7.932999849319458
522 3 P 8.823155343098272
^ ^ ^ ^
| | | |
| | | Time
| | S=Start, P=Stop
| Thread ID
Job #
and is parsed by the readlog
and showactivity
commands.