Changeset 13:8c9b1276f623
- Timestamp:
- 09/20/08 18:33:11 (4 years ago)
- Tags:
- Files:
-
- dreactor/core/Dispatcher.d (deleted)
- dreactor/core/Task.d (modified) (12 diffs)
- dreactor/core/Vat.d (modified) (12 diffs)
- dreactor/protocol/DefaultProvider.d (modified) (4 diffs)
- dreactor/protocol/IProvider.d (modified) (2 diffs)
- dreactor/protocol/TcpProvider.d (modified) (14 diffs)
- dreactor/transport/AsyncSocketConduit.d (modified) (1 diff)
- dreactor/util/Emitter.d (moved) (moved from dreactor/protocol/Emitter.d) (3 diffs)
- dsss.conf (modified) (1 diff)
- dsss.last (modified) (1 diff)
- test/chatclient (modified) (previous)
- test/chatclient.d (modified) (4 diffs)
- test/chatserver (modified) (previous)
- test/chatserver.d (modified) (1 diff)
- test/testtuple.d (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
dreactor/core/Task.d
r12 r13 4 4 import tango.util.container.HashMap; 5 5 import tango.util.container.CircularList; 6 import tango.core.Atomic; 7 6 8 import dreactor.core.Vat; 7 9 import dreactor.protocol.IProvider; 10 import dreactor.protocol.DefaultProvider; 8 11 9 12 alias CircularList!(Message) Messages; … … 20 23 if (box.get(type, m)) 21 24 { 22 Message msg = m.removeHead(); 23 if (msg) 25 if (!m.isEmpty) 26 { 27 Message msg = m.removeHead(); 24 28 msg_count.store(msg_count.load()-1); 25 29 26 if (m.isEmpty()) 30 if (m.isEmpty()) 31 box.removeKey(type); 32 return msg; 33 } 34 else 27 35 box.removeKey(type); 28 29 return msg; 30 } 31 else 32 return null; 36 } 37 Message msg; 38 return msg; 33 39 } 34 40 … … 40 46 { 41 47 Message msg = popMessageOfType(i); 42 if (msg )48 if (msg.valid) 43 49 return msg; 44 50 } 45 return null; 51 Message msg; 52 return msg; 46 53 } 47 54 … … 52 59 auto itor = box.iterator; 53 60 54 do61 while (true) 55 62 { 56 63 if (itor.valid && itor.next(key, m)) … … 59 66 { 60 67 Message msg = m.removeHead(); 61 if (msg )68 if (msg.valid) 62 69 msg_count.store(msg_count.load()-1); 63 70 if (m.isEmpty()) … … 67 74 else 68 75 { 69 it erator.remove();76 itor.remove(); 70 77 } 71 78 } 72 79 else 73 return null; 74 } 75 while (true) 80 { 81 Message msg; 82 return msg; 83 } 84 } 76 85 } 77 86 … … 108 117 int id; 109 118 Vat vat; 110 TaskD Gtaskdg;119 TaskDg taskdg; 111 120 IProvider provider; 112 121 113 122 public 114 this( TaskDg tdg = null, IProvider provider= null)115 { 116 fiber = new Fiber(&run );123 this(IProvider prov = null) 124 { 125 fiber = new Fiber(&run, 4096 * 4); 117 126 mailbox = new Mailbox; 118 127 lockedMailbox = new Mailbox; 119 taskdg = tdg;120 if (!provider)121 provider = new DefaultProvider;128 provider = prov ? prov : new DefaultProvider; 129 130 Vat.LocalVat.addTask(this); 122 131 } 123 132 … … 154 163 body 155 164 { 156 while (msg = receive()) 165 Message msg; 166 while ((msg = receive()).valid) 157 167 { 158 168 taskdg(msg); … … 171 181 { 172 182 Task t; 173 if ( t = vat.getTask(taskid))183 if ((t = vat.getTask(taskid)) !is null) 174 184 { 175 185 t.appendMessage(m); 176 186 return true; 177 187 } 178 else if ( t = Vat.getGlobalTask(taskid))188 else if ((t = Vat.getGlobalTask(taskid)) !is null) 179 189 { 180 190 t.appendIVMessage(m); … … 184 194 } 185 195 196 char[] getString(Message msg) 197 { 198 return (cast(char*) msg.payload)[0 .. msg.info]; 199 } 200 201 Fiber.State state() 202 { 203 return fiber.state(); 204 } 205 void call() 206 { 207 fiber.call(); 208 } 186 209 protected 187 210 … … 198 221 { 199 222 Message m = mailbox.popMessageOfType(types); 200 if (!m )223 if (!m.valid) 201 224 Fiber.yield(); 202 else if (SYSTEM_QUIT == m.type) 203 break; 204 else return m; 225 else 226 return m; 205 227 206 228 } 207 return null;208 229 } 209 230 … … 213 234 { 214 235 Message m = mailbox.popMessage(); 215 if (!m )236 if (!m.valid) 216 237 Fiber.yield(); 217 else if (SYSTEM_QUIT == m.type) 218 break; 219 else return m; 220 } 221 return null; 238 else 239 return m; 240 } 222 241 } 223 242 dreactor/core/Vat.d
r12 r13 18 18 import tango.core.Thread; 19 19 import tango.core.Atomic; 20 import tango.util.collection.CircularSeq;21 20 import tango.util.log.Log; 22 21 23 22 import dreactor.transport.AsyncSocketConduit; 24 23 import dreactor.protocol.IProvider; 24 import dreactor.protocol.DefaultProvider; 25 25 import dreactor.core.Task; 26 26 import dreactor.util.ThreadSafeQueue; … … 28 28 static char[] version_string = "Vat.d 0.1 2008-05-31"; 29 29 30 31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; 30 alias bool delegate (Event) RegDg; 32 31 33 32 class TaskAttachment … … 43 42 class Vat 44 43 { 44 static Vat LocalVat; 45 45 46 private 46 47 Thread thread; … … 52 53 Selector selector; 53 54 static Atomic!(int) taskCount; 54 TaskAttachment[int] globalTasks; //global registry of tasks55 static TaskAttachment[int] globalTasks; //global registry of tasks 55 56 56 57 public … … 64 65 thread.start(); 65 66 } 66 67 synchronized int addTask(Task t, IProvider p = null) 67 68 static this() 69 { 70 LocalVat = new Vat; 71 } 72 73 int addTask(Task t) 68 74 { 69 75 t.setVat(this); 70 ++taskCount; 71 auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider); 72 tasks[taskCount] = ta; 73 globalTask[taskCount] = ta; 76 int taskid = taskCount.load() + 1; 77 taskCount.store(taskid); 78 auto p = t.getProvider(); 79 if (p is null) 80 p = new DefaultProvider; 81 p.setRegisterFunc(createRegFunc(taskid)); //default the task id as a param in the delegate 82 auto ta = new TaskAttachment(t, p); 83 tasks[taskid] = ta; 84 globalTasks[taskid] = ta; 74 85 selector.register(p.getConduit(), p.getEvents(), ta); 75 t.setId(taskCount); 76 return taskCount; 86 t.setId(taskid); 87 88 return taskid; 77 89 } 78 90 … … 87 99 } 88 100 89 bool addConnection(int tid, Conduit c, Eventsevts)101 bool register(int tid, Event evts) 90 102 { 91 103 log.trace("adding handler"); 92 TaskAttachment ta; 93 if (ta = (tid in tasks)) 94 return selector.register(c, evts, ta); 104 TaskAttachment* ta; 105 if ((ta = (tid in tasks)) !is null) 106 { 107 selector.register((*ta).provider.getConduit(), evts, *ta); 108 return true; 109 } 95 110 else 111 { 96 112 return false; 97 } 98 113 } 114 } 115 116 RegDg createRegFunc(int taskid) 117 { 118 class Functor 119 { 120 int taskid; 121 this (int tid) 122 { 123 taskid = tid; 124 } 125 bool call(Event evts) 126 { 127 return register(taskid, evts); 128 } 129 } 130 auto ftor = new Functor(taskid); 131 return &ftor.call; 132 } 133 99 134 bool remConnection(Conduit c) 100 135 { 101 return selector.unregister(c); 136 selector.unregister(c); 137 return true; 102 138 } 103 139 104 140 Task getTask(int tid) 105 141 { 106 TaskAttachment ta;107 if ( ta = (tid in tasks))142 TaskAttachment* ta; 143 if ((ta = (tid in tasks)) !is null) 108 144 return ta.task; 109 145 else … … 111 147 } 112 148 113 static synchronizedTask getGlobalTask(int tid)114 { 115 TaskAttachment ta;116 if ( ta = (tid in globaltasks))149 static Task getGlobalTask(int tid) 150 { 151 TaskAttachment* ta; 152 if ((ta = (tid in globalTasks)) !is null) 117 153 return ta.task; 118 154 else … … 140 176 log.trace("Read event fired"); 141 177 auto ta = cast(TaskAttachment) key.attachment; 142 processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit);178 ta.task.appendMessage(ta.provider.handleRead()); 143 179 144 180 } … … 147 183 log.trace("Write event fired"); 148 184 auto ta = cast(TaskAttachment) key.attachment; 149 ta.appendMessage(ta.provider.handleWrite()); 150 processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit); 185 ta.task.appendMessage(ta.provider.handleWrite()); 151 186 } 152 187 else if (key.isHangup()) … … 154 189 log.trace("Hangup event fired"); 155 190 auto ta = cast(TaskAttachment) key.attachment; 156 ta.appendMessage(ta.provider.handleDisconnect()); 157 processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit); 191 ta.task.appendMessage(ta.provider.handleDisconnect()); 158 192 } 159 193 else if (key.isError() || key.isInvalidHandle()) … … 162 196 // error, close connection 163 197 auto ta = cast(TaskAttachment) key.attachment; 164 ta.appendMessage(ta.provider.handleError()); 165 processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit); 198 ta.task.appendMessage(ta.provider.handleError()); 166 199 } 167 200 } … … 184 217 foreach(int k; tasks.keys) 185 218 { 186 if (tasks[k]. state() == Fiber.State.HOLD)187 tasks[k]. call();188 if (tasks[k]. state() == Fiber.State.TERM)219 if (tasks[k].task.state() == Fiber.State.HOLD) 220 tasks[k].task.call(); 221 if (tasks[k].task.state() == Fiber.State.TERM) 189 222 tasks.remove(k); 190 223 } 191 224 } 192 225 193 void processReturn(int result, Conduit c)194 {195 switch(result)196 {197 case CLOSE:198 selector.unregister(c);199 c.detach();200 break;201 case UNREGISTER:202 selector.unregister(c);203 break;204 case REMAIN:205 //this space intentially left blank206 break;207 case REGISTER:208 break;209 case REREGISTER:210 break;211 default:212 log.error("processReturn: unknown return value");213 }214 }215 226 } dreactor/protocol/DefaultProvider.d
r12 r13 1 1 module dreactor.protocol.DefaultProvider; 2 2 3 import tango.io.Selector; 3 import tango.io.selector.model.ISelector; 4 import tango.io.device.Conduit; 4 5 5 6 import dreactor.protocol.IProvider; … … 11 12 private 12 13 Conduit cond; 13 Events evts; 14 Event evts; 15 bool delegate (Event) regFn; 16 public 14 17 15 public 16 Message handleRead(Conduit c) 17 { 18 enum { 19 Read = 1000, 20 Write, 21 Error, 22 Connect, 23 Disconnect 18 24 } 19 25 20 Message handle Write(Conduit c)26 Message handleRead() 21 27 { 28 return Message(cast(void*)cond, Read, 0); 22 29 } 23 30 24 Message handle Error(Conduit c)31 Message handleWrite() 25 32 { 33 return Message(cast(void*)cond, Write, 0); 26 34 } 27 35 28 Message handle Connect(Conduit c)36 Message handleError() 29 37 { 38 return Message(cast(void*)cond, Error, 0); 30 39 } 31 40 32 Message handle Disconnect(Conduit c)41 Message handleConnect() 33 42 { 43 return Message(cast(void*)cond, Connect, 0); 44 } 45 46 Message handleDisconnect() 47 { 48 return Message(cast(void*)cond, Disconnect, 0); 34 49 } 35 50 … … 40 55 } 41 56 42 int getEvents() 57 void send(char[] buf) 58 { 59 } 60 61 Event getEvents() 43 62 { 44 63 return evts; … … 47 66 void setEvents(Event e) 48 67 { 49 evts e;68 evts = e; 50 69 } 70 71 void setRegisterFunc( bool delegate (Event) fn) 72 { 73 regFn = fn; 74 } 51 75 } dreactor/protocol/IProvider.d
r12 r13 1 1 module dreactor.protocol.IProvider; 2 2 3 class Message 3 import tango.io.selector.model.ISelector; 4 import tango.io.device.Conduit; 5 6 struct Message 4 7 { 5 8 public 9 6 10 int type; 7 11 int info; 8 Object payload; 9 this (Object buf, int t, int e) 12 void* payload; 13 int from; 14 bool valid; 15 16 static Message opCall(void* buf, int t, int e, int f = 0) 10 17 { 11 type = t; 12 info = e; 13 payload = buf; 18 Message m; 19 m.type = t; 20 m.info = e; 21 m.from = f; 22 m.payload = buf; 23 m.valid = true; 24 return m; 14 25 } 15 26 } … … 17 28 interface IProvider 18 29 { 19 Message handleRead( Conduit c);20 Message handleWrite( Conduit c);21 Message handleError( Conduit c);22 Message handleConnect( Conduit c);23 Message handleDisconnect( Conduit c);24 abstractvoid send(char []);25 30 Message handleRead(); 31 Message handleWrite(); 32 Message handleError(); 33 Message handleConnect(); 34 Message handleDisconnect(); 35 void send(char []); 36 void setRegisterFunc(bool delegate (Event)); 26 37 Conduit getConduit(); 27 int getEvents();28 void setEvents( );38 Event getEvents(); 39 void setEvents(Event e); 29 40 } dreactor/protocol/TcpProvider.d
r12 r13 1 module dreactor.protocol. RawTcp;1 module dreactor.protocol.TcpProvider; 2 2 3 3 import tango.io.device.Conduit; 4 4 import tango.io.selector.model.ISelector; 5 5 import tango.net.Socket; 6 import tango.util.co llection.CircularSeq;6 import tango.util.container.CircularList; 7 7 import tango.util.log.Log; 8 8 import tango.util.log.Config; … … 10 10 import dreactor.transport.AsyncSocketConduit; 11 11 import dreactor.core.Vat; 12 import dreactor.core.Dispatcher; 13 12 public import dreactor.protocol.IProvider; 14 13 /****************************************************************************** 15 14 … … 17 16 18 17 ******************************************************************************/ 19 class T CPProvider : IProvider18 class TcpProvider : IProvider 20 19 { 21 20 public 22 23 enum 24 { 25 RECEIVE = 1000, 26 SEND_COMPLETE, 27 NEW_CONNECTION, 28 REMOTE_CLOSED, 29 SEND_ERROR, 30 RECEIVE_ERROR, 31 ERROR 21 enum { 22 SendComplete = 2000, 23 NewConnection, 24 Receive, 25 RemoteClosed, 26 SendError, 27 ReceiveError, 28 Error 32 29 } 33 30 … … 37 34 log.info("log initialized"); 38 35 cond = c; 39 } 40 41 this(Vat v, IPv4Address addr) 42 { 43 AsyncSocketConduit cond = new AsyncSocketConduit; 44 cond.socket().setAddressReuse(true); 45 this(cond); 46 } 47 36 events = Event.Read; 37 } 38 39 this(IPv4Address addr, bool listen = false) 40 { 41 AsyncSocketConduit c = new AsyncSocketConduit; 42 c.socket().setAddressReuse(true); 43 if (listen) 44 { 45 c.bind(addr); 46 c.socket().listen(1000); 47 listener = listen; 48 } 49 else 50 c.connect(addr); 51 this(c); 52 } 53 54 48 55 ~this() 49 56 { … … 51 58 } 52 59 53 Message handleRead( Conduit c)60 Message handleRead() 54 61 { 55 62 Logger log = Log.lookup("Handlers.onReceive"); 63 64 if (listener) 65 return handleConnect(); 56 66 57 67 char inbuf[8192]; 58 68 int amt; 59 if((amt = h.transport.read(inbuf)) > 0)60 { 61 return new Message(inbuf[0 .. amt].dup, RECEIVE, amt);69 if((amt = cond.read(inbuf)) > 0) 70 { 71 return Message(inbuf[0 .. amt].dup.ptr, Receive, amt); 62 72 } 63 73 else … … 65 75 if (amt == 0) 66 76 { 67 children.remove(h); 68 (cast(AsyncSocketConduit) h.transport).shutdown(); 69 return Message(null, REMOTE_CLOSED, amt); 77 cond.shutdown(); 78 return Message(null, RemoteClosed, amt); 70 79 } 71 80 log.error("Received no data, err = {}", amt); 72 81 } 73 return new Message(null, ERROR, amt);82 return Message(null, Error, amt); 74 83 } 75 84 … … 82 91 83 92 ***************************************************************************/ 84 Message handleWrite( Conduit c)93 Message handleWrite() 85 94 { 86 95 Logger log = Log.lookup("Handlers.onSend"); … … 96 105 //h.remEvent(Event.Write); 97 106 //TODO - How do we handle event re-registering 98 return new Message(null, SEND_COMPLETE, sent);107 return Message(null, SendComplete, sent); 99 108 } 100 109 } … … 102 111 { 103 112 log.error("Select said socket was writable, but sent 0 bytes"); 104 return new Message(null, SEND_ERROR, 0);113 return Message(null, Error, 0); 105 114 } 106 115 else 107 116 { 108 117 log.error("Socket send return ERR"); 109 return new Message(null, SEND_ERROR, sent); 110 } 111 } 112 else 113 { 114 //h.remEvent(Event.Write); 115 //TODO - How do we handle event re-registering 116 117 return new Message(null, SEND_COMPLETE, 0); 118 } 119 } 120 121 Message handleDisconnect(Conduit c) 122 { 123 return new Message(c, REMOTE_CLOSED, 0); 124 } 125 126 Message handleError(Conduit c) 127 { 128 return new Messsage(null, ERROR, 0); 118 return Message(null, Error, sent); 119 } 120 } 121 else 122 { 123 remEvent(Event.Write); 124 if (!regFn(events)) 125 { 126 log.error("unable to register mgr"); 127 } 128 return Message(null, SendComplete, 0); 129 } 130 } 131 132 Message handleDisconnect() 133 { 134 return Message(cast(void*)cond, RemoteClosed, 0); 135 } 136 137 Message handleError() 138 { 139 return Message(cast(void*)cond, Error, 0); 129 140 } 130 141 131 Message handleConnect(Conduit c) 132 { 133 return new Message(accept(), NEW_CONNECTION, 0); 142 Message handleConnect() 143 { 144 log.trace("accepting new connection"); 145 return Message(cast(void*)accept(), NewConnection, 0); 134 146 } 135 147 … … 139 151 } 140 152 141 int getEvents()153 Event getEvents() 142 154 { 143 155 return events; … … 153 165 AsyncSocketConduit newcond = new AsyncSocketConduit; 154 166 cond.socket().accept(newcond.socket); 155 log.info("accepted new connection ");167 log.info("accepted new connection {} {}", cast(uint) newcond, newcond.fileHandle()); 156 168 return newcond; 157 }158 159 int broadcast(char[] outbuf, TCPProvider[] recips)160 {161 foreach(TCPProvider c; recips)162 {163 if (c.appendOutBuffer(outbuf))164 {165 h.addEvent(Event.Write);166 vat.addConnection(h);167 }168 }169 return 0;170 169 } 171 170 … … 178 177 179 178 **************************************************************************/ 180 int send(char[] outbufl)179 void send(char[] outbuf) 181 180 { 182 181 if (appendOutBuffer(outbuf)) 183 182 { 184 //TODO - should we always register for all events? or update it when needed? 185 //d.addEvent(Event.Write); 186 if (!vat.addConnection(d)) 183 addEvent(Event.Write); 184 if (!regFn(events)) 187 185 { 188 186 log.error("unable to register mgr"); 189 187 } 190 188 } 191 return 0;192 189 } 193 190 … … 199 196 } 200 197 201 202 ~this()203 {204 (cast(AsyncSocketConduit)manager.transport).shutdown();205 (cast(AsyncSocketConduit)manager.transport).detach();206 }207 208 198 int connect(IPv4Address addr) 209 199 { … … 219 209 /************************************************************************** 220 210 221 send 222 User-called function to send data to the counterpart at the other 223 end of the connection. This sets up the connection manager to send 224 data as the socket becomes free. 225 211 appendOutBuffer 212 213 Adds an outgoing buffer to the list. This returns true if the list 214 was empty, indicating that the handler should be registered with the 215 SelectLoop. If it returns false, it was probably already registered. 216 226 217 **************************************************************************/ 227 int send(char[] outbuf, IPv4Address addr = null) 228 { 229 if (!connected) 230 { 231 log.info("send: not connected, connecting"); 232 return -1; 233 } 234 if (appendOutBuffer(outbuf)) 235 { 236 addEvent(Event.Write); 237 if (!vat.addConnection(manager)) 238 { 239 log.error("unable to register mgr"); 240 } 241 } 242 return 0; 243 } 244 245 218 bool appendOutBuffer(char[] outbuf) 219 { 220 out_buffers.append(outbuf); 221 out_buffers_len++; 222 if (out_buffers_len == 1) 223 return true; 224 else 225 return false; 226 } 227 228 /************************************************************************** 229 230 addOffset 231 Use this function to update the offset position after a successful data 232 send. This not only manages the current offset, but will update the 233 out buffer chain if necessary. 234 235 Returns: false if there is nothing left to send, true if there is. 236 237 **************************************************************************/ 238 bool addOffset(int off) 239 in 240 { 241 assert(out_buffers_len > 0); 242 } 243 body 244 { 245 char[] hd = out_buffers.head(); 246 if ((off + o_offset) >= hd.length) 247 { 248 out_buffers.removeHead(); 249 o_offset = 0; 250 out_buffers_len--; 251 return (out_buffers_len > 0); 252 } 253 else 254 o_offset += off; 255 return true; 256 } 257 258 /************************************************************************** 259 260 char[] nextBuffer 261 262 Returns a slice of the current outbound buffer, returns a char[] pointing 263 to null if there is no current outbound buffer 264 265 **************************************************************************/ 266 char[] nextBuffer() 267 { 268 if (out_buffers_len < 1) 269 { 270 return null; 271 } 272 273 return out_buffers.head()[o_offset .. $]; 274 } 275 276 void setRegisterFunc( bool delegate (Event) fn) 277 { 278 regFn = fn; 279 } 280 281 void addEvent(Event evt) 282 { 283 events |= evt; 284 } 285 286 void remEvent(Event evt) 287 { 288 events &= !evt; 289 } 290 246 291 private 247 Vat vat; 248 Conduit cond; 292 AsyncSocketConduit cond; 249 293 Logger log; 250 294 bool listener; 251 295 Event events; 296 bool connected; 297 CircularList!(char[]) out_buffers; 298 int out_buffers_len; 299 int o_offset; 300 bool delegate (Event) regFn; 252 301 } dreactor/transport/AsyncSocketConduit.d
r12 r13 103 103 ***********************************************************************/ 104 104 105 overrideHandle fileHandle ()105 Handle fileHandle () 106 106 { 107 107 return cast(Handle) socket_.fileHandle; dreactor/util/Emitter.d
r12 r13 1 module Emitter1 module dreactor.util.Emitter; 2 2 3 3 … … 7 7 8 8 import dreactor.core.Task; 9 import dreactor.protocol.IProvider; 9 10 10 alias Message delegate( void) EmitterDg;11 alias Message delegate() EmitterDg; 11 12 12 13 class Emitter … … 44 45 Thread thread; 45 46 bool running; 46 Emitter Cbcallback;47 EmitterDg callback; 47 48 } 48 49 dsss.conf
r12 r13 3 3 #[test/chatserver.d] 4 4 [test/chatclient.d] 5 [test/chatserver.d] dsss.last
r12 r13 3 3 #[test/chatserver.d] 4 4 [test/chatclient.d] 5 [test/chatserver.d] test/chatclient.d
r12 r13 1 1 2 2 module chatclient; 3 4 import tango.io.Stdout; 5 import tango.io.Console; 6 import tango.net.Socket; 7 import tango.util.log.Log; 3 8 4 9 import dreactor.core.Vat; 5 10 import dreactor.core.Task; 6 11 import dreactor.protocol.TcpProvider; 12 import dreactor.util.Emitter; 7 13 8 enum {EMITTER_CHAT_RECEIVE = 42}; 14 enum { StdinReceive = 42 } 15 16 Logger log; 9 17 10 18 class ChatTask : Task … … 13 21 private 14 22 TcpProvider client; 15 23 bool running; 16 24 public 17 25 this(TcpProvider tcpclient) … … 23 31 { 24 32 Message msg; 25 33 running = true; 26 34 auto em = new Emitter(this, 27 35 { 28 36 char buf[] = Cin.copyln(true); 29 return new Message(buf, EMITTER_STDIN_RECEIVE, buf.size);37 return Message(cast(void*)buf.ptr, StdinReceive, buf.length); 30 38 }); 31 39 32 while ( msg = receive())40 while (running) 33 41 { 34 switch(msg.type) 42 msg = receive(); 43 switch (msg.type) 35 44 { 36 case EMITTER_CHAT_RECEIVE: 37 char[] inbuf = msg.payload; 45 case StdinReceive: 46 { 47 char[] inbuf = getString(msg); 38 48 if (inbuf == "quit") 39 49 { 40 em.stopNow(); 41 return; 50 running = false; 42 51 } 43 client.send(msg.payload); 44 break; 45 46 case TcpProvider.RECEIVE: 47 Stdout(cast(char[]) msg.payload); 48 break; 49 50 case TcpProvider.SEND_COMPLETE: 51 break; 52 53 case TcpProvider.REMOTE_CLOSED: 54 Stdout("--- Remote host closed connection \n"); 55 break; 56 52 client.send(inbuf); 53 } 54 case TcpProvider.Receive: 55 { 56 Stdout(getString(msg)); 57 } 57 58 default: 58 Stdout(" Unknown message received\n");59 Stdout("unknown msg received: {}", msg.type); 59 60 } 60 61 } … … 64 65 65 66 66 int main( int argc, char[][] argv)67 int main(char[][] args) 67 68 { 68 auto vat = new Vat;69 auto client = new TcpProvider(new IPv4Address("localhost", 5555), vat);70 auto tsk = new ChatTask( client);71 vat.addTask(task);69 log = Log.lookup("dreactor.chatserver"); 70 auto provider = new TcpProvider(new IPv4Address("localhost", 5555)); 71 auto tsk = new ChatTask(provider); 72 return 0; 72 73 } test/chatserver.d
r12 r13 1 1 2 2 module chatserver; 3 4 import tango.io.Stdout; 5 import tango.io.Console; 6 import tango.util.container.CircularList; 7 import tango.util.log.Log; 8 import tango.net.Socket; 3 9 4 10 import dreactor.core.Vat; 5 11 import dreactor.core.Task; 6 12 import dreactor.protocol.TcpProvider; 13 import dreactor.transport.AsyncSocketConduit; 7 14 15 typedef Message ChildTCPRequest; 16 Logger log; 8 17 9 18 class ChatConnectionTask : Task 10 19 { 11 20 public 21 this(TcpProvider tcpclient) 22 { 23 super(tcpclient); 24 } 25 26 enum { 27 StdIn = 100, 28 RemoteClosed 29 } 12 30 13 31 void run() 14 32 { 33 running = true; 15 34 Message msg; 16 while ( msg = receive())35 while (running) 17 36 { 37 msg = receive(); 18 38 switch(msg.type) 19 39 { 20 case T CP_PROVIDER_RECEIVE:21 //Stdout(cast(char[]) msg.payload);40 case TcpProvider.Receive: 41 Stdout(cast(char*) msg.payload); 22 42 break; 23 case T CP_PROVIDER_SEND_COMPLETE:43 case TcpProvider.SendComplete: 24 44 break; 25 case T CP_PROVIDER_REMOTE_CLOSED:26 Stdout("--- Remote host closed connection \n");45 case TcpProvider.RemoteClosed: 46 log.trace("--- Remote host closed connection \n"); 27 47 break; 28 48 default: 29 Stdout("Unknown message received\n");49 log.trace("Unknown message received\n"); 30 50 } 31 }32 em.stopNow();33 }34 35 void send(char[] buf)36 {37 tcp.send(buf);38 }39 40 static CircularList!(ChatConnectionTask!(41 }42 43 44 int main(int argc, char[][] argv)45 {46 auto vat = new Vat;47 48 void listentask(Message msg)49 {50 switch(msg.type)51 {52 case TCP_PROVIDER_CONNECT:53 AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload;54 auto tsk = ChatConnectionTask(new TcpProvider(cond));55 vat.addTask(tsk);56 break;57 default:58 Stdout("Unknown message received\n");59 51 } 60 52 } 61 53 54 private 55 bool running; 56 } 62 57 63 auto provider = new TcpProvider(new IPv4Address("localhost", 5555), vat); 64 auto srvtsk = new Task(&listentask, provider); 65 vat.addTask(task, client); 58 class ListenerTask : Task 59 { 60 this(TcpProvider tcpclient) 61 { 62 super(tcpclient); 63 } 64 void run() 65 { 66 Message msg; 67 running = true; 68 while (running) 69 { 70 msg = receive(); 71 auto children = new CircularList!(ChatConnectionTask); 72 switch(msg.type) 73 { 74 case TcpProvider.NewConnection: 75 AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload; 76 log.trace("new conduit : {}", cast(uint) cond); 77 auto provider = new TcpProvider(cond); 78 auto tsk = new ChatConnectionTask(new TcpProvider(cond)); 79 children.append(tsk); 80 log.trace("accepted connection"); 81 break; 82 case ChatConnectionTask.StdIn: 83 char[] inbuf = (cast(char*) msg.payload)[0 .. msg.info]; 84 break; 85 case ChatConnectionTask.RemoteClosed: 86 break; 87 default: 88 log.trace("Unknown message received"); 89 } 90 } 91 } 92 private 93 bool running; 66 94 } 95 96 int main(char[][] args) 97 { 98 log = Log.lookup("dreactor.chatserver"); 99 auto provider = new TcpProvider(new IPv4Address("localhost", 5555), true); 100 auto srvtsk = new ListenerTask(provider); 101 return 0; 102 }
