Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Ticket #576: pool.d

File pool.d, 3.8 kB (added by Nietsnie, 1 year ago)

Example of a Thread Pool class.

Line 
1 /***************************************
2 * Authors: Jeff Davey, jeffd@gwava.com
3 * Thread pooling library
4 */
5
6 module pool;
7
8 private import tango.core.Thread;
9 private import tango.core.sync.Mutex;
10 private import tango.core.sync.Condition;
11
12 private import tango.io.Stdout;
13
14
15 private class PooledThread
16 {
17     enum State
18     {
19         IDLE,
20         RUNNING,
21         SHUTDOWN,
22         CLOSED
23     };
24
25     this(void delegate(void *) dg, size_t stackSize = 0)
26     {
27         this._dg = dg;
28         this(stackSize);
29     }
30
31     this(void (*func)(void *arg), size_t stackSize = 0)
32     {
33         this._func = func;
34         this(stackSize);
35     }
36
37     State state()
38     {
39         return this._state;
40     }
41
42     State state(State newState)
43     {
44         return this._state = newState;
45     }
46
47     void join()
48     {
49         _thread.join();
50     }
51
52     void lock()
53     {
54         _lock.lock();
55     }
56
57     bool tryLock()
58     {
59         return _lock.tryLock();
60     }
61
62     void unlock()
63     {
64         _lock.unlock();
65     }
66
67     void* arg()
68     {
69         return this._arg;
70     }
71
72     void* arg(void* newArg)
73     {
74         return this._arg = newArg;
75     }
76    
77     void notify()
78     {
79         _signal.notify();
80     }
81
82     private:
83     Mutex _lock;
84     Condition _signal;
85     void delegate(void *) _dg = null;
86     void (*_func)(void *) = null;
87     void* _arg = null;
88     Thread _thread;
89     State _state = State.CLOSED;
90
91    
92     void run()
93     {
94         _lock.lock();
95         scope (exit) _lock.unlock();
96
97         for(;;)
98         {
99             this._state = State.IDLE;
100             _signal.wait();
101             if (this._state == State.SHUTDOWN)
102             {
103                 this._state = State.CLOSED;
104                 break;
105             }
106
107             if (this._state == State.RUNNING)
108             {
109                 if (_dg)
110                     _dg(_arg);
111                 else if (_func)
112                     _func(_arg);
113             }
114         }
115
116         return 0;
117     }
118
119     this(size_t stackSize)
120     {
121         this._lock = new Mutex();
122         this._signal = new Condition(this._lock);
123
124 //      _thread = new Thread(&run, stackSize); // TODO: WE NEED STACKSIZE!
125         _thread = new Thread(&run);
126         _thread.start();
127     }
128
129 }
130
131 class ThreadPool
132 {
133     this(uint poolSize, void (*func)(void *), size_t stackSize = 0)
134     in
135     {
136         assert(func);
137     }
138     body
139     {
140         this._func = func;
141         this(poolSize, stackSize);
142     }
143
144     this(uint poolSize, void delegate(void *) dg, size_t stackSize = 0)
145     in
146     {
147         assert(dg);
148     }
149     body
150     {
151         this._dg = dg;
152         this(poolSize, stackSize);     
153     }
154
155     bool start(void *arg, bool block = true)
156     {
157         bool rtn = false;
158         for(;;)
159         {
160             uint i = 0;
161             for (; i < _pool.length; i++)
162             {
163                 if (_pool[i].tryLock())
164                 {
165                     scope(exit) _pool[i].unlock();
166                     if (_pool[i].state == PooledThread.State.IDLE)
167                     {
168                         int *data = cast(int *)arg;
169                         _pool[i].state = PooledThread.State.RUNNING;
170                         _pool[i].arg = arg;
171                         _pool[i].notify();
172                         rtn = true;
173                         break;
174                     }
175                 }
176             }
177
178             if (i == _pool.length)
179             {
180                 if (block)
181                     Thread.sleep(0.1);
182                 else
183                     break;
184             }
185             else
186                 break;
187         }
188
189         return rtn;
190     }
191
192
193     void close()
194     {
195         if (open)
196         {
197             for (uint i = 0; i < _pool.length; i++)         
198             {
199                 _pool[i].lock();
200                 if (_pool[i].state != PooledThread.State.RUNNING)
201                 {
202                     _pool[i].state = PooledThread.State.SHUTDOWN;       
203                     _pool[i].notify();
204                 }
205                 _pool[i].unlock();
206             }
207             for (uint i = 0; i < _pool.length; i++)
208                 _pool[i].join();
209             open = false;
210         }
211     }
212
213
214     private:
215     void delegate(void *) _dg = null;
216     void (*_func)(void *) = null;
217     PooledThread[] _pool = null;
218     bool open = false;
219
220     this(uint poolSize, size_t stackSize)
221     {
222         _pool = new PooledThread[poolSize];
223
224         for (uint i = 0; i < poolSize; i++)
225         {
226             if (_func)
227                 _pool[i] = new PooledThread(_func, stackSize);
228             else
229                 _pool[i] = new PooledThread(_dg, stackSize);
230         }
231         open = true;
232     }
233
234     unittest
235     {   
236         uint a = 0;
237         Mutex m = new Mutex();
238
239         void func(void *arg)
240         {           
241             char[] name = Thread.getThis().name;
242             int *data = cast(int *)arg;
243             m.lock();
244             Stdout.format("{}", *data).newline;
245             a++;
246             m.unlock();
247             Thread.sleep(1);
248             return 0;
249         }
250
251
252         ThreadPool pool = new ThreadPool(100, &func);
253         if (pool)
254         {
255             for (uint i = 0; i < 1000; i++)
256             {
257                 int *data = new int();
258                 *data = i;
259                 pool.start(cast(void*)(data));
260             }
261         }
262         pool.close();
263
264         assert(a == 1000);
265     }
266 }