multitasking web server
authorLuke Kenneth Casson Leighton <lkcl@lkcl.net>
Mon, 12 Jul 2010 19:13:25 +0000 (20:13 +0100)
committerLuke Kenneth Casson Leighton <lkcl@lkcl.net>
Mon, 12 Jul 2010 19:13:25 +0000 (20:13 +0100)
SimpleAppHTTPServer.py [new file with mode: 0644]
httpd.py [new file with mode: 0644]
multitask.py [new file with mode: 0644]

diff --git a/SimpleAppHTTPServer.py b/SimpleAppHTTPServer.py
new file mode 100644 (file)
index 0000000..a615e2b
--- /dev/null
@@ -0,0 +1,222 @@
+"""Simple HTTP Server.
+
+This module builds on BaseHTTPServer by implementing the standard GET
+and HEAD requests in a fairly straightforward manner.
+
+"""
+
+
+__version__ = "0.6"
+
+__all__ = ["SimpleHTTPRequestHandler"]
+
+import os
+import posixpath
+import BaseHTTPServer
+import urllib
+import urlparse
+import cgi
+import shutil
+import mimetypes
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from StringIO import StringIO
+
+
+class SimpleAppHTTPRequestHandler(object):
+
+    """Simple HTTP request handler with GET and HEAD commands.
+
+    This serves files from the current directory and any of its
+    subdirectories.  The MIME type for files is determined by
+    calling the .guess_type() method.
+
+    The GET and HEAD requests are identical except that the HEAD
+    request omits the actual contents of the file.
+
+    """
+
+    server_version = "SimpleHTTP/" + __version__
+
+    def onGET(self, client, *args):
+        """Serve a GET request."""
+        self.client = client
+        hr = args[0]
+        self.path = hr.path
+        ka = hr.request_version == "HTTP/1.1"
+        f = self.send_head(hr, ka)
+        if f:
+            self.copyfile(f, hr.wfile)
+            f.close()
+
+    def onHEAD(self):
+        self.client = client
+        """Serve a HEAD request."""
+        hr = args[0]
+        f = self.send_head(hr, False)
+        if f:
+            f.close()
+
+    def send_head(self, hr, ka):
+        """Common code for GET and HEAD commands.
+
+        This sends the response code and MIME headers.
+
+        Return value is either a file object (which has to be copied
+        to the outputfile by the caller unless the command was HEAD,
+        and must be closed by the caller under all circumstances), or
+        None, in which case the caller has nothing further to do.
+
+        """
+        path = self.translate_path(self.path)
+        f = None
+        if os.path.isdir(path):
+            if not self.path.endswith('/'):
+                # redirect browser - doing basically what apache does
+                hr.send_response(301)
+                hr.send_header("Location", self.path + "/")
+                hr.end_headers()
+                return None
+            for index in "index.html", "index.htm":
+                index = os.path.join(path, index)
+                if os.path.exists(index):
+                    path = index
+                    break
+            else:
+                return self.list_directory(hr, path, ka)
+        ctype = self.guess_type(path)
+        if ctype.startswith('text/'):
+            mode = 'r'
+        else:
+            mode = 'rb'
+        try:
+            f = open(path, mode)
+        except IOError:
+            hr.send_error(404, "File not found")
+            return None
+        hr.send_response(200)
+        hr.send_header("Content-type", ctype)
+        fs = os.fstat(f.fileno())
+        hr.send_header("Content-Length", str(fs[6]))
+        hr.send_header("Last-Modified", hr.date_time_string(fs.st_mtime))
+        if ka:
+            hr.send_header("Connection", "keep-alive")
+        self.client.cookies.add_cookie_header(hr)
+        hr.end_headers()
+        return f
+
+    def list_directory(self, hr, path, ka):
+        """Helper to produce a directory listing (absent index.html).
+
+        Return value is either a file object, or None (indicating an
+        error).  In either case, the headers are sent, making the
+        interface the same as for send_head().
+
+        """
+        try:
+            list = os.listdir(path)
+        except os.error:
+            self.send_error(404, "No permission to list directory")
+            return None
+        list.sort(key=lambda a: a.lower())
+        f = StringIO()
+        displaypath = cgi.escape(urllib.unquote(self.path))
+        f.write("<title>Directory listing for %s</title>\n" % displaypath)
+        f.write("<h2>Directory listing for %s</h2>\n" % displaypath)
+        f.write("<hr>\n<ul>\n")
+        for name in list:
+            fullname = os.path.join(path, name)
+            displayname = linkname = name
+            # Append / for directories or @ for symbolic links
+            if os.path.isdir(fullname):
+                displayname = name + "/"
+                linkname = name + "/"
+            if os.path.islink(fullname):
+                displayname = name + "@"
+                # Note: a link to a directory displays with @ and links with /
+            f.write('<li><a href="%s">%s</a>\n'
+                    % (urllib.quote(linkname), cgi.escape(displayname)))
+        f.write("</ul>\n<hr>\n")
+        length = f.tell()
+        f.seek(0)
+        hr.send_response(200)
+        hr.send_header("Content-type", "text/html")
+        hr.send_header("Content-Length", str(length))
+        if ka:
+            hr.send_header("Connection", "keep-alive")
+        self.client.cookies.add_cookie_header(hr)
+        hr.end_headers()
+        return f
+
+    def translate_path(self, path):
+        """Translate a /-separated PATH to the local filename syntax.
+
+        Components that mean special things to the local file system
+        (e.g. drive or directory names) are ignored.  (XXX They should
+        probably be diagnosed.)
+
+        """
+        # abandon query parameters
+        path = urlparse.urlparse(path)[2]
+        path = posixpath.normpath(urllib.unquote(path))
+        words = path.split('/')
+        words = filter(None, words)
+        path = os.getcwd()
+        for word in words:
+            drive, word = os.path.splitdrive(word)
+            head, word = os.path.split(word)
+            if word in (os.curdir, os.pardir): continue
+            path = os.path.join(path, word)
+        return path
+
+    def copyfile(self, source, outputfile):
+        """Copy all data between two file objects.
+
+        The SOURCE argument is a file object open for reading
+        (or anything with a read() method) and the DESTINATION
+        argument is a file object open for writing (or
+        anything with a write() method).
+
+        The only reason for overriding this would be to change
+        the block size or perhaps to replace newlines by CRLF
+        -- note however that this the default server uses this
+        to copy binary data as well.
+
+        """
+        shutil.copyfileobj(source, outputfile)
+
+    def guess_type(self, path):
+        """Guess the type of a file.
+
+        Argument is a PATH (a filename).
+
+        Return value is a string of the form type/subtype,
+        usable for a MIME Content-type header.
+
+        The default implementation looks the file's extension
+        up in the table self.extensions_map, using application/octet-stream
+        as a default; however it would be permissible (if
+        slow) to look inside the data to make a better guess.
+
+        """
+
+        base, ext = posixpath.splitext(path)
+        if ext in self.extensions_map:
+            return self.extensions_map[ext]
+        ext = ext.lower()
+        if ext in self.extensions_map:
+            return self.extensions_map[ext]
+        else:
+            return self.extensions_map['']
+
+    if not mimetypes.inited:
+        mimetypes.init() # try to read system mime.types
+    extensions_map = mimetypes.types_map.copy()
+    extensions_map.update({
+        '': 'application/octet-stream', # Default
+        '.py': 'text/plain',
+        '.c': 'text/plain',
+        '.h': 'text/plain',
+        })
+
diff --git a/httpd.py b/httpd.py
new file mode 100644 (file)
index 0000000..a7cd598
--- /dev/null
+++ b/httpd.py
@@ -0,0 +1,850 @@
+# Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
+
+'''
+This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
+1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
+   It also uses the App abstract class to implement the applications.
+2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
+   derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
+   various streams from the client, represented using the Stream class.
+3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
+   and write of FLV file format.
+
+
+Typically an application can launch this server as follows:
+$ python rtmp.py
+
+To know the command line options use the -h option:
+$ python rtmp.py -h
+
+To start the server with a different directory for recording and playing FLV files from, use the following command.
+$ python rtmp.py -r some-other-directory/
+Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
+
+A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
+from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
+local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
+same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
+from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
+it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
+see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
+your played video will start appearing after some initial delay.
+
+
+If an application wants to use this module as a library, it can launch the server as follows:
+>>> agent = FlashServer()   # a new RTMP server instance
+>>> agent.root = 'flvs/'    # set the document root to be 'flvs' directory. Default is current './' directory.
+>>> agent.start()           # start the server
+>>> multitask.run()         # this is needed somewhere in the application to actually start the co-operative multitasking.
+
+
+If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
+setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
+the client connects to the server.
+
+class MyApp(App):         # a new MyApp extends the default App in rtmp module.
+    def __init__(self):   # constructor just invokes base class constructor
+        App.__init__(self)
+    def onConnect(self, client, *args):
+        result = App.onConnect(self, client, *args)   # invoke base class method first
+        def invokeAdded(self, client):                # define a method to invoke 'connected("some-arg")' on Flash client
+            client.call('connected', 'some-arg')
+            yield
+        multitask.add(invokeAdded(self, client))      # need to invoke later so that connection is established before callback
+...
+agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
+
+Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
+If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
+throw an exception and display the error message.
+
+'''
+
+import os, sys, time, struct, socket, traceback, multitask
+import threading, Queue
+import uuid
+
+from BaseHTTPServer import BaseHTTPRequestHandler
+from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
+from cStringIO import StringIO
+from cookielib import parse_ns_headers, CookieJar
+
+_debug = False
+
+class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
+
+    def setup(self):
+        self.connection = self.request
+        self.rfile = StringIO()
+        self.wfile = StringIO()
+
+    def finish(self):
+        pass
+
+    def info(self):
+        return self.headers
+
+    def get_full_url(self):
+        return self.path
+
+    def get_header(self, hdr, default):
+        return self.headers.getheader(hdr, default)
+
+    def has_header(self, hdr):
+        return False
+
+    def is_unverifiable(self):
+        return False
+
+    def add_unredirected_header(self, name, val):
+        if name == 'Cookie':
+            self.send_header("Set-Cookie", val)
+
+class ConnectionClosed:
+    'raised when the client closed the connection'
+
+class SockStream(object):
+    '''A class that represents a socket as a stream'''
+    def __init__(self, sock):
+        self.sock, self.buffer = sock, ''
+        self.bytesWritten = self.bytesRead = 0
+    
+    def close(self):
+        print "sock close"
+        fno = self.sock.fileno()
+        self.sock.close()
+        print "writable?", self.sock, fno
+        yield multitask.writable(fno, timeout=0.1)
+        print "is it?"
+        del self.sock
+        self.sock = None
+        
+    def readline(self):
+        try:
+            while True:
+                nl = self.buffer.find("\n")
+                if nl >= 0: # do not have newline in buffer
+                    data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
+                    raise StopIteration(data)
+                data = (yield multitask.recv(self.sock, 4096)) # read more from socket
+                if not data: raise ConnectionClosed
+                if _debug: print 'socket.read[%d] %r'%(len(data), data)
+                self.bytesRead += len(data)
+                self.buffer += data
+        except StopIteration: raise
+        except: raise ConnectionClosed # anything else is treated as connection closed.
+        
+    def read(self, count):
+        try:
+            while True:
+                if len(self.buffer) >= count: # do not have data in buffer
+                    data, self.buffer = self.buffer[:count], self.buffer[count:]
+                    raise StopIteration(data)
+                data = (yield multitask.recv(self.sock, 4096)) # read more from socket
+                if not data: raise ConnectionClosed
+                # if _debug: print 'socket.read[%d] %r'%(len(data), data)
+                self.bytesRead += len(data)
+                self.buffer += data
+        except StopIteration: raise
+        except: raise ConnectionClosed # anything else is treated as connection closed.
+        
+    def unread(self, data):
+        self.buffer = data + self.buffer
+            
+    def write(self, data):
+        while len(data) > 0: # write in 4K chunks each time
+            chunk, data = data[:4096], data[4096:]
+            self.bytesWritten += len(chunk)
+            if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
+            try: yield multitask.send(self.sock, chunk)
+            except: raise ConnectionClosed
+        if _debug: print 'socket.written'
+
+
+class Header(object):
+    FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
+    
+    def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
+        self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
+        self.extendedtime = 0
+        if channel<64: self.hdrdata = chr(channel)
+        elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
+        else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256) 
+    
+    def _appendExtendedTimestamp(self, data):
+        if self.time == 0xFFFFFF:
+            data += struct.pack('>I', self.extendedtime)
+        return data
+                    
+    def toBytes(self, control):
+        data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
+        if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
+        
+        data += struct.pack('>I', self.time & 0xFFFFFF)[1:]  # add time
+        if control == Header.TIME: return self._appendExtendedTimestamp(data)
+        
+        data += struct.pack('>I', self.size)[1:]  # size
+        data += chr(self.type)                    # type
+        if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
+        
+        data += struct.pack('<I', self.streamId)  # add streamId
+        return self._appendExtendedTimestamp(data)
+
+    def __repr__(self):
+        return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
+            % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
+
+class Message(object):
+    # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
+    RPC,  RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK,  CHUNK_SIZE = \
+    0x14, 0x11, 0x12, 0x0F,  0x13,      0x10,       0x08,  0x09,  0x03, 0x01
+    
+    def __init__(self, hdr=None, data=''):
+        if hdr is None: hdr = Header()
+        self.header, self.data = hdr, data
+    
+    # define properties type, streamId and time to access self.header.(property)
+    for p in ['type', 'streamId', 'time']:
+        exec 'def _g%s(self): return self.header.%s'%(p, p)
+        exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
+        exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
+    @property
+    def size(self): return len(self.data)
+            
+    def __repr__(self):
+        return ("<Message header=%r data=%r>"% (self.header, self.data))
+                
+class Protocol(object):
+    # constants
+    PING_SIZE           = 1536
+    DEFAULT_CHUNK_SIZE  = 128
+    MIN_CHANNEL_ID      = 3
+    PROTOCOL_CHANNEL_ID = 2
+    
+    def __init__(self, sock):
+        self.stream = SockStream(sock)
+        self.lastReadHeaders = dict() # indexed by channelId
+        self.incompletePackets = dict() #indexed by channelId
+        self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
+        self.lastWriteHeaders = dict() # indexed by streamId
+        self.nextChannelId = Protocol.MIN_CHANNEL_ID
+        self.writeLock = threading.Lock()
+        self.writeQueue = Queue.Queue()
+            
+    def messageReceived(self, msg):
+        yield
+            
+    def protocolMessage(self, msg):
+        if msg.type == Message.ACK: # respond to ACK requests
+            response = Message()
+            response.type, response.data = msg.type, msg.data
+            self.writeMessage(response)
+        elif msg.type == Message.CHUNK_SIZE:
+            self.readChunkSize = struct.unpack('>L', msg.data)[0]
+            
+    def connectionClosed(self):
+        yield
+                            
+    def parse(self):
+        try:
+            yield self.parseRequests()   # parse http requests
+        except ConnectionClosed:
+            yield self.connectionClosed()
+            if _debug: print 'parse connection closed'
+                    
+    def writeMessage(self, message):
+        self.writeQueue.put(message)
+            
+    def parseHandshake(self):
+        '''Parses the rtmp handshake'''
+        data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
+        yield self.stream.write(data)
+        data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
+        yield self.stream.write(data)
+    
+    def parseRequests(self):
+        '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
+        self.hr = MultitaskHTTPRequestHandler(self.stream, ("",0,), self.remote)
+        self.hr.close_connection = 1
+        self.cookies = CookieJar()
+
+        while True:
+
+            # prepare reading the request so that when it's handed
+            # over to "standard" HTTPRequestHandler, the data's already
+            # there.
+            print "parseRequests"
+            raw_requestline = (yield self.stream.readline())
+            if _debug: print "parseRequests, line", raw_requestline
+            if not raw_requestline:
+                raise ConnectionClosed 
+            data = ''
+            try:
+                while 1:
+                    line = (yield self.stream.readline())
+                    data += line
+                    if line in ['\n', '\r\n']:
+                        break
+            except StopIteration:
+                if _debug: print "parseRequests, stopiter"
+                raise
+            except:
+                if _debug:
+                    print 'parseRequests', \
+                          (traceback and traceback.print_exc() or None)
+
+                raise ConnectionClosed 
+
+            self.hr.raw_requestline = raw_requestline
+            self.hr.rfile.write(data)
+            print "parseRequests write after"
+            self.hr.rfile.seek(0)
+            print "parseRequests seek after"
+
+            if not self.hr.parse_request():
+                raise ConnectionClosed 
+            print "parseRequests parse_req after"
+            try:
+                yield self.messageReceived(self.hr)
+            except:
+                if _debug:
+                    print 'messageReceived', \
+                          (traceback and traceback.print_exc() or None)
+
+    def write(self):
+        '''Writes messages to stream'''
+        while True:
+            while self.writeQueue.empty(): (yield multitask.sleep(0.01))
+            data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
+            if _debug: print "write to stream", repr(data)
+            if data is None: 
+                # just in case TCP socket is not closed, close it.
+                try:
+                    print "stream closing"
+                    print self.stream
+                    yield self.stream.close()
+                    print "stream closed"
+                except: pass
+                break
+            
+            try:
+                yield self.stream.write(data)
+            except ConnectionClosed:
+                yield self.connectionClosed()
+            except:
+                print traceback.print_exc()
+
+class Command(object):
+    ''' Class for command / data messages'''
+    def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
+        '''Create a new command with given type, name, id, cmdData and args list.'''
+        self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
+        
+    def __repr__(self):
+        return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
+    
+    def setArg(self, arg):
+        self.args.append(arg)
+    
+    def getArg(self, index):
+        return self.args[index]
+    
+    @classmethod
+    def fromMessage(cls, message):
+        ''' initialize from a parsed RTMP message'''
+        assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
+
+        length = len(message.data)
+        if length == 0: raise ValueError('zero length message data')
+        
+        if message.type == Message.RPC3 or message.type == Message.DATA3:
+            assert message.data[0] == '\x00' # must be 0 in AMD3
+            data = message.data[1:]
+        else:
+            data = message.data
+        
+        amfReader = amf.AMF0(data)
+
+        inst = cls()
+        inst.name = amfReader.read() # first field is command name
+
+        try:
+            if message.type == Message.RPC:
+                inst.id = amfReader.read() # second field *may* be message id
+                inst.cmdData = amfReader.read() # third is command data
+            else:
+                inst.id = 0
+            inst.args = [] # others are optional
+            while True:
+                inst.args.append(amfReader.read())
+        except EOFError:
+            pass
+        return inst
+    
+    def toMessage(self):
+        msg = Message()
+        assert self.type
+        msg.type = self.type
+        output = amf.BytesIO()
+        amfWriter = amf.AMF0(output)
+        amfWriter.write(self.name)
+        if msg.type == Message.RPC or msg.type == Message.RPC3:
+            amfWriter.write(self.id)
+            amfWriter.write(self.cmdData)
+        for arg in self.args:
+            amfWriter.write(arg)
+        output.seek(0)
+        #hexdump.hexdump(output)
+        #output.seek(0)
+        if msg.type == Message.RPC3 or msg.type == Message.DATA3:
+            data = '\x00' + output.read()
+        else:
+            data = output.read()
+        msg.data = data
+        output.close()
+        return msg
+
+def getfilename(path, name, root):
+    '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
+    the the path present in the path variable.'''
+    ignore, ignore, scope = path.partition('/')
+    if scope: scope = scope + '/'
+    result = root + scope + name + '.flv'
+    if _debug: print 'filename=', result
+    return result
+
+class Stream(object):
+    '''The stream object that is used for RTMP stream.'''
+    count = 0;
+    def __init__(self, client):
+        self.client, self.id, self.name = client, 0, ''
+        self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
+        self.queue = multitask.Queue()
+        self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
+        if _debug: print self, 'created'
+        
+    def close(self):
+        if _debug: print self, 'closing'
+        if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
+        if self.playfile is not None: self.playfile.close(); self.playfile = None
+        self.client = None # to clear the reference
+        pass
+    
+    def __repr__(self):
+        return self._name;
+    
+    def recv(self):
+        '''Generator to receive new Message on this stream, or None if stream is closed.'''
+        return self.queue.get()
+    
+    def send(self, msg):
+        '''Method to send a Message or Command on this stream.'''
+        if isinstance(msg, Command):
+            msg = msg.toMessage()
+        msg.streamId = self.id
+        # if _debug: print self,'send'
+        if self.client is not None: self.client.writeMessage(msg)
+        
+class Client(Protocol):
+    '''The client object represents a single connected client to the server.'''
+    def __init__(self, sock, server, remote):
+        Protocol.__init__(self, sock)
+        self.server = server
+        self.remote = remote
+        self.agent = {}
+        self.streams = {}
+        self._nextCallId = 2
+        self._nextStreamId = 1
+        self.objectEncoding = 0.0
+        self.queue = multitask.Queue() # receive queue used by application
+        multitask.add(self.parse())
+        multitask.add(self.write())
+
+    def recv(self):
+        '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
+        return self.queue.get()
+    
+    def connectionClosed(self):
+        '''Called when the client drops the connection'''
+        if _debug: 'Client.connectionClosed'
+        self.writeMessage(None)
+        yield self.queue.put((None,None))
+            
+    def messageReceived(self, msg):
+        if _debug: print 'messageReceived cmd=', msg.command, msg.path
+        ch = msg.headers.getheaders("Cookie")
+        ch = parse_ns_headers(ch)
+        cookies = self.cookies._cookies_from_attrs_set(ch, msg)
+        has_sess = False
+        for c in cookies:
+            self.cookies.set_cookie(c)
+            if c.name == "session":
+                has_sess = True
+                msg.sess_cookie = c
+        if not has_sess:
+            t = ("session", uuid.uuid4().hex, {}, {})
+            msg.sess_cookie = self.cookies._cookie_from_cookie_tuple(t, msg)
+            self.cookies.set_cookie(msg.sess_cookie)
+        yield self.server.queue.put((self, msg)) # new connection
+
+    def accept(self):
+        '''Method to accept an incoming client.'''
+        response = Command()
+        response.id, response.name, response.type = 1, '_result', Message.RPC
+        if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
+        response.setArg(dict(level='status', code='NetConnection.Connect.Success',
+                        description='Connection succeeded.', details=None,
+                        objectEncoding=self.objectEncoding))
+        self.writeMessage(response.toMessage())
+            
+    def rejectConnection(self, reason=''):
+        '''Method to reject an incoming client.'''
+        response = Command()
+        response.id, response.name, response.type = 1, '_error', Message.RPC
+        response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
+                        description=reason, details=None))
+        self.writeMessage(response.toMessage())
+            
+    def call(self, method, *args):
+        '''Call a (callback) method on the client.'''
+        cmd = Command()
+        cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
+        cmd.args, cmd.cmdData = args, None
+        self._nextCallId += 1
+        if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
+        self.writeMessage(cmd.toMessage())
+            
+    def createStream(self):
+        ''' Create a stream on the server side'''
+        stream = Stream(self)
+        stream.id = self._nextStreamId
+        self.streams[stream.id] = stream
+        self._nextStreamId += 1
+        return stream
+
+
+class Server(object):
+    '''A RTMP server listens for incoming connections and informs the app.'''
+    def __init__(self, sock):
+        '''Create an RTMP server on the given bound TCP socket. The server will terminate
+        when the socket is disconnected, or some other error occurs in listening.'''
+        self.sock = sock
+        self.queue = multitask.Queue()  # queue to receive incoming client connections
+        multitask.add(self.run())
+
+    def recv(self):
+        '''Generator to wait for incoming client connections on this server and return
+        (client, args) or (None, None) if the socket is closed or some error.'''
+        return self.queue.get()
+        
+    def run(self):
+        try:
+            while True:
+                sock, remote = (yield multitask.accept(self.sock))  # receive client TCP
+                if sock == None:
+                    if _debug: print 'rtmp.Server accept(sock) returned None.' 
+                    break
+                if _debug: print 'connection received from', remote
+                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
+                client = Client(sock, self, remote)
+        except: 
+            if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
+            traceback.print_exc()
+        
+        if (self.sock):
+            try: self.sock.close(); self.sock = None
+            except: pass
+        if (self.queue):
+            yield self.queue.put((None, None))
+            self.queue = None
+
+class App(SimpleAppHTTPRequestHandler):
+    '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
+    count = 0
+    def __init__(self):
+        self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
+        self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
+        if _debug: print self.name, 'created'
+    def __del__(self):
+        if _debug: print self.name, 'destroyed'
+    @property
+    def clients(self):
+        '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
+        return self._clients[1:] if self._clients is not None else []
+    def onConnect(self, client, *args):
+        if _debug: print self.name, 'onConnect', client.path
+        return True
+    def onDisconnect(self, client):
+        if _debug: print self.name, 'onDisconnect', client.path
+    
+class HttpServer(object):
+    '''A RTMP server to record and stream Flash video.'''
+    def __init__(self):
+        '''Construct a new HttpServer. It initializes the local members.'''
+        self.sock = self.server = None
+        self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
+        self.clients = dict()  # list of clients indexed by scope. First item in list is app instance.
+        self.root = ''
+        
+    def start(self, host='0.0.0.0', port=8080):
+        '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
+        if _debug: print 'start', host, port
+        if not self.server:
+            sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            sock.bind((host, port))
+            if _debug: print 'listening on ', sock.getsockname()
+            sock.listen(5)
+            server = self.server = Server(sock) # start rtmp server on that socket
+            multitask.add(self.serverlistener())
+    
+    def stop(self):
+        if _debug: print 'stopping Flash server'
+        if self.server and self.sock:
+            try: self.sock.close(); self.sock = None
+            except: pass
+        self.server = None
+        
+    def serverlistener(self):
+        '''Server listener (generator). It accepts all connections and invokes client listener'''
+        try:
+            while True:  # main loop to receive new connections on the server
+                client, msg = (yield self.server.recv()) # receive an incoming client connection.
+                # TODO: we should reject non-localhost client connections.
+                if not client:                # if the server aborted abnormally,
+                    break                     #    hence close the listener.
+                if _debug: print 'client connection received', client, args
+                # if client.objectEncoding != 0 and client.objectEncoding != 3:
+                if client.objectEncoding != 0:
+                    yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
+                    yield client.connectionClosed()
+                else:
+                    print "cookies", str(client.cookies)
+                    session = msg.sess_cookie.value
+                    name, ignore, scope = msg.path.partition('/')
+                    if '*' not in self.apps and name not in self.apps:
+                        yield client.rejectConnection(reason='Application not found: ' + name)
+                    else: # create application instance as needed and add in our list
+                        if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
+                        app = self.apps[name] if name in self.apps else self.apps['*'] # application class
+                        if session in self.clients: inst = self.clients[session][0]
+                        else: inst = app()
+                        try: 
+                            methodname = "on%s" % msg.command
+                            method =  getattr(inst, methodname, None)
+                            result = method(client, msg)
+                            close_connection = msg.close_connection
+                            print "close connection", close_connection
+                        except: 
+                            if _debug: traceback.print_exc()
+                            yield client.rejectConnection(reason='Exception on %s' % methodname)
+                            continue
+                        if result is True or result is None:
+                            if session not in self.clients: 
+                                self.clients[session] = [inst]; inst._clients=self.clients[session]
+                            self.clients[session].append(client)
+                            msg.wfile.seek(0)
+                            data = msg.wfile.read()
+                            msg.wfile.seek(0)
+                            msg.wfile.truncate()
+                            yield client.writeMessage(data)
+                            if close_connection:
+                                if _debug:
+                                    print 'close_connection requested'
+                                try:
+                                    yield client.connectionClosed()
+                                except ClientClosed:
+                                    if _debug:
+                                        print 'close_connection done'
+                                    pass
+                        else: 
+                            yield client.rejectConnection(reason='Rejected in onConnect')
+        except StopIteration: raise
+        except: 
+            if _debug:
+                print 'serverlistener exception', \
+                (sys and sys.exc_info() or None)
+                traceback.print_exc()
+            
+    def clientlistener(self, client):
+        '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
+        try:
+            while True:
+                msg, arg = (yield client.recv())   # receive new message from client
+                if not msg:                   # if the client disconnected,
+                    if _debug: print 'connection closed from client'
+                    break                     #    come out of listening loop.
+                if msg == 'command':          # handle a new command
+                    multitask.add(self.clienthandler(client, arg))
+                elif msg == 'stream':         # a new stream is created, handle the stream.
+                    arg.client = client
+                    multitask.add(self.streamlistener(arg))
+        except StopIteration: raise
+        except:
+            if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
+            traceback.print_exc()
+            
+        # client is disconnected, clear our state for application instance.
+        if _debug: print 'cleaning up client', client.path
+        inst = None
+        if client.path in self.clients:
+            inst = self.clients[client.path][0]
+            self.clients[client.path].remove(client)
+        for stream in client.streams.values(): # for all streams of this client
+            self.closehandler(stream)
+        client.streams.clear() # and clear the collection of streams
+        if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
+            if _debug: print 'removing the application instance'
+            inst = self.clients[client.path][0]
+            inst._clients = None
+            del self.clients[client.path]
+        if inst is not None: inst.onDisconnect(client)
+        
+    def closehandler(self, stream):
+        '''A stream is closed explicitly when a closeStream command is received from given client.'''
+        if stream.client is not None:
+            inst = self.clients[stream.client.path][0]
+            if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
+                inst.onClose(stream.client, stream)
+                del inst.publishers[stream.name]
+            if stream.name in inst.players and stream in inst.players[stream.name]:
+                inst.onStop(stream.client, stream)
+                inst.players[stream.name].remove(stream)
+                if len(inst.players[stream.name]) == 0:
+                    del inst.players[stream.name]
+            stream.close()
+        
+    def clienthandler(self, client, cmd):
+        '''A generator to handle a single command on the client.'''
+        inst = self.clients[client.path][0]
+        if inst:
+            if cmd.name == '_error':
+                if hasattr(inst, 'onStatus'):
+                    result = inst.onStatus(client, cmd.args[0])
+            elif cmd.name == '_result':
+                if hasattr(inst, 'onResult'):
+                    result = inst.onResult(client, cmd.args[0])
+            else:
+                res, code, args = Command(), '_result', dict()
+                try: result = inst.onCommand(client, cmd.name, *cmd.args)
+                except:
+                    if _debug: print 'Client.call exception', (sys and sys.exc_info() or None) 
+                    code, args = '_error', dict()
+                res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
+                res.args, res.cmdData = args, None
+                if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
+                client.writeMessage(res.toMessage())
+                # TODO return result to caller
+        yield
+        
+    def streamlistener(self, stream):
+        '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
+        stream.recordfile = None # so that it doesn't complain about missing attribute
+        while True:
+            msg = (yield stream.recv())
+            if not msg:
+                if _debug: print 'stream closed'
+                self.closehandler(stream)
+                break
+            # if _debug: msg
+            multitask.add(self.streamhandler(stream, msg))
+            
+    def streamhandler(self, stream, message):
+        '''A generator to handle a single message on the stream.'''
+        try:
+            if message.type == Message.RPC:
+                cmd = Command.fromMessage(message)
+                if _debug: print 'streamhandler received cmd=', cmd
+                if cmd.name == 'publish':
+                    yield self.publishhandler(stream, cmd)
+                elif cmd.name == 'play':
+                    yield self.playhandler(stream, cmd)
+                elif cmd.name == 'closeStream':
+                    self.closehandler(stream)
+            else: # audio or video message
+                yield self.mediahandler(stream, message)
+        except GeneratorExit: pass
+        except StopIteration: pass
+        except: 
+            if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
+    
+    def publishhandler(self, stream, cmd):
+        '''A new stream is published. Store the information in the application instance.'''
+        try:
+            stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
+            stream.name = cmd.args[0]
+            if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
+            inst = self.clients[stream.client.path][0]
+            if (stream.name in inst.publishers):
+                raise ValueError, 'Stream name already in use'
+            inst.publishers[stream.name] = stream # store the client for publisher
+            result = inst.onPublish(stream.client, stream)
+            
+            if stream.mode == 'record' or stream.mode == 'append':
+                stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
+            response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
+            yield stream.send(response)
+        except ValueError, E: # some error occurred. inform the app.
+            if _debug: print 'error in publishing stream', str(E)
+            response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
+            yield stream.send(response)
+
+    def playhandler(self, stream, cmd):
+        '''A new stream is being played. Just updated the players list with this stream.'''
+        inst = self.clients[stream.client.path][0]
+        name = stream.name = cmd.args[0]  # store the stream's name
+        start = cmd.args[1] if len(cmd.args) >= 2 else -2
+        if name not in inst.players:
+            inst.players[name] = [] # initialize the players for this stream name
+        if stream not in inst.players[name]: # store the stream as players of this name
+            inst.players[name].append(stream)
+        path = getfilename(stream.client.path, stream.name, self.root)
+        if os.path.exists(path):
+            stream.playfile = FLV().open(path)
+            multitask.add(stream.playfile.reader(stream))
+        if _debug: print 'playing stream=', name, 'start=', start
+        result = inst.onPlay(stream.client, stream)
+        response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
+        yield stream.send(response)
+            
+    def mediahandler(self, stream, message):
+        '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
+        if stream.client is not None:
+            inst = self.clients[stream.client.path][0]
+            result = inst.onPublishData(stream.client, stream, message)
+            if result:
+                client = stream.client
+                for s in (inst.players.get(stream.name, [])):
+                    # if _debug: print 'D', stream.name, s.name
+                    result = inst.onPlayData(s.client, s, message)
+                    if result:
+                        yield s.send(message)
+                if stream.recordfile is not None:
+                    stream.recordfile.write(message)
+
+# The main routine to start, run and stop the service
+if __name__ == '__main__':
+    print "optparse"
+    from optparse import OptionParser
+    parser = OptionParser()
+    parser.add_option('-i', '--host',    dest='host',    default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
+    parser.add_option('-p', '--port',    dest='port',    default=8080, type="int", help='listening port number. Default 8080')
+    parser.add_option('-r', '--root',    dest='root',    default='./',       help="document root directory. Default './'")
+    parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
+    (options, args) = parser.parse_args()
+    
+    _debug = options.verbose
+    try:
+        if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
+        agent = HttpServer()
+        agent.root = options.root
+        agent.start(options.host, options.port)
+        if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
+        multitask.run()
+    except KeyboardInterrupt:
+        print traceback.print_exc()
+        pass
+    except:
+        print "exception"
+        print traceback.print_exc()
+    if _debug: print time.asctime(), 'Flash Server Stops'
diff --git a/multitask.py b/multitask.py
new file mode 100644 (file)
index 0000000..0ccdfd2
--- /dev/null
@@ -0,0 +1,1281 @@
+################################################################################
+#
+# Copyright (c) 2007 Christopher J. Stawarz
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+################################################################################
+
+
+
+"""
+
+Cooperative multitasking and asynchronous I/O using generators
+
+multitask allows Python programs to use generators (a.k.a. coroutines)
+to perform cooperative multitasking and asynchronous I/O.
+Applications written using multitask consist of a set of cooperating
+tasks that yield to a shared task manager whenever they perform a
+(potentially) blocking operation, such as I/O on a socket or getting
+data from a queue.  The task manager temporarily suspends the task
+(allowing other tasks to run in the meantime) and then restarts it
+when the blocking operation is complete.  Such an approach is suitable
+for applications that would otherwise have to use select() and/or
+multiple threads to achieve concurrency.
+
+The functions and classes in the multitask module allow tasks to yield
+for I/O operations on sockets and file descriptors, adding/removing
+data to/from queues, or sleeping for a specified interval.  When
+yielding, a task can also specify a timeout.  If the operation for
+which the task yielded has not completed after the given number of
+seconds, the task is restarted, and a Timeout exception is raised at
+the point of yielding.
+
+As a very simple example, here's how one could use multitask to allow
+two unrelated tasks to run concurrently:
+
+  >>> def printer(message):
+  ...     while True:
+  ...         print message
+  ...         yield
+  ... 
+  >>> multitask.add(printer('hello'))
+  >>> multitask.add(printer('goodbye'))
+  >>> multitask.run()
+  hello
+  goodbye
+  hello
+  goodbye
+  hello
+  goodbye
+  [and so on ...]
+
+For a more useful example, here's how one could implement a
+multitasking server that can handle multiple concurrent client
+connections:
+
+  def listener(sock):
+      while True:
+          conn, address = (yield multitask.accept(sock))
+          multitask.add(client_handler(conn))
+
+  def client_handler(sock):
+      while True:
+          request = (yield multitask.recv(sock, 1024))
+          if not request:
+              break
+          response = handle_request(request)
+          yield multitask.send(sock, response)
+
+  multitask.add(listener(sock))
+  multitask.run()
+
+Tasks can also yield other tasks, which allows for composition of
+tasks and reuse of existing multitasking code.  A child task runs
+until it either completes or raises an exception.  To return output to
+its parent, a child task raises StopIteration, passing the output
+value(s) to the StopIteration constructor.  An unhandled exception
+raised within a child task is propagated to its parent.  For example:
+
+  >>> def parent():
+  ...     print (yield return_none())
+  ...     print (yield return_one())
+  ...     print (yield return_many())
+  ...     try:
+  ...         yield raise_exception()
+  ...     except Exception, e:
+  ...         print 'caught exception: %s' % e
+  ... 
+  >>> def return_none():
+  ...     yield
+  ...     # do nothing
+  ...     # or return
+  ...     # or raise StopIteration
+  ...     # or raise StopIteration(None)
+  ... 
+  >>> def return_one():
+  ...     yield
+  ...     raise StopIteration(1)
+  ... 
+  >>> def return_many():
+  ...     yield
+  ...     raise StopIteration(2, 3)  # or raise StopIteration((2, 3))
+  ... 
+  >>> def raise_exception():
+  ...     yield
+  ...     raise RuntimeError('foo')
+  ... 
+  >>> multitask.add(parent())
+  >>> multitask.run()
+  None
+  1
+  (2, 3)
+  caught exception: foo
+
+"""
+
+
+import collections
+import errno
+from functools import partial
+import heapq
+import os
+import select
+import sys
+import time
+import types
+
+
+__author__   = 'Christopher Stawarz <cstawarz@csail.mit.edu>'
+__version__  = '0.2.0'
+__revision__ = int('$Revision: 5025 $'.split()[1])
+
+
+
+################################################################################
+#
+# Timeout exception type
+#
+################################################################################
+
+
+
+class Timeout(Exception):
+    'Raised in a yielding task when an operation times out'
+    pass
+
+
+
+################################################################################
+#
+# _ChildTask class
+#
+################################################################################
+
+
+
+class _ChildTask(object):
+
+    def __init__(self, parent, task):
+        self.parent = parent
+        self.task = task
+
+    def send(self, value):
+        return self.task.send(value)
+
+    def throw(self, type, value=None, traceback=None):
+        return self.task.throw(type, value, traceback)
+
+
+
+################################################################################
+#
+# YieldCondition class
+#
+################################################################################
+
+
+
+class YieldCondition(object):
+
+    """
+
+    Base class for objects that are yielded by a task to the task
+    manager and specify the condition(s) under which the task should
+    be restarted.  Only subclasses of this class are useful to
+    application code.
+
+    """
+
+    def __init__(self, timeout=None):
+        """
+
+        If timeout is None, the task will be suspended indefinitely
+        until the condition is met.  Otherwise, if the condition is
+        not met within timeout seconds, a Timeout exception will be
+        raised in the yielding task.
+
+        """
+
+        self.task = None
+        self.handle_expiration = None
+
+        if timeout is None:
+            self.expiration = None
+        else:
+            self.expiration = time.time() + float(timeout)
+
+    def _expires(self):
+        return (self.expiration is not None)
+
+
+
+################################################################################
+#
+# _SleepDelay class and related functions
+#
+################################################################################
+
+
+
+class _SleepDelay(YieldCondition):
+
+    def __init__(self, seconds):
+        seconds = float(seconds)
+        if seconds <= 0.0:
+            raise ValueError("'seconds' must be greater than 0")
+        super(_SleepDelay, self).__init__(seconds)
+
+
+def sleep(seconds):
+    """
+
+    A task that yields the result of this function will be resumed
+    after the specified number of seconds have elapsed.  For example:
+
+      while too_early():
+          yield sleep(5)  # Sleep for five seconds
+      do_something()      # Done sleeping; get back to work
+
+    """
+
+    return _SleepDelay(seconds)
+
+
+
+################################################################################
+#
+# FDReady class and related functions
+#
+################################################################################
+
+
+
+class FDReady(YieldCondition):
+
+    """
+
+    A task that yields an instance of this class will be suspended
+    until a specified file descriptor is ready for I/O.
+
+    """
+
+    def __init__(self, fd, read=False, write=False, exc=False, timeout=None):
+        """
+
+        Resume the yielding task when fd is ready for reading,
+        writing, and/or "exceptional" condition handling.  fd can be
+        any object accepted by select.select() (meaning an integer or
+        an object with a fileno() method that returns an integer).
+        Any exception raised by select() due to fd will be re-raised
+        in the yielding task.
+
+        If timeout is not None, a Timeout exception will be raised in
+        the yielding task if fd is not ready after timeout seconds
+        have elapsed.
+
+        """
+
+        super(FDReady, self).__init__(timeout)
+
+        self.fd = (fd if _is_file_descriptor(fd) else fd.fileno())
+
+        if not (read or write or exc):
+            raise ValueError("'read', 'write', and 'exc' cannot all be false")
+        self.read = read
+        self.write = write
+        self.exc = exc
+
+    def fileno(self):
+        'Return the file descriptor on which the yielding task is waiting'
+        return self.fd
+
+    def _add_to_fdsets(self, read_fds, write_fds, exc_fds):
+        for add, fdset in ((self.read, read_fds),
+                           (self.write, write_fds),
+                           (self.exc, exc_fds)):
+            if add:
+                fdset.add(self)
+
+    def _remove_from_fdsets(self, read_fds, write_fds, exc_fds):
+        for fdset in (read_fds, write_fds, exc_fds):
+            fdset.discard(self)
+
+
+def _is_file_descriptor(fd):
+    return isinstance(fd, (int, long))
+
+
+def readable(fd, timeout=None):
+    """
+
+    A task that yields the result of this function will be resumed
+    when fd is readable.  If timeout is not None, a Timeout exception
+    will be raised in the yielding task if fd is not readable after
+    timeout seconds have elapsed.  For example:
+
+      try:
+          yield readable(sock, timeout=5)
+          data = sock.recv(1024)
+      except Timeout:
+          # No data after 5 seconds
+
+    """
+
+    return FDReady(fd, read=True, timeout=timeout)
+
+
+def writable(fd, timeout=None):
+    """
+
+    A task that yields the result of this function will be resumed
+    when fd is writable.  If timeout is not None, a Timeout exception
+    will be raised in the yielding task if fd is not writable after
+    timeout seconds have elapsed.  For example:
+
+      try:
+          yield writable(sock, timeout=5)
+          nsent = sock.send(data)
+      except Timeout:
+          # Can't send after 5 seconds
+
+    """
+
+    return FDReady(fd, write=True, timeout=timeout)
+
+
+
+################################################################################
+#
+# FDAction class and related functions
+#
+################################################################################
+
+
+
+class FDAction(FDReady):
+
+    """
+
+    A task that yields an instance of this class will be suspended
+    until an I/O operation on a specified file descriptor is complete.
+
+    """
+
+    def __init__(self, fd, func, args=(), kwargs={}, read=False, write=False,
+                 exc=False):
+        """
+
+        Resume the yielding task when fd is ready for reading,
+        writing, and/or "exceptional" condition handling.  fd can be
+        any object accepted by select.select() (meaning an integer or
+        an object with a fileno() method that returns an integer).
+        Any exception raised by select() due to fd will be re-raised
+        in the yielding task.
+
+        The value of the yield expression will be the result of
+        calling func with the specified args and kwargs (which
+        presumably performs a read, write, or other I/O operation on
+        fd).  If func raises an exception, it will be re-raised in the
+        yielding task.  Thus, FDAction is really just a convenient
+        subclass of FDReady that requests that the task manager
+        perform an I/O operation on the calling task's behalf.
+
+        If kwargs contains a timeout argument that is not None, a
+        Timeout exception will be raised in the yielding task if fd is
+        not ready after timeout seconds have elapsed.
+
+        """
+
+        timeout = kwargs.pop('timeout', None)
+        super(FDAction, self).__init__(fd, read, write, exc, timeout)
+
+        self.func = func
+        self.args = args
+        self.kwargs = kwargs
+
+    def _eval(self):
+        return self.func(*(self.args), **(self.kwargs))
+
+
+def read(fd, *args, **kwargs):
+    """
+
+    A task that yields the result of this function will be resumed
+    when fd is readable, and the value of the yield expression will be
+    the result of reading from fd.  If a timeout keyword is given and
+    is not None, a Timeout exception will be raised in the yielding
+    task if fd is not readable after timeout seconds have elapsed.
+    Other arguments will be passed to the read function (os.read() if
+    fd is an integer, fd.read() otherwise).  For example:
+
+      try:
+          data = (yield read(fd, 1024, timeout=5))
+      except Timeout:
+          # No data after 5 seconds
+
+    """
+
+    func = (partial(os.read, fd) if _is_file_descriptor(fd) else fd.read)
+    return FDAction(fd, func, args, kwargs, read=True)
+
+
+def readline(fd, *args, **kwargs):
+    """
+
+    A task that yields the result of this function will be resumed
+    when fd is readable, and the value of the yield expression will be
+    the result of reading a line from fd.  If a timeout keyword is
+    given and is not None, a Timeout exception will be raised in the
+    yielding task if fd is not readable after timeout seconds have
+    elapsed.  Other arguments will be passed to fd.readline().  For
+    example:
+
+      try:
+          data = (yield readline(fd, timeout=5))
+      except Timeout:
+          # No data after 5 seconds
+
+    """
+
+    return FDAction(fd, fd.readline, args, kwargs, read=True)
+
+
+def write(fd, *args, **kwargs):
+    """
+
+    A task that yields the result of this function will be resumed
+    when fd is writable, and the value of the yield expression will be
+    the result of writing to fd.  If a timeout keyword is given and is
+    not None, a Timeout exception will be raised in the yielding task
+    if fd is not writable after timeout seconds have elapsed.  Other
+    arguments will be passed to the write function (os.write() if fd
+    is an integer, fd.write() otherwise).  For example:
+
+      try:
+          nbytes = (yield write(fd, data, timeout=5))
+      except Timeout:
+          # Can't write after 5 seconds
+
+    """
+
+    func = (partial(os.write, fd) if _is_file_descriptor(fd) else fd.write)
+    return FDAction(fd, func, args, kwargs, write=True)
+
+
+def accept(sock, *args, **kwargs):
+    """
+
+    A task that yields the result of this function will be resumed
+    when sock is readable, and the value of the yield expression will
+    be the result of accepting a new connection on sock.  If a timeout
+    keyword is given and is not None, a Timeout exception will be
+    raised in the yielding task if sock is not readable after timeout
+    seconds have elapsed.  Other arguments will be passed to
+    sock.accept().  For example:
+
+      try:
+          conn, address = (yield accept(sock, timeout=5))
+      except Timeout:
+          # No connections after 5 seconds
+
+    """
+
+    return FDAction(sock, sock.accept, args, kwargs, read=True)
+
+
+def recv(sock, *args, **kwargs):
+    """
+
+    A task that yields the result of this function will be resumed
+    when sock is readable, and the value of the yield expression will
+    be the result of receiving from sock.  If a timeout keyword is
+    given and is not None, a Timeout exception will be raised in the
+    yielding task if sock is not readable after timeout seconds have
+    elapsed.  Other arguments will be passed to sock.recv().  For
+    example:
+
+      try:
+          data = (yield recv(sock, 1024, timeout=5))
+      except Timeout:
+          # No data after 5 seconds
+
+    """
+
+    return FDAction(sock, sock.recv, args, kwargs, read=True)
+
+
+def recvfrom(sock, *args, **kwargs):
+    """
+
+    A task that yields the result of this function will be resumed
+    when sock is readable, and the value of the yield expression will
+    be the result of receiving from sock.  If a timeout keyword is
+    given and is not None, a Timeout exception will be raised in the
+    yielding task if sock is not readable after timeout seconds have
+    elapsed.  Other arguments will be passed to sock.recvfrom().  For
+    example:
+
+      try:
+          data, address = (yield recvfrom(sock, 1024, timeout=5))
+      except Timeout:
+          # No data after 5 seconds
+
+    """
+
+    return FDAction(sock, sock.recvfrom, args, kwargs, read=True)
+
+
+def send(sock, *args, **kwargs):
+    """
+
+    A task that yields the result of this function will be resumed
+    when sock is writable, and the value of the yield expression will
+    be the result of sending to sock.  If a timeout keyword is given
+    and is not None, a Timeout exception will be raised in the
+    yielding task if sock is not writable after timeout seconds have
+    elapsed.  Other arguments will be passed to the sock.send().  For
+    example:
+
+      try:
+          nsent = (yield send(sock, data, timeout=5))
+      except Timeout:
+          # Can't send after 5 seconds
+
+    """
+
+    return FDAction(sock, sock.send, args, kwargs, write=True)
+
+
+def sendto(sock, *args, **kwargs):
+    """
+
+    A task that yields the result of this function will be resumed
+    when sock is writable, and the value of the yield expression will
+    be the result of sending to sock.  If a timeout keyword is given
+    and is not None, a Timeout exception will be raised in the
+    yielding task if sock is not writable after timeout seconds have
+    elapsed.  Other arguments will be passed to the sock.sendto().
+    For example:
+
+      try:
+          nsent = (yield sendto(sock, data, address, timeout=5))
+      except Timeout:
+          # Can't send after 5 seconds
+
+    """
+
+    return FDAction(sock, sock.sendto, args, kwargs, write=True)
+
+
+
+################################################################################
+#
+# Queue and _QueueAction classes
+#
+################################################################################
+
+
+
+class Queue(object):
+
+    """
+
+    A multi-producer, multi-consumer FIFO queue (similar to
+    Queue.Queue) that can be used for exchanging data between tasks
+
+    """
+
+    def __init__(self, contents=(), maxsize=0):
+        """
+
+        Create a new Queue instance.  contents is a sequence (empty by
+        default) containing the initial contents of the queue.  If
+        maxsize is greater than 0, the queue will hold a maximum of
+        maxsize items, and put() will block until space is available
+        in the queue.
+
+        """
+
+        self.maxsize = int(maxsize)
+        self._queue = collections.deque(contents)
+
+    def __len__(self):
+        'Return the number of items in the queue'
+        return len(self._queue)
+
+    def _get(self):
+        return self._queue.popleft()
+
+    def _put(self, item):
+        self._queue.append(item)
+
+    def empty(self):
+        'Return True is the queue is empty, False otherwise'
+        return (len(self) == 0)
+
+    def full(self):
+        'Return True is the queue is full, False otherwise'
+        return ((len(self) >= self.maxsize) if (self.maxsize > 0) else False)
+
+    def get(self, timeout=None):
+        """
+
+        A task that yields the result of this method will be resumed
+        when an item is available in the queue, and the value of the
+        yield expression will be the item.  If timeout is not None, a
+        Timeout exception will be raised in the yielding task if an
+        item is not available after timeout seconds have elapsed.  For
+        example:
+
+          try:
+              item = (yield queue.get(timeout=5))
+          except Timeout:
+              # No item available after 5 seconds
+
+        """
+
+        return _QueueAction(self, timeout=timeout)
+
+    def put(self, item, timeout=None):
+        """
+
+        A task that yields the result of this method will be resumed
+        when item has been added to the queue.  If timeout is not
+        None, a Timeout exception will be raised in the yielding task
+        if no space is available after timeout seconds have elapsed.
+        For example:
+
+          try:
+              yield queue.put(item, timeout=5)
+          except Timeout:
+              # No space available after 5 seconds
+
+        """
+
+        return _QueueAction(self, item, timeout=timeout)
+
+
+class _QueueAction(YieldCondition):
+
+    NO_ITEM = object()
+
+    def __init__(self, queue, item=NO_ITEM, timeout=None):
+        super(_QueueAction, self).__init__(timeout)
+        if not isinstance(queue, Queue):
+            raise TypeError("'queue' must be a Queue instance")
+        self.queue = queue
+        self.item = item
+
+
+################################################################################
+#
+# SmartQueue and _SmartQueueAction classes
+#
+################################################################################
+
+
+
+class SmartQueue(object):
+
+    """
+
+    A multi-producer, multi-consumer FIFO queue (similar to
+    Queue.Queue) that can be used for exchanging data between tasks.
+    The difference with Queue is that this implements filtering criteria
+    on get and allows multiple get to be signalled for the same put. 
+    On the downside, this uses list instead of deque and has lower
+    performance.
+    
+    """
+
+    def __init__(self, contents=(), maxsize=0):
+        """
+
+        Create a new Queue instance.  contents is a sequence (empty by
+        default) containing the initial contents of the queue.  If
+        maxsize is greater than 0, the queue will hold a maximum of
+        maxsize items, and put() will block until space is available
+        in the queue.
+
+        """
+
+        self.maxsize = int(maxsize)
+        self._pending =  list(contents)
+
+    def __len__(self):
+        'Return the number of items in the queue'
+        return len(self._pending)
+
+    def _get(self, criteria=None):
+        #self._pending = filter(lambda x: x[1]<=now, self._pending) # remove expired ones
+        if criteria:
+            found = filter(lambda x: criteria(x), self._pending)   # check any matching criteria
+            if found: 
+                self._pending.remove(found[0])
+                return found[0]
+            else:
+                return None
+        else:
+            return self._pending.pop(0) if self._pending else None
+
+    def _put(self, item):
+        self._pending.append(item)
+
+    def empty(self):
+        'Return True is the queue is empty, False otherwise'
+        return (len(self) == 0)
+
+    def full(self):
+        'Return True is the queue is full, False otherwise'
+        return ((len(self) >= self.maxsize) if (self.maxsize > 0) else False)
+
+    def get(self, timeout=None, criteria=None):
+        """
+
+        A task that yields the result of this method will be resumed
+        when an item is available in the queue and the item matches the
+        given criteria (a function, usually lambda), and the value of the
+        yield expression will be the item.  If timeout is not None, a
+        Timeout exception will be raised in the yielding task if an
+        item is not available after timeout seconds have elapsed.  For
+        example:
+
+          try:
+              item = (yield queue.get(timeout=5, criteria=lambda x: x.name='kundan'))
+          except Timeout:
+              # No item available after 5 seconds
+
+        """
+
+        return _SmartQueueAction(self, timeout=timeout, criteria=criteria)
+
+    def put(self, item, timeout=None):
+        """
+
+        A task that yields the result of this method will be resumed
+        when item has been added to the queue.  If timeout is not
+        None, a Timeout exception will be raised in the yielding task
+        if no space is available after timeout seconds have elapsed.
+        TODO: Otherwise if space is available, the timeout specifies how 
+        long to keep the item in the queue before discarding it if it
+        is not fetched in a get. In this case it doesnot throw exception. 
+        For example:
+
+          try:
+              yield queue.put(item, timeout=5)
+          except Timeout:
+              # No space available after 5 seconds
+
+        """
+
+        return _SmartQueueAction(self, item, timeout=timeout)
+
+
+class _SmartQueueAction(YieldCondition):
+
+    NO_ITEM = object()
+
+    def __init__(self, queue, item=NO_ITEM, timeout=None, criteria=None):
+        super(_SmartQueueAction, self).__init__(timeout)
+        if not isinstance(queue, SmartQueue):
+            raise TypeError("'queue' must be a SmartQueue instance")
+        self.queue = queue
+        self.item = item
+        self.criteria = criteria
+        self.expires = (timeout is not None) and (time.time() + timeout) or 0
+
+
+################################################################################
+#
+# TaskManager class
+#
+################################################################################
+
+
+
+class TaskManager(object):
+
+    """
+
+    Engine for running a set of cooperatively-multitasking tasks
+    within a single Python thread
+
+    """
+
+    def __init__(self):
+        """
+
+        Create a new TaskManager instance.  Generally, there will only
+        be one of these per Python process.  If you want to run two
+        existing instances simultaneously, merge them first, then run
+        one or the other.
+
+        """
+
+        self._queue       = collections.deque()
+        self._read_waits  = set()
+        self._write_waits = set()
+        self._exc_waits   = set()
+        self._queue_waits = collections.defaultdict(self._double_deque)
+        self._timeouts    = []
+
+    @staticmethod
+    def _double_deque():
+        return (collections.deque(), collections.deque())
+
+    def merge(self, other):
+        """
+
+        Merge this TaskManager with another.  After the merge, the two
+        objects share the same (merged) internal data structures, so
+        either can be used to manage the combined task set.
+
+        """
+
+        if not isinstance(other, TaskManager):
+            raise TypeError("'other' must be a TaskManager instance")
+
+        # Merge the data structures
+        self._queue.extend(other._queue)
+        self._read_waits  |= other._read_waits
+        self._write_waits |= other._write_waits
+        self._exc_waits   |= other._exc_waits
+        self._queue_waits.update(other._queue_waits)
+        self._timeouts.extend(other._timeouts)
+        heapq.heapify(self._timeouts)
+
+        # Make other reference the merged data structures.  This is
+        # necessary because other's tasks may reference and use other
+        # (e.g. to add a new task in response to an event).
+        other._queue       = self._queue
+        other._read_waits  = self._read_waits
+        other._write_waits = self._write_waits
+        other._exc_waits   = self._exc_waits
+        other._queue_waits = self._queue_waits
+        other._timeouts    = self._timeouts
+
+    def add(self, task):
+        'Add a new task (i.e. a generator instance) to the run queue'
+
+        if not isinstance(task, types.GeneratorType):
+            raise TypeError("'task' must be a generator")
+        self._enqueue(task)
+
+    def _enqueue(self, task, input=None, exc_info=()):
+        self._queue.append((task, input, exc_info))
+
+    def run(self):
+        """
+
+        Call run_next() repeatedly until there are no tasks that are
+        currently runnable, waiting for I/O, or waiting to time out.
+        Note that this method can block indefinitely (e.g. if there
+        are only I/O waits and no timeouts).  If this is unacceptable,
+        use run_next() instead.
+
+        """
+        while self.has_runnable() or self.has_io_waits() or self.has_timeouts():
+            self.run_next()
+
+    def has_runnable(self):
+        """
+
+        Return True is there are runnable tasks in the queue, False
+        otherwise
+
+        """
+        return bool(self._queue)
+
+    def has_io_waits(self):
+        """
+
+        Return True is there are tasks waiting for I/O, False
+        otherwise
+
+        """
+        return bool(self._read_waits or self._write_waits or self._exc_waits)
+
+    def has_timeouts(self):
+        """
+
+        Return True is there are tasks with pending timeouts, False
+        otherwise
+
+        """
+        return bool(self._timeouts)
+
+    def run_next(self, timeout=None):
+        """
+
+        Perform one iteration of the run cycle: check whether any
+        pending I/O operations can be performed, check whether any
+        timeouts have expired, then run all currently runnable tasks.
+
+        The timeout argument specifies the maximum time to wait for
+        some task to become runnable.  If timeout is None and there
+        are no currently runnable tasks, but there are tasks waiting
+        to perform I/O or time out, then this method will block until
+        at least one of the waiting tasks becomes runnable.  To
+        prevent this method from blocking indefinitely, use timeout to
+        specify the maximum number of seconds to wait.
+
+        If there are runnable tasks in the queue when run_next() is
+        called, then it will check for I/O readiness using a
+        non-blocking call to select() (i.e. a poll), and only
+        already-expired timeouts will be handled.  This ensures both
+        that the task manager is never idle when tasks can be run and
+        that tasks waiting for I/O never starve.
+
+        """
+
+        if self.has_io_waits():
+            self._handle_io_waits(self._fix_run_timeout(timeout))
+
+        if self.has_timeouts():
+            self._handle_timeouts(self._fix_run_timeout(timeout))
+
+        # Run all tasks currently in the queue
+        for dummy in xrange(len(self._queue)):
+            task, input, exc_info = self._queue.popleft()
+            try:
+                if exc_info:
+                    output = task.throw(*exc_info)
+                else:
+                    output = task.send(input)
+            except StopIteration, e:
+                if isinstance(task, _ChildTask):
+                    if not e.args:
+                        output = None
+                    elif len(e.args) == 1:
+                        output = e.args[0]
+                    else:
+                        output = e.args
+                    self._enqueue(task.parent, input=output)
+            except:
+                if isinstance(task, _ChildTask):
+                    # Propagate exception to parent
+                    self._enqueue(task.parent, exc_info=sys.exc_info())
+                else:
+                    # No parent task, so just die
+                    raise
+            else:
+                self._handle_task_output(task, output)
+
+    def _fix_run_timeout(self, timeout):
+        if self.has_runnable():
+            # Don't block if there are tasks in the queue
+            timeout = 0.0
+        elif self.has_timeouts():
+            # If there are timeouts, block only until the first expiration
+            expiration_timeout = max(0.0, self._timeouts[0][0] - time.time())
+            if (timeout is None) or (timeout > expiration_timeout):
+                timeout = expiration_timeout
+        return timeout
+
+    def _handle_io_waits(self, timeout):
+        # The error handling here is (mostly) borrowed from Twisted
+        try:
+            read_ready, write_ready, exc_ready = \
+                select.select(self._read_waits,
+                              self._write_waits,
+                              self._exc_waits,
+                              timeout)
+        except (TypeError, ValueError):
+            self._remove_bad_file_descriptors()
+        except (select.error, IOError), err:
+            if err[0] == errno.EINTR:
+                pass
+            elif ((err[0] == errno.EBADF) or
+                  ((sys.platform == 'win32') and
+                   (err[0] == 10038))):  # WSAENOTSOCK
+                self._remove_bad_file_descriptors()
+            else:
+                # Not an error we can handle, so die
+                raise
+        else:
+            for fd in set(read_ready + write_ready + exc_ready):
+                try:
+                    input = (fd._eval() if isinstance(fd, FDAction) else None)
+                    self._enqueue(fd.task, input=input)
+                except:
+                    self._enqueue(fd.task, exc_info=sys.exc_info())
+                fd._remove_from_fdsets(self._read_waits,
+                                       self._write_waits,
+                                       self._exc_waits)
+                if fd._expires():
+                    self._remove_timeout(fd)
+
+    def _remove_bad_file_descriptors(self):
+        for fd in (self._read_waits | self._write_waits | self._exc_waits):
+            try:
+                select.select([fd], [fd], [fd], 0.0)
+            except:
+                self._enqueue(fd.task, exc_info=sys.exc_info())
+                fd._remove_from_fdsets(self._read_waits,
+                                       self._write_waits,
+                                       self._exc_waits)
+                if fd._expires():
+                    self._remove_timeout(fd)
+
+    def _add_timeout(self, item, handler):
+        item.handle_expiration = handler
+        heapq.heappush(self._timeouts, (item.expiration, item))
+
+    def _remove_timeout(self, item):
+        self._timeouts.remove((item.expiration, item))
+        heapq.heapify(self._timeouts)
+
+    def _handle_timeouts(self, timeout):
+        if (not self.has_runnable()) and (timeout > 0.0):
+            time.sleep(timeout)
+
+        current_time = time.time()
+
+        while self._timeouts and (self._timeouts[0][0] <= current_time):
+            item = heapq.heappop(self._timeouts)[1]
+            if isinstance(item, _SleepDelay):
+                self._enqueue(item.task)
+            else:
+                self._enqueue(item.task, exc_info=(Timeout,))
+                item.handle_expiration()
+
+    def _handle_task_output(self, task, output):
+        if isinstance(output, types.GeneratorType):
+            self._enqueue(_ChildTask(task, output))
+        elif isinstance(output, YieldCondition):
+            output.task = task
+            if isinstance(output, _SleepDelay):
+                self._add_timeout(output, None)
+            elif isinstance(output, FDReady):
+                self._handle_fdready(task, output)
+            elif isinstance(output, _QueueAction):
+                self._handle_queue_action(task, output)
+            elif isinstance(output, _SmartQueueAction):
+                self._handle_smart_queue_action(task, output)
+        else:
+            # Return any other output as input and send task to
+            # end of queue
+            self._enqueue(task, input=output)
+
+    def _handle_fdready(self, task, output):
+        output._add_to_fdsets(self._read_waits,
+                              self._write_waits,
+                              self._exc_waits)
+        if output._expires():
+            self._add_timeout(output,
+                              (lambda:
+                               output._remove_from_fdsets(self._read_waits,
+                                                          self._write_waits,
+                                                          self._exc_waits)))
+
+    def _handle_queue_action(self, task, output):
+        get_waits, put_waits = self._queue_waits[output.queue]
+
+        if output.item is output.NO_ITEM:
+            # Action is a get
+            if output.queue.empty():
+                get_waits.append(output)
+                if output._expires():
+                    self._add_timeout(output,
+                                      (lambda: get_waits.remove(output)))
+            else:
+                item = output.queue._get()
+                self._enqueue(task, input=item)
+                if put_waits:
+                    action = put_waits.popleft()
+                    output.queue._put(action.item)
+                    self._enqueue(action.task)
+                    if action._expires():
+                        self._remove_timeout(action)
+        else:
+            # Action is a put
+            if output.queue.full():
+                put_waits.append(output)
+                if output._expires():
+                    self._add_timeout(output,
+                                      (lambda: put_waits.remove(output)))
+            else:
+                output.queue._put(output.item)
+                self._enqueue(task)
+                if get_waits:
+                    action = get_waits.popleft()
+                    item = output.queue._get()
+                    self._enqueue(action.task, input=item)
+                    if action._expires():
+                        self._remove_timeout(action)
+
+
+    def _handle_smart_queue_action(self, task, output):
+        get_waits, put_waits = self._queue_waits[output.queue]
+
+        if output.item is output.NO_ITEM:
+            # Action is a get
+            item = output.queue._get(criteria=output.criteria)
+            if item is None:
+                get_waits.append(output)
+                if output._expires():
+                    self._add_timeout(output,
+                                      (lambda: get_waits.remove(output)))
+            else:
+                self._enqueue(task, input=item)
+                if put_waits:
+                    action = put_waits.popleft()
+                    output.queue._put(action.item)
+                    self._enqueue(action.task)
+                    if action._expires():
+                        self._remove_timeout(action)
+        else:
+            # Action is a put
+            if output.queue.full():
+                put_waits.append(output)
+                if output._expires():
+                    self._add_timeout(output,
+                                      (lambda: put_waits.remove(output)))
+            else:
+                output.queue._put(output.item)
+                self._enqueue(task)
+                if get_waits:
+                    actions = []
+                    for action in get_waits:
+                        item = output.queue._get(criteria=action.criteria)
+                        if item is not None:
+                            actions.append((action, item))
+                    for action,item in actions:
+                        get_waits.remove(action)
+                        self._enqueue(action.task, input=item)
+                        if action._expires():
+                            self._remove_timeout(action)
+
+
+
+################################################################################
+#
+# Default TaskManager instance
+#
+################################################################################
+
+
+
+_default_task_manager = None
+
+
+def get_default_task_manager():
+    'Return the default TaskManager instance'
+    global _default_task_manager
+    if _default_task_manager is None:
+        _default_task_manager = TaskManager()
+    return _default_task_manager
+
+
+def add(task):
+    'Add a task to the default TaskManager instance'
+    get_default_task_manager().add(task)
+
+
+def run():
+    'Run the default TaskManager instance'
+    get_default_task_manager().run()
+
+
+
+################################################################################
+#
+# Test routine
+#
+################################################################################
+
+
+
+if __name__ == '__main__':
+    if sys.platform == 'win32':
+        # Make sure WSAStartup() is called
+        import socket
+
+    def printer(name):
+        for i in xrange(1, 4):
+            print '%s:\t%d' % (name, i)
+            yield
+
+    t = TaskManager()
+    t.add(printer('first'))
+    t.add(printer('second'))
+    t.add(printer('third'))
+
+    queue = Queue()
+
+    def receiver():
+        print 'receiver started'
+        print 'receiver received: %s' % (yield queue.get())
+        print 'receiver finished'
+
+    def sender():
+        print 'sender started'
+        yield queue.put('from sender')
+        print 'sender finished'
+
+    def bad_descriptor():
+        print 'bad_descriptor running'
+        try:
+            yield readable(12)
+        except:
+            print 'exception in bad_descriptor:', sys.exc_info()[1]
+
+    def sleeper():
+        print 'sleeper started'
+        yield sleep(1)
+        print 'sleeper finished'
+
+    def timeout_immediately():
+        print 'timeout_immediately running'
+        try:
+            yield Queue().get(timeout=0)
+        except Timeout:
+            print 'timeout_immediately timed out'
+
+    t2 = TaskManager()
+    t2.add(receiver())
+    t2.add(bad_descriptor())
+    t2.add(sender())
+    t2.add(sleeper())
+    t2.add(timeout_immediately())
+
+    def parent():
+        print 'child returned: %s' % ((yield child()),)
+        try:
+            yield child(raise_exc=True)
+        except:
+            print 'exception in child:', sys.exc_info()[1]
+
+    def child(raise_exc=False):
+        yield
+        if raise_exc:
+            raise RuntimeError('foo')
+        raise StopIteration(1, 2, 3)
+
+    t3 = TaskManager()
+    t3.add(parent())
+
+    t.merge(t2)
+    t.merge(t3)
+    t.run()
+
+    assert not(t.has_runnable() or t.has_io_waits() or t.has_timeouts())