root/trunk/futurism/future.d

Revision 12, 27.5 kB (checked in by kevinbealer, 5 years ago)

- Minor change.

Line 
1 // future.d
2 //
3 // Futurism - A "Future" library for D.
4 //
5 // I wrote this after watching a talk by Herb Sutter about concurrent
6 // programming.  This library implements the "future" concept for D.
7 // The future concept is designed to promote scalable concurrency.
8 // The original invention of the concept is attributed to Henry Baker
9 // and Carl Hewitt in 1977.
10 //
11 // Author:   Kevin Bealer
12 // License:  Public Domain
13 // Modified: January 2007
14
15 module future;
16
17 version(Tango) {
18     import tango.core.Thread;
19     import tango.io.FileConduit;
20     import tango.io.Conduit;
21     import tango.stdc.stdlib;
22     import tango.stdc.string;
23     import tango.text.convert.Integer;
24     import tango.text.Util;
25     import tango.core.Version;
26 } else {
27     import std.c.stdlib;
28     import std.c.string;
29     import std.conv;
30     import std.file;
31     import std.string;
32     import std.thread;
33     import std.traits;
34     import std.cpuid;
35     import std.stream;
36 }
37
38 template FutureTypeGroup(Delegate, Args...) {
39     alias ReturnType!(Delegate)          Return;
40     alias ParameterTypeTuple!(Delegate)  Params;
41     alias Future!(Return, Params)        TFuture;
42 }
43
44 /*****
45  * Build a future given a delegate and (optional) arguments.
46  */
47
48 FutureTypeGroup!(Delegate, Args).TFuture
49 make_future(Delegate, Args...)(Delegate cmd, Args args)
50 {
51     return new FutureTypeGroup!(Delegate, Args).TFuture(cmd, args);
52 }
53
54 FutureTypeGroup!(Delegate, Args).TFuture
55 make_future_priority(Delegate, Args...)(int p, Delegate cmd, Args args)
56 {
57     return new FutureTypeGroup!(Delegate, Args).TFuture(p, cmd, args);
58 }
59
60 /*****
61  * Scoped wrapper for ThreadPool.  If the number of threads is not
62  * specified, this selects the number based on what it can find about
63  * the hardware.  An environment variable, "FUTURISM_DEFAULT_THREADS",
64  * allows users to adjust the default for compiled apps.
65  */
66 scope class ThreadPoolScope {
67     // Create a pool with N threads.
68     this(int n = 0)
69     {
70         if (n < 1) {
71             // Automatically generated values are 0-based.
72             n = recommendThreads();
73         } else {
74             // A caller-specified value should be 1-based.
75             n ++;
76         }
77        
78         nthr_ = n;
79        
80         if (nthr_ > 1) {
81             // Subtract one for the 'main' thread.
82             pool_ = new ThreadPool(nthr_ - 1);
83         }
84     }
85    
86     ~this()
87     {
88         stop();
89     }
90    
91     // Stop the thread pool.
92     void stop()
93     {
94         if (pool_ !is null) {
95             pool_.stop();
96         }
97     }
98    
99     // Create a thread pool, selecting the number of threads based on
100     // an examination of the environment, hardware, and other factors.
101    
102     static int recommendThreads()
103     {
104         int N = getUserCount();
105        
106         if (! N)
107             N = getHardwareCount();
108        
109         if (! N)
110             N = getDefaultCount();
111        
112         assert(N);
113         return N;
114     }
115    
116     static int getUserCount()
117     {
118         int N = 0;
119        
120         char * cfg = getenv("FUTURISM_DEFAULT_THREADS");
121        
122         if (cfg) {
123             char[] nstr = cfg[0..strlen(cfg)];
124             N = toInt(nstr);
125         }
126        
127         if (N < 1)
128             N = 0;
129        
130         if (N > maxEnvarThreads)
131             N = maxEnvarThreads;
132        
133         return N;
134     }
135    
136     // Number of threads to use, including the main thread.  The
137     // actual worker threads in the thread pool will be one less than
138     // what is returned here.
139    
140     int threadCount()
141     {
142         return nthr_;
143     }
144    
145 private:
146     // The rest of these are not public because they are heuristics
147     // with little meaning outside this class.
148    
149    
150     // The number of threads we might want per CPU; this is actually
151     // something of a fudge factor.  This allows I/O bound threads to
152     // run efficiently, lets a few futures wait on each other without
153     // piling up too bad, and doesn't do much harm for CPU bound
154     // systems.
155    
156     // My guess is that for futures we probably want a higher number
157     // here than would be used for dedicated number cruncher threads
158     // in a workhorse system.
159    
160     static int defaultOverlap()
161     {
162         return 4;
163     }
164    
165     // Try to find some number representing the amoung of hardware
166     // threading available here.
167    
168     static int getHardwareCount()
169     {
170         int N = 0;
171        
172         version(linux) {
173             try {
174                 // Linux gives a seperate line for each CPU core, so
175                 // no additional std.cpuid is needed or useful here.
176                
177                 version(Tango) {
178                     // Use a FileConduit because 'files' in /proc do
179                     // not report a correct length.
180                    
181                     auto f = new FileConduit("/proc/cpuinfo");
182                    
183                     char[] cpuinfo = "\n";
184                     void[] tmp = new byte[4096];
185                     int amt = 0;
186                    
187                     while(0 != (amt = f.fill(tmp))) {
188                         cpuinfo ~= cast(char[]) tmp[0..amt];
189                     }
190                    
191                     static if (Tango == 0.95) {
192                         N = demarcate(cpuinfo, "\nprocessor").length-1;
193                     } else {
194                         N = split(cpuinfo, "\nprocessor").length-1;
195                     }
196                 } else {
197                     // std.file.read checks the file length first,
198                     // so it is necessary to use a stream here.
199                    
200                     int new_N = 0;
201                    
202                     Stream file = new BufferedFile("/proc/cpuinfo");
203                     foreach(ulong n, char[] line; file) {
204                         if (find("\n" ~ line, "\nprocessor:") != -1) {
205                             new_N ++;
206                         }
207                     }
208                    
209                     if (new_N) {
210                         N = new_N;
211                     }
212                 }
213                
214                 N *= defaultOverlap;
215             }
216             catch(Exception e) {
217                 // Maybe the /proc fs is not enabled?
218             }
219         }
220        
221         if (N == 0) {
222             // There is no good way to find number of CPUs, so use the
223             // number of threads per CPU.  This only works for x86 and
224             // is not yet supported in Tango.
225            
226             version(Tango) {
227             } else {
228                 N = threadsPerCPU * defaultOverlap;
229             }
230         }
231        
232         return N;
233     }
234    
235     // Just select a number.
236    
237     static int getDefaultCount()
238     {
239         // The number 2 here represents a "guess" of two CPUs or
240         // cores, which will soon (its 2007 as I write this) be a
241         // reasonable middle ground for consumer boxes.
242        
243         return 2 * defaultOverlap;
244     }
245    
246     // Limit the range of the envar.
247    
248     static int maxEnvarThreads()
249     {
250         // Creating too many threads can cause the application to
251         // crash.  This limit applies only to the envar, and is meant
252         // to protect overly ambitious users from crashes.
253        
254         return 128;
255     }
256    
257     // Number of threads we decided on.
258     int        nthr_;
259    
260     // Pool, unless nthr_ is less than 2.
261     ThreadPool pool_;
262 }
263
264 /*****
265  * This class allows application writers to queue tasks that can be
266  * done on concurrent worker threads.  Create an object of this type,
267  * passing a delegate that represents some work that needs to be done
268  * to the constructor.  A task object will be created and placed on a
269  * global queue.
270  *
271  * Normally, a ThreadPool object (or more than one) is created with
272  * several worker threads that can handle these queued requests whlie
273  * the main thread does other things.
274  *
275  * When the results are needed, fetch them with the value() method.
276  * If the results have not been computed by the time value() is
277  * called, the thread asking for them will run the computation inside
278  * the call to value().
279  */
280
281 class Future(T, U...) {
282     alias AsyncTask!(T, U) Task;
283    
284     // Construct a task at priority 0.
285     this(T delegate(U) dg, U args)
286     {
287         task_ = new Task(0, dg, args);
288     }
289    
290     // Construct a task, assigning a priority.
291     this(int priority, T delegate(U) dg, U args)
292     {
293         task_ = new Task(priority, dg, args);
294     }
295    
296     // Get the results of the calculation, waiting if necessary.
297     T value()
298     {
299         return task_.getValue();
300     }
301    
302     // Wait for the calculation to be finished.
303     void wait()
304     {
305         task_.getValue();
306     }
307    
308     // Test whether the calculation has finished.
309     bool isDone()
310     {
311         return task_.isDone();
312     }
313    
314  private:
315     Task task_;
316 }
317
318
319 /*****
320  * This class creates several worker threads.  If any asynchronous
321  * jobs are queued by the Future class, these worker thread will try
322  * to do the calculations concurrently with the main application and
323  * each other.  Each thread pool must be stopped by calling stop()
324  * before it is deleted or the application terminates.
325  */
326 class ThreadPool {
327     // Construct a thread pool with n threads.
328     this(int n)
329     {
330         list_ = TaskList();
331        
332         for(int i = 0; i < n; i++) {
333             version(Tango) {
334                 Thread t = new Thread(& workLoopTango);
335             } else {
336                 Thread t = new Thread(& workLoopPhobos);
337             }
338             threads_ ~= t;
339         }
340        
341         // Start all the threads.
342         synchronized(this) {
343             foreach(Thread t; threads_) {
344                 t.start();
345             }
346         }
347        
348         // Wait until all the threads are running before returning, so
349         // that stop() cannot be called before all threads have
350         // started.
351        
352         while(1) {
353             synchronized(this) {
354                 volatile {
355                     if (running_ == n) {
356                         break;
357                     }
358                 }
359             }
360             Thread.yield();
361         }
362     }
363    
364     // Destructor - you must call 'stop' before this runs.  We can't
365     // call stop() here directly, because we don't know in what order
366     // the destructors will be called for this and the Thread objects.
367    
368     ~this()
369     {
370         assert(stopped_);
371     }
372    
373     // Call this to stop this thread pool -- it will wait for all
374     // threads to terminate before returning.
375    
376     void stop()
377     {
378         bool done = false;
379        
380         // Signal all threads to resume and wait for them to finish
381         // their workLoop() call.  If they are blocked in the task
382         // list, this will unblock them.
383        
384         while(! done) {
385             synchronized(this) {
386                 volatile {
387                     stopping_ = true;
388                    
389                     if (running_ == 0) {
390                         done = true;
391                     }
392                 }
393                
394                 version(Tango) {
395                     // no resume needed.
396                     Thread.yield();
397                 } else {
398                     Thread.resumeAll();
399                 }
400             }
401             Thread.yield();
402         }
403        
404         foreach(Thread t; threads_) {
405             version(Tango) {
406                 t.join();
407             } else {
408                 t.wait();
409             }
410         }
411        
412         stopped_ = true;
413     }
414    
415     // Check whether a global stop has been requested.
416    
417     static bool checkGlobalStop()
418     {
419         volatile {
420             return global_stop;
421         }
422     }
423    
424     // Request a global stop.  This prevents tasks from being queued
425     // on thread pools, but is not a substitute for calling stop() on
426     // each thread pool.
427    
428     static void stopAllTasks()
429     {
430         volatile {
431             global_stop = true;
432         }
433     }
434    
435 private:
436     alias AsyncTaskBase Task;
437    
438     static bool global_stop = false;
439    
440     void workLoopTango()
441     {
442         void adjust_count(bool up)
443         {
444             synchronized(this) {
445                 volatile {
446                     if (up) {
447                         running_ ++;
448                     } else {
449                         running_ --;
450                     }
451                 }
452             }
453         }
454        
455         // Keep track of number of threads still running in the while
456         // loop below (and therefore possibly waiting in task_list.)
457        
458         adjust_count(true);
459         scope(exit) adjust_count(false);
460        
461         bool do_break = false;
462        
463         while(1) {
464             // first, check for and bail out if this thread pool is
465             // closing shop.
466            
467             synchronized(this) {
468                 volatile {
469                     if (stopping_ || checkGlobalStop()) {
470                         do_break = true;
471                     }
472                 }
473             }
474            
475             if (do_break)
476                 break;
477
478             // This method may call pause().
479             Task task = list_.getTask();
480            
481             volatile {
482                 if ((! stopping_) && (task !is null)) {
483                     task.checkTask(false);
484                 }
485             }
486         }
487     }
488    
489     // Different signature for Phobos threads.
490     int workLoopPhobos()
491     {
492         workLoopTango();
493        
494         return 0;
495     }
496    
497     // All running tasks in the application.
498     TaskList list_;
499    
500     // Threads in this thread pool.
501     Thread[] threads_;
502    
503     // Count of running threads in workLoop()
504     int      running_ = 0;
505    
506     // True when this thread pool is shutting down.
507     bool     stopping_ = false;
508    
509     // True when shut-down has completed.
510     bool     stopped_ = false;
511 }
512
513
514 // Asynchronous task.
515 //
516 // For a given data type T, this class represents a process that
517 // produces that data and might be run asynchronously.
518
519 private class AsyncTask(T, U...) : AsyncTaskBase {
520     // Job to do.
521     alias T delegate(U) Work;
522    
523     // Build a task item.
524     this(int priority, Work w, U args)
525     {
526         super(priority);
527         work_ = w;
528        
529         foreach(i,a; args) {
530             args_[i] = a;
531         }
532        
533         TaskList().addTask(this);
534     }
535    
536     // do work and/or get result for consumer of this calculation.
537     T getValue()
538     {
539         checkTask(true);
540
541         if (error_ !is null) {
542             throw error_;
543         }
544
545         static if(! is(T == void)) {
546             return value_;
547         }
548     }
549
550     // try to do work
551     void work()
552     {
553         TaskList().removeTask(this);
554
555         try {
556             static if (is(T == void)) {
557                 work_(args_);
558             } else {
559                 value_ = work_(args_);
560             }
561         }
562         catch(Exception e) {
563             error_ = e;
564         }
565     }
566
567  private:
568      Work      work_;
569      U         args_;
570      Exception error_;
571      bool      doing_ = false;
572      
573      static if (! is(T == void)) {
574          T      value_;
575      }
576 }
577
578 // Base class for AsyncTask.
579
580 abstract private class AsyncTaskBase : IndexPQNode {
581     this(int p)
582     {
583         synchronized {
584             volatile {
585                 serial_ = next_serial_ ++;
586             }
587         }
588        
589         priority_ = p;
590     }
591    
592     // Do task or wait for completion.  Value consumers should use
593     // wait = true to sleep until results are available; thread pool
594     // worker threads should use wait = false to avoid blocking on
595     // already-queued work items.
596    
597     void checkTask(bool will_wait)
598     {
599         bool do_work = false;
600         bool need_wait = false;
601        
602         synchronized(this) {
603             volatile {
604                 if (state_ == State.start) {
605                     state_ = State.run;
606                     do_work = true;
607                 } else if (state_ == State.run) {
608                     need_wait = true;
609                 } else {
610                     assert(state_ == State.done);
611                     return;
612                 }
613             }
614         }
615        
616         if (do_work) {
617             work();
618             haveResult();
619         } else if (will_wait && need_wait) {
620             needResult();
621         }
622     }
623    
624     int opCmp(Object obj)
625     {
626         AsyncTaskBase other = cast(AsyncTaskBase) obj;
627         assert(other !is null);
628        
629         if (this is other) {
630             return 0;
631         }
632        
633         if (priority_ < other.priority_) {
634             return -1;
635         } else if (priority_ > other.priority_) {
636             return 1;
637         }
638        
639         if (serial_ < other.serial_) {
640             return 1;
641         }
642        
643         assert(serial_ > other.serial_);
644         return -1;
645     }
646    
647     bool isDone()
648     {
649         bool done = false;
650        
651         synchronized(this) {
652             volatile {
653                 if (state_ == State.done) {
654                     done = true;
655                 }
656             }
657         }
658        
659         return done;
660     }
661    
662 protected:
663     abstract void work();
664    
665 private:
666     enum State {
667         start, run, done
668     }
669    
670     void haveResult()
671     {
672         // This is intended to change the state from start to done;
673         // then it loops, calling resumeAll(), until there are no
674         // waiters.
675        
676         bool set_done = false;
677        
678         while(1) {
679             synchronized(this) {
680                 volatile {
681                     if (set_done) {
682                         assert(state_ == State.done);
683                     } else {
684                         assert(state_ == State.run);
685                         state_ = State.done;
686                         set_done = true;
687                     }
688                    
689                     if (waiters_ == 0) {
690                         return;
691                     } else {
692                         version(Tango) {
693                             // no resume needed.
694                         } else {
695                             Thread.resumeAll();
696                         }
697                     }
698                 }
699             }
700            
701             // To allow the waiters to wake up and leave.
702             Thread.yield();
703         }
704     }
705    
706     void needResult()
707     {
708         version(Tango) {
709             PseudoPause pauser;
710             pauser.msg = "needResult";
711         }
712        
713         synchronized(this) {
714             volatile {
715                 if (state_ == State.done) {
716                     return;
717                 } else {
718                     assert(state_ == State.run);
719                     waiters_ ++;
720                 }
721             }
722         }
723        
724         while(1) {
725             synchronized(this) {
726                 volatile {
727                     if (state_ == State.done) {
728                         waiters_ --;
729                         return;
730                     }
731                 }
732             }
733            
734             version(Tango) {
735                 // Simulate pause with yields and timed sleeps.
736                 pauser.pause();
737             } else {
738                 Thread.getThis().pause();
739             }
740         }
741     }
742    
743  private:
744     State state_   = State.start;
745     int   waiters_ = 0;
746    
747     long  serial_;
748     int   priority_;
749    
750     static long next_serial_ = 1;
751 }
752
753 // Element of an index priority queue.
754 abstract private class IndexPQNode {
755     abstract int opCmp(Object o);
756    
757     // position in the queue.
758     int position()
759     {
760         return pq_index;
761     }
762    
763 private:
764     long pq_index = -1;
765 }
766
767 // Index priority queue -- this is like any priority queue, but the
768 // elements know their own location, so that elements can be deleted
769 // in constant time.  This is a base class, because it doesn't know
770 // what type of node it is working with.
771
772 private class IndexPQ_Base {
773     alias IndexPQNode Node;
774    
775     this()
776     {
777         nodes_.length = 1;
778     }
779    
780     void add(Node n)
781     {
782         // Add the node
783         int pos = nodes_.length;
784         n.pq_index = pos;
785         nodes_ ~= n;
786        
787         // Position it
788         bubble_up_(pos);
789     }
790    
791     void remove(Node n)
792     in
793     {
794         assert(n !is null);
795         assert(nodes_.length > 1);
796         assert(n.pq_index != -1);
797         assert(nodes_[n.pq_index] is n);
798     }
799     body
800     {
801         // Remove the indicated node, replacing it with the last node
802        
803         int pos = n.pq_index;
804        
805         if (pos != nodes_.length-1) {
806             nodes_[pos] = nodes_[$-1];
807             nodes_[pos].pq_index = pos;
808         }
809        
810         nodes_[$-1] = null;
811         nodes_.length = nodes_.length - 1;
812        
813         // Fix the position of the moved node.
814         if (nodes_.length > 2 && nodes_.length != pos) {
815             bubble_(pos);
816         }
817     }
818    
819     // Get the top node.
820     Node top()
821     in
822     {
823         assert(nodes_.length > 1);
824     }
825     out(result)
826     {
827         assert(result.pq_index == 1);
828     }
829     body
830     {
831         return nodes_[1];
832     }
833    
834     // Return true if the queue is empty.
835     int length()
836     {
837         assert(nodes_.length > 0);
838         return nodes_.length - 1;
839     }
840    
841 private:
842     void bubble_up_(int pos)
843     in
844     {
845         assert(pos < nodes_.length && pos >= 1);
846     }
847     body
848     {
849         while(pos != 1 && higher_(pos, pos/2)) {
850             swap_(pos, pos/2);
851             pos = pos/2;
852         }
853        
854         // Always try to bubble down after bubbling up
855         bubble_down_(pos);
856     }
857    
858     void bubble_down_(int pos)
859     in
860     {
861         assert(pos < nodes_.length && pos >= 1);
862     }
863     body
864     {
865         while(pos*2 < nodes_.length) {
866             int left = pos*2, right = pos*2 + 1;
867        
868             bool go_left = higher_(left, pos),
869                 go_right = higher_(right, pos);
870        
871             if (go_left && go_right) {
872                 if (higher_(left, right)) {
873                     swap_(pos, left);
874                     pos = left;
875                 } else {
876                     swap_(pos, right);
877                     pos = right;
878                 }
879             } else if (go_left) {
880                 swap_(left, pos);
881                 pos = left;
882             } else if (go_right) {
883                 swap_(right, pos);
884                 pos = right;
885             } else {
886                 return;
887             }
888         }
889     }
890    
891     void bubble_(int pos)
892     in
893     {
894         assert(pos < nodes_.length && pos >= 1);
895     }
896     body
897     {
898         // when in doubt, bubble up.
899         bubble_up_(pos);
900     }
901    
902     bool higher_(int pos1, int pos2)
903     {
904         if (pos1 < 1 || pos1 >= nodes_.length ||
905             pos2 < 1 || pos2 >= nodes_.length) {
906            
907             return false;
908         }
909        
910         return (nodes_[pos1] > nodes_[pos2]);
911     }
912    
913     void swap_(int pos1, int pos2)
914     in
915     {
916         assert(pos1 >= 1); assert(pos1 < nodes_.length);
917         assert(pos2 >= 1); assert(pos2 < nodes_.length);
918     }
919     body
920     {
921         Node n1 = nodes_[pos1];
922         Node n2 = nodes_[pos2];
923        
924         n1.pq_index = pos2;
925         n2.pq_index = pos1;
926        
927         nodes_[n1.pq_index] = n1;
928         nodes_[n2.pq_index] = n2;
929     }
930    
931     Node[] nodes_;
932 }
933
934 // Index priority queue, specialized by node type.
935
936 private class IndexPQ(T) : IndexPQ_Base {
937     void add(T n)
938     {
939         super.add(n);
940     }
941    
942     void remove(T n)
943     {
944         super.remove(n);
945     }
946    
947     T top()
948     {
949         return cast(T) super.top();
950     }
951 }
952
953 // All tasks in the system are queued here.
954
955 private class TaskList {
956     alias AsyncTaskBase Task;
957    
958     // Get the singleton object.
959    
960     static TaskList opCall()
961     {
962         return instance_;
963     }
964    
965     // This is called to add a new task to the queue.
966    
967     void addTask(Task task)
968     {
969         synchronized(this) {
970             queue_.add(task);
971            
972             // Wake up threads waiting for work, if any.
973            
974             if (waiters_) {
975                 version(Tango) {
976                     // no resume needed.
977                 } else {
978                     Thread.resumeAll();
979                 }
980             }
981         }
982     }
983    
984     // Remove a task because a thread is working on it.
985    
986     void removeTask(Task task)
987     {
988         synchronized(this) {
989             removeTask_nl(task);
990         }
991     }
992    
993     // Get any new task to work on.
994    
995     Task getTask()
996     {
997         // If there are unassigned tasks, find the highest priority
998         // one, move it to the assigned tasks queue, and return it to
999         // the caller.
1000        
1001         synchronized(this) {
1002             volatile {
1003                 if (queue_.length) {
1004                     Task t = getTask_nl();
1005                     return t;
1006                 }
1007                 waiters_ ++;
1008             }
1009         }
1010        
1011         // Otherwise, wait.
1012        
1013         version(Tango) {
1014             // done in caller
1015             PseudoPause p;
1016             p.sleep();
1017         } else {
1018             Thread.getThis().pause();
1019         }
1020        
1021         // Instead of checking the queue again, we return null.  This
1022         // allows the caller to check for other messages or conditions
1023         // before calling us again in a loop.
1024        
1025         synchronized(this) {
1026             volatile {
1027                 waiters_ --;
1028             }
1029         }
1030        
1031         return null;
1032     }
1033    
1034 private:
1035     // Constructor.
1036     this()
1037     {
1038         queue_ = new IndexPQ!(Task);
1039     }
1040    
1041     // Class constructor.
1042     static this()
1043     {
1044         instance_ = new TaskList();
1045     }
1046    
1047     // Remove a task from the unassigned tasks queue (if this is being
1048     // run "in-line" by a value consumer) or the assigned tasks list
1049     // (by a worker thread).
1050    
1051     void removeTask_nl(Task task)
1052     {
1053         for(int i = 0; i < assigned_.length; i++) {
1054             if (assigned_[i] is task) {
1055                 assigned_[i] = assigned_[$-1];
1056                 assigned_.length = assigned_.length - 1;
1057                
1058                 return;
1059             }
1060         }
1061        
1062         queue_.remove(task);
1063     }
1064    
1065     // Move a task from the unassigned tasks queue to the assigned
1066     // tasks list.
1067    
1068     Task getTask_nl()
1069     {
1070         assert(queue_.length);
1071         Task t = queue_.top();
1072         queue_.remove(t);
1073         assigned_ ~= t;
1074
1075         return t;
1076     }
1077    
1078     static TaskList instance_;
1079    
1080     IndexPQ!(Task) queue_;
1081     Task[] assigned_;
1082    
1083     // threads waiting here.
1084     int waiters_;
1085 }
1086
1087 // The phobos version of Futurism uses pause and resumeAll to wait for
1088 // state-change events.  Tango has a different tool set for this, but
1089 // it is not available yet.  For now, I simulate pause events with
1090 // yields and short sleeps, and do nothing for "resume".  The resume
1091 // operations are not needed because the timed sleeps wake up.
1092 //
1093 // I expected the phobos version to run faster, because it wakes up
1094 // waiters right away, whereas the tango version needs to wait for the
1095 // sleep intervals to expire.  However, at the moment, the tango
1096 // implementation seems to perform better (for some cases).
1097 //
1098 // Two possible explanations come to mind.  First, yield()ing several
1099 // times before sleeping may be beneficial enough that the phobos
1100 // version of this code should adopt a variant of it.  Secondly, the
1101 // resumeAll() operation is inefficient because it causes unnecessary
1102 // thread wakeups, and possibly also unnecessary signal handling in
1103 // non-sleeping threads.  These together may explain the difference.
1104
1105 version(Tango) {
1106     struct PseudoPause {
1107         void pause()
1108         {
1109             if (serial1 == 0) {
1110                 serial1 = serial ++;
1111             }
1112            
1113             if (count < yields) {
1114                 Thread.yield();
1115             } else {
1116                 sleep();
1117             }
1118            
1119             count ++;
1120         }
1121        
1122         void sleep()
1123         {
1124             Thread.sleep(Interval.second/10);
1125         }
1126        
1127         char[] msg;
1128         int count = 0;
1129         int yields = 10;
1130         int report = 1;
1131         static int serial = 100;
1132         int serial1 = 0;
1133     }
1134 }
Note: See TracBrowser for help on using the browser.