Note: This website is archived. For up-to-date information about D projects and development, please visit wiki.dlang.org.

root/trunk/phobos/std/concurrency.d

Revision 2260, 31.8 kB (checked in by sean, 14 years ago)

Changed thisTid to construct a MessageBox? if one doesn't exist (true for the main thread and threads created with core.thread instead of spawn). This eliminates a double construction for spawned threads or the alternative of a segfault when sending to a thread not created by spawn.

  • Property svn:eol-style set to native
Line 
1 /**
2  * This is a low-level messaging API upon which more structured or restrictive
3  * APIs may be built.  The general idea is that every messageable entity is
4  * represented by a common handle type (called a Cid in this implementation),
5  * which allows messages to be sent to in-process threads, on-host processes,
6  * and foreign-host processes using the same interface.  This is an important
7  * aspect of scalability because it allows the components of a program to be
8  * spread across available resources with few to no changes to the actual
9  * implementation.
10  *
11  * Right now, only in-process threads are supported and referenced by a more
12  * specialized handle called a Tid.  It is effectively a subclass of Cid, with
13  * additional features specific to in-process messaging.
14  *
15  * Copyright: Copyright Sean Kelly 2009 - 2010.
16  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
17  * Authors:   Sean Kelly
18  */
19 /*          Copyright Sean Kelly 2009 - 2010.
20  * Distributed under the Boost Software License, Version 1.0.
21  *    (See accompanying file LICENSE_1_0.txt or copy at
22  *          http://www.boost.org/LICENSE_1_0.txt)
23  */
24 module std.concurrency;
25
26
27 public
28 {
29     import core.atomic;
30     import core.sync.barrier;
31     import core.sync.condition;
32     import core.sync.mutex;
33     import core.sync.rwmutex;
34     import core.sync.semaphore;
35     import std.variant;
36 }
37 private
38 {
39     import core.thread;
40     import std.algorithm;
41     import std.exception;
42     import std.range;
43     import std.range;
44     import std.traits;
45     import std.typecons;
46     import std.typetuple;
47
48     template hasLocalAliasing(T...)
49     {
50         static if( !T.length )
51             enum hasLocalAliasing = false;
52         else
53             enum hasLocalAliasing = (std.traits.hasLocalAliasing!(T[0]) && !is(T[0] == Tid)) ||
54                                     std.concurrency.hasLocalAliasing!(T[1 .. $]);
55     }
56
57     enum MsgType
58     {
59         standard,
60         priority,
61         linkDead,
62     }
63
64     struct Message
65     {
66         MsgType type;
67         Variant data;
68        
69         this(T...)( MsgType t, T vals )
70             if( T.length < 1 )
71         {
72             static assert( false, "messages must contain at least one item" );
73         }
74
75         this(T...)( MsgType t, T vals )
76             if( T.length == 1 )
77         {
78             type = t;
79             data = vals[0];
80         }
81
82         this(T...)( MsgType t, T vals )
83             if( T.length > 1 )
84         {
85             type = t;
86             data = Tuple!(T)( vals );
87         }
88
89         auto convertsTo(T...)()
90         {
91             static if( T.length == 1 )
92                 return is( T[0] == Variant ) ||
93                        data.convertsTo!(T);
94             else
95                 return data.convertsTo!(Tuple!(T));
96         }
97        
98         auto get(T...)()
99         {
100             static if( T.length == 1 )
101             {
102                 static if( is( T[0] == Variant ) )
103                     return data;
104                 else
105                     return data.get!(T);
106             }
107             else
108             {
109                 return data.get!(Tuple!(T));
110             }
111         }
112
113         auto map(Op)( Op op )
114         {
115             alias ParameterTypeTuple!(Op) Args;
116
117             static if( Args.length == 1 )
118             {
119                 static if( is( Args[0] == Variant ) )
120                     return op( data );
121                 else
122                     return op( data.get!(Args) );
123             }
124             else
125             {
126                 return op( data.get!(Tuple!(Args)).expand );
127             }
128         }
129     }
130
131     void checkops(T...)( T ops )
132     {
133         foreach( i, t1; T )
134         {
135             static assert( is( t1 == function ) || is( t1 == delegate ) );
136             alias ParameterTypeTuple!(t1) a1;
137             alias ReturnType!(t1) r1;
138
139             static if( i < T.length - 1 && is( r1 == void ) )
140             {
141                 static assert( a1.length != 1 || !is( a1[0] == Variant ),
142                                "function with arguments " ~ a1.stringof ~
143                                " occludes successive function" );
144
145                 foreach( t2; T[i+1 .. $] )
146                 {
147                     static assert( is( t2 == function ) || is( t2 == delegate ) );
148                     alias ParameterTypeTuple!(t2) a2;
149
150                     static assert( !is( a1 == a2 ),
151                                    "function with arguments " ~ a1.stringof ~
152                                    " occludes successive function" );
153                 }
154             }
155         }
156     }
157
158     MessageBox  mbox;
159     bool[Tid]   links;
160     Tid         owner;
161 }
162
163
164 static this()
165 {
166     // NOTE: thisTid will construct a new MessageBox if one doesn't exist,
167     //       which should only be true of the main thread and threads created
168     //       via core.thread instead of spawn.
169 }
170
171
172 static ~this()
173 {
174     if( mbox !is null )
175     {
176         mbox.close();
177         auto me = thisTid;
178         foreach( tid; links.keys )
179             _send( MsgType.linkDead, tid, me );
180         if( owner != Tid.init )
181             _send( MsgType.linkDead, owner, me );
182     }
183 }
184
185
186 //////////////////////////////////////////////////////////////////////////////
187 // Exceptions
188 //////////////////////////////////////////////////////////////////////////////
189
190
191 /**
192  *
193  */
194 class MessageMismatch : Exception
195 {
196     this( string msg = "Unexpected message type" )
197     {
198         super( msg );
199     }
200 }
201
202
203 /**
204  *
205  */
206 class OwnerTerminated : Exception
207 {
208     this( Tid t, string msg = "Owner terminated" )
209     {
210         super( msg );
211         tid = t;
212     }
213
214     Tid tid;
215 }
216
217
218 /**
219  *
220  */
221 class LinkTerminated : Exception
222 {
223     this( Tid t, string msg = "Link terminated" )
224     {
225         super( msg );
226         tid = t;
227     }
228
229     Tid tid;
230 }
231
232
233 /**
234  *
235  */
236 class PriorityMessageException : Exception
237 {
238     this( Variant vals )
239     {
240         super( "Priority message" );
241         message = vals;
242     }
243
244     Variant message;
245 }
246
247
248 /**
249  *
250  */
251 class MailboxFull : Exception
252 {
253     this( Tid t, string msg = "Mailbox full" )
254     {
255         super( msg );
256         tid = t;
257     }
258
259     Tid tid;
260 }
261
262
263 //////////////////////////////////////////////////////////////////////////////
264 // Thread ID
265 //////////////////////////////////////////////////////////////////////////////
266
267
268 /**
269  * An opaque type used to represent a logical local process.
270  */
271 struct Tid
272 {
273     void send(T...)( T vals )
274     {
275         static assert( !hasLocalAliasing!(T),
276                        "Aliases to mutable thread-local data not allowed." );
277         _send( this, vals );
278     }
279
280
281 private:
282     this( MessageBox m )
283     {
284         mbox = m;
285     }
286
287
288     MessageBox  mbox;
289 }
290
291
292 /**
293  * Returns the caller's Tid.
294  */
295 @property Tid thisTid()
296 {
297     if( mbox )
298         return Tid( mbox );
299     mbox = new MessageBox;
300     return Tid( mbox );
301 }
302
303
304 //////////////////////////////////////////////////////////////////////////////
305 // Thread Creation
306 //////////////////////////////////////////////////////////////////////////////
307
308
309 /**
310  * Executes the supplied function in a new context represented by Tid.  The
311  * calling context is designated as the owner of the new context.  When the
312  * owner context terminated an OwnerTerminated message will be sent to the
313  * new context, causing an OwnerTerminated exception to be thrown on
314  * receive().
315  *
316  * Params:
317  *  fn   = The function to execute.
318  *  args = Arguments to the function.
319  *
320  * Returns:
321  *  A Tid representing the new context.
322  */
323 Tid spawn(T...)( void function(T) fn, T args )
324 {
325     static assert( !hasLocalAliasing!(T),
326                    "Aliases to mutable thread-local data not allowed." );
327     return _spawn( false, fn, args );
328 }
329
330
331 /**
332  * Executes the supplied function in a new context represented by Tid.  This
333  * new context is linked to the calling context so that if either it or the
334  * calling context terminates a LinkTerminated message will be sent to the
335  * other, causing a LinkTerminated exception to be thrown on receive().  The
336  * owner relationship from spawn() is preserved as well, so if the link
337  * between threads is broken, owner termination will still result in an
338  * OwnerTerminated exception to be thrown on receive().
339  *
340  * Params:
341  *  fn   = The function to execute.
342  *  args = Arguments to the function.
343  *
344  * Returns:
345  *  A Tid representing the new context.
346  */
347 Tid spawnLinked(T...)( void function(T) fn, T args )
348 {
349     static assert( !hasLocalAliasing!(T),
350                    "Aliases to mutable thread-local data not allowed." );
351     return _spawn( true, fn, args );
352 }
353
354
355 /*
356  *
357  */
358 private Tid _spawn(T...)( bool linked, void function(T) fn, T args )
359 {
360     // TODO: MessageList and &exec should be shared.
361     auto spawnTid = Tid( new MessageBox );
362     auto ownerTid = thisTid;
363
364     void exec()
365     {
366         mbox  = spawnTid.mbox;
367         owner = ownerTid;
368         fn( args );
369     }
370
371     // TODO: MessageList and &exec should be shared.
372     auto t = new Thread( &exec ); t.start();
373     links[spawnTid] = linked;
374     return spawnTid;
375 }
376
377
378 //////////////////////////////////////////////////////////////////////////////
379 // Sending and Receiving Messages
380 //////////////////////////////////////////////////////////////////////////////
381
382
383 /**
384  * Sends the supplied value to the context represented by tid.
385  */
386 void send(T...)( Tid tid, T vals )
387 {
388     static assert( !hasLocalAliasing!(T),
389                    "Aliases to mutable thread-local data not allowed." );
390     _send( tid, vals );
391 }
392
393
394 /**
395  *
396  */
397 void prioritySend(T...)( Tid tid, T vals )
398 {
399     static assert( !hasLocalAliasing!(T),
400                    "Aliases to mutable thread-local data not allowed." );
401     _send( MsgType.priority, tid, vals );
402 }
403
404
405 /*
406  * ditto
407  */
408 private void _send(T...)( Tid tid, T vals )
409 {
410     _send( MsgType.standard, tid, vals );
411 }
412
413
414 /*
415  * Implementation of send.  This allows parameter checking to be different for
416  * both Tid.send() and .send().
417  */
418 private void _send(T...)( MsgType type, Tid tid, T vals )
419 {
420     tid.mbox.put( Message( type, vals ) );
421 }
422
423
424 /**
425  *
426  */
427 void receive(T...)( T ops )
428 {
429     checkops( ops );
430     mbox.get( ops );
431 }
432
433
434 unittest
435 {
436     assert( __traits( compiles,
437                       {
438                           receive( (Variant x) {} );
439                           receive( (int x) {}, (Variant x) {} );
440                       } ) );
441
442     assert( !__traits( compiles,
443                        {
444                            receive( (Variant x) {}, (int x) {} );
445                        } ) );
446
447     assert( !__traits( compiles,
448                        {
449                            receive( (int x) {}, (int x) {} );
450                        } ) );
451 }
452
453
454 private template receiveOnlyRet(T...)
455 {
456     static if( T.length == 1 )
457         alias T[0] receiveOnlyRet;
458     else
459         alias Tuple!(T) receiveOnlyRet;
460 }
461
462 /**
463  *
464  */
465 receiveOnlyRet!(T) receiveOnly(T...)()
466 {
467     Tuple!(T) ret;
468
469     mbox.get( ( T val )
470               {
471                   static if( T.length )
472                       ret.field = val;
473               },
474               ( Variant val )
475               {
476                   throw new MessageMismatch;
477               } );
478     static if( T.length == 1 )
479         return ret[0];
480     else
481         return ret;
482 }
483
484
485 /**
486  *
487  */
488 bool receiveTimeout(T...)( long ms, T ops )
489 {
490     checkops( ops );
491     static enum long TICKS_PER_MILLI = 10_000;
492     return mbox.get( ms * TICKS_PER_MILLI, ops );
493 }
494
495
496 unittest
497 {
498     assert( __traits( compiles,
499                       {
500                           receiveTimeout( 0, (Variant x) {} );
501                           receiveTimeout( 0, (int x) {}, (Variant x) {} );
502                       } ) );
503
504     assert( !__traits( compiles,
505                        {
506                            receiveTimeout( 0, (Variant x) {}, (int x) {} );
507                        } ) );
508
509     assert( !__traits( compiles,
510                        {
511                            receiveTimeout( 0, (int x) {}, (int x) {} );
512                        } ) );
513 }
514
515
516 //////////////////////////////////////////////////////////////////////////////
517 // MessageBox Limits
518 //////////////////////////////////////////////////////////////////////////////
519
520
521 /**
522  * These behaviors may be specified when a mailbox is full.
523  */
524 enum OnCrowding
525 {
526     block,          /// Wait until room is available.
527     throwException, /// Throw a MailboxFull exception.
528     ignore          /// Abort the send and return.
529 }
530
531
532 private
533 {
534     bool onCrowdingBlock( Tid tid )
535     {
536         return true;
537     }
538
539
540     bool onCrowdingThrow( Tid tid )
541     {
542         throw new MailboxFull( tid );
543     }
544
545
546     bool onCrowdingIgnore( Tid tid )
547     {
548         return false;
549     }
550 }
551
552
553 /**
554  * Sets a limit on the maximum number of user messages allowed in the mailbox.
555  * If this limit is reached, the caller attempting to add a new message will
556  * execute the behavior specified by doThis.  If messages is zero, the mailbox
557  * is unbounded.
558  *
559  * Params:
560  *  tid      = The Tid of the thread for which this limit should be set.
561  *  messages = The maximum number of messages or zero if no limit.
562  *  doThis   = The behavior executed when a message is sent to a full
563  *             mailbox.
564  */
565 void setMaxMailboxSize( Tid tid, size_t messages, OnCrowding doThis )
566 {
567     final switch( doThis )
568     {
569     case OnCrowding.block:
570         return tid.mbox.setMaxMsgs( messages, &onCrowdingBlock );
571     case OnCrowding.throwException:
572         return tid.mbox.setMaxMsgs( messages, &onCrowdingThrow );
573     case OnCrowding.ignore:
574         return tid.mbox.setMaxMsgs( messages, &onCrowdingIgnore );
575     }
576 }
577
578
579 /**
580  * Sets a limit on the maximum number of user messages allowed in the mailbox.
581  * If this limit is reached, the caller attempting to add a new message will
582  * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
583  *
584  * Params:
585  *  tid      = The Tid of the thread for which this limit should be set.
586  *  messages = The maximum number of messages or zero if no limit.
587  *  onCrowdingDoThis = The routine called when a message is sent to a full
588  *                     mailbox.
589  */
590 void setMaxMailboxSize( Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis )
591 {
592     tid.mbox.setMaxMsgs( messages, onCrowdingDoThis );
593 }
594
595
596 //////////////////////////////////////////////////////////////////////////////
597 // MessageBox Implementation
598 //////////////////////////////////////////////////////////////////////////////
599
600
601 private
602 {
603     /*
604      * A MessageBox is a message queue for one thread.  Other threads may send
605      * messages to this owner by calling put(), and the owner receives them by
606      * calling get().  The put() call is therefore effectively shared and the
607      * get() call is effectively local.  setMaxMsgs may be used by any thread
608      * to limit the size of the message queue.
609      */
610     class MessageBox
611     {
612         this()
613         {
614             m_lock      = new Mutex;
615             m_putMsg    = new Condition( m_lock );
616             m_notFull   = new Condition( m_lock );
617             m_closed    = false;
618         }
619
620
621         /*
622          * Sets a limit on the maximum number of user messages allowed in the
623          * mailbox.  If this limit is reached, the caller attempting to add
624          * a new message will execute call.  If num is zero, there is no limit
625          * on the message queue.
626          *
627          * Params:
628          *  num  = The maximum size of the queue or zero if the queue is
629          *         unbounded.
630          *  call = The routine to call when the queue is full.
631          */
632         final void setMaxMsgs( size_t num, bool function(Tid) call )
633         {
634             synchronized( m_lock )
635             {
636                 m_maxMsgs   = num;
637                 m_onMaxMsgs = call;
638             }
639         }
640
641
642         /*
643          * If maxMsgs is not set, the message is added to the queue and the
644          * owner is notified.  If the queue is full, the message will still be
645          * accepted if it is a control message, otherwise onCrowdingDoThis is
646          * called.  If the routine returns true, this call will block until
647          * the owner has made space available in the queue.  If it returns
648          * false, this call will abort.
649          *
650          * Params:
651          *  msg = The message to put in the queue.
652          *
653          * Throws:
654          *  An exception if the queue is full and onCrowdingDoThis throws.
655          */
656         final void put( ref Message msg )
657         {
658             synchronized( m_lock )
659             {
660                 // TODO: Generate an error here if m_closed is true, or maybe
661                 //       put a message in the caller's queue?
662                 if( !m_closed )
663                 {
664                     while( true )
665                     {
666                         if( isPriorityMsg( msg ) )
667                         {
668                             m_sharedPty.put( msg );
669                             m_putMsg.notify();
670                             return;
671                         }
672                         if( !mboxFull() || isControlMsg( msg ) )
673                         {
674                             m_sharedBox.put( msg );
675                             m_putMsg.notify();
676                             return;
677                         }
678                         if( m_onMaxMsgs !is null && !m_onMaxMsgs( thisTid ) )
679                         {
680                             return;
681                         }
682                         m_putQueue++;
683                         m_notFull.wait();
684                         m_putQueue--;
685                     }
686                 }
687             }
688         }
689
690
691         /*
692          * Matches ops against each message in turn until a match is found.
693          *
694          * Params:
695          *  ops = The operations to match.  Each may return a bool to indicate
696          *        whether a message with a matching type is truly a match.
697          *
698          * Returns:
699          *  true if a message was retrieved and false if not (such as if a
700          *  timeout occurred).
701          *
702          * Throws:
703          *  LinkTerminated if a linked thread terminated, or OwnerTerminated
704          * if the owner thread terminates and no existing messages match the
705          * supplied ops.
706          */
707         final bool get(T...)( T vals )
708         {
709             static assert( T.length );
710
711             static if( isImplicitlyConvertible!(T[0], long) )
712             {
713                 alias TypeTuple!(T[1 .. $]) Ops;
714                 alias vals[1 .. $] ops;
715                 assert( vals[0] >= 0 );
716                 enum timedWait = true;
717                 long period = vals[0];
718             }
719             else
720             {
721                 alias TypeTuple!(T) Ops;
722                 alias vals[0 .. $] ops;
723                 enum timedWait = false;
724             }
725
726             bool onStandardMsg( ref Message msg )
727             {
728                 foreach( i, t; Ops )
729                 {
730                     alias ParameterTypeTuple!(t) Args;
731                     auto op = ops[i];
732                    
733                     if( msg.convertsTo!(Args) )
734                     {
735                         static if( is( ReturnType!(t) == bool ) )
736                         {
737                             return msg.map( op );
738                         }
739                         else
740                         {
741                             msg.map( op );
742                             return true;
743                         }
744                     }
745                 }
746                 return false;
747             }
748
749             bool onLinkDeadMsg( ref Message msg )
750             {
751                 assert( msg.convertsTo!(Tid) );
752                 auto tid = msg.get!(Tid);
753
754                 if( bool* depends = (tid in links) )
755                 {
756                     links.remove( tid );
757                     // Give the owner relationship precedence.
758                     if( *depends && tid != owner )
759                     {                       
760                         auto e = new LinkTerminated( tid );
761                         if( onStandardMsg( Message( MsgType.standard, e ) ) )
762                             return true;
763                         throw e;
764                     }
765                 }
766                 if( tid == owner )
767                 {
768                     owner = Tid.init;
769                     auto e = new OwnerTerminated( tid );
770                     if( onStandardMsg( Message( MsgType.standard, e ) ) )
771                         return true;
772                     throw e;
773                 }
774                 return false;
775             }
776
777             bool onControlMsg( ref Message msg )
778             {
779                 switch( msg.type )
780                 {
781                 case MsgType.linkDead:
782                     return onLinkDeadMsg( msg );
783                 default:
784                     return false;
785                 }
786             }
787
788             bool scan( ref ListT list )
789             {
790                 for( auto range = list[]; !range.empty; )
791                 {
792                     // Only the message handler will throw, so if this occurs
793                     // we can be certain that the message was handled.
794                     scope(failure) list.removeAt( range );
795
796                     if( isControlMsg( range.front ) )
797                     {
798                         if( onControlMsg( range.front ) )
799                         {
800                             // Although the linkDead message is a control message,
801                             // it can be handled by the user.  Since the linkDead
802                             // message throws if not handled, if we get here then
803                             // it has been handled and we can return from receive.
804                             // This is a weird special case that will have to be
805                             // handled in a more general way if more are added.
806                             if( !isLinkDeadMsg( range.front ) )
807                             {
808                                 list.removeAt( range );
809                                 continue;
810                             }
811                             list.removeAt( range );
812                             return true;   
813                         }
814                         range.popFront();
815                         continue;
816                     }
817                     else
818                     {
819                         if( onStandardMsg( range.front ) )
820                         {
821                             list.removeAt( range );
822                             return true;
823                         }
824                         range.popFront();
825                         continue;
826                     }
827                 }
828                 return false;
829             }
830
831
832             bool pty( ref ListT list )
833             {
834                 if( !list.empty )
835                 {
836                     auto range = list[];
837
838                     if( onStandardMsg( range.front ) )
839                     {
840                         list.removeAt( range );
841                         return true;
842                     }
843                     if( range.front.convertsTo!(Throwable) )
844                         throw range.front.get!(Throwable);
845                     else if( range.front.convertsTo!(shared(Throwable)) )
846                         throw range.front.get!(shared(Throwable));
847                     else throw new PriorityMessageException( range.front.data );
848                 }
849                 return false;
850             }
851
852             while( true )
853             {
854                 ListT arrived;
855
856                 if( pty( m_localPty ) ||
857                     scan( m_localBox ) )
858                 {
859                     return true;
860                 }
861                 synchronized( m_lock )
862                 {
863                     updateMsgCount();
864                     while( m_sharedPty.empty && m_sharedBox.empty )
865                     {
866                         // NOTE: We're notifying all waiters here instead of just
867                         //       a few because the onCrowding behavior may have
868                         //       changed and we don't want to block sender threads
869                         //       unnecessarily if the new behavior is not to block.
870                         //       This will admittedly result in spurious wakeups
871                         //       in other situations, but what can you do?
872                         if( m_putQueue && !mboxFull() )
873                             m_notFull.notifyAll();
874                         static if( timedWait )
875                         {
876                             if( !m_putMsg.wait( period ) )
877                                 return false;
878                         }
879                         else
880                         {
881                             m_putMsg.wait();
882                         }
883                     }
884                     m_localPty.put( m_sharedPty );
885                     arrived.put( m_sharedBox );
886                 }
887                 if( m_localPty.empty )
888                 {
889                     scope(exit) m_localBox.put( arrived );
890                     if( scan( arrived ) )
891                         return true;
892                     else continue;
893                 }
894                 m_localBox.put( arrived );
895                 pty( m_localPty );
896                 return true;
897             }
898         }
899
900
901         /*
902          * Called on thread termination.  This routine processes any remaining
903          * control messages, clears out message queues, and sets a flag to
904          * reject any future messages.
905          */
906         final void close()
907         {
908             void onLinkDeadMsg( ref Message msg )
909             {
910                 assert( msg.convertsTo!(Tid) );
911                 auto tid = msg.get!(Tid);
912
913                 links.remove( tid );
914                 if( tid == owner )
915                     owner = Tid.init;
916             }
917
918             void sweep( ref ListT list )
919             {
920                 for( auto range = list[]; !range.empty; range.popFront() )
921                 {
922                     if( range.front.type == MsgType.linkDead )
923                         onLinkDeadMsg( range.front );
924                 }
925             }
926
927             ListT arrived;
928
929             sweep( m_localBox );
930             synchronized( m_lock )
931             {
932                 arrived.put( m_sharedBox );
933                 m_closed = true;
934             }
935             m_localBox.clear();
936             sweep( arrived );
937         }
938
939
940     private:
941         //////////////////////////////////////////////////////////////////////
942         // Routines involving shared data, m_lock must be held.
943         //////////////////////////////////////////////////////////////////////
944
945
946         bool mboxFull()
947         {
948             return m_maxMsgs &&
949                    m_maxMsgs <= m_localMsgs + m_sharedBox.length;
950         }
951
952
953         void updateMsgCount()
954         {
955             m_localMsgs = m_localBox.length;
956         }
957
958
959     private:
960         //////////////////////////////////////////////////////////////////////
961         // Routines involving local data only, no lock needed.
962         //////////////////////////////////////////////////////////////////////
963
964
965         pure final bool isControlMsg( ref Message msg )
966         {
967             return msg.type != MsgType.standard &&
968                    msg.type != MsgType.priority;
969         }
970
971
972         pure final bool isPriorityMsg( ref Message msg )
973         {
974             return msg.type == MsgType.priority;
975         }
976        
977        
978         pure final bool isLinkDeadMsg( ref Message msg )
979         {
980             return msg.type == MsgType.linkDead;
981         }
982
983
984     private:
985         //////////////////////////////////////////////////////////////////////
986         // Type declarations.
987         //////////////////////////////////////////////////////////////////////
988
989
990         alias bool function(Tid) OnMaxFn;
991         alias List!(Message)     ListT;
992
993     private:
994         //////////////////////////////////////////////////////////////////////
995         // Local data, no lock needed.
996         //////////////////////////////////////////////////////////////////////
997
998
999         ListT       m_localBox;
1000         ListT       m_localPty;
1001
1002
1003     private:
1004         //////////////////////////////////////////////////////////////////////
1005         // Shared data, m_lock must be held on access.
1006         //////////////////////////////////////////////////////////////////////
1007
1008
1009         Mutex       m_lock;
1010         Condition   m_putMsg;
1011         Condition   m_notFull;
1012         size_t      m_putQueue;
1013         ListT       m_sharedBox;
1014         ListT       m_sharedPty;
1015         OnMaxFn     m_onMaxMsgs;
1016         size_t      m_localMsgs;
1017         size_t      m_maxMsgs;
1018         bool        m_closed;
1019     }
1020
1021
1022     /*
1023      *
1024      */
1025     struct List(T)
1026     {
1027         struct Range
1028         {
1029             @property bool empty() const
1030             {
1031                 return !m_prev.next;
1032             }
1033
1034             @property ref T front()
1035             {
1036                 enforce( m_prev.next );
1037                 return m_prev.next.val;
1038             }
1039
1040             @property void front( T val )
1041             {
1042                 enforce( m_prev.next );
1043                 m_prev.next.val = val;
1044             }
1045
1046             void popFront()
1047             {
1048                 enforce( m_prev.next );
1049                 m_prev = m_prev.next;
1050             }
1051
1052             //T moveFront()
1053             //{
1054             //    enforce( m_prev.next );
1055             //    return move( m_prev.next.val );
1056             //}
1057
1058             private this( Node* p )
1059             {
1060                 m_prev = p;
1061             }
1062
1063             private Node* m_prev;
1064         }
1065
1066
1067         /*
1068          *
1069          */
1070         void put( T val )
1071         {
1072             put( new Node( val ) );
1073             m_count++;
1074         }
1075
1076
1077         /*
1078          *
1079          */
1080         void put( ref List!(T) rhs )
1081         {
1082             if( !rhs.empty )
1083             {
1084                 put( rhs.m_first );
1085                 while( m_last.next !is null )
1086                 {
1087                     m_last = m_last.next;
1088                     m_count++;
1089                 }
1090                 rhs.m_first = null;
1091                 rhs.m_last  = null;
1092                 rhs.m_count = 0;
1093             }
1094         }
1095
1096
1097         /*
1098          *
1099          */
1100         Range opSlice()
1101         {
1102             return Range( cast(Node*) &m_first );
1103         }
1104
1105
1106         /*
1107          *
1108          */
1109         void removeAt( Range r )
1110         {
1111             Node* n = r.m_prev;
1112             enforce( n && n.next );
1113
1114             if( m_last is m_first )
1115                 m_last = null;
1116             else if( m_last is n.next )
1117                 m_last = n;
1118             Node* todelete = n.next;
1119             n.next = n.next.next;
1120             //delete todelete;
1121             m_count--;
1122         }
1123
1124
1125         /*
1126          *
1127          */
1128         @property size_t length()
1129         {
1130             return m_count;
1131         }
1132
1133
1134         /*
1135          *
1136          */
1137         void clear()
1138         {
1139             m_first = m_last = null;
1140         }
1141
1142
1143         /*
1144          *
1145          */
1146         bool empty()
1147         {
1148             return m_first is null;
1149         }
1150
1151
1152     private:
1153         struct Node
1154         {
1155             Node*   next;
1156             T       val;
1157
1158             this( T v )
1159             {
1160                 val = v;
1161             }
1162         }
1163
1164
1165         /*
1166          *
1167          */
1168         void put( Node* n )
1169         {
1170             if( !empty )
1171             {
1172                 m_last.next = n;
1173                 m_last = n;
1174                 return;
1175             }
1176             m_first = n;
1177             m_last = n;
1178         }
1179
1180
1181         Node*   m_first;
1182         Node*   m_last;
1183         size_t  m_count;
1184     }
1185 }
1186
1187
1188 version( unittest )
1189 {
1190     import std.stdio;
1191
1192     void testfn( Tid tid )
1193     {
1194         receive( (float val) { assert(0); },
1195                  (int val, int val2)
1196                  {
1197                      assert( val == 42 && val2 == 86 );
1198                  } );
1199         receive( (Tuple!(int, int) val)
1200                  {
1201                      assert( val[0] == 42 &&
1202                              val[1] == 86 );
1203                  } );
1204         receive( (Variant val) {} );
1205         receive( (string val)
1206                  {
1207                      if( "the quick brown fox" != val )
1208                          return false;
1209                      return true;
1210                  },
1211                  (string val)
1212                  {
1213                      assert( false );
1214                  } );
1215         prioritySend( tid, "done" );
1216     }
1217
1218
1219     unittest
1220     {
1221         auto tid = spawn( &testfn, thisTid );
1222
1223         send( tid, 42, 86 );
1224         send( tid, tuple(42, 86) );
1225         send( tid, "hello", "there" );
1226         send( tid, "the quick brown fox" );
1227         receive( (string val) { assert(val == "done"); } );
1228     }
1229 }
Note: See TracBrowser for help on using the browser.