Name: [16797] Tim Bower
Member: 65 months
Authored: 8 videos
Description: Assistant Professor Computer Systems Technology Kansas State University at Salina `Tim Bower <http://www.sal.ksu.edu/faculty/tim/>`_ My blog: `Open Source on the Prairie <http://prairiepy.blogspot.com/>`_ ...

Code Walk Through of Multi-Threaded Chat Server [ID:1550] (3/4)

in series: Python Network Programming

video tutorial by Tim Bower, added 07/09

(Showmedo is undergoing major changes. To report any problems viewing the videos please email us and include browser and OS specifics. Cheers - Kyran.)

This video uses the example of a multi-party chat server to demonstrate how to write a multi-threaded network server in Python using sockets.

Topics discussed include:

  1. Multi-threaded and asynchronous programming.
  2. Setting up a socket to accept new connections.
  3. Creating and managing threads.
  4. Sending and receiving data using a socket.
  5. Synchronization and locking to implement a readers and writers based message queue.

More discussion about this program and more Python network programming topics can be found on my Network Programming Study Guide

Here is a link to the specific page in the Study Guide where all the source code can be downloaded from.

#!/bin/env python
"""
  File: chatserver.py

  Chat Server with Threading
"""
# Copyright 2009 Tim Bower 
# This program was developed for education purposes for the Network
# Programming Class, CMST 355, at Kansas State University at Salina.
#
# This program is licensed as Open Source Software using the Apache License,
# Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# You are free to use, copy, distribute, and display the work, and to make
# derivative works. If you do, you must give the original author credit. The
# author specifically permits (and encourages) teachers to post, reproduce,
# and distribute some or all of this material for use in their classes or by
# their students.

import socket
import sys
import os
import threading
import time
#import traceback
from itertools import cycle

MAX_INDEX = 100                      # The max of cyclic index
MAX_LEN = 10                         # Message queue length
host = ''                            # Bind to all interfaces
port = 50000

def mesg_index(old, last, new):
    """
    This computes the index value of the message queue from where the reader
    should return messages.  It accounts for having a cyclic counter.
    This code is a little tricky because it has to catch all possible
    combinations.
    old -> index of oldest (first) message in queue
    last -> index of last message read by the client thread
    new -> index of newest (last) message in the queue
    """
    if new >= old:
        # normal case
        if last >= old and last < new:
            return (last - old + 1)
        else:
            return 0
    else:
        # cyclic roll over (new < old)
        if last >= old:
            return (last - old + 1)
        elif last < new:
            return (MAX_INDEX - old + last)
        else:
            return 0

class MSGQueue(object):
    """
    Manage a queue of messages for Chat, the threads will read and write to
    this object.  This is an implementation of the readers - writers problem
    with a bounded buffer.
    """
    def __init__(self):
        self.msg = []
        self.cyclic_count = cycle(range(MAX_INDEX))
        self.current = -1
        self.readers = 0
        self.writers = 0
        self.mutex1 = threading.Lock()
        self.mutex2 = threading.Lock()
        self.readPending = threading.Lock()
        self.writeBlock = threading.Lock()
        self.readBlock = threading.Lock()

# This is kind of complicated locking stuff. Don't worry about why there
# are so many locks, it just came from a book showing the known solution to
# the readers and writers problem. You may wonder if so many locks and
# semaphores are really needed.  Well, lots of of really smart people have
# studied the readers and writers problem and this is what they came up
# with as a solution that works with no deadlocks.  The safe plan for us is
# to just use a known solution.

# The only code I did not take directly from a book is what's inside the
# critical section for the reader and writer. The messages are kept in a
# list.  Each list is a tuple containing an index number, a time stamp and
# the message.  Each thread calls the reader on a regular basis to check if
# there are new messages that it has not yet sent to it's client.
# To keep the list from growing without bound, if it is at the MAX_LEN size,
# the oldest item is removed when a new item is added.

# The basic idea of the readers and writers algorithm is to use locks to
# quickly see how many other readers and writers there are.  writeBlock is the
# only lock held while reading the data.  Thus the reader only prevents writers
# from entering the critical section.  Both writeBlock and readBlock are held
# while writing to the data.  Thus the writer blocks readers and other writers.
# So, multiple readers or only one writer are allowed to access the data at a
# given time.  Multiple readers are allowed because the reader does not modify
# the data.  But since the writer does change the data, we can only allow one
# writer access to the critical section at a time.  We also don't want a reader
# in the critical section while a writer is there because the writer could mess
# up the reader by changing the data in the middle of it being read.


    def reader(self, lastread):
        self.readPending.acquire()
        self.readBlock.acquire()
        self.mutex1.acquire()
        self.readers = self.readers + 1
        if self.readers == 1: self.writeBlock.acquire()
        self.mutex1.release()
        self.readBlock.release()
        self.readPending.release()
        # here is the critical section
        if lastread == self.current: # or not len(self.msg):
            retVal = None
        else:
            MsgIndex = mesg_index(self.msg[0][0], lastread, self.current)
            retVal = self.msg[MsgIndex:]
        # End of critical section
        self.mutex1.acquire()
        self.readers = self.readers - 1
        if self.readers == 0: self.writeBlock.release()
        self.mutex1.release()
        return retVal

    def writer(self, data):
        self.mutex2.acquire()
        self.writers = self.writers + 1
        if self.writers == 1: self.readBlock.acquire()
        self.mutex2.release()
        self.writeBlock.acquire()
        # here is the critical section
        self.current = self.cyclic_count.next()
        self.msg.append((self.current, time.localtime(), data))
        while len(self.msg) > MAX_LEN:
            del self.msg[0]     # remove oldest item
        # End of critical section
        self.writeBlock.release()
        self.mutex2.acquire()
        self.writers = self.writers - 1
        if self.writers == 0: self.readBlock.release()
        self.mutex2.release()

def sendAll(sock, lastread):
    # this function just cuts down on some code duplication
    global chatQueue
    reading = chatQueue.reader(lastread)
    if reading == None: return lastread
    for (last, timeStmp, msg) in reading:
        sock.send("At %s -- %s" % (time.asctime(timeStmp), msg))
    return last

def clientExit(sock, peer, error=None):
    # this function just cuts down on some code duplication
    global chatQueue
    print "A disconnect by " + peer
    if error:
        msg = peer + " has exited -- " + error + "\r\n"
    else:
        msg = peer + " has exited\r\n"
    chatQueue.writer(msg)

def handlechild(clientsock):
    # Do the sending and receiving of data for one client
    global chatQueue
    # lastreads of -1 gets all available messages on first read, even 
    # if message index cycled back to zero.
    lastread = -1
    # the identity of each user is called peer - they are the peer on the other
    # end of the socket connection. 
    peer = clientsock.getpeername()
    print "Got connection from ", peer
    msg = str(peer) + " has joined\r\n"
    chatQueue.writer(msg)
    while 1:
        # check for and send any new messages
        lastread = sendAll(clientsock, lastread)
        try:
            data = clientsock.recv(4096)
        except socket.timeout:
            continue
        except socket.error:
            # caused by main thread doing a socket.close on this socket
            # It is a race condition if this exception is raised or not.
            print "Server shutdown"
            return
        except:  # some error or connection reset by peer
            clientExit(clientsock, str(peer))
            break
        if not len(data): # a disconnect (socket.close() by client)
            clientExit(clientsock, str(peer))
            break

        # Process the message received from the client
        # First check if it is a one of the special chat protocol messages.
        if data.startswith('/nick'):
            oldpeer = peer
            peer = data.replace('/nick', '', 1).strip()
            if len(peer):
                chatQueue.writer("%s now goes by %s\r\n" \
                                % (str(oldpeer), str(peer)))
            else: peer = oldpeer

        elif data.startswith('/quit'):
            bye = data.replace('/quit', '', 1).strip()
            if len(bye):
                msg = "%s is leaving now -- %s\r\n" % (str(peer), bye)
            else:
                msg = "%s is leaving now\r\n" % (str(peer))
            chatQueue.writer(msg)
            break            # exit the loop to disconnect

        else:
            # Not a special command, but a chat message
            chatQueue.writer("Message from %s:\r\n\t%s\r\n" \
                                % (str(peer), data))

    #-- End looping for messages from/to the client
    # Close the connection
    clientsock.close()

# Begin the main part of the program
if __name__ == '__main__':
    # One global message queue, which uses the readers and writers
    # synchronization algorithm.
    chatQueue = MSGQueue()
    clients = []

    # Set up the socket.
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((host, port))
    s.listen(3)

    while 1:
        print "Waiting for Connections"
        try:
            clientsock, clientaddr = s.accept()
            # set a timeout so it won't block forever on socket.recv().
            # Clients that are not doing anything check for new messages 
            # after each timeout.
            clientsock.settimeout(1)
        except KeyboardInterrupt:
            # shutdown - force the threads to close by closing their socket
            s.close()
            for sock in clients:
                sock.close()
            break
        #except:
        #    traceback.print_exc()
        #    continue

        clients.append(clientsock)
        t = threading.Thread(target = handlechild, args = [clientsock])
        t.setDaemon(1)
        t.start()

Got any questions?

Get answers in the ShowMeDo Learners Google Group.

Video statistics:

  • Video's rank shown in the most popular listing
  • Video plays: 992 (since July 30th)
  • Plays in last week: 2
  • Published: 63 months ago

Thank-yous, questions and comments

If this video tutorial was helpful please take some time to say thank-you to the authors for their hard work. Feel free to ask questions. Let the author know why their video tutorial was useful - what are you learning about? Did the video tutorial save you time? Would you like to see more?

You may also want to see our ShowMeDo Google Group to speak to our active users and authors.

Your email address will not be published.

Show some quick comments >>








All comments excluding tick-boxed quick-comments

It is very good. It gives me a very clear explaination on how multi thread working on network program.Thanks!!!


You never remove the clients from your self.clients once they part. Add this after line 214:

self.clients.remove(clientsock)


Showmedo is a peer-produced video-tutorials and screencasts site for free and open-source software (FOSS)- with the exception of some club videos, the large majority are free to watch and download.

how to help » about » faq »

Educating the Open-source Community With Showmedo

Although as important as the software it supports, education and documentation are relatively neglected in the Open-source world. Coders love to code, and explaining how best to use or improve the software tends to be deferred or even sidelined.

At Showmedo we believe the community can play a vital role here and also say thanks for the tools and software that make our lives easier. If you have a piece of software you love or a programming langugage you are enthusiastic about, why not make a screencast showing others how to use it? All the stuff you wish you'd been told, the tips, tricks, insights that would have saved you time and frustration.

Screencasting is easier than you think, and we're happy to help you. You can emailus for advice or just use some of the how-to screencasts on the site. This screencasting learning-pathis a good place to start.

Kudos and Thanks for Tim

By the Same Author

Content

Feedback

Showmedo's development is fairly rapid and bugs will inevitably creep in. If you have any problems please drop us a line using the contact address below. Likewise, any suggestions for improvements to the site are gratefully received.

feedback@showmedo.com