Changeset 25

Show
Ignore:
Timestamp:
08/31/07 14:59:10 (1 year ago)
Author:
Gregor
Message:

tango/scrapple/thread/Actor.d: Added an enqueueSync that synchronizes on multiple messages.

tests/shakematrix.d: And used it.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/tango/scrapple/thread/Actor.d

    r23 r25  
    8989        } 
    9090 
     91        /** Enqueue a group of messages and wait for all of them (really only 
     92            useful with a Dispatcher or some other delegating actor) */ 
     93        void enqueueSync(Message[] msgs) 
     94        { 
     95            // make all our semaphores 
     96            Semaphore[] sems; 
     97            sems.length = msgs.length; 
     98            foreach (i, m; msgs) { 
     99                sems[i] = new Semaphore; 
     100            } 
     101 
     102            // enqueue all the messages 
     103            foreach (i, m; msgs) { 
     104                enqueue(new SyncMessage(sems[i], m)); 
     105            } 
     106 
     107            // then wait on all the semaphores 
     108            foreach (s; sems) { 
     109                s.wait(); 
     110            } 
     111        } 
     112 
    91113        /** Enqueue a delegate to be called */ 
    92114        void enqueueDelegate(void delegate() d) 
  • trunk/tests/shakematrix.d

    r24 r25  
    136136            // and a dispatcher to manage them 
    137137            mm.d = new Dispatcher(nms); 
     138            mm.d.start(); 
    138139 
    139140            // then send messages for each row 
     141            Message[] ms; 
     142            ms.length = mm.res.h; 
    140143            for (int y = 0; y < mm.res.h; y++) { 
    141                 mm.d.enqueue(new RowMultiply(mm, mm.l, mm.r, y)); 
    142             } 
    143  
    144             mm.d.start(); 
     144                ms[y] = new RowMultiply(mm, mm.l, mm.r, y); 
     145            } 
     146            mm.d.enqueueSync(ms); 
    145147 
    146148            return false; 
     
    205207 
    206208            // if we have no more to go, we're done 
    207             if (mm.togo == 0) { 
    208                 synchronized (Stdout) { 
    209                     for (int y = 0; y < mm.res.h; y++) { 
    210                         for (int x = 0; x < mm.res.w; x++) { 
    211                             Stdout(mm.res[x, y])(" "); 
    212                         } 
    213                         Stdout.newline; 
    214                     } 
    215                 } 
     209            if (mm.togo == 0) 
    216210                mm.d.enqueueSync(new DieMessage); 
    217                 return true; 
    218             } 
    219211            return false; 
    220212        } 
     
    256248    mm.start(); 
    257249    mm.enqueueSync(new MatrixMultiply); 
     250    mm.enqueueSync(new DieMessage); 
    258251    mm.join(); 
    259252    swe = sw.stop(); 
     253 
     254    for (int y = 0; y < mm.res.h; y++) { 
     255        for (int x = 0; x < mm.res.w; x++) { 
     256            Stdout(mm.res[x, y])(" "); 
     257        } 
     258        Stdout.newline; 
     259    } 
    260260    Stdout("Time for actors: ").format("{0:10}", cast(real) swe).newline; 
    261261 
     
    263263    sw.start(); 
    264264    auto o = l * r; 
     265    swe = sw.stop(); 
     266 
    265267    for (int y = 0; y < o.h; y++) { 
    266268        for (int x = 0; x < o.w; x++) { 
     
    269271        Stdout.newline; 
    270272    } 
    271     swe = sw.stop(); 
    272273    Stdout("Time for unthreaded: ").format("{0:10}", cast(real) swe).newline; 
    273274