root/trunk/tango/scrapple/thread/Actor.d

Revision 25, 8.4 kB (checked in by Gregor, 1 year ago)

tango/scrapple/thread/Actor.d: Added an enqueueSync that synchronizes on multiple messages.

tests/shakematrix.d: And used it.

Line 
1 /**
2  * An implementation for an actor in the actor model of concurrency
3  *
4  * Authors:
5  *  Gregor Richards
6  *
7  * License:
8  *  Copyright (c) 2007  Gregor Richards
9  * 
10  *  Permission is hereby granted, free of charge, to any person obtaining a
11  *  copy of this software and associated documentation files (the "Software"),
12  *  to deal in the Software without restriction, including without limitation
13  *  the rights to use, copy, modify, merge, publish, distribute, sublicense,
14  *  and/or sell copies of the Software, and to permit persons to whom the
15  *  Software is furnished to do so, subject to the following conditions:
16  * 
17  *  The above copyright notice and this permission notice shall be included in
18  *  all copies or substantial portions of the Software.
19  * 
20  *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  *  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  *  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  *  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25  *  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26  *  DEALINGS IN THE SOFTWARE.
27  */
28
29 module tango.scrapple.thread.Actor;
30
31 public import tango.scrapple.thread.Message;
32
33 public import tango.core.Thread;
34 import tango.core.sync.Mutex;
35 import tango.core.sync.Semaphore;
36
37 /** An actor is a thread with builtin message-passing support for concurrency */
38 class Actor : Thread {
39     public:
40         this()
41         {
42             actorLock = new Mutex;
43             queueLock = new Semaphore;
44             super(&run);
45         }
46
47         /** new with a master */
48         this(Actor smaster)
49         {
50             actorLock = new Mutex;
51             queueLock = new Semaphore;
52             master = smaster;
53             super(&run);
54         }
55
56         /** Enqueue a message */
57         void enqueue(Message m)
58         {
59             // before locking, try to give the recipient a fair chance to receive
60             Thread.yield();
61
62             // lock the queue
63             actorLock.lock();
64
65             // add the new message
66             queue ~= m;
67
68             // then notify of the queue update
69             queueLock.notify();
70
71             // and unlock the queue
72             actorLock.unlock();
73         }
74
75         /** Enqueue a message and wait (must be done from an actor).
76             Returns the message. */
77         Message enqueueSync(Message m)
78         {
79             // create a semaphore to wait for the synchronization on
80             scope Semaphore s = new Semaphore;
81
82             // now enqueue it
83             enqueue(new SyncMessage(s, m));
84
85             // and wait
86             s.wait();
87
88             return m;
89         }
90
91         /** Enqueue a group of messages and wait for all of them (really only
92             useful with a Dispatcher or some other delegating actor) */
93         void enqueueSync(Message[] msgs)
94         {
95             // make all our semaphores
96             Semaphore[] sems;
97             sems.length = msgs.length;
98             foreach (i, m; msgs) {
99                 sems[i] = new Semaphore;
100             }
101
102             // enqueue all the messages
103             foreach (i, m; msgs) {
104                 enqueue(new SyncMessage(sems[i], m));
105             }
106
107             // then wait on all the semaphores
108             foreach (s; sems) {
109                 s.wait();
110             }
111         }
112
113         /** Enqueue a delegate to be called */
114         void enqueueDelegate(void delegate() d)
115         {
116             enqueueSync(new DelegateMessage(d));
117         }
118
119         /** You can also enqueue a delegate by just telling an actor to "do"
120             something */
121         alias enqueueDelegate act;
122
123         /** Set our master */
124         void setMaster(Actor smaster)
125         {
126             master = smaster;
127         }
128
129         /** Handle a message. The default handler is to call the Message's own
130             handling function. Returns true if this actor should die. */
131         bool handle(Message m)
132         {
133             return m.handle(this);
134         }
135
136     private:
137         /** The basic actor loop */
138         void run()
139         {
140             while (true) {
141                 actorLock.lock();
142
143                 // check the queue
144                 while (queue.length != 0) {
145                     // get the message off the head
146                     Message qhead = queue[0];
147
148                     /* move everything else back (slicing would cause
149                        unnecessary overhead when enqueueing) */
150                     for (int i = 0; i < queue.length - 1; i++) {
151                         queue[i] = queue[i+1];
152                     }
153                     queue.length = queue.length - 1;
154                    
155                     // then run it (allowing other messages to come in)
156                     actorLock.unlock();
157                     if (handle(qhead)) {
158                         // that's it for us!
159                         return;
160                     }
161                     actorLock.lock();
162
163                     // empty queue! Let's complain about it
164                     if (queue.length == 0 && master !is null) {
165                         actorLock.unlock();
166                         master.enqueue(new EmptyQueueMessage(this));
167                         actorLock.lock();
168                     }
169                 }
170
171                 // now we're just waiting on new data
172                 actorLock.unlock();
173
174                 queueLock.wait();
175             }
176         }
177
178         /** The actor lock. Cannot generally be used through synchronize() */
179         Mutex actorLock;
180
181         /** The queue lock informs the thread when new items have entered the
182             queue */
183         Semaphore queueLock;
184
185         /** The queue of commands. Should only be accessed through the proper
186             locking mechanisms */
187         Message[] queue;
188
189         /** The master, to which messages such as EmptyQueueMessage are sent */
190         Actor master;
191 }
192
193 /** A Dispatcher is an actor which delegates messages to other actors. Useful
194     when you have a number of actors for different cores/CPUs, but they are
195     otherwise indistinct */
196 class Dispatcher : Actor {
197     public:
198         this(Actor[] sslaves)
199         {
200             super();
201
202             slaves = sslaves.dup;
203            
204             // mark this as the master of each slave
205             foreach (s; slaves) {
206                 s.setMaster(this);
207                 slaveQueueEmpty[s] = true;
208             }
209         }
210
211         /** Handle messages */
212         bool handle(Message m)
213         {
214             if (cast(EmptyQueueMessage) m !is null) {
215                 // just telling us their queue is empty, so mark it
216                 if (m.sender !is null)
217                     slaveQueueEmpty[m.sender] = true;
218
219             } else if (cast(DieMessage) m !is null) {
220                 // when we die, we take everyone down with us!
221                 foreach (s; slaves)
222                     s.enqueueSync(new DieMessage);
223                 return true;
224
225             } else if (auto sm = cast(SyncMessage) m) {
226                 // this could be a DieMessage
227                 auto subm = sm.m;
228                 if (cast(DieMessage) subm !is null) {
229                     foreach (s; slaves)
230                         s.enqueueSync(new DieMessage);
231                     return m.handle(this);
232                 }
233                 delegateQueue ~= m;
234
235             } else {
236                 delegateQueue ~= m;
237
238             }
239
240             flushDelegateQueue();
241             return false;
242         }
243
244     private:
245         /** Flush the delegate queue as much as possible */
246         void flushDelegateQueue()
247         {
248             foreach (a; slaves) {
249                 if (slaveQueueEmpty[a]) {
250                     // give them something to do
251                     if (delegateQueue.length > 0) {
252                         a.enqueue(delegateQueue[0]);
253                         for (int i = 0; i < delegateQueue.length - 1; i++) {
254                             delegateQueue[i] = delegateQueue[i+1];
255                         }
256                         delegateQueue.length = delegateQueue.length - 1;
257                         slaveQueueEmpty[a] = false;
258                     }
259                 }
260             }
261         }
262
263         /** The slaves of this delegater */
264         Actor[] slaves;
265
266         /** And whether their queues are empty */
267         bool[Actor] slaveQueueEmpty;
268
269         /** A separate queue for messages which we have received, but not yet
270             delegated */
271         Message[] delegateQueue;
272 }
Note: See TracBrowser for help on using the browser.