root/trunk/tango/scrapple/thread/ThreadPool.d

Revision 31, 6.6 kB (checked in by flithm, 1 year ago)

Fix ThreadPool?'s module dec

Line 
1 /++
2     author: Jeff Davey <jeffd@gwava.com>
3     license: BSD style
4
5     A basic thread queueing module.
6
7     This specific module pre-detaches all threads up to the size of the thread pool. The idea is that state
8     can then be saved on each thread's TLS for caching and other purposes.
9
10     All threads are set to "detach" mode, so that you can close the running program and let the OS clean up
11     any thread resources.
12
13     There is also a close function, which specifically waits for all detached threads to move to a closed state
14     before returning.
15
16     Usage is fairly straightforward, and both delegates and functions are supported:
17
18     Example:
19     ---
20     uint a = 0;
21     Mutex m = new Mutex();
22
23     void func(void *arg)
24     {           
25         char[] name = Thread.getThis().name;
26         int *data = cast(int *)arg;
27         m.lock();
28         a++;
29         m.unlock();
30         Thread.sleep(1);
31         return 0;
32     }
33
34
35     ThreadPool pool = new ThreadPool(100, &func);
36     if (pool)
37     {
38         for (uint i = 0; i < 1000; i++)
39         {
40             int *data = new int();
41             *data = i;
42             pool.start(cast(void*)(data));
43         }
44     }
45     pool.close();
46     assert(a == 1000);
47     ---
48 ++/
49
50 module tango.scrapple.thread.ThreadPool;
51
52 private import tango.core.Thread;
53 private import tango.core.sync.Mutex;
54 private import tango.core.sync.Condition;
55
56 private class PooledThread
57 {
58     enum State
59     {
60         IDLE,
61         RUNNING,
62         SHUTDOWN,
63         CLOSED
64     };
65
66     this(void delegate(void *) dg, size_t stackSize = 0)
67     {
68         this._dg = dg;
69         this(stackSize);
70     }
71
72     this(void (*func)(void *arg), size_t stackSize = 0)
73     {
74         this._func = func;
75         this(stackSize);
76     }
77
78     State state()
79     {
80         return this._state;
81     }
82
83     State state(State newState)
84     {
85         return this._state = newState;
86     }
87
88     void join()
89     {
90         _thread.join();
91     }
92
93     void lock()
94     {
95         _lock.lock();
96     }
97
98     bool tryLock()
99     {
100         return _lock.tryLock();
101     }
102
103     void unlock()
104     {
105         _lock.unlock();
106     }
107
108     void* arg()
109     {
110         return this._arg;
111     }
112
113     void* arg(void* newArg)
114     {
115         return this._arg = newArg;
116     }
117    
118     void notify()
119     {
120         _signal.notify();
121     }
122
123     private:
124     Mutex _lock;                        // for condition/mutex pair
125     Condition _signal;                  // used to signal work available
126     void delegate(void *) _dg = null;   // delegate to call
127     void (*_func)(void *) = null;       // or functiont o call
128     void* _arg = null;                  // Arg that has been passed
129     Thread _thread;                     // Underlying Thread class
130     State _state = State.CLOSED;        // Our current state.
131
132    
133     void run()
134     {
135         _lock.lock();
136         scope (exit) _lock.unlock();
137
138         for(;;)
139         {
140             this._state = State.IDLE;
141             _signal.wait(); // wait for signal from pool
142             if (this._state == State.SHUTDOWN)  // shutting down?
143             {
144                 this._state = State.CLOSED;
145                 break;
146             }
147
148             if (this._state == State.RUNNING)   // let's run the work.
149             {
150                 if (_dg)
151                     _dg(_arg);
152                 else if (_func)
153                     _func(_arg);
154             }
155         }
156
157         return 0;
158     }
159
160     this(size_t stackSize)
161     {
162         this._lock = new Mutex();
163         this._signal = new Condition(this._lock);
164
165         _thread = new Thread(&run, stackSize);
166         _thread.isDaemon = true;
167         _thread.start();
168     }
169
170 }
171
172 /++
173     ThreadPool is the thread queueing class.
174 ++/
175
176 class ThreadPool
177 {
178
179     /++
180         Construct a ThreadPool.
181
182         Params:
183         poolSize            =   The size of the thread pool. It will
184                                 actually create this many threads on startup.
185
186         (function/delegate) =   void func(void *arg) argument is the passed
187                                 argument from the start function.
188
189         stackSize           =   Size of the stack per thread. As tango
190                                 doesn't quite support this functionality,
191                                 it's not working quite yet.
192     ++/
193     this(uint poolSize, void (*func)(void *), size_t stackSize = 0)
194     in
195     {
196         assert(func);
197     }
198     body
199     {
200         this._func = func;
201         this(poolSize, stackSize);
202     }
203
204     this(uint poolSize, void delegate(void *) dg, size_t stackSize = 0)
205     in
206     {
207         assert(dg);
208     }
209     body
210     {
211         this._dg = dg;
212         this(poolSize, stackSize);     
213     }
214
215
216     /++
217         Start a unit of work.
218
219         Params:
220         arg                 =   A void * arg that will be passed to the
221                                 underlying worker thread.
222
223         block               =   If true, the start function will wait
224                                 for an existing thread to become
225                                 available, if all are being used. If
226                                 false, the start function will return
227                                 immediately with a value of false if it
228                                 could not schedule work, or a vlue of true
229                                 if it could.
230     
231     ++/
232     bool start(void *arg, bool block = true)
233     {
234         bool rtn = false;
235         for(;;)
236         {
237             uint i = 0;
238             for (; i < _pool.length; i++)
239             {
240                 if (_pool[i].tryLock())
241                 {
242                     scope(exit) _pool[i].unlock();
243                     if (_pool[i].state == PooledThread.State.IDLE)
244                     {
245                         int *data = cast(int *)arg;
246                         _pool[i].state = PooledThread.State.RUNNING;
247                         _pool[i].arg = arg;
248                         _pool[i].notify();
249                         rtn = true;
250                         break;
251                     }
252                 }
253             }
254
255             if (i == _pool.length)
256             {
257                 if (block)
258                     Thread.sleep(0.1);
259                 else
260                     break;
261             }
262             else
263                 break;
264         }
265
266         return rtn;
267     }
268
269
270     /++
271       Close the thread pool. This will go through all running threads,
272       tell them to shutdown, and wait for that to happen before returning.
273     
274       Currently, the poll time between thread checks is hard-coded to 100ms
275     ++/
276     void close()
277     {
278         if (open)
279         {
280             for(;;)
281             {
282                 bool allClosed = true;
283                 for (uint i = 0; i < _pool.length; i++)
284                 {
285                     _pool[i].lock();
286                     if (_pool[i].state == PooledThread.State.IDLE)
287                     {
288                         _pool[i].state = PooledThread.State.SHUTDOWN;       
289                         _pool[i].notify();
290                     }   
291                     if (_pool[i].state != PooledThread.State.CLOSED)
292                         allClosed = false;                                 
293                     _pool[i].unlock();
294                 }
295                 if (allClosed)
296                     break;
297                 else
298                     Thread.sleep(.1);
299             }
300             open = false;
301         }
302     }
303
304
305     private:
306     void delegate(void *) _dg = null;
307     void (*_func)(void *) = null;
308     PooledThread[] _pool = null;
309     bool open = false;
310
311     this(uint poolSize, size_t stackSize)
312     {
313         _pool = new PooledThread[poolSize];
314
315         for (uint i = 0; i < poolSize; i++)
316         {
317             if (_func)
318                 _pool[i] = new PooledThread(_func, stackSize);
319             else
320                 _pool[i] = new PooledThread(_dg, stackSize);
321         }
322         open = true;
323     }
324 }
325
326 version (Test)
327 {
328     import tetra.util.Test;
329     unittest
330     {   
331         Test.Status basicFunctionality(inout char[][] messages)
332         {
333             uint a = 0;
334             Mutex m = new Mutex();
335             void func(void *arg)
336             {           
337                 char[] name = Thread.getThis().name;
338                 int *data = cast(int *)arg;
339                 m.lock();
340                 a++;
341                 m.unlock();
342                 Thread.sleep(1);
343                 return 0;
344             }
345
346
347             ThreadPool pool = new ThreadPool(100, &func);
348             if (pool)
349             {
350                 for (uint i = 0; i < 1000; i++)
351                 {
352                     int *data = new int();
353                     *data = i;
354                     pool.start(cast(void*)(data));
355                 }
356             }
357             pool.close();
358
359             if (a == 1000)
360                 return Test.Status.Success;
361             return Test.Status.Failure;
362         }
363
364         auto t = new Test("tetra.util.ThreadPool");
365         t["Basic Functionality"] = &basicFunctionality;
366         t.run();
367     }
368 }
Note: See TracBrowser for help on using the browser.