ThreadPools.jl Documentation

Improved thread management for background and nonuniform tasks

A simple package that creates a few functions mimicked from Base (bgforeach, bgmap, and @bgthreads) that behave like the originals but generate spawned tasks that stay purely on background threads. For better throughput with more uniform tasks, fgforeach, fgmap, and @fgthreads are also provided, and logging versions of all of the above and the Base.Threads.@threads macro are included for tuning purposes.

Overview

As of v1.3.1, Julia does not have any built-in mechanisms for keeping computational threads off of the primary thread. For many use cases, this restriction is not important - usually, pure computational activities will run faster using all threads. But in some cases, we may want to keep the primary thread free of blocking tasks. For example, a GUI running on the primary thread will become unresponsive if a computational task hits. For another, parallel computations with very nonuniform processing times can benefit from sacrificing the primary thread to manage the loads on the remaining ones.

ThreadPools is a simple package that allows background-only Task assignment for cases where this makes sense. The standard foreach, map, and @threads functions are mimicked, adding a bg prefix to each to denote background operation: bgforeach, bgmap, and @bgthreads. Code that runs with one of those Base functions should run just fine with the bg prepended, but adding multithreading for free in the foreach and map cases, and in all cases keeping the primary thread free of blocking Tasks.

When the user would still like to include the primary thread, fg versions of the above functions are provided: fgforeach, fgmap, and @fgthreads. These can provide a little more throughput, though there will be occasional interruption of the thread management by the spawned tasks. Finally, all of the above have a logged counterpart: ThreadPools.logbgforeach, ThreadPools.logbgmap, ThreadPools.@logbgthreads, ThreadPools.logfgforeach, ThreadPools.logfgmap, and ThreadPools.@logfgthreads.

Usage

Each of the simple API functions can be used like the Base versions of the same function, with a bg prepended to the function:

julia> bgforeach([1,2,3]) do x
         println("\$(x+1) \$(Threads.threadid())")
       end
3 3
4 4
2 2

julia> bgmap([1,2,3]) do x
         println("\$x \$(Threads.threadid())")
         x^2
       end
2 3
3 4
1 2
3-element Array{Int64,1}:
 1
 4
 9

julia> @bgthreads for x in 1:3
         println("\$x \$(Threads.threadid())")
       end
2 3
3 4
1 2

For an example of a more complex load-management scenario, see examples/stackdemo.jl.

Logger Usage

The logging versions of the functions take in an IO as the log, or and string that will cause a new file to be created and used by the log. The readlog and showactivity functions help visualize the activity (here, a 4-thread system using the primary with fgforeach):

julia> ThreadPools.logfgforeach(x -> sleep(0.1*x), "log.txt", 1:8)

julia> log = ThreadPools.readlog("log.txt")
Dict{Int64,Array{ThreadPools.Job,1}} with 4 entries:
  4 => ThreadPools.Job[Job(3, 4, 0.016, 0.328), Job(7, 4, 0.328, 1.039)]
  2 => ThreadPools.Job[Job(2, 2, 0.016, 0.228), Job(6, 2, 0.228, 0.843)]
  3 => ThreadPools.Job[Job(1, 3, 0.016, 0.128), Job(5, 3, 0.128, 0.629)]
  1 => ThreadPools.Job[Job(4, 1, 0.016, 0.428), Job(8, 1, 0.428, 1.233)]

julia> ThreadPools.showstats(log)

    Total duration: 1.217 s
    Number of jobs: 8
    Average job duration: 0.46 s
    Minimum job duration: 0.112 s
    Maximum job duration: 0.805 s

    Thread 1: Duration 1.217 s, Gap time 0.0 s
    Thread 2: Duration 0.827 s, Gap time 0.0 s
    Thread 3: Duration 0.613 s, Gap time 0.0 s
    Thread 4: Duration 1.023 s, Gap time 0.0 s

julia> ThreadPools.showactivity(log, 0.1)
0.000   -   -   -   -
0.100   4   2   1   3
0.200   4   2   5   3
0.300   4   6   5   3
0.400   4   6   5   7
0.500   8   6   5   7
0.600   8   6   5   7
0.700   8   6   -   7
0.800   8   6   -   7
0.900   8   -   -   7
1.000   8   -   -   7
1.100   8   -   -   -
1.200   8   -   -   -
1.300   -   -   -   -
1.400   -   -   -   -

Demonstrations

There are a couple of demonstrations in the examples directory. demo.jl shows how jobs are distributed across threads in both the @threads and @bgthreads cases for various workload distributions. Running these demos is fairly simple (results below on 4 threads):

julia> include("examples/demo.jl")
Main.Demo

julia> Demo.run_with_outliers()


@bgthreads, Active Job Per Thread on 200ms Intervals

   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   6  14  25  29  31  31  40  49  52  62  68  73  83  89   0 100 105 109 109 109 109 109 109 132 137 141 147   0   0   0
   0   8  15  20  30  33  33  33  50  57  63  66  66  84  90  94   0 104 108 112 116 121 123 127 131 134 134 134   0   0   0
   0   9  12  24  24  24  35  38   0  56  61  69   0  82  91  95  98  98  98 113 117 120 120 120 120 135 142 146   0   0   0


@threads, Active Job Per Thread on 200ms Intervals

   0   4   6   9  10  12  15  16  20  24  24  24  28  29  31  31  32  33  33  34  37   0   0   0   0   0
   0  43  46  50  52  54  56  60  62  65  66  66  68  70  73   0   0   0   0   0   0   0   0   0   0   0
   0  79  82  84  87  90  92  94  96  98  98  98  98 100 101 104 106 108 109 109 109 109 109 110 112   0
   0 117 119 120 120 120 120 121 124 127 131 133 134 134 134 137 141 143 146 149   0   0   0   0   0   0

Speed increase using all threads (ideal 33.3%): 14.4%

These demos generate numbered jobs with a randomized work distribution that can be varied. There are normal, uniform, and uniform with 10% outliers of 10x distributions. The activity graphs in these demos present time-sliced shapshots of the thread activities, showing which job number was active in that time slice.

The available demos are:

  • Demo.run_with_uniform()
  • Demo.run_with_variation()
  • Demo.run_with_outliers()

There is also a more complex demo at examples/stackdemo.jl. Here, the workload is heirarchal - each jobs produces a result and possibly more jobs. The primary thread in this case is used purely more managing the job stack.

Simple API

Each function of the simple API tries to mimic an existing function in Base or Base.Threads to keep any code rework to a minimum.

ThreadPools.bgforeachMethod
bgforeach(fn, itrs...) -> Nothing

Mimics the Base.foreach function, but spawns each iteration to a background thread. Falls back to Base.foreach when nthreads() == 1.

Example

julia> bgforeach([1,2,3]) do x
    println("$(x+1) $(Threads.threadid())")
  end
3 3
4 4
2 2

Note that the execution order across the threads is not guaranteed.

source
ThreadPools.bgmapMethod
bgmap(fn, itrs...) -> collection

Mimics the Base.map function, but spawns each case to a background thread. Falls back to Base.map when nthreads() == 1.

Note that the collection(s) supplied must be of equal and finite length.

Example

julia> bgmap([1,2,3]) do x
         println("$x $(Threads.threadid())")
         x^2
       end
2 3
3 4
1 2
3-element Array{Int64,1}:
 1
 4
 9

Note that while the thread execution order is not guaranteed, the final result will maintain the proper sequence.

source
ThreadPools.@bgthreadsMacro
@bgthreads

A macro to parallelize a for-loop to run with multiple threads, preventing use of the primary.

@bgthreads mimics the Threads.@threads macro, but keeps the activity off of the primary thread. Will fall back gracefully to Base.foreach behavior when nthreads == 1.

Example

julia> @bgthreads for x in 1:3
        println("$x $(Threads.threadid())")
       end
2 3
3 4
1 2

Note that the execution order across the threads is not guaranteed.

source
ThreadPools.fgforeachMethod
fgforeach(fn, itrs...) -> Nothing

Equivalent to bgforeach(fn, itrs...), but allows processing on the primary thread.

Example

julia> fgforeach([1,2,3,4,5]) do x
         println("$(x+1) $(Threads.threadid())")
       end
3 1
2 2
4 3
5 4
6 1

Note that the primary thread was used to process indexes 2 and 5, in this case.

source
ThreadPools.fgmapMethod
fgmap(fn, itrs...) -> collection

Equivalent to bgmap(fn, itrs...), but allows processing on the primary thread.

Note that the collection(s) supplied must be of equal and finite length.

Example

julia> fgmap([1,2,3,4,5]) do x
         println("$x $(Threads.threadid())")
         x^2
       end
4 4
1 2
3 3
5 4
2 1
5-element Array{Int64,1}:
1
4
9
16
25

Note that the primary thread was used to process index 2, in this case.

source
ThreadPools.@fgthreadsMacro
@fgthreads

A macro to parallelize a for-loop to run with multiple threads, allowing use of the primary.

Equivalent to @bgthreads, but allows processing on the primary thread.

Example

julia> @fgthreads for x in 1:5
    println("$x $(Threads.threadid())")
   end
4 3
1 4
2 2
5 3
3 1

Note that the primary thread was used to process index 3, in this case.

source

ThreadPool API

The ThreadPool mimics the Channel{Task} API, where put!ting a Task causes it to be executed, and take! returns the completed Task. The ThreadPool is iterable over the completed Tasks in the same way a Channel would be.

ThreadPools.ThreadPoolType
ThreadPool(allow_primary=false)

The main ThreadPool object. Its API mimics that of a Channel{Task}, but each submitted task is executed on a different thread. If allow_primary is true, the assigned thread might be the primary, which will interfere with future thread management for the duration of any heavy-computational (blocking) processes. If it is false, all assigned threads will be off of the primary. Each thread will only be allowed one Task at a time, but each thread will backfill with the next queued Task immediately on completion of the previous, without regard to how bust the other threads may be.

source
Base.put!Method
Base.put!(pool::ThreadPool, t::Task)

Put the task t into the pool, blocking until the pool has an available thread.

source
Base.put!Method
Base.put!(pool::ThreadPool, fn::Function, args...)
Base.put!(fn::Function, pool::ThreadPool, args...)

Creates a task that runs fn(args...) and adds it to the pool, blocking until the pool has an available thread.

source
Base.take!Method
Base.take!(pool::ThreadPool) -> Task

Takes the next available completed task from the pool, blocking until a task is available.

source
Base.closeMethod
Base.close(pool::ThreadPool)

Shuts down the pool, closing the internal thread handlers. It is safe to issue this command after all Tasks have been submitted, regardless of the Task completion status. If issued while the pool is still active, it will yield until all tasks have been completed.

source
ThreadPools.isactiveMethod
ThreadPools.isactive(pool::ThreadPool)

Returns true if there are queued Tasks anywhere in the pool, either awaiting execution, executing, or waiting to be retrieved.

source
ThreadPools.resultsMethod
results(pool::ThreadPool) -> result iterator

Returns an iterator over the fetched results of the pooled tasks.

Example

julia> pool = ThreadPool();

julia> @async begin
         for i in 1:4
           put!(pool, x -> 2*x, i)
         end
         close(pool)
       end;

julia> for r in results(pool)
         println(r)
       end
6
2
4
8

Note that the execution order across the threads is not guaranteed.

source

Logging API

For performance tuning, it can be useful to substitute in a logger that can be used to analyze the thread activity. LoggingThreadPool is provided for this purpose.

ThreadPools.logbgforeachFunction
ThreadPools.logbgforeach(fn, io, itrs...) -> Nothing

Mimics bgforeach, but with a log that can be analyzed with readlog. If io is a string, a file will be opened with that name and used as the log.

!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.

source
ThreadPools.logbgmapFunction
ThreadPools.logbgmap(fn, io, itrs...) -> Nothing

Mimics bgmap, but with a log that can be analyzed with readlog. If io is a string, a file will be opened with that name and used as the log.

!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.

source
ThreadPools.@logbgthreadsMacro
ThreadPools.@logbgthreads io

Mimics @bgthreads, but with a log that can be analyzed with readlog. If io is a string, a file will be opened with that name and used as the log.

!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.

source
ThreadPools.logfgforeachFunction
ThreadPools.logfgforeach(fn, io, itrs...) -> Nothing

Mimics fgforeach, but with a log that can be analyzed with readlog. If io is a string, a file will be opened with that name and used as the log.

!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.

source
ThreadPools.logfgmapFunction
ThreadPools.logfgmap(fn, io, itrs...) -> Nothing

Mimics fgmap, but with a log that can be analyzed with readlog. If io is a string, a file will be opened with that name and used as the log.

!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.

source
ThreadPools.@logfgthreadsMacro
ThreadPools.@logfgthreads io

Mimics @fgthreads, but with a log that can be analyzed with readlog. If io is a string, a file will be opened with that name and used as the log.

!! note This function cannot be used with Threads.nthreads() == 1, and will throw an error if this is tried.

source
ThreadPools.readlogFunction
ThreadPools.readlog(io) -> Dict of (thread # => job list)

Analyzes the output of a LoggingThreadPool and produces the history of each job on each thread.

Each job in the job list is a struct of:

struct Job
    id    :: Int
    tid   :: Int
    start :: Float64
    stop  :: Float64
end

The default sorting order of the jobs in each thread are by stop time. io can either be an IO object or a filename.

Example

julia> log = ThreadPools.readlog("mylog.txt")
Dict{Int64,Array{ThreadPools.Job,1}} with 3 entries:
  4 => ThreadPools.Job[Job(3, 4, 0.016, 0.327), Job(6, 4, 0.327, 0.928)]
  2 => ThreadPools.Job[Job(2, 2, 0.016, 0.233), Job(5, 2, 0.233, 0.749)]
  3 => ThreadPools.Job[Job(1, 3, 0.016, 0.139), Job(4, 3, 0.139, 0.546)]
source
ThreadPools.showstatsFunction
ThreadPools.showstats([io, ]log)

Produces a statistical analysis of the provided log.

Example

julia> ThreadPools.showstats("mylog.txt")

    Total duration: 1.542 s
    Number of jobs: 8
    Average job duration: 0.462 s
    Minimum job duration: 0.111 s
    Maximum job duration: 0.82 s

    Thread 2: Duration 1.542 s, Gap time 0.0 s
    Thread 3: Duration 1.23 s, Gap time 0.0 s
    Thread 4: Duration 0.925 s, Gap time 0.0 s
source
ThreadPools.showactivityFunction
ThreadPools.showactivity([io, ]log, dt, t0=0, t1=Inf; nthreads=Threads.nthreads())

Produces a textual graph of the thread activity in the provided log.

The format of the output is

julia> ThreadPools.showactivity("mylog.txt", 0.1)
0.000   -   -   -   -
0.100   4   2   1   3
0.200   4   2   5   3
0.300   4   6   5   3
0.400   4   6   5   7
0.500   8   6   5   7
0.600   8   6   5   7
0.700   8   6   -   7
0.800   8   6   -   7
0.900   8   -   -   7
1.000   8   -   -   7
1.100   8   -   -   -
1.200   8   -   -   -
1.300   -   -   -   -
1.400   -   -   -   -

where the first column is time, and each column afterwards is the active job id in each thread (threads 1:nthreads, left to right) at that point in time.

If io is provided, the output will be written there. log may be a log IO object, or a filename to be opened and read. dt is the time step for each row, t0 is the optional starting time, t1 the optional stopping time, and nthreads is the number of threads to print.

source
ThreadPools.LoggingThreadPoolFunction
ThreadPools.LoggingThreadPool(io, allow_primary=false)

A ThreadPool that will index and log the start/stop times of each Task put into the pool. The log format is:

522 3 S 7.932999849319458
523 4 S 7.932999849319458
522 3 P 8.823155343098272
  ^ ^ ^ ^
  | | | |
  | | | Time
  | | S=Start, P=Stop 
  | Thread ID
 Job #

and is parsed by the readlog and showactivity commands.

source