Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Ticket #576: ThreadPool.d

File ThreadPool.d, 11.2 kB (added by Nietsnie, 5 years ago)
Line 
1 /++
2     author: Jeff Davey <jeffd@gwava.com>
3     changed: Darryl Bleau <darrylb@gwava.com> (altered to a variadic template class)
4     license: BSD style
5
6     A basic thread queueing module.
7
8     This specific module pre-detaches all threads up to the size of the thread pool. The idea is that state
9     can then be saved on each thread's TLS for caching and other purposes.
10
11     All threads are set to "detach" mode, so that you can close the running program and let the OS clean up
12     any thread resources.
13
14     There is also a close function, which specifically waits for all detached threads to move to a closed state
15     before returning.
16
17     Thread pools are templated variadically to the type and number of arguments that each threaded function will take.
18
19     Usage is fairly straightforward, and both delegates and functions are supported:
20
21     Example:
22     ---
23     uint a = 0;
24     Mutex m = new Mutex();
25
26     void func(void *arg)
27     {           
28         char[] name = Thread.getThis().name; // get the name of this thread
29         int *data = cast(int *)arg; // do some fun with casting
30         m.lock(); // lock something
31         a++; // increment something
32         m.unlock(); // unlock something
33         Thread.sleep(1); // zzz...
34     }
35
36     ThreadPool!(void*) pool = new ThreadPool!(void*)(100, &func);
37     if (pool)
38     {
39         for (uint i = 0; i < 1000; i++)
40         {
41             int *data = new int();
42             *data = i;
43             pool.start(cast(void*)(data));
44         }
45     }
46     pool.close();
47     assert(a == 1000);
48     ---
49
50     ThreadPool functions don't have to take a void*, you can declare the ThreadPool with any type of arguments that you like:
51     ---
52     ThreadPool!(char[],float)
53     ThreadPool!(Object,uint,char[])
54     ---
55 ++/
56
57 module tetra.util.ThreadPool;
58
59 import tango.core.Thread;
60 import tango.core.sync.Mutex;
61 import tango.core.sync.Condition;
62
63 private class PooledThread(T...)
64 {
65     static enum State
66     {
67         IDLE,
68         RUNNING,
69         SHUTDOWN,
70         CLOSED
71     }
72
73     this(void delegate(T) dg, size_t stackSize = 0)
74     {
75         this._dg = dg;
76         this(stackSize);
77     }
78
79     this(void (*func)(T), size_t stackSize = 0)
80     {
81         this._func = func;
82         this(stackSize);
83     }
84
85     void join()
86     {
87         _thread.join();
88     }
89
90     void lock()
91     {
92         _lock.lock();
93     }
94
95     bool tryLock()
96     {
97         return _lock.tryLock();
98     }
99
100     void unlock()
101     {
102         _lock.unlock();
103     }
104
105     void notify()
106     {
107         _signal.notify();
108     }
109
110     private:
111     Mutex _lock;                         // for condition/mutex pair
112     Condition _signal;                     // used to signal work available
113     void delegate(T) _dg = null;        // delegate to call
114     void (*_func)(T) = null;            // or function to call
115     Thread _thread;                        // Underlying Thread class
116     T arg;                                // Arg that has been passed
117     State state = State.CLOSED;         // Our current state.
118    
119     void run()
120     {
121         _lock.lock();
122         scope (exit)
123         {   
124             this.state = State.CLOSED;   
125             _lock.unlock();
126         }
127
128         for(;;)
129         {
130             this.state = State.IDLE;
131             _signal.wait();    // wait for signal from pool
132             if (this.state == State.SHUTDOWN)    // shutting down?
133                 break;
134
135             if (this.state == State.RUNNING)    // let's run the work.
136             {           
137                 try
138                 {   
139                     if (_dg)
140                         _dg(arg);
141                     else if (_func)
142                         _func(arg);
143                 }
144                 catch (Exception ex) {} // if the underlying function call blows up, we still want the pool to be available.
145                                         // In practice, the underlying delegate/function passed, should make sure to capture
146                                         // it's exceptions.
147             }
148         }
149     }
150
151     this(size_t stackSize)
152     {
153         this._lock = new Mutex();
154         this._signal = new Condition(this._lock);
155
156         _thread = new Thread(&run, stackSize);
157         _thread.isDaemon = true;
158         _thread.start();
159     }
160 }
161
162 /++
163     ThreadPool is the thread queueing class.
164 ++/
165 class ThreadPool(T...)
166 {
167     /++
168         Construct a ThreadPool of type T
169
170         Params:
171         poolSize             =     The size of the thread pool. It will
172                                 actually create this many threads on startup.
173
174         (function/delegate) =    void func(T argument). argument is the passed
175                                 argument from the start function.
176
177         stackSize            =    Size of the stack per thread. As tango
178                                 doesn't quite support this functionality,
179                                 it's not working quite yet.
180     ++/   
181     this(uint poolSize, void (*func)(T), size_t stackSize = 0)
182     in
183     {
184         assert(func);
185     }
186     body
187     {
188         this._func = func;
189         this(poolSize, stackSize);
190     }
191
192     this(uint poolSize, void delegate(T) dg, size_t stackSize = 0)
193     in
194     {
195         assert(dg);
196     }
197     body
198     {
199         this._dg = dg;
200         this(poolSize, stackSize);       
201     }
202
203     /++
204         Start a unit of work.
205
206         Params:
207         arg                    =    A templated (T) arg that will be passed to the
208                                 underlying worker thread.
209
210         block                =    If true, the start function will wait
211                                 for an existing thread to become
212                                 available, if all are being used. If
213                                 false, the start function will return
214                                 immediately with a value of false if it
215                                 could not schedule work, or a vlue of true
216                                 if it could.
217     
218     ++/
219     bool start(T arg, bool block = true)
220     {
221         bool rtn = false;
222         for(;;)
223         {
224             uint i = 0;
225             for (; i < _pool.length; i++)
226             {
227                 if (_pool[i].tryLock())
228                 {
229                     scope(exit) _pool[i].unlock();
230                     if (_pool[i].state == PooledThread!(T).State.IDLE)
231                     {
232                         _pool[i].state = PooledThread!(T).State.RUNNING;
233                         foreach(k,a; arg)
234                             _pool[i].arg[k] = a;
235                         _pool[i].notify();
236                         rtn = true;
237                         break;
238                     }
239                 }
240             }
241
242             if (i == _pool.length)
243             {
244                 if (block)
245                     Thread.sleep(.1);
246                 else
247                     break;
248             }
249             else
250                 break;
251         }
252         return rtn;
253     }
254
255     /++
256       Close the thread pool. This will go through all running threads,
257       tell them to shutdown, and wait for that to happen before returning.
258     
259       Currently, the poll time between thread checks is hard-coded to 100ms
260     ++/
261     void close()
262     {
263         if (open)
264         {
265             for(;;)
266             {
267                 bool allClosed = true;
268                 for (uint i = 0; i < _pool.length; i++)
269                 {
270                     _pool[i].lock();
271                     if (_pool[i].state == PooledThread!(T).State.IDLE)
272                     {
273                         _pool[i].state = PooledThread!(T).State.SHUTDOWN;       
274                         _pool[i].notify();
275                     }   
276                     if (_pool[i].state != PooledThread!(T).State.CLOSED)
277                         allClosed = false;                                   
278                     _pool[i].unlock();
279                 }
280                 if (allClosed)
281                     break;
282                 else
283                     Thread.sleep(.1);
284             }
285             open = false;
286         }
287     }
288
289     private:
290     void delegate(T) _dg = null;
291     void (*_func)(T) = null;
292     PooledThread!(T)[] _pool = null;
293     bool open = false;
294
295     this(uint poolSize, size_t stackSize)
296     {
297         _pool = new PooledThread!(T)[poolSize];
298
299         for (uint i = 0; i < poolSize; i++)
300         {
301             if (_func)
302                 _pool[i] = new PooledThread!(T)(_func, stackSize);
303             else
304                 _pool[i] = new PooledThread!(T)(_dg, stackSize);
305         }
306         open = true;
307     }
308 }
309
310 debug (UnitTest)
311 {
312     unittest
313     {   
314         int voidTemplate()
315         {
316             uint a = 0;
317             Mutex m = new Mutex();
318             void func(void *arg)
319             {           
320                 char[] name = Thread.getThis().name;
321                 int *data = cast(int *)arg;
322                 m.lock();
323                 a++;
324                 m.unlock();
325                 Thread.sleep(.1);
326                 return 0;
327             }
328
329             ThreadPool!(void*) pool = new ThreadPool!(void*)(50, &func);
330             if (pool)
331             {
332                 for (uint i = 0; i < 1000; i++)
333                 {
334                     int *data = new int();
335                     *data = i;
336                     pool.start(cast(void*)(data));
337                 }
338             }
339             pool.close();
340             if (a == 1000)
341                 return 1;
342             return 0;
343         }
344
345         int uintTemplate()
346         {
347             uint a = 0;
348             Mutex m = new Mutex();
349             void func(uint arg)
350             {           
351                 m.lock();
352                 a += arg;
353                 m.unlock();
354             }
355
356             ThreadPool!(uint) pool = new ThreadPool!(uint)(50, &func);
357             if (pool)
358             {
359                 for (uint i = 0; i < 1000; i++)
360                     pool.start(i);
361             }
362             pool.close();
363             if (a == 499500)
364                 return 1;
365             return 0;
366         }
367
368         int uintUintTemplate()
369         {
370             uint a = 0;
371             uint b = 0;
372             Mutex m = new Mutex();
373             void func(uint first, uint second)
374             {
375                 m.lock();
376                 a += first;
377                 b += second;
378                 m.unlock();
379             }
380
381             ThreadPool!(uint, uint) pool = new ThreadPool!(uint, uint)(50, &func);
382             if (pool)
383             {
384                 for (uint i = 0; i < 1000; i++)
385                     pool.start(1, i);
386             }
387             pool.close();
388             if ((a == 1000) && (b == 499500))
389                 return 1;
390             return 0;
391         }
392
393         int objectTemplate()
394         {
395             class TestOb
396             {
397                 int a;
398             }
399             Mutex m = new Mutex;
400             TestOb a = new TestOb;
401            
402             void func(TestOb arg)
403             {
404                 m.lock();
405                 arg.a += 1;
406                 m.unlock();
407             }
408
409             ThreadPool!(TestOb) pool = new ThreadPool!(TestOb)(50, &func);
410             if (pool)
411             {
412                 for (uint i = 0; i < 1000; i++)
413                     pool.start(a);
414             }
415             pool.close();
416             if (a.a == 1000)
417                 return 1;
418             return 0;
419         }
420
421         assert(voidTemplate());
422         assert(uintTemplate());
423         assert(uintUintTemplate());
424         assert(objectTemplate());
425     }
426 }