Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Ticket #576: ThreadPool2.d

File ThreadPool2.d, 5.1 kB (added by anders0, 4 years ago)
Line 
1 /**
2  * -- description --
3  *
4  * Copyright: Copyright (C) 2007 Anders Halager. All rights reserved.
5  * License:   BSD style: $(LICENSE)
6  * Author:    Anders Halager
7  */
8 module tango.util.ThreadPool2;
9
10 import tango.core.Thread,
11        tango.core.Atomic,
12        tango.core.sync.Mutex,
13        tango.core.sync.Condition;
14
15 class ThreadPool(Args...)
16 {
17     alias void delegate(Args) JobD;
18
19     this(size_t workers, size_t q_size = 0)
20     {
21         // pre-allocate memory for q_size jobs in the queue
22         q.length = q_size;
23         q.length = 0;
24
25         m = new Mutex;
26         poolActivity = new Condition(m);
27         workerActivity = new Condition(m);
28
29         priority_job.store(cast(Job*)null);
30         active_jobs.store(0u);
31         done.store(true);
32
33         for (size_t i = 0; i < workers; i++)
34         {
35             auto thread = new Thread(&doJob);
36             // Allow the OS to kill the threads if we exit the program without
37             // handling them our selves
38             thread.isDaemon = true;
39             thread.start();
40             pool ~= thread;
41         }
42     }
43
44     /**
45       Assign the given job to a thread immediately or block until one is
46       available
47      */
48     void assign(JobD job, Args args)
49     {
50         m.lock();
51         auto j = Job(job, args);
52         priority_job.store(&j);
53         m.unlock();
54         poolActivity.notify();
55         // Wait until someone has taken the job
56         while (priority_job.load() !is null)
57             workerActivity.wait();
58             // Thread.yield();
59     }
60
61     /**
62       Assign the given job to a thread immediately or return false if none is
63       available. (Returns true if one was available)
64      */
65     bool tryAssign(JobD job, Args args)
66     {
67         if (active_jobs.load() >= pool.length)
68             return false;
69         assign(job, args);
70         return true;
71     }
72
73     /**
74       Put a job into the pool for eventual execution.
75
76       Warning: Acts as a stack, not a queue as you would expect
77      */
78     void append(JobD job, Args args)
79     {
80         m.lock();
81         q ~= Job(job, args);
82         m.unlock();
83         poolActivity.notify();
84     }
85
86     /// Get the number of jobs waiting to be executed
87     int pendingJobs()
88     {
89         m.lock(); scope(exit) m.unlock();
90         return q.length;
91     }
92
93     /// Finish currently executing jobs and shutdown.
94     void shutdown()
95     {
96         done.store(true);
97         q.length = 0;
98         poolActivity.notifyAll();
99         foreach (thread; pool)
100             thread.join();
101         m.lock();
102         m.unlock();
103     }
104
105     /// Complete all jobs and then shutdown.
106     void finish()
107     {
108         m.lock();
109         while (q.length > 0)
110             workerActivity.wait();
111         m.unlock();
112         shutdown();
113     }
114
115 private:
116     // Our list of threads -- only used during startup and shutdown
117     Thread[] pool;
118     struct Job
119     {
120         JobD dg;
121         Args args;
122     }
123     // Used for storing queued jobs that will be executed eventually
124     Job[] q;
125
126     // This is to store a single job for immediate execution, which hopefully
127     // means that any program using only assign and tryAssign wont need any
128     // heap allocations after startup.
129     Atomic!(Job*) priority_job;
130
131     // This should be used when accessing the job queue
132     Mutex m;
133
134     // Notify is called on this condition whenever we have activity in the pool
135     // that the workers might want to know about.
136     Condition poolActivity;
137
138     // Worker threads call notify on this when they are done with a job or are
139     // completely done.
140     // This allows a graceful shut down and is necessary since assign has to
141     // wait for a job to become available
142     Condition workerActivity;
143
144     // Are we in the shutdown phase?
145     Atomic!(bool) done;
146
147     // Counter for the number of jobs currently being calculated
148     Atomic!(uint) active_jobs;
149
150     // Thread delegate:
151     void doJob()
152     {
153         while (!done.load())
154         {
155             m.lock();
156             while (q.length == 0 && priority_job.load() is null && !done.load())
157                 poolActivity.wait();
158             if (done.load()) {
159                 m.unlock(); // not using scope(exit), need to manually unlock
160                 break;
161             }
162             Job job;
163             Job* jobPtr = priority_job.load();
164             if (jobPtr !is null)
165             {
166                 job = *jobPtr;
167                 priority_job.store(cast(Job*)null);
168                 workerActivity.notify();
169             }
170             else
171             {
172                 // A stack -- should be a queue
173                 job = q[length - 1];
174                 q.length = q.length - 1;
175             }
176
177             // Make sure we unlock before we start doing the calculations
178             m.unlock();
179
180             // Do the actual job
181             active_jobs.increment();
182             try {
183                 job.dg(job.args);
184             } catch (Exception ex) { }
185             active_jobs.decrement();
186
187             // Tell the pool that we are done with something
188             workerActivity.notify();
189             Thread.yield();
190         }
191         // Tell the pool that we are now done
192         workerActivity.notify();
193     }
194 }