Server with non-blocking user input without select module

andreasu 0 Tallied Votes 585 Views Share

A simple streaming server-client pair with non-blocking user input can easily be coded in Python by using the 'select' module that provides a straightforward interface to the Unix select() system call. For Windows users however this is not an option as the acceptance of three empty sequences in the conventional way of calling select() is platform-dependent. Consider the following expression

inputready,outputready,exceptready = select.select(input,[],[])

where input = [server,sys.stdin]. Client sockets are also appended to this list.
inputready is then tested for serversocket, client sockets and stdin each of which can be handled separately. This is the way select is normally used for non-blocking user input and is known to work on Unix but unfortunately not on Windows.

The answer to this problem on Windows machines is the clever use of threading! See the code below

Global lists are used to store the client connections, client threads and user input thread to allow access to their data from all threads. I've extended the code quite a bit and added some extra features. It's actually a complete client server pair. The server accepts multiple clients. Client state changes are also reported. If the server goes down, all clients are disconnected but their input threads stay alive. By typing 'reconnect' the client can attempt to reestablish the connection to the server. My implementation is just one way of accomplishing this. Feel free to modify, extend and reuse the code to your will. Here's the code:

Threaded server with non-blocking user input

# Threaded streaming server with non-blocking input.
# Without the use of 'select' module.
# Version: 1.0    Date: 3.12.2010
# Author: Andreas Urbanski

from socket import *
import threading
import time
import sys

# For client sockets
clients = []
# For client threads
threads = []
opensockets = []

# Just a global define. Show / don't show timestamps
ts = 1

def timestamp(use=1):
    
    if use:
        return time.strftime('  [%Y-%m-%d %H:%M:%S]')
    else:
        return chr(0)
    
# Server class handles all server operations and client threads
class Server:

    def __init__( self ):
        
        # Use 'localhost' for local use
        self.host = gethostbyname(gethostname())
        if len(sys.argv) < 2:
            self.port = 7070    # default port
        else:
            self.port = int(sys.argv[1])
        self.addr = (self.host, self.port)
        self.backlog = 5
        self.size = 1024
        self.sobj = None

    def run ( self ):
        
        try:
            # Open the socket
            self.sobj = socket(AF_INET, SOCK_STREAM)
            self.sobj.bind(self.addr)
            self.sobj.listen(self.backlog)
            # server socket open, store to list
            opensockets.append(self.sobj)
            
            print 'Streaming server started %s. Name %s at\nAddress %s and port %d'\
                %( time.strftime('%Y-%m-%d %H:%M:%S'),
                   repr(gethostname()),
                   gethostbyname(gethostname()),
                   self.port)
        # handle errors    
        except error, (value, message):
            if self.sobj:
                self.sobj.close()
                print 'Could not open socket: ' + message
                sys.exit(1)
                
        running = 1
        # Start the thread to handle user input
        Input().start()
        while running:
            client, address = self.sobj.accept()
            cli = Client((client, address))
            cli.start()
            # Append the client thread to threads list
            threads.append(cli)
            # Append the client connection handle to clients list 
            clients.append(client)
            print 'Client connected on '+repr(address)\
                  +timestamp(ts)

        self.sobj.close()
        # finally join all threads
        for cli in threads:
            cli.join()

# Input class is a threaded handler for user input.
class Input( Server, threading.Thread ):

    def __init__( self ):
        
        threading.Thread.__init__(self)
        Server.__init__(self)
        
    def exitall( self ):
        # Close all sockets and exit
        for cli in clients:
            cli.close()
        sys.exit(0)

    def run( self ):
        
        print 'Input ready\n'
        running = 1
        while running:
            # Get user input
            data = raw_input('')
            if data:
                # To exit
                if data == 'quit':
                    self.exitall()
                    running = 0
                    sys.exit(0)
                else:
                    # Send the input data to all clients
                    for client in clients:
                        client.send(data)
                    print 'Sent '+repr(data)+' to all clients.'\
                          +timestamp(ts)

# Client threads
class Client ( threading.Thread ):

    def __init__( self, (client, address) ):
        
        threading.Thread.__init__(self)
        self.client = client
        self.address = address
        self.size = 1024

    def run ( self ):
        
        running = 1
        while running:
            try:
                # Accept client data
                data = self.client.recv(1024)
                # Close if quit received
                if data == 'quit':
                    self.client.close()
                    print 'Client '+repr(self.address)+' disconnected.'\
                          +timestamp(ts)
                    running = 0
                elif data:
                    print 'Got  '+repr(data)+'  from  '+repr(self.address)\
                          +timestamp(ts)
            # Handle errors
            except:
                print 'Connection to '+repr(self.address)+' was reset by peer.'\
                      +timestamp(ts)
                running = 0
        # Cleanup and close thread
        clients.remove(self.client)
        self.client.close()

if __name__ == '__main__':
    main = Server()
    main.run()

Threaded client with non-blocking user input

# Threaded streaming server client with non-blocking input.
# Without the use of 'select' module.
# Version: 1.0    Date: 3.12.2010
# Author: Andreas Urbanski

from socket import *
import threading
import time
import sys

connections = []
exittimeout = 3
main = None

def timestamp():
    return time.strftime('  [%Y-%m-%d %H:%M:%S]')

class Client:

    def __init__( self ):
        # Unless bound to 'localhost'
        self.host = '10.0.0.3'
        if len(sys.argv) < 2:
            self.port = 7070
        else:
            self.port = int(sys.argv[1])
        self.addr = (self.host, self.port)
        self.dsize = 1024
        self.sobj = None

    def run ( self ):

        self.sobj = socket(AF_INET, SOCK_STREAM)
        self.sobj.connect(self.addr)
        connections.append(self.sobj)
        
        # Store the connection address away
        addr = self.sobj.getpeername()
        print 'Client connected to '+repr(self.host)+' on '+str(self.port)+'.',
        print 'It is %s'% time.ctime()
        print 'Input ready\n'
        running = 1
        Input().start()
        
        while running:
            try:
                data = self.sobj.recv(self.dsize)
                if data:
                    print 'Got  '+repr(data)+'  from  '+repr(addr)+timestamp()
            except:
                print 'Connection to '+repr(addr)+' was reset by peer.'\
                      +timestamp()
                connections.remove(self.sobj)
                running = 0
        print '\nIf exit halts type \'quit\' to kill the stdinput thread.'
        time.sleep(exittimeout)
        self.sobj.close()
        sys.exit(0)

class Input ( threading.Thread ):
    # Works even with multiple connections
    def __init__( self ):
        threading.Thread.__init__(self)

    def run ( self ):
        running = 1
        while running:
            data = raw_input('')
            if data:
                for conn in connections:
                    addr = conn.getpeername()
                    try:
                        conn.send(data)
                        print 'Sent '+repr(data)+' to '+repr(addr[0])
                    except:
                        print 'Connection to '+repr(addr)+' was reset by peer.'\
                              +timestamp()
                        connections.remove(conn)
                if data == 'reconnect':
                    main.run()
                if data == 'quit':
                    print 'Input thread killed.'
                    running = 0
        # Cleanup and exit
        for conn in connections:
            conn.close()
            sys.exit(0)
        
if __name__ == '__main__':
    main = Client()
    main.run()

I've used a limited amount of code commenting in there but most parts should be self-explanatory. If you don't understand some part of the code feel free to ask and comment. Any comments are welcome!