Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

ServerSocket, EPoll & ThreadPools problem

Moderators: kris

Posted: 02/18/09 09:34:36

I am trying to create a server that uses Epoll for the socket connection management and ThreadPools? for the parallel handling of requests. The server just reads a file and writes it to the socket. Everything is working fine so far accept, that the server shuts down if some load is put on it. I used apache's ab to test the server with different scenarios. When I use 'ab -n 100000 -c 500' then the test will be conducted with 100.000 request and 500 concurrent requests. ab runs fine, but at the end or in a second run receives a 'connection reset by peer'. How does this happen?

In the example below I just open a file and stream it to the socket conduit. If I open the file only once at the startup of the server and just write the it over and over to the socket without opening the again and again I don't get the error? Do I miss something? Do I need to make sure that the file can be opened? I didn't get a "too many files open" exception, though. Any help would be great.

    this (uint numberThreads, uint socketPort, uint backLog, bool reuse = false)
    {
        this.numberThreads  = numberThreads;
        this.socketPort     = socketPort;
        this.backLog        = backLog;
        this.reuse          = reuse;  
        this.child_id       = getpid(); 
        this.initializeSocket();
    }

    private void initializeSocket() 
    {
        this.socket = new ServerSocket( new InternetAddress(this.socketPort), 
                                        this.backLog, 
                                        this.reuse);
    }
    
    private void initializeThreads()
    {
        this.thread_pool    = new ThreadPool!(http_thread_param)(this.numberThreads);    
    }
    
    public void start() 
    {  
        EpollSelector       selector;               
        int                 _conduit_filehandle;
        
        this.initializeThreads();       
        
        // initialize EpollSelector
        selector = new EpollSelector();
        selector.open(1, 1);
        selector.register(this.socket, Event.Read);                        
        this.socket.socket.blocking(false);
              
        while(1)
        {  
            if (selector.select(EPOLLWAIT_INFINITE) > 0) 
            {   
                foreach (SelectionKey key; selector.selectedSet())
                {
                    try {
                        SocketConduit _conduit                  = (cast(ServerSocket) key.conduit).accept(); 
                        _conduit_filehandle                     = _conduit.fileHandle();                        
                        this.connections[_conduit_filehandle]   = _conduit;
                    }                    
                    catch (Exception e) {
                        selector.reregister(key.conduit, Event.Read);
                        break;
                    } 
                    
                    if (key.isReadable()) 
                    {   
                        // just an object to handle the incoming request strings 
                        Request _req = new Request(this.connections[_conduit_filehandle]);
                        
                        if (_req.getInputLength > 0) {   
                            this.requests[_conduit_filehandle] = _req;
                            selector.reregister(key.conduit, Event.Write);                            
                        }
                        else {         
                            selector.reregister(key.conduit, Event.Read);
                            break;
                        }   
                    }
                    
                    if (key.isWritable()) 
                    {  
                        this.serviceThread(this.connections[_conduit_filehandle], this.requests[_conduit_filehandle]);
                        selector.reregister(key.conduit, Event.Read);                        
                    }
                                       
                    if (key.isError() || key.isHangup() || key.isInvalidHandle())
                    {   
                        selector.unregister(key.conduit);
                        this.connections[_conduit_filehandle].detach();    
                    }                                                              
                }
            }
        }
        
        scope(exit) {            
            selector.close();            
        }  
        
        return 0;
    }
        
    private void serviceThread ( SocketConduit conduit, HttpRequest request ) 
    {          
        this.thread_pool.assign(&this.threadAction, thread_params);
    }
    
    private void threadAction(http_thread_param thread_params)
    {         
        FileInput fi = new FileInput(<somefile>);        
        thread_params.conduit.copy(fi);        
        thread_params.conduit.flush();
        thread_params.conduit.detach();        
        fi.close();        
    }

Strangely the error does not occur if I am opening the file again and again, but do not write the content to the conduit, but instead use write with some dummy text.

    private void threadAction(http_thread_param thread_params)
    {         
        FileInput fi = new FileInput(<somefile>);        
        thread_params.conduit.write('TEST');        
        thread_params.conduit.flush();
        thread_params.conduit.detach();        
        fi.close();        
    }
Author Message

Posted: 02/18/09 09:57:46

Since the error message normally means that the receiver signals some error (or close down situation) (in this case I assume that would be ab?), it sounds like the issue isn't with your Tango application but rather the receiver? Maybe your example is too much for it ;)

Posted: 02/18/09 21:56:44

You have some problems in your code.

But worst here is that you are calling accept on ANY socket that is readable. This will undoubtedly throw an exception when called on a non-server socket. It appears that you break from the loop so no further processing will happen. You should check that the conduit is the server socket before calling accept.

I'm not sure what you are doing with all the reregister calls.

Have you tried logging what is happening to see how your code behaves? I'd be surprised if it is serving any data.

-Steve

Posted: 02/18/09 23:57:44 -- Modified: 02/19/09 00:04:00 by
lars_kirchhoff

Thanks for your comments. I'm new to Epoll and this is my first attempt. It is working and is sending data correctly. I've written a test application, that request different files randomly and checks if they have been sent correctly (I was suspicious because of the threads, that different requests would be mixing up). But it is running fine.

As far as I understood Epoll and the Epoll implementation of tango, I am able to specify the socket to which Epoll should listen and also specify the event I would like to check. I thought this would be done with:

selector.register(this.socket, Event.Read);

So I register the server socket, which I have created in the constructor, with the Epoll Selector to listen to read events. If I get a read event I grab the conduit from the socket accept and read the input. I finally reregister the Epoll selector to the conduit I got back from the socket accept to write events in order to wait for a next available spot to write on the socket conduit.

But after looking at the code I see your (@Steve) point that trying to accept a connection on each selector key event is stupid as some events are not from ServerSockets? anymore, but rather SocketConduits?. But how do I know that it is a ServerSocket? or a SocketConduit?, that is provided by key.conduit?

Another possible reason could be that I start the serviceThread and immediately reregister the conduit to be readable. That means the socket could potentially read before it has even written anything. This would explain why the "reset by peer" error occurs after fewer request, when I request and send files that are larger (1Mb).

So two questions remain? How do I differentiate the type of key.conduit (ServerSocket? vs. SocketConduit?)? And do I need to reregister the SocketConduit? after writing? And if yes, how do I know if a thread is completed? (Ok more than two, ;)).. thanks for any comments.

/lars

Posted: 02/19/09 09:35:52

After some sleep and more thoughts I think the code is correct. If the Epoll register function works the way that it sets a socket on which to listen and reregister allows to change the events on which action should be performed than the code should be alright. If you look closely, I do not register the SocketConduit?, but only register and reregister the ServerSocket? conduit. First I register the ServerSocket? and within the loop I always register the key.conduit, which should be always the ServerSocket? that is initialized before the loop.

That leaves only the unregister in case of the an error, hangup and invalid handle to be a reason and the problem with the reregister of the socket after the thread is called.

/lars