Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Ticket #576: ThreadPool.2.d

File ThreadPool.2.d, 5.1 kB (added by anders0, 8 months ago)
Line 
1 /**
2  * -- description --
3  *
4  * Copyright: Copyright (C) 2007 Anders Halager. All rights reserved.
5  * License:   BSD style: $(LICENSE)
6  * Author:    Anders Halager
7  */
8 module tango.util.ThreadPool;
9
10 import tango.core.Thread,
11        tango.core.Atomic,
12        tango.core.sync.Mutex,
13        tango.core.sync.Condition;
14
15 class ThreadPool(Args...)
16 {
17     alias void delegate(Args) JobD;
18
19     this(size_t workers, size_t q_size = 0)
20     {
21         // pre-allocate memory for q_size jobs in the queue
22         q.length = q_size;
23         q.length = 0;
24
25         m = new Mutex;
26         poolActivity = new Condition(m);
27         workerActivity = new Condition(m);
28
29         priority_job.store(cast(Job*)null);
30         active_jobs.store(0u);
31         done.store(false);
32
33         for (size_t i = 0; i < workers; i++)
34         {
35             auto thread = new Thread(&doJob);
36             // Allow the OS to kill the threads if we exit the program without
37             // handling them our selves
38             thread.isDaemon = true;
39             thread.start();
40             pool ~= thread;
41         }
42     }
43
44     /**
45       Assign the given job to a thread immediately or block until one is
46       available
47      */
48     void assign(JobD job, Args args)
49     {
50         m.lock();
51         auto j = Job(job, args);
52         priority_job.store(&j);
53         m.unlock();
54         poolActivity.notify();
55         // Wait until someone has taken the job
56         while (priority_job.load() !is null)
57             workerActivity.wait();
58     }
59
60     /**
61       Assign the given job to a thread immediately or return false if none is
62       available. (Returns true if one was available)
63      */
64     bool tryAssign(JobD job, Args args)
65     {
66         if (active_jobs.load() >= pool.length)
67             return false;
68         assign(job, args);
69         return true;
70     }
71
72     /**
73       Put a job into the pool for eventual execution.
74
75       Warning: Acts as a stack, not a queue as you would expect
76      */
77     void append(JobD job, Args args)
78     {
79         m.lock();
80         q ~= Job(job, args);
81         m.unlock();
82         poolActivity.notify();
83     }
84
85     /// Get the number of jobs waiting to be executed
86     int pendingJobs()
87     {
88         m.lock(); scope(exit) m.unlock();
89         return q.length;
90     }
91
92     /// Finish currently executing jobs and shutdown.
93     void shutdown()
94     {
95         done.store(true);
96         q.length = 0;
97         poolActivity.notifyAll();
98         foreach (thread; pool)
99             thread.join();
100         m.lock();
101         m.unlock();
102     }
103
104     /// Complete all jobs and then shutdown.
105     void finish()
106     {
107         m.lock();
108         while (q.length > 0 || active_jobs.load() > 0)
109             workerActivity.wait();
110         m.unlock();
111         shutdown();
112     }
113
114 private:
115     // Our list of threads -- only used during startup and shutdown
116     Thread[] pool;
117     struct Job
118     {
119         JobD dg;
120         Args args;
121     }
122     // Used for storing queued jobs that will be executed eventually
123     Job[] q;
124
125     // This is to store a single job for immediate execution, which hopefully
126     // means that any program using only assign and tryAssign wont need any
127     // heap allocations after startup.
128     Atomic!(Job*) priority_job;
129
130     // This should be used when accessing the job queue
131     Mutex m;
132
133     // Notify is called on this condition whenever we have activity in the pool
134     // that the workers might want to know about.
135     Condition poolActivity;
136
137     // Worker threads call notify on this when they are done with a job or are
138     // completely done.
139     // This allows a graceful shut down and is necessary since assign has to
140     // wait for a job to become available
141     Condition workerActivity;
142
143     // Are we in the shutdown phase?
144     Atomic!(bool) done;
145
146     // Counter for the number of jobs currently being calculated
147     Atomic!(uint) active_jobs;
148
149     // Thread delegate:
150     void doJob()
151     {
152         while (!done.load())
153         {
154             m.lock();
155             while (q.length == 0 && priority_job.load() is null && !done.load())
156                 poolActivity.wait();
157             if (done.load()) {
158                 m.unlock(); // not using scope(exit), need to manually unlock
159                 break;
160             }
161             Job job;
162             Job* jobPtr = priority_job.load();
163             if (jobPtr !is null)
164             {
165                 job = *jobPtr;
166                 priority_job.store(cast(Job*)null);
167                 workerActivity.notify();
168             }
169             else
170             {
171                 // A stack -- should be a queue
172                 job = q[length - 1];
173                 q.length = q.length - 1;
174             }
175
176             // Make sure we unlock before we start doing the calculations
177             m.unlock();
178
179             // Do the actual job
180             active_jobs.increment();
181             try {
182                 job.dg(job.args);
183             } catch (Exception ex) { }
184             active_jobs.decrement();
185
186             // Tell the pool that we are done with something
187             workerActivity.notify();
188             Thread.yield();
189         }
190         // Tell the pool that we are now done
191         workerActivity.notify();
192     }
193 }