Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Ticket #576: ThreadPool_new.d

File ThreadPool_new.d, 9.0 kB (added by anders0, 8 months ago)

Lets hope this is the last one needed :)

Line 
1 /**
2  * This module provides an implementation of the classical thread-pool model.
3  *
4  * Copyright: Copyright (C) 2007-2008 Anders Halager. All rights reserved.
5  * License:   BSD style: $(LICENSE)
6  * Author:    Anders Halager
7  */
8
9 module tango.core.ThreadPool;
10
11 private import  tango.core.Thread,
12                 tango.core.Atomic;
13
14 private import  tango.core.sync.Mutex,
15                 tango.core.sync.Condition;
16
17 /**
18  * A thread pool is a way to process multiple jobs in parallel without creating
19  * a new thread for each job. This way the overhead of creating a thread is
20  * only paid once, and not once for each job and you can limit the maximum
21  * number of threads active at any one point.
22  *
23  * In this case a "job" is simply a delegate and some parameters the delegate
24  * will be called with after having been added to the thread pool's queue.
25  *
26  * Example:
27  * --------------------
28  * // create a new pool with two threads
29  * auto pool = new ThreadPool!(int)(2);
30  * void delegate(int) f = (int x) { Stdout(x).newline; };
31  *
32  * // Now we have three ways of telling the pool to execute our jobs
33  * // First we can say we just want it done at some later point
34  * pool.append(f, 1);
35  * // Secondly we can ask for a job to be done as soon as possible, blocking
36  * // until it is started by some thread
37  * pool.assign(f, 2);
38  * // Finally we can say we either want it done immediately or not at all
39  * if (pool.tryAssign(f, 3))
40  *     Stdout("Someone took the job!").newline;
41  * else
42  *     Stdout("No one was available to do the job right now").newline;
43  * // After giving the pool some jobs to do, we need to give it a chance to
44  * // finish, so we can do one of two things.
45  * // Choice no. 1 is to finish what has already been assigned to the threads,
46  * // but ignore any remaining queued jobs
47  * //   pool.shutdown();
48  * // The other choice is to finish all jobs currently executing or in queue:
49  * pool.finish();
50  * --------------------
51  *
52  * If append isn't called there should be no additional heap allocations after
53  * initialization.
54  */
55
56 class ThreadPool(Args...)
57 {
58     /// An alias for the type of delegates this thread pool considers a job
59     alias void delegate(Args) JobD;
60
61     /**
62      * Create a new ThreadPool.
63      *
64      * Params:
65      *   workers = The amount of threads to spawn
66      *   q_size  = The expected size of the queue (how many elements are
67      *   preallocated)
68      */
69     this(size_t workers, size_t q_size = 0)
70     {
71         // pre-allocate memory for q_size jobs in the queue
72         q.length = q_size;
73         q.length = 0;
74
75         m = new Mutex;
76         poolActivity = new Condition(m);
77         workerActivity = new Condition(m);
78
79         priority_job.store(cast(Job*)null);
80         active_jobs.store(0u);
81         done.store(false);
82
83         for (size_t i = 0; i < workers; i++)
84         {
85             auto thread = new Thread(&doJob);
86             // Allow the OS to kill the threads if we exit the program without
87             // handling them our selves
88             thread.isDaemon = true;
89             thread.start();
90             pool ~= thread;
91         }
92     }
93
94     /**
95       Assign the given job to a thread immediately or block until one is
96       available
97      */
98     void assign(JobD job, Args args)
99     {
100         m.lock();
101         scope(exit) m.unlock();
102         auto j = Job(job, args);
103         priority_job.store(&j);
104         poolActivity.notify();
105         // Wait until someone has taken the job
106         while (priority_job.load() !is null)
107             workerActivity.wait();
108     }
109
110     /**
111       Assign the given job to a thread immediately or return false if none is
112       available. (Returns true if one was available)
113      */
114     bool tryAssign(JobD job, Args args)
115     {
116         if (active_jobs.load() >= pool.length)
117             return false;
118         assign(job, args);
119         return true;
120     }
121
122     /**
123       Put a job into the pool for eventual execution.
124
125       Warning: Acts as a stack, not a queue as you would expect
126      */
127     void append(JobD job, Args args)
128     {
129         m.lock();
130         q ~= Job(job, args);
131         m.unlock();
132         poolActivity.notify();
133     }
134
135     /// Get the number of jobs waiting to be executed
136     size_t pendingJobs()
137     {
138         m.lock(); scope(exit) m.unlock();
139         return q.length;
140     }
141
142     /// Get the number of jobs being executed
143     size_t activeJobs()
144     {
145         return active_jobs.load();
146     }
147
148     /// Finish currently executing jobs and drop all pending.
149     void shutdown()
150     {
151         done.store(true);
152         m.lock();
153         q.length = 0;
154         m.unlock();
155         poolActivity.notifyAll();
156         foreach (thread; pool)
157             thread.join();
158         m.lock();
159         m.unlock();
160     }
161
162     /// Complete all pending jobs and shutdown.
163     void finish()
164     {
165         m.lock();
166         while (q.length > 0 || active_jobs.load() > 0)
167             workerActivity.wait();
168         m.unlock();
169         shutdown();
170     }
171
172 private:
173     // Our list of threads -- only used during startup and shutdown
174     Thread[] pool;
175     struct Job
176     {
177         JobD dg;
178         Args args;
179     }
180     // Used for storing queued jobs that will be executed eventually
181     Job[] q;
182
183     // This is to store a single job for immediate execution, which hopefully
184     // means that any program using only assign and tryAssign wont need any
185     // heap allocations after startup.
186     Atomic!(Job*) priority_job;
187
188     // This should be used when accessing the job queue
189     Mutex m;
190
191     // Notify is called on this condition whenever we have activity in the pool
192     // that the workers might want to know about.
193     Condition poolActivity;
194
195     // Worker threads call notify on this when they are done with a job or are
196     // completely done.
197     // This allows a graceful shut down and is necessary since assign has to
198     // wait for a job to become available
199     Condition workerActivity;
200
201     // Are we in the shutdown phase?
202     Atomic!(bool) done;
203
204     // Counter for the number of jobs currently being calculated
205     Atomic!(size_t) active_jobs;
206
207     // Thread delegate:
208     void doJob()
209     {
210         while (!done.load())
211         {
212             m.lock();
213             while (q.length == 0 && priority_job.load() is null && !done.load())
214                 poolActivity.wait();
215             if (done.load()) {
216                 m.unlock(); // not using scope(exit), need to manually unlock
217                 break;
218             }
219             Job job;
220             Job* jobPtr = priority_job.load();
221             if (jobPtr !is null)
222             {
223                 job = *jobPtr;
224                 priority_job.store(cast(Job*)null);
225                 workerActivity.notify();
226             }
227             else
228             {
229                 // A stack -- should be a queue
230                 job = q[$ - 1];
231                 q.length = q.length - 1;
232             }
233
234             // Make sure we unlock before we start doing the calculations
235             m.unlock();
236
237             // Do the actual job
238             active_jobs.increment();
239             try {
240                 job.dg(job.args);
241             } catch (Exception ex) { }
242             active_jobs.decrement();
243
244             // Tell the pool that we are done with something
245             m.lock();
246             workerActivity.notify();
247             m.unlock();
248         }
249         // Tell the pool that we are now done
250         m.lock();
251         workerActivity.notify();
252         m.unlock();
253     }
254 }
255
256
257
258 /*******************************************************************************
259
260 *******************************************************************************/
261
262 debug (ThreadPool)
263 {
264         import tango.util.log.Trace;
265         import Integer = tango.text.convert.Integer;
266
267         void main(char[][] args)
268         {
269                 long job(long val)
270                 {
271                         // a 'big job'
272                         Thread.sleep (3.0/val);
273                         return val;
274                 }
275
276                 void hashJob(char[] file)
277                 {
278                         // If we don't catch exceptions the thread-pool will still
279                         // work, but the job will fail silently
280                         try {
281                             long n = Integer.parse(file);
282                             Trace.formatln("job({}) = {}", n, job(n));
283                             } catch (Exception ex) {
284                                     Trace.formatln("Exception: {}", ex.msg);
285                                     }
286                 }
287
288                 // Create new thread pool with one worker thread per file given
289                 auto thread_pool = new ThreadPool!(char[])(args.length - 1);
290
291                 Thread.sleep(1);
292                 Trace.formatln ("starting");
293
294                 foreach (file; args[1 .. args.length])
295                          thread_pool.assign(&hashJob, file);
296
297                 thread_pool.finish();
298         }
299 }