Changeset 13:8c9b1276f623

Show
Ignore:
Timestamp:
09/20/08 18:33:11 (4 years ago)
Author:
rick@minifunk
Tags:

tip

branch:
default
Message:

bug fixes

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • dreactor/core/Task.d

    r12 r13  
    44import tango.util.container.HashMap; 
    55import tango.util.container.CircularList; 
     6import tango.core.Atomic; 
     7 
    68import dreactor.core.Vat; 
    79import dreactor.protocol.IProvider; 
     10import dreactor.protocol.DefaultProvider; 
    811 
    912alias CircularList!(Message) Messages; 
     
    2023        if (box.get(type, m)) 
    2124        { 
    22             Message msg = m.removeHead(); 
    23             if (msg) 
     25            if (!m.isEmpty) 
     26            { 
     27                Message msg = m.removeHead(); 
    2428                msg_count.store(msg_count.load()-1); 
    2529 
    26             if (m.isEmpty()) 
     30                if (m.isEmpty()) 
     31                    box.removeKey(type); 
     32                return msg; 
     33            } 
     34            else 
    2735                box.removeKey(type); 
    28  
    29             return msg; 
    30         } 
    31         else 
    32             return null; 
     36        } 
     37        Message msg; 
     38        return msg; 
    3339    } 
    3440 
     
    4046        { 
    4147            Message msg = popMessageOfType(i); 
    42             if (msg
     48            if (msg.valid
    4349                return msg; 
    4450        } 
    45         return null; 
     51        Message msg; 
     52        return msg; 
    4653    } 
    4754 
     
    5259        auto itor = box.iterator; 
    5360 
    54         do 
     61        while (true) 
    5562        { 
    5663            if (itor.valid && itor.next(key, m)) 
     
    5966                { 
    6067                    Message msg = m.removeHead(); 
    61                     if (msg
     68                    if (msg.valid
    6269                        msg_count.store(msg_count.load()-1); 
    6370                    if (m.isEmpty()) 
     
    6774                else  
    6875                { 
    69                     iterator.remove(); 
     76                    itor.remove(); 
    7077                } 
    7178            } 
    7279            else 
    73                 return null; 
    74         } 
    75         while (true) 
     80            { 
     81                Message msg;  
     82                return msg; 
     83            } 
     84        } 
    7685    } 
    7786 
     
    108117    int id; 
    109118    Vat vat; 
    110     TaskDG taskdg; 
     119    TaskDg taskdg; 
    111120    IProvider provider; 
    112121 
    113122public 
    114     this(TaskDg tdg = null, IProvider provider = null)  
    115     { 
    116         fiber = new Fiber(&run); 
     123    this(IProvider prov = null)  
     124    { 
     125        fiber = new Fiber(&run, 4096 * 4); 
    117126        mailbox = new Mailbox; 
    118127        lockedMailbox = new Mailbox; 
    119         taskdg = tdg
    120         if (!provider) 
    121             provider = new DefaultProvider
     128        provider = prov ? prov : new DefaultProvider
     129         
     130        Vat.LocalVat.addTask(this)
    122131    } 
    123132 
     
    154163    body 
    155164    { 
    156         while (msg = receive()) 
     165        Message msg; 
     166        while ((msg = receive()).valid) 
    157167        { 
    158168            taskdg(msg); 
     
    171181    { 
    172182        Task t; 
    173         if (t = vat.getTask(taskid)
     183        if ((t = vat.getTask(taskid)) !is null
    174184        { 
    175185            t.appendMessage(m); 
    176186            return true; 
    177187        } 
    178         else if (t = Vat.getGlobalTask(taskid)
     188        else if ((t = Vat.getGlobalTask(taskid)) !is null
    179189        { 
    180190            t.appendIVMessage(m); 
     
    184194    } 
    185195 
     196    char[] getString(Message msg) 
     197    { 
     198        return (cast(char*) msg.payload)[0 .. msg.info]; 
     199    } 
     200 
     201    Fiber.State state() 
     202    { 
     203        return fiber.state(); 
     204    } 
     205    void call() 
     206    { 
     207        fiber.call(); 
     208    } 
    186209protected 
    187210 
     
    198221        { 
    199222            Message m = mailbox.popMessageOfType(types); 
    200             if (!m
     223            if (!m.valid
    201224                Fiber.yield(); 
    202             else if (SYSTEM_QUIT == m.type) 
    203                 break; 
    204             else return m; 
     225            else  
     226                return m; 
    205227             
    206228        } 
    207         return null; 
    208229    } 
    209230 
     
    213234        { 
    214235            Message m = mailbox.popMessage(); 
    215             if (!m
     236            if (!m.valid
    216237                Fiber.yield(); 
    217             else if (SYSTEM_QUIT == m.type) 
    218                 break; 
    219             else return m; 
    220         } 
    221         return null; 
     238            else  
     239                return m; 
     240        } 
    222241    } 
    223242 
  • dreactor/core/Vat.d

    r12 r13  
    1818import tango.core.Thread; 
    1919import tango.core.Atomic; 
    20 import tango.util.collection.CircularSeq; 
    2120import tango.util.log.Log; 
    2221 
    2322import dreactor.transport.AsyncSocketConduit; 
    2423import dreactor.protocol.IProvider; 
     24import dreactor.protocol.DefaultProvider; 
    2525import dreactor.core.Task; 
    2626import dreactor.util.ThreadSafeQueue; 
     
    2828static char[] version_string = "Vat.d 0.1 2008-05-31"; 
    2929 
    30  
    31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; 
     30alias bool delegate (Event) RegDg; 
    3231 
    3332class TaskAttachment 
     
    4342class Vat 
    4443{ 
     44    static Vat LocalVat; 
     45 
    4546private 
    4647    Thread thread; 
     
    5253    Selector selector; 
    5354    static Atomic!(int) taskCount; 
    54     TaskAttachment[int] globalTasks; //global registry of tasks 
     55    static TaskAttachment[int] globalTasks; //global registry of tasks 
    5556 
    5657public  
     
    6465        thread.start(); 
    6566    } 
    66       
    67     synchronized int addTask(Task t, IProvider p = null) 
     67 
     68    static this() 
     69    { 
     70        LocalVat = new Vat; 
     71    } 
     72 
     73    int addTask(Task t) 
    6874    { 
    6975        t.setVat(this); 
    70         ++taskCount; 
    71         auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider); 
    72         tasks[taskCount] = ta; 
    73         globalTask[taskCount] = ta; 
     76        int taskid = taskCount.load() + 1; 
     77        taskCount.store(taskid); 
     78        auto p = t.getProvider(); 
     79        if (p is null) 
     80            p = new DefaultProvider; 
     81        p.setRegisterFunc(createRegFunc(taskid));  //default the task id as a param in the delegate 
     82        auto ta = new TaskAttachment(t, p); 
     83        tasks[taskid] = ta; 
     84        globalTasks[taskid] = ta; 
    7485        selector.register(p.getConduit(), p.getEvents(), ta); 
    75         t.setId(taskCount); 
    76         return taskCount; 
     86        t.setId(taskid); 
     87 
     88        return taskid; 
    7789    } 
    7890  
     
    8799    } 
    88100 
    89     bool addConnection(int tid, Conduit c, Events evts) 
     101    bool register(int tid, Event evts) 
    90102    { 
    91103        log.trace("adding handler"); 
    92         TaskAttachment ta; 
    93         if (ta = (tid in tasks)) 
    94             return selector.register(c, evts, ta); 
     104        TaskAttachment* ta; 
     105        if ((ta = (tid in tasks)) !is null) 
     106        { 
     107            selector.register((*ta).provider.getConduit(), evts, *ta); 
     108            return true; 
     109        } 
    95110        else 
     111        { 
    96112            return false; 
    97     } 
    98       
     113        } 
     114    } 
     115     
     116    RegDg createRegFunc(int taskid) 
     117    { 
     118        class Functor 
     119        {    
     120            int taskid; 
     121            this (int tid) 
     122            { 
     123                taskid = tid; 
     124            } 
     125            bool call(Event evts) 
     126            { 
     127                return register(taskid, evts);     
     128            } 
     129        } 
     130        auto ftor = new Functor(taskid); 
     131        return &ftor.call; 
     132    }    
     133 
    99134    bool remConnection(Conduit c) 
    100135    { 
    101         return selector.unregister(c); 
     136        selector.unregister(c); 
     137        return true; 
    102138    } 
    103139 
    104140    Task getTask(int tid) 
    105141    { 
    106         TaskAttachment ta; 
    107         if (ta = (tid in tasks)
     142        TaskAttachment* ta; 
     143        if ((ta = (tid in tasks)) !is null
    108144            return ta.task; 
    109145        else 
     
    111147    } 
    112148 
    113     static synchronized Task getGlobalTask(int tid) 
    114     { 
    115         TaskAttachment ta; 
    116         if (ta = (tid in globaltasks)
     149    static Task getGlobalTask(int tid) 
     150    { 
     151        TaskAttachment* ta; 
     152        if ((ta = (tid in globalTasks)) !is null
    117153            return ta.task; 
    118154        else 
     
    140176                        log.trace("Read event fired");     
    141177                        auto ta = cast(TaskAttachment) key.attachment; 
    142                         processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit); 
     178                        ta.task.appendMessage(ta.provider.handleRead()); 
    143179 
    144180                    } 
     
    147183                        log.trace("Write event fired");     
    148184                        auto ta = cast(TaskAttachment) key.attachment; 
    149                         ta.appendMessage(ta.provider.handleWrite()); 
    150                         processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit); 
     185                        ta.task.appendMessage(ta.provider.handleWrite()); 
    151186                    } 
    152187                    else if (key.isHangup()) 
     
    154189                        log.trace("Hangup event fired"); 
    155190                        auto ta = cast(TaskAttachment) key.attachment; 
    156                         ta.appendMessage(ta.provider.handleDisconnect()); 
    157                         processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit); 
     191                        ta.task.appendMessage(ta.provider.handleDisconnect()); 
    158192                    } 
    159193                    else if (key.isError() || key.isInvalidHandle()) 
     
    162196                        // error, close connection 
    163197                        auto ta = cast(TaskAttachment) key.attachment; 
    164                         ta.appendMessage(ta.provider.handleError()); 
    165                         processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit); 
     198                        ta.task.appendMessage(ta.provider.handleError()); 
    166199                    } 
    167200                } 
     
    184217        foreach(int k; tasks.keys) 
    185218        { 
    186             if (tasks[k].state() == Fiber.State.HOLD) 
    187                 tasks[k].call(); 
    188             if (tasks[k].state() == Fiber.State.TERM) 
     219            if (tasks[k].task.state() == Fiber.State.HOLD) 
     220                tasks[k].task.call(); 
     221            if (tasks[k].task.state() == Fiber.State.TERM) 
    189222                tasks.remove(k); 
    190223        }         
    191224    } 
    192225 
    193     void processReturn(int result, Conduit c) 
    194     { 
    195         switch(result) 
    196         { 
    197             case CLOSE: 
    198                 selector.unregister(c); 
    199                 c.detach(); 
    200             break; 
    201             case UNREGISTER: 
    202                 selector.unregister(c); 
    203             break; 
    204             case REMAIN: 
    205                 //this space intentially left blank 
    206             break; 
    207             case REGISTER: 
    208             break; 
    209             case REREGISTER: 
    210             break; 
    211             default: 
    212                 log.error("processReturn: unknown return value"); 
    213         } 
    214     } 
    215226} 
  • dreactor/protocol/DefaultProvider.d

    r12 r13  
    11module dreactor.protocol.DefaultProvider; 
    22 
    3 import tango.io.Selector; 
     3import tango.io.selector.model.ISelector; 
     4import tango.io.device.Conduit; 
    45 
    56import dreactor.protocol.IProvider; 
     
    1112private 
    1213    Conduit cond; 
    13     Events evts; 
     14    Event evts; 
     15    bool delegate (Event) regFn; 
     16public 
    1417 
    15 public 
    16     Message handleRead(Conduit c) 
    17     { 
     18    enum { 
     19        Read = 1000, 
     20        Write, 
     21        Error, 
     22        Connect, 
     23        Disconnect 
    1824    } 
    1925 
    20     Message handleWrite(Conduit c
     26    Message handleRead(
    2127    { 
     28        return Message(cast(void*)cond, Read, 0);  
    2229    } 
    2330 
    24     Message handleError(Conduit c
     31    Message handleWrite(
    2532    { 
     33        return Message(cast(void*)cond, Write, 0);  
    2634    } 
    2735 
    28     Message handleConnect(Conduit c
     36    Message handleError(
    2937    { 
     38        return Message(cast(void*)cond, Error, 0);  
    3039    } 
    3140 
    32     Message handleDisconnect(Conduit c
     41    Message handleConnect(
    3342    { 
     43        return Message(cast(void*)cond, Connect, 0);  
     44    } 
     45 
     46    Message handleDisconnect() 
     47    { 
     48        return Message(cast(void*)cond, Disconnect, 0);  
    3449    } 
    3550 
     
    4055    } 
    4156 
    42     int getEvents() 
     57    void send(char[] buf) 
     58    { 
     59    } 
     60 
     61    Event getEvents() 
    4362    { 
    4463        return evts; 
     
    4766    void setEvents(Event e) 
    4867    { 
    49         evts e; 
     68        evts = e; 
    5069    } 
     70     
     71    void setRegisterFunc( bool delegate (Event) fn) 
     72    { 
     73        regFn = fn; 
     74    }  
    5175} 
  • dreactor/protocol/IProvider.d

    r12 r13  
    11module dreactor.protocol.IProvider; 
    22 
    3 class Message 
     3import tango.io.selector.model.ISelector; 
     4import tango.io.device.Conduit; 
     5 
     6struct Message 
    47{ 
    58public 
     9 
    610    int type; 
    711    int info; 
    8     Object payload; 
    9     this (Object buf, int t, int e)  
     12    void* payload; 
     13    int from; 
     14    bool valid;  
     15 
     16    static Message opCall(void* buf, int t, int e, int f = 0)  
    1017    { 
    11         type = t;  
    12         info = e;  
    13         payload = buf;  
     18        Message m; 
     19        m.type = t;  
     20        m.info = e; 
     21        m.from = f;  
     22        m.payload = buf; 
     23        m.valid = true; 
     24        return m; 
    1425    } 
    1526} 
     
    1728interface IProvider 
    1829{ 
    19     Message handleRead(Conduit c); 
    20     Message handleWrite(Conduit c); 
    21     Message handleError(Conduit c); 
    22     Message handleConnect(Conduit c); 
    23     Message handleDisconnect(Conduit c); 
    24     abstract void send(char []); 
    25  
     30    Message handleRead(); 
     31    Message handleWrite(); 
     32    Message handleError(); 
     33    Message handleConnect(); 
     34    Message handleDisconnect(); 
     35    void send(char []); 
     36    void setRegisterFunc(bool delegate (Event)); 
    2637    Conduit getConduit(); 
    27     int getEvents(); 
    28     void setEvents(); 
     38    Event getEvents(); 
     39    void setEvents(Event e); 
    2940} 
  • dreactor/protocol/TcpProvider.d

    r12 r13  
    1 module dreactor.protocol.RawTcp
     1module dreactor.protocol.TcpProvider
    22 
    33import tango.io.device.Conduit; 
    44import tango.io.selector.model.ISelector; 
    55import tango.net.Socket; 
    6 import tango.util.collection.CircularSeq
     6import tango.util.container.CircularList
    77import tango.util.log.Log; 
    88import tango.util.log.Config; 
     
    1010import dreactor.transport.AsyncSocketConduit; 
    1111import dreactor.core.Vat; 
    12 import dreactor.core.Dispatcher; 
    13  
     12public import dreactor.protocol.IProvider; 
    1413/****************************************************************************** 
    1514     
     
    1716 
    1817******************************************************************************/ 
    19 class TCPProvider : IProvider 
     18class TcpProvider : IProvider 
    2019{ 
    2120public 
    22  
    23     enum  
    24     { 
    25         RECEIVE = 1000, 
    26         SEND_COMPLETE, 
    27         NEW_CONNECTION, 
    28         REMOTE_CLOSED, 
    29         SEND_ERROR, 
    30         RECEIVE_ERROR, 
    31         ERROR 
     21    enum { 
     22        SendComplete = 2000, 
     23        NewConnection, 
     24        Receive, 
     25        RemoteClosed, 
     26        SendError, 
     27        ReceiveError, 
     28        Error 
    3229    } 
    3330 
     
    3734        log.info("log initialized"); 
    3835        cond = c; 
    39     } 
    40  
    41     this(Vat v, IPv4Address addr) 
    42     { 
    43         AsyncSocketConduit cond = new AsyncSocketConduit; 
    44         cond.socket().setAddressReuse(true); 
    45         this(cond); 
    46     } 
    47  
     36        events = Event.Read; 
     37    } 
     38 
     39    this(IPv4Address addr, bool listen = false) 
     40    { 
     41        AsyncSocketConduit c = new AsyncSocketConduit; 
     42        c.socket().setAddressReuse(true); 
     43        if (listen) 
     44        { 
     45            c.bind(addr); 
     46            c.socket().listen(1000); 
     47            listener = listen; 
     48        } 
     49        else 
     50            c.connect(addr); 
     51        this(c); 
     52    } 
     53 
     54     
    4855    ~this() 
    4956    { 
     
    5158    }  
    5259     
    53     Message handleRead(Conduit c
     60    Message handleRead(
    5461    { 
    5562        Logger log = Log.lookup("Handlers.onReceive"); 
     63 
     64        if (listener) 
     65            return handleConnect(); 
    5666 
    5767        char inbuf[8192]; 
    5868        int amt; 
    59         if((amt = h.transport.read(inbuf)) > 0) 
    60         { 
    61             return new Message(inbuf[0 .. amt].dup, RECEIVE, amt); 
     69        if((amt = cond.read(inbuf)) > 0) 
     70        { 
     71            return Message(inbuf[0 .. amt].dup.ptr, Receive, amt); 
    6272        } 
    6373        else 
     
    6575            if (amt == 0) 
    6676            { 
    67                 children.remove(h); 
    68                 (cast(AsyncSocketConduit) h.transport).shutdown(); 
    69                 return Message(null, REMOTE_CLOSED, amt); 
     77                cond.shutdown(); 
     78                return Message(null, RemoteClosed, amt); 
    7079            } 
    7180            log.error("Received no data, err = {}", amt); 
    7281        } 
    73         return new Message(null, ERROR, amt); 
     82        return Message(null, Error, amt); 
    7483    } 
    7584     
     
    8291 
    8392    ***************************************************************************/ 
    84     Message handleWrite(Conduit c
     93    Message handleWrite(
    8594    { 
    8695        Logger log = Log.lookup("Handlers.onSend"); 
     
    96105                    //h.remEvent(Event.Write); 
    97106                    //TODO - How do we handle event re-registering 
    98                     return new Message(null, SEND_COMPLETE, sent); 
     107                    return Message(null, SendComplete, sent); 
    99108                } 
    100109            } 
     
    102111            { 
    103112                log.error("Select said socket was writable, but sent 0 bytes"); 
    104                 return new Message(null, SEND_ERROR, 0); 
     113                return Message(null, Error, 0); 
    105114            } 
    106115            else 
    107116            { 
    108117                log.error("Socket send return ERR"); 
    109                 return new Message(null, SEND_ERROR, sent); 
    110             } 
    111         } 
    112         else 
    113         { 
    114             //h.remEvent(Event.Write); 
    115             //TODO - How do we handle event re-registering 
    116          
    117             return new Message(null, SEND_COMPLETE, 0); 
    118         } 
    119     } 
    120  
    121     Message handleDisconnect(Conduit c) 
    122     { 
    123         return new Message(c, REMOTE_CLOSED, 0); 
    124     } 
    125  
    126     Message handleError(Conduit c) 
    127     { 
    128         return new Messsage(null, ERROR, 0); 
     118                return Message(null, Error, sent); 
     119            } 
     120        } 
     121        else 
     122        { 
     123            remEvent(Event.Write); 
     124            if (!regFn(events)) 
     125            { 
     126                log.error("unable to register mgr"); 
     127            } 
     128            return Message(null, SendComplete, 0); 
     129        } 
     130    } 
     131 
     132    Message handleDisconnect() 
     133    { 
     134        return Message(cast(void*)cond, RemoteClosed, 0); 
     135    } 
     136 
     137    Message handleError() 
     138    { 
     139        return Message(cast(void*)cond, Error, 0); 
    129140    }  
    130141 
    131     Message handleConnect(Conduit c) 
    132     { 
    133         return new Message(accept(), NEW_CONNECTION, 0); 
     142    Message handleConnect() 
     143    { 
     144        log.trace("accepting new connection"); 
     145        return Message(cast(void*)accept(), NewConnection, 0); 
    134146    } 
    135147 
     
    139151    } 
    140152 
    141     int getEvents() 
     153    Event getEvents() 
    142154    { 
    143155        return events; 
     
    153165        AsyncSocketConduit newcond = new AsyncSocketConduit; 
    154166        cond.socket().accept(newcond.socket); 
    155         log.info("accepted new connection"); 
     167        log.info("accepted new connection {} {}", cast(uint) newcond, newcond.fileHandle()); 
    156168        return newcond; 
    157     } 
    158  
    159     int broadcast(char[] outbuf, TCPProvider[] recips) 
    160     { 
    161         foreach(TCPProvider c; recips) 
    162         { 
    163             if (c.appendOutBuffer(outbuf)) 
    164             { 
    165                 h.addEvent(Event.Write); 
    166                 vat.addConnection(h); 
    167             } 
    168         } 
    169         return 0; 
    170169    } 
    171170     
     
    178177 
    179178    **************************************************************************/ 
    180     int send(char[] outbufl
     179    void send(char[] outbuf
    181180    { 
    182181        if (appendOutBuffer(outbuf)) 
    183182        { 
    184             //TODO - should we always register for all events? or update it when needed? 
    185             //d.addEvent(Event.Write); 
    186             if (!vat.addConnection(d)) 
     183            addEvent(Event.Write); 
     184            if (!regFn(events)) 
    187185            { 
    188186                log.error("unable to register mgr"); 
    189187            } 
    190188        } 
    191         return 0; 
    192189    } 
    193190 
     
    199196    } 
    200197     
    201  
    202     ~this() 
    203     { 
    204         (cast(AsyncSocketConduit)manager.transport).shutdown(); 
    205         (cast(AsyncSocketConduit)manager.transport).detach(); 
    206     } 
    207  
    208198    int connect(IPv4Address addr) 
    209199    { 
     
    219209    /************************************************************************** 
    220210     
    221         send 
    222         User-called function to send data to the counterpart at the other 
    223         end of the connection. This sets up the connection manager to send 
    224         data as the socket becomes free.  
    225  
     211        appendOutBuffer 
     212 
     213        Adds an outgoing buffer to the list. This returns true if the list 
     214        was empty, indicating that the handler should be registered with the 
     215        SelectLoop. If it returns false, it was probably already registered. 
     216         
    226217    **************************************************************************/ 
    227     int send(char[] outbuf, IPv4Address addr = null) 
    228     { 
    229         if (!connected) 
    230         { 
    231             log.info("send: not connected, connecting"); 
    232             return -1; 
    233         } 
    234         if (appendOutBuffer(outbuf)) 
    235         { 
    236             addEvent(Event.Write); 
    237             if (!vat.addConnection(manager)) 
    238             { 
    239                 log.error("unable to register mgr"); 
    240             } 
    241         } 
    242         return 0; 
    243     } 
    244      
    245      
     218    bool appendOutBuffer(char[] outbuf) 
     219    { 
     220        out_buffers.append(outbuf); 
     221        out_buffers_len++; 
     222        if (out_buffers_len == 1) 
     223            return true; 
     224        else 
     225            return false; 
     226    } 
     227 
     228    /************************************************************************** 
     229 
     230        addOffset  
     231        Use this function to update the offset position after a successful data 
     232        send. This not only manages the current offset, but will update the  
     233        out buffer chain if necessary.  
     234 
     235        Returns: false if there is nothing left to send, true if there is. 
     236 
     237    **************************************************************************/  
     238    bool addOffset(int off) 
     239    in 
     240    { 
     241        assert(out_buffers_len > 0); 
     242    } 
     243    body 
     244    { 
     245        char[] hd = out_buffers.head(); 
     246        if ((off + o_offset) >= hd.length) 
     247        { 
     248            out_buffers.removeHead(); 
     249            o_offset = 0; 
     250            out_buffers_len--; 
     251            return (out_buffers_len > 0); 
     252        } 
     253        else 
     254            o_offset += off; 
     255        return true; 
     256    } 
     257 
     258    /************************************************************************** 
     259 
     260        char[] nextBuffer 
     261 
     262        Returns a slice of the current outbound buffer, returns a char[] pointing 
     263        to null if there is no current outbound buffer 
     264 
     265    **************************************************************************/ 
     266    char[] nextBuffer() 
     267    { 
     268        if (out_buffers_len < 1) 
     269        { 
     270            return null;  
     271        } 
     272 
     273        return out_buffers.head()[o_offset .. $]; 
     274    } 
     275     
     276    void setRegisterFunc( bool delegate (Event) fn) 
     277    { 
     278        regFn = fn; 
     279    } 
     280 
     281    void addEvent(Event evt) 
     282    { 
     283        events |= evt; 
     284    } 
     285 
     286    void remEvent(Event evt) 
     287    { 
     288        events &= !evt; 
     289    } 
     290 
    246291private 
    247     Vat vat; 
    248     Conduit cond; 
     292    AsyncSocketConduit cond; 
    249293    Logger log; 
    250294    bool listener; 
    251295    Event events; 
     296    bool connected; 
     297    CircularList!(char[]) out_buffers; 
     298    int out_buffers_len; 
     299    int o_offset; 
     300    bool delegate (Event) regFn; 
    252301} 
  • dreactor/transport/AsyncSocketConduit.d

    r12 r13  
    103103    ***********************************************************************/ 
    104104 
    105     override Handle fileHandle () 
     105    Handle fileHandle () 
    106106    { 
    107107       return cast(Handle) socket_.fileHandle; 
  • dreactor/util/Emitter.d

    r12 r13  
    1 module Emitter 
     1module dreactor.util.Emitter; 
    22 
    33 
     
    77 
    88import dreactor.core.Task; 
     9import dreactor.protocol.IProvider; 
    910 
    10 alias Message delegate(void) EmitterDg; 
     11alias Message delegate() EmitterDg; 
    1112 
    1213class Emitter 
     
    4445    Thread thread; 
    4546    bool running; 
    46     EmitterCb callback; 
     47    EmitterDg callback; 
    4748} 
    4849 
  • dsss.conf

    r12 r13  
    33#[test/chatserver.d] 
    44[test/chatclient.d] 
     5[test/chatserver.d] 
  • dsss.last

    r12 r13  
    33#[test/chatserver.d] 
    44[test/chatclient.d] 
     5[test/chatserver.d] 
  • test/chatclient.d

    r12 r13  
    11 
    22module chatclient; 
     3 
     4import tango.io.Stdout; 
     5import tango.io.Console; 
     6import tango.net.Socket; 
     7import tango.util.log.Log; 
    38 
    49import dreactor.core.Vat; 
    510import dreactor.core.Task; 
    611import dreactor.protocol.TcpProvider; 
     12import dreactor.util.Emitter; 
    713 
    8 enum {EMITTER_CHAT_RECEIVE = 42}; 
     14enum { StdinReceive = 42 } 
     15 
     16Logger log; 
    917 
    1018class ChatTask : Task 
     
    1321private 
    1422    TcpProvider client; 
    15  
     23    bool running; 
    1624public 
    1725    this(TcpProvider tcpclient)  
     
    2331    { 
    2432        Message msg; 
    25  
     33        running = true; 
    2634        auto em = new Emitter(this,  
    2735                { 
    2836                     char buf[] = Cin.copyln(true); 
    29                      return new Message(buf, EMITTER_STDIN_RECEIVE, buf.size); 
     37                     return Message(cast(void*)buf.ptr, StdinReceive, buf.length); 
    3038                }); 
    3139         
    32         while (msg = receive()
     40        while (running
    3341        { 
    34             switch(msg.type) 
     42            msg = receive(); 
     43            switch (msg.type) 
    3544            { 
    36                 case EMITTER_CHAT_RECEIVE: 
    37                     char[] inbuf = msg.payload; 
     45                case StdinReceive: 
     46                { 
     47                    char[] inbuf = getString(msg); 
    3848                    if (inbuf == "quit") 
    3949                    { 
    40                         em.stopNow(); 
    41                         return; 
     50                        running = false; 
    4251                    } 
    43                     client.send(msg.payload); 
    44                     break; 
    45  
    46                 case TcpProvider.RECEIVE: 
    47                     Stdout(cast(char[]) msg.payload); 
    48                     break; 
    49  
    50                 case TcpProvider.SEND_COMPLETE: 
    51                     break; 
    52  
    53                 case TcpProvider.REMOTE_CLOSED: 
    54                     Stdout("--- Remote host closed connection \n"); 
    55                     break; 
    56  
     52                    client.send(inbuf); 
     53                } 
     54                case TcpProvider.Receive: 
     55                { 
     56                    Stdout(getString(msg)); 
     57                } 
    5758                default: 
    58                     Stdout("Unknown message received\n"); 
     59                    Stdout("unknown msg received: {}", msg.type); 
    5960            } 
    6061        } 
     
    6465 
    6566 
    66 int main(int argc, char[][] argv
     67int main(char[][] args
    6768{ 
    68     auto vat = new Vat
    69     auto client = new TcpProvider(new IPv4Address("localhost", 5555), vat); 
    70     auto tsk = new ChatTask(client); 
    71     vat.addTask(task)
     69    log = Log.lookup("dreactor.chatserver")
     70    auto provider = new TcpProvider(new IPv4Address("localhost", 5555)); 
     71    auto tsk = new ChatTask(provider); 
     72    return 0
    7273} 
  • test/chatserver.d

    r12 r13  
    11 
    22module chatserver; 
     3 
     4import tango.io.Stdout; 
     5import tango.io.Console; 
     6import tango.util.container.CircularList; 
     7import tango.util.log.Log; 
     8import tango.net.Socket; 
    39 
    410import dreactor.core.Vat; 
    511import dreactor.core.Task; 
    612import dreactor.protocol.TcpProvider; 
     13import dreactor.transport.AsyncSocketConduit; 
    714 
     15typedef Message ChildTCPRequest; 
     16Logger log; 
    817 
    918class ChatConnectionTask : Task 
    1019{ 
    1120public 
     21    this(TcpProvider tcpclient)  
     22    { 
     23        super(tcpclient); 
     24    } 
     25 
     26    enum { 
     27        StdIn = 100, 
     28        RemoteClosed 
     29        } 
    1230 
    1331    void run() 
    1432    { 
     33        running = true; 
    1534        Message msg; 
    16         while (msg = receive()
     35        while (running
    1736        { 
     37            msg = receive(); 
    1838            switch(msg.type) 
    1939            { 
    20                 case TCP_PROVIDER_RECEIVE
    21                     //Stdout(cast(char[]) msg.payload); 
     40                case TcpProvider.Receive
     41                    Stdout(cast(char*) msg.payload); 
    2242                    break; 
    23                 case TCP_PROVIDER_SEND_COMPLETE
     43                case TcpProvider.SendComplete
    2444                    break; 
    25                 case TCP_PROVIDER_REMOTE_CLOSED
    26                     Stdout("--- Remote host closed connection \n"); 
     45                case TcpProvider.RemoteClosed
     46                    log.trace("--- Remote host closed connection \n"); 
    2747                    break; 
    2848                default: 
    29                     Stdout("Unknown message received\n"); 
     49                    log.trace("Unknown message received\n"); 
    3050            } 
    31         } 
    32         em.stopNow(); 
    33     } 
    34  
    35     void send(char[] buf) 
    36     { 
    37         tcp.send(buf); 
    38     } 
    39  
    40     static CircularList!(ChatConnectionTask!( 
    41 } 
    42  
    43  
    44 int main(int argc, char[][] argv) 
    45 { 
    46     auto vat = new Vat; 
    47   
    48     void listentask(Message msg) 
    49     { 
    50         switch(msg.type) 
    51         { 
    52             case TCP_PROVIDER_CONNECT: 
    53                 AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload; 
    54                 auto tsk = ChatConnectionTask(new TcpProvider(cond)); 
    55                 vat.addTask(tsk); 
    56                 break; 
    57             default: 
    58                 Stdout("Unknown message received\n"); 
    5951        } 
    6052    } 
    6153 
     54private 
     55    bool running; 
     56} 
    6257 
    63     auto provider = new TcpProvider(new IPv4Address("localhost", 5555), vat); 
    64     auto srvtsk = new Task(&listentask, provider); 
    65     vat.addTask(task, client); 
     58class ListenerTask : Task 
     59
     60    this(TcpProvider tcpclient)  
     61    { 
     62        super(tcpclient); 
     63    } 
     64    void run() 
     65    { 
     66        Message msg; 
     67        running = true; 
     68        while (running) 
     69        { 
     70            msg = receive(); 
     71            auto children = new CircularList!(ChatConnectionTask); 
     72            switch(msg.type) 
     73            { 
     74                case TcpProvider.NewConnection: 
     75                    AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload; 
     76                    log.trace("new conduit : {}", cast(uint) cond); 
     77                    auto provider = new TcpProvider(cond); 
     78                    auto tsk = new ChatConnectionTask(new TcpProvider(cond)); 
     79                    children.append(tsk); 
     80                    log.trace("accepted connection"); 
     81                    break; 
     82                case ChatConnectionTask.StdIn: 
     83                    char[] inbuf = (cast(char*) msg.payload)[0 .. msg.info]; 
     84                    break; 
     85                case ChatConnectionTask.RemoteClosed: 
     86                    break; 
     87                default: 
     88                    log.trace("Unknown message received"); 
     89            } 
     90        } 
     91    } 
     92private 
     93    bool running; 
    6694} 
     95 
     96int main(char[][] args) 
     97{ 
     98    log = Log.lookup("dreactor.chatserver"); 
     99    auto provider = new TcpProvider(new IPv4Address("localhost", 5555), true); 
     100    auto srvtsk = new ListenerTask(provider); 
     101    return 0; 
     102}