Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Ticket #576: ThreadPool.4.d

File ThreadPool.4.d, 7.1 kB (added by anders0, 8 months ago)
Line 
1 /**
2  * This module provides an implementation of the classical thread pool model.
3  *
4  * Copyright: Copyright (C) 2007-2008 Anders Halager. All rights reserved.
5  * License:   BSD style: $(LICENSE)
6  * Author:    Anders Halager
7  */
8 module tango.util.ThreadPool;
9
10 import tango.core.Thread,
11        tango.core.Atomic,
12        tango.core.sync.Mutex,
13        tango.core.sync.Condition;
14
15 /**
16  * A thread pool is a way to process multiple jobs in parallel without creating
17  * a new thread for each job. This way the overhead of creating a thread is
18  * only paid once, and not once for each job and you can limit the maximum
19  * number of threads active at any one point.
20  *
21  * In this case a "job" is simply a delegate and some parameters the delegate
22  * will be called with after having been added to the thread pool's queue.
23  *
24  * Example:
25  * --------------------
26  * // create a new pool with two threads
27  * auto pool = new ThreadPool!(int)(2);
28  * void delegate(int) f = (int x) { Stdout(x).newline; };
29  *
30  * // Now we have three ways of telling the pool to execute our jobs
31  * // First we can say we just want it done at some later point
32  * pool.append(f, 1);
33  * // Secondly we can ask for a job to be done as soon as possible, blocking
34  * // until it is started by some thread
35  * pool.assign(f, 2);
36  * // Finally we can say we either want it done immediately or not at all
37  * if (pool.tryAssign(f, 3))
38  *     Stdout("Someone took the job!").newline;
39  * else
40  *     Stdout("No one was available to do the job right now").newline;
41  * // After giving the pool some jobs to do, we need to give it a chance to
42  * // finish, so we can do one of two things.
43  * // Choice no. 1 is to finish what has already been assigned to a thread:
44  * //   pool.shutdown();
45  * // The other choice is to finish all jobs currently executing or in queue:
46  * pool.finish();
47  * --------------------
48  *
49  * If append isn't called there should be no additional heap allocations after
50  * initialization.
51  */
52 class ThreadPool(Args...)
53 {
54     /// An alias for the type of delegates this thread pool considers a job
55     alias void delegate(Args) JobD;
56
57     /**
58      * Create a new ThreadPool.
59      *
60      * Params:
61      *   workers = The amount of threads to spawn
62      *   q_size  = The expected size of the queue (how many elements are
63      *   preallocated)
64      */
65     this(size_t workers, size_t q_size = 0)
66     {
67         // pre-allocate memory for q_size jobs in the queue
68         q.length = q_size;
69         q.length = 0;
70
71         m = new Mutex;
72         poolActivity = new Condition(m);
73         workerActivity = new Condition(m);
74
75         priority_job.store(cast(Job*)null);
76         active_jobs.store(0u);
77         done.store(false);
78
79         for (size_t i = 0; i < workers; i++)
80         {
81             auto thread = new Thread(&doJob);
82             // Allow the OS to kill the threads if we exit the program without
83             // handling them our selves
84             thread.isDaemon = true;
85             thread.start();
86             pool ~= thread;
87         }
88     }
89
90     /**
91       Assign the given job to a thread immediately or block until one is
92       available
93      */
94     void assign(JobD job, Args args)
95     {
96         m.lock();
97         scope(exit) m.unlock();
98         auto j = Job(job, args);
99         priority_job.store(&j);
100         poolActivity.notify();
101         // Wait until someone has taken the job
102         while (priority_job.load() !is null)
103             workerActivity.wait();
104     }
105
106     /**
107       Assign the given job to a thread immediately or return false if none is
108       available. (Returns true if one was available)
109      */
110     bool tryAssign(JobD job, Args args)
111     {
112         if (active_jobs.load() >= pool.length)
113             return false;
114         assign(job, args);
115         return true;
116     }
117
118     /**
119       Put a job into the pool for eventual execution.
120
121       Warning: Acts as a stack, not a queue as you would expect
122      */
123     void append(JobD job, Args args)
124     {
125         m.lock();
126         q ~= Job(job, args);
127         m.unlock();
128         poolActivity.notify();
129     }
130
131     /// Get the number of jobs waiting to be executed
132     size_t pendingJobs()
133     {
134         m.lock(); scope(exit) m.unlock();
135         return q.length;
136     }
137
138     /// Get the number of jobs being executed
139     size_t activeJobs()
140     {
141         return active_jobs.load();
142     }
143
144     /// Finish currently executing jobs and drop all pending.
145     void shutdown()
146     {
147         done.store(true);
148         q.length = 0;
149         poolActivity.notifyAll();
150         foreach (thread; pool)
151             thread.join();
152         m.lock();
153         m.unlock();
154     }
155
156     /// Complete all pending jobs and shutdown.
157     void finish()
158     {
159         m.lock();
160         while (q.length > 0 || active_jobs.load() > 0)
161             workerActivity.wait();
162         m.unlock();
163         shutdown();
164     }
165
166 private:
167     // Our list of threads -- only used during startup and shutdown
168     Thread[] pool;
169     struct Job
170     {
171         JobD dg;
172         Args args;
173     }
174     // Used for storing queued jobs that will be executed eventually
175     Job[] q;
176
177     // This is to store a single job for immediate execution, which hopefully
178     // means that any program using only assign and tryAssign wont need any
179     // heap allocations after startup.
180     Atomic!(Job*) priority_job;
181
182     // This should be used when accessing the job queue
183     Mutex m;
184
185     // Notify is called on this condition whenever we have activity in the pool
186     // that the workers might want to know about.
187     Condition poolActivity;
188
189     // Worker threads call notify on this when they are done with a job or are
190     // completely done.
191     // This allows a graceful shut down and is necessary since assign has to
192     // wait for a job to become available
193     Condition workerActivity;
194
195     // Are we in the shutdown phase?
196     Atomic!(bool) done;
197
198     // Counter for the number of jobs currently being calculated
199     Atomic!(size_t) active_jobs;
200
201     // Thread delegate:
202     void doJob()
203     {
204         while (!done.load())
205         {
206             m.lock();
207             while (q.length == 0 && priority_job.load() is null && !done.load())
208                 poolActivity.wait();
209             if (done.load()) {
210                 m.unlock(); // not using scope(exit), need to manually unlock
211                 break;
212             }
213             Job job;
214             Job* jobPtr = priority_job.load();
215             if (jobPtr !is null)
216             {
217                 job = *jobPtr;
218                 priority_job.store(cast(Job*)null);
219                 workerActivity.notify();
220             }
221             else
222             {
223                 // A stack -- should be a queue
224                 job = q[$ - 1];
225                 q.length = q.length - 1;
226             }
227
228             // Make sure we unlock before we start doing the calculations
229             m.unlock();
230
231             // Do the actual job
232             active_jobs.increment();
233             try {
234                 job.dg(job.args);
235             } catch (Exception ex) { }
236             active_jobs.decrement();
237
238             // Tell the pool that we are done with something
239             workerActivity.notify();
240             Thread.yield();
241         }
242         // Tell the pool that we are now done
243         workerActivity.notify();
244     }
245 }