move example to different port
[multitaskhttpd.git] / httpd.py
index 6537ee0e6071358815a45bd18765dd269ff50155..d86c3bece9c7e4b661897c216c71ab25d1f3984a 100644 (file)
--- a/httpd.py
+++ b/httpd.py
@@ -1,38 +1,10 @@
 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
 
 '''
-This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
-1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
-   It also uses the App abstract class to implement the applications.
-2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
-   derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
-   various streams from the client, represented using the Stream class.
-3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
-   and write of FLV file format.
-
-
-Typically an application can launch this server as follows:
-$ python rtmp.py
-
-To know the command line options use the -h option:
-$ python rtmp.py -h
-
-To start the server with a different directory for recording and playing FLV files from, use the following command.
-$ python rtmp.py -r some-other-directory/
-Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
-
-A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
-from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
-local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
-same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
-from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
-it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
-see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
-your played video will start appearing after some initial delay.
 
 
 If an application wants to use this module as a library, it can launch the server as follows:
->>> agent = FlashServer()   # a new RTMP server instance
+>>> agent = HTTPServer()   # a new RTMP server instance
 >>> agent.root = 'flvs/'    # set the document root to be 'flvs' directory. Default is current './' directory.
 >>> agent.start()           # start the server
 >>> multitask.run()         # this is needed somewhere in the application to actually start the co-operative multitasking.
@@ -63,6 +35,7 @@ throw an exception and display the error message.
 import os, sys, time, struct, socket, traceback, multitask
 import threading, Queue
 import uuid
+import select
 from string import strip
 
 from BaseHTTPServer import BaseHTTPRequestHandler
@@ -101,19 +74,57 @@ class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
             val = v.OutputString()
             self.send_header("Set-Cookie", val)
 
+
+def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
+    ch = headers.getheaders(cookie_key)
+    if _debug:
+        print "messageReceived cookieheaders=", '; '.join(ch)
+    res = []
+    for c in ch:
+        c = c.split(";")
+        if len(c) == 0:
+            continue
+        c = map(strip, c)
+        c = filter(lambda x: x, c)
+        res += c
+    has_sess = False
+    response_cookies = SimpleCookie()
+    for c in res:
+        if _debug:
+            print "found cookie", repr(c)
+        name, value = c.split("=")
+        response_cookies[name] = value
+        #response_cookies[name]['path'] = "/"
+        #response_cookies[name]['domain'] = remote[0]
+        #response_cookies[name]['version'] = 0
+        if name == "session":
+            response_cookies[name]['expires'] = 50000
+            has_sess = True
+    if not add_sess:
+        return response_cookies
+    if not has_sess:
+        response_cookies['session'] = uuid.uuid4().hex
+        response_cookies['session']['expires'] = 50000
+        #response_cookies['session']['path'] = '/'
+        #response_cookies['session']['domain'] = remote[0]
+        #response_cookies['session']['version'] = 0
+    return response_cookies
+
+
 class ConnectionClosed:
     'raised when the client closed the connection'
 
+
 class SockStream(object):
     '''A class that represents a socket as a stream'''
     def __init__(self, sock):
         self.sock, self.buffer = sock, ''
         self.bytesWritten = self.bytesRead = 0
-    
+
     def close(self):
         self.sock.shutdown(1) # can't just close it.
         self.sock = None
-        
+
     def readline(self):
         try:
             while True:
@@ -128,7 +139,7 @@ class SockStream(object):
                 self.buffer += data
         except StopIteration: raise
         except: raise ConnectionClosed # anything else is treated as connection closed.
-        
+
     def read(self, count):
         try:
             while True:
@@ -142,10 +153,10 @@ class SockStream(object):
                 self.buffer += data
         except StopIteration: raise
         except: raise ConnectionClosed # anything else is treated as connection closed.
-        
+
     def unread(self, data):
         self.buffer = data + self.buffer
-            
+
     def write(self, data):
         while len(data) > 0: # write in 4K chunks each time
             chunk, data = data[:4096], data[4096:]
@@ -156,109 +167,44 @@ class SockStream(object):
         if _debug: print 'socket.written'
 
 
-class Header(object):
-    FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
-    
-    def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
-        self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
-        self.extendedtime = 0
-        if channel<64: self.hdrdata = chr(channel)
-        elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
-        else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256) 
-    
-    def _appendExtendedTimestamp(self, data):
-        if self.time == 0xFFFFFF:
-            data += struct.pack('>I', self.extendedtime)
-        return data
-                    
-    def toBytes(self, control):
-        data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
-        if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
-        
-        data += struct.pack('>I', self.time & 0xFFFFFF)[1:]  # add time
-        if control == Header.TIME: return self._appendExtendedTimestamp(data)
-        
-        data += struct.pack('>I', self.size)[1:]  # size
-        data += chr(self.type)                    # type
-        if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
-        
-        data += struct.pack('<I', self.streamId)  # add streamId
-        return self._appendExtendedTimestamp(data)
-
-    def __repr__(self):
-        return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
-            % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
-
-class Message(object):
-    # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
-    RPC,  RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK,  CHUNK_SIZE = \
-    0x14, 0x11, 0x12, 0x0F,  0x13,      0x10,       0x08,  0x09,  0x03, 0x01
-    
-    def __init__(self, hdr=None, data=''):
-        if hdr is None: hdr = Header()
-        self.header, self.data = hdr, data
-    
-    # define properties type, streamId and time to access self.header.(property)
-    for p in ['type', 'streamId', 'time']:
-        exec 'def _g%s(self): return self.header.%s'%(p, p)
-        exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
-        exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
-    @property
-    def size(self): return len(self.data)
-            
-    def __repr__(self):
-        return ("<Message header=%r data=%r>"% (self.header, self.data))
-                
 class Protocol(object):
-    # constants
-    PING_SIZE           = 1536
-    DEFAULT_CHUNK_SIZE  = 128
-    MIN_CHANNEL_ID      = 3
-    PROTOCOL_CHANNEL_ID = 2
-    
+
     def __init__(self, sock):
         self.stream = SockStream(sock)
-        self.lastReadHeaders = dict() # indexed by channelId
-        self.incompletePackets = dict() #indexed by channelId
-        self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
-        self.lastWriteHeaders = dict() # indexed by streamId
-        self.nextChannelId = Protocol.MIN_CHANNEL_ID
         self.writeLock = threading.Lock()
         self.writeQueue = Queue.Queue()
-            
+
     def messageReceived(self, msg):
         yield
-            
-    def protocolMessage(self, msg):
-        if msg.type == Message.ACK: # respond to ACK requests
-            response = Message()
-            response.type, response.data = msg.type, msg.data
-            self.writeMessage(response)
-        elif msg.type == Message.CHUNK_SIZE:
-            self.readChunkSize = struct.unpack('>L', msg.data)[0]
-            
+
     def connectionClosed(self):
         yield
-                            
+
     def parse(self):
         try:
             yield self.parseRequests()   # parse http requests
         except ConnectionClosed:
-            yield self.connectionClosed()
+            #yield self.connectionClosed()
             if _debug: print 'parse connection closed'
-                    
+            #yield self.server.queue.put((self, None)) # close connection
+
+    def removeConnection(self):
+        yield self.server.queue.put((self, None)) # close connection
+
     def writeMessage(self, message):
-        self.writeQueue.put(message)
-            
-    def parseHandshake(self):
-        '''Parses the rtmp handshake'''
-        data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
-        yield self.stream.write(data)
-        data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
-        yield self.stream.write(data)
-    
+        try:
+            yield self.stream.write(message)
+        except ConnectionClosed:
+            yield self.connectionClosed()
+        except:
+            print traceback.print_exc()
+        #self.writeQueue.put(message)
+
     def parseRequests(self):
         '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
+
+        if _debug:
+            print "parseRequests start", repr(self)
         self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
         self.hr.close_connection = 1
         self.cookies = CookieJar()
@@ -268,11 +214,21 @@ class Protocol(object):
             # prepare reading the request so that when it's handed
             # over to "standard" HTTPRequestHandler, the data's already
             # there.
-            print "parseRequests"
+            if _debug:
+                print "parseRequests"
+            try:
+                readok = (yield multitask.readable(self.stream.sock, 5000))
+            except select.error:
+                print "select error: connection closed"
+                raise ConnectionClosed
+
+            if _debug:
+                print "readok", readok
+                print
             raw_requestline = (yield self.stream.readline())
             if _debug: print "parseRequests, line", raw_requestline
             if not raw_requestline:
-                raise ConnectionClosed 
+                raise ConnectionClosed
             data = ''
             try:
                 while 1:
@@ -288,19 +244,31 @@ class Protocol(object):
                     print 'parseRequests', \
                           (traceback and traceback.print_exc() or None)
 
-                raise ConnectionClosed 
+                raise ConnectionClosed
 
             self.hr.raw_requestline = raw_requestline
+            #pos = self.hr.rfile.tell()
+            pos = 0
+            self.hr.rfile.truncate(0)
             self.hr.rfile.write(data)
-            print "parseRequests write after"
-            self.hr.rfile.seek(0)
-            print "parseRequests seek after"
+            if _debug:
+                print "parseRequests write after"
+            self.hr.rfile.seek(pos)
+            if _debug:
+                print "parseRequests seek after"
 
             if not self.hr.parse_request():
-                raise ConnectionClosed 
-            print "parseRequests parse_req after"
+                raise ConnectionClosed
+            if _debug:
+                print "parseRequests parse_req after"
+                print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
             try:
                 yield self.messageReceived(self.hr)
+            except ConnectionClosed:
+                if _debug:
+                    print 'parseRequests, ConnectionClosed '
+                    raise StopIteration
+
             except:
                 if _debug:
                     print 'messageReceived', \
@@ -308,191 +276,91 @@ class Protocol(object):
 
     def write(self):
         '''Writes messages to stream'''
+        print "starting protocol write loop", repr(self)
         while True:
             while self.writeQueue.empty(): (yield multitask.sleep(0.01))
             data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
             if _debug: print "write to stream", repr(data)
-            if data is None: 
+            if data is None:
                 # just in case TCP socket is not closed, close it.
                 try:
-                    print "stream closing"
-                    print self.stream
+                    if _debug:
+                        print "stream closing"
+                        print self.stream
                     yield self.stream.close()
-                    print "stream closed"
+                    if _debug:
+                        print "stream closed"
                 except: pass
                 break
-            
+
             try:
                 yield self.stream.write(data)
             except ConnectionClosed:
                 yield self.connectionClosed()
             except:
                 print traceback.print_exc()
+        print "ending protocol write loop", repr(self)
 
-class Command(object):
-    ''' Class for command / data messages'''
-    def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
-        '''Create a new command with given type, name, id, cmdData and args list.'''
-        self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
-        
-    def __repr__(self):
-        return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
-    
-    def setArg(self, arg):
-        self.args.append(arg)
-    
-    def getArg(self, index):
-        return self.args[index]
-    
-    @classmethod
-    def fromMessage(cls, message):
-        ''' initialize from a parsed RTMP message'''
-        assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
-
-        length = len(message.data)
-        if length == 0: raise ValueError('zero length message data')
-        
-        if message.type == Message.RPC3 or message.type == Message.DATA3:
-            assert message.data[0] == '\x00' # must be 0 in AMD3
-            data = message.data[1:]
-        else:
-            data = message.data
-        
-        amfReader = amf.AMF0(data)
-
-        inst = cls()
-        inst.name = amfReader.read() # first field is command name
-
-        try:
-            if message.type == Message.RPC:
-                inst.id = amfReader.read() # second field *may* be message id
-                inst.cmdData = amfReader.read() # third is command data
-            else:
-                inst.id = 0
-            inst.args = [] # others are optional
-            while True:
-                inst.args.append(amfReader.read())
-        except EOFError:
-            pass
-        return inst
-    
-    def toMessage(self):
-        msg = Message()
-        assert self.type
-        msg.type = self.type
-        output = amf.BytesIO()
-        amfWriter = amf.AMF0(output)
-        amfWriter.write(self.name)
-        if msg.type == Message.RPC or msg.type == Message.RPC3:
-            amfWriter.write(self.id)
-            amfWriter.write(self.cmdData)
-        for arg in self.args:
-            amfWriter.write(arg)
-        output.seek(0)
-        #hexdump.hexdump(output)
-        #output.seek(0)
-        if msg.type == Message.RPC3 or msg.type == Message.DATA3:
-            data = '\x00' + output.read()
-        else:
-            data = output.read()
-        msg.data = data
-        output.close()
-        return msg
-
-def getfilename(path, name, root):
-    '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
-    the the path present in the path variable.'''
-    ignore, ignore, scope = path.partition('/')
-    if scope: scope = scope + '/'
-    result = root + scope + name + '.flv'
-    if _debug: print 'filename=', result
-    return result
 
 class Stream(object):
     '''The stream object that is used for RTMP stream.'''
-    count = 0;
+    count = 0
     def __init__(self, client):
         self.client, self.id, self.name = client, 0, ''
         self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
         self.queue = multitask.Queue()
         self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
         if _debug: print self, 'created'
-        
+
     def close(self):
         if _debug: print self, 'closing'
         if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
         if self.playfile is not None: self.playfile.close(); self.playfile = None
         self.client = None # to clear the reference
         pass
-    
+
     def __repr__(self):
-        return self._name;
-    
+        return self._name
+
     def recv(self):
         '''Generator to receive new Message on this stream, or None if stream is closed.'''
         return self.queue.get()
-    
-    def send(self, msg):
-        '''Method to send a Message or Command on this stream.'''
-        if isinstance(msg, Command):
-            msg = msg.toMessage()
-        msg.streamId = self.id
-        # if _debug: print self,'send'
-        if self.client is not None: self.client.writeMessage(msg)
-        
+
+
 class Client(Protocol):
     '''The client object represents a single connected client to the server.'''
     def __init__(self, sock, server, remote):
         Protocol.__init__(self, sock)
         self.server = server
         self.remote = remote
-        self.agent = {}
         self.streams = {}
-        self._nextCallId = 2
-        self._nextStreamId = 1
-        self.objectEncoding = 0.0
         self.queue = multitask.Queue() # receive queue used by application
         multitask.add(self.parse())
-        multitask.add(self.write())
+        #multitask.add(self.write())
 
     def recv(self):
         '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
         return self.queue.get()
-    
+
     def connectionClosed(self):
         '''Called when the client drops the connection'''
         if _debug: 'Client.connectionClosed'
-        self.writeMessage(None)
-        yield self.queue.put((None,None))
-            
+        #self.writeMessage(None)
+        #yield self.queue.put((None,None))
+        if self.stream.sock:
+            yield self.stream.close()
+        else:
+            yield None
+
     def messageReceived(self, msg):
         if _debug: print 'messageReceived cmd=', msg.command, msg.path
-        ch = msg.headers.getheaders("Cookie")
-        print "messageReceived cookieheaders=", '; '.join(ch)
-        res = []
-        for c in ch:
-            c = c.split(";")
-            c = map(strip, c)
-            res += c
-        has_sess = False
-        msg.response_cookies = SimpleCookie()
-        for c in res:
-            print "found cookie", str(c)
-            name, value = c.split("=")
-            msg.response_cookies[name] = value
-            msg.response_cookies[name]['path'] = "/"
-            msg.response_cookies[name]['domain'] = self.remote[0]
-            #msg.response_cookies[name]['expires'] = 'None'
-            msg.response_cookies[name]['version'] = 0
-            if name == "session":
-                has_sess = True
-        if not has_sess:
-            msg.response_cookies['session'] = uuid.uuid4().hex
-            #msg.response_cookies['session']['expires'] = 'None'
-            msg.response_cookies['session']['path'] = '/'
-            msg.response_cookies['session']['domain'] = self.remote[0]
-            msg.response_cookies['session']['version'] = 0
+        msg.response_cookies = process_cookies(msg.headers, self.remote)
 
+        # slightly bad, this: read everything, put it into memory,
+        # but it helps to jump-start the project and enables use of
+        # standard http://python.org BaseHTTPRequestHandler.
+        # modification of mimetools.Message (actually rfc822) is
+        # otherwise required.
         if msg.headers.has_key('content-length'):
             max_chunk_size = 10*1024*1024
             size_remaining = int(msg.headers["content-length"])
@@ -509,40 +377,11 @@ class Client(Protocol):
 
         yield self.server.queue.put((self, msg)) # new connection
 
-    def accept(self):
-        '''Method to accept an incoming client.'''
-        response = Command()
-        response.id, response.name, response.type = 1, '_result', Message.RPC
-        if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
-        response.setArg(dict(level='status', code='NetConnection.Connect.Success',
-                        description='Connection succeeded.', details=None,
-                        objectEncoding=self.objectEncoding))
-        self.writeMessage(response.toMessage())
-            
     def rejectConnection(self, reason=''):
-        '''Method to reject an incoming client.'''
-        response = Command()
-        response.id, response.name, response.type = 1, '_error', Message.RPC
-        response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
-                        description=reason, details=None))
-        self.writeMessage(response.toMessage())
-            
-    def call(self, method, *args):
-        '''Call a (callback) method on the client.'''
-        cmd = Command()
-        cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
-        cmd.args, cmd.cmdData = args, None
-        self._nextCallId += 1
-        if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
-        self.writeMessage(cmd.toMessage())
-            
-    def createStream(self):
-        ''' Create a stream on the server side'''
-        stream = Stream(self)
-        stream.id = self._nextStreamId
-        self.streams[stream.id] = stream
-        self._nextStreamId += 1
-        return stream
+        '''Method to reject an incoming client.  just close.
+           TODO: report back an HTTP error with "reason" in it.
+        '''
+        self.removeConnection()
 
 
 class Server(object):
@@ -558,21 +397,21 @@ class Server(object):
         '''Generator to wait for incoming client connections on this server and return
         (client, args) or (None, None) if the socket is closed or some error.'''
         return self.queue.get()
-        
+
     def run(self):
         try:
             while True:
                 sock, remote = (yield multitask.accept(self.sock))  # receive client TCP
                 if sock == None:
-                    if _debug: print 'rtmp.Server accept(sock) returned None.' 
+                    if _debug: print 'rtmp.Server accept(sock) returned None.'
                     break
                 if _debug: print 'connection received from', remote
                 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
                 client = Client(sock, self, remote)
-        except: 
+        except:
             if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
             traceback.print_exc()
-        
+
         if (self.sock):
             try: self.sock.close(); self.sock = None
             except: pass
@@ -593,19 +432,21 @@ class BaseApp(object):
     def clients(self):
         '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
         return self._clients[1:] if self._clients is not None else []
-    
+
+
 class App(BaseApp, SimpleAppHTTPRequestHandler):
     pass
-    
+
+
 class HTTPServer(object):
-    '''A RTMP server to record and stream Flash video.'''
+    '''A RTMP server to record and stream HTTP.'''
     def __init__(self):
         '''Construct a new HttpServer. It initializes the local members.'''
         self.sock = self.server = None
         self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
         self.clients = dict()  # list of clients indexed by scope. First item in list is app instance.
         self.root = ''
-        
+
     def start(self, host='0.0.0.0', port=8080):
         '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
         if _debug: print 'start', host, port
@@ -617,14 +458,14 @@ class HTTPServer(object):
             sock.listen(5)
             server = self.server = Server(sock) # start rtmp server on that socket
             multitask.add(self.serverlistener())
-    
+
     def stop(self):
-        if _debug: print 'stopping Flash server'
+        if _debug: print 'stopping HTTP server'
         if self.server and self.sock:
             try: self.sock.close(); self.sock = None
             except: pass
         self.server = None
-        
+
     def serverlistener(self):
         '''Server listener (generator). It accepts all connections and invokes client listener'''
         try:
@@ -634,41 +475,49 @@ class HTTPServer(object):
                 if not client:                # if the server aborted abnormally,
                     break                     #    hence close the listener.
                 if _debug: print 'client connection received', client, msg
-                # if client.objectEncoding != 0 and client.objectEncoding != 3:
-                if client.objectEncoding != 0:
-                    yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
+
+                if msg is None:
                     yield client.connectionClosed()
-                else:
+                    session = client.session
+                    del self.clients[session]
+                    continue
+
+                if _debug:
                     print "cookies", str(msg.response_cookies)
-                    session = msg.response_cookies['session'].value
-                    name = msg.path
+                session = msg.response_cookies['session'].value
+                client.session = session
+                name = msg.path
+                if _debug:
                     print "serverlistener", name
-                    if '*' not in self.apps and name not in self.apps:
-                        yield client.rejectConnection(reason='Application not found: ' + name)
-                    else: # create application instance as needed and add in our list
-                        if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
-                        app = self.apps[name] if name in self.apps else self.apps['*'] # application class
+                if '*' not in self.apps and name not in self.apps:
+                    yield client.rejectConnection(reason='Application not found: ' + name)
+                else: # create application instance as needed and add in our list
+                    if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
+                    app = self.apps[name] if name in self.apps else self.apps['*'] # application class
+                    if _debug:
                         print "clients", self.clients.keys()
-                        if session in self.clients:
-                            inst = self.clients[session][0]
-                        else:
-                            inst = app()
-                        msg.server = inst # whew! just in time!
-                        try: 
-                            methodname = "on%s" % msg.command
+                    if session in self.clients:
+                        inst = self.clients[session][0]
+                    else:
+                        inst = app()
+                    self.clients[session] = [inst]; inst._clients=self.clients[session]
+                    msg.server = inst # whew! just in time!
+                    try:
+                        methodname = "on%s" % msg.command
+                        if _debug:
                             print methodname, dir(inst)
-                            method = getattr(inst, methodname, None)
-                            result = method(client, msg)
-                            close_connection = msg.close_connection
+                        method = getattr(inst, methodname, None)
+                        yield method(client, msg)
+                        result = None
+                        close_connection = msg.close_connection
+                        if _debug:
                             print "close connection", close_connection
-                        except: 
-                            if _debug: traceback.print_exc()
-                            yield client.rejectConnection(reason='Exception on %s' % methodname)
-                            continue
-                        if result is True or result is None:
-                            if session not in self.clients: 
-                                self.clients[session] = [inst]; inst._clients=self.clients[session]
-                            self.clients[session].append(client)
+                    except:
+                        if _debug: traceback.print_exc()
+                        yield client.rejectConnection(reason='Exception on %s' % methodname)
+                        continue
+                    if result is True or result is None:
+                        if result is None:
                             msg.wfile.seek(0)
                             data = msg.wfile.read()
                             msg.wfile.seek(0)
@@ -679,174 +528,22 @@ class HTTPServer(object):
                                     print 'close_connection requested'
                                 try:
                                     yield client.connectionClosed()
-                                except ClientClosed:
+                                    raise ConnectionClosed
+                                except ConnectionClosed:
                                     if _debug:
                                         print 'close_connection done'
                                     pass
-                        else: 
-                            yield client.rejectConnection(reason='Rejected in onConnect')
+                    else:
+                        if _debug:
+                            print "result", result
+                        yield client.rejectConnection(reason='Rejected in onConnect')
         except StopIteration: raise
-        except: 
+        except:
             if _debug:
                 print 'serverlistener exception', \
                 (sys and sys.exc_info() or None)
                 traceback.print_exc()
-            
-    def clientlistener(self, client):
-        '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
-        try:
-            while True:
-                msg, arg = (yield client.recv())   # receive new message from client
-                if not msg:                   # if the client disconnected,
-                    if _debug: print 'connection closed from client'
-                    break                     #    come out of listening loop.
-                if msg == 'command':          # handle a new command
-                    multitask.add(self.clienthandler(client, arg))
-                elif msg == 'stream':         # a new stream is created, handle the stream.
-                    arg.client = client
-                    multitask.add(self.streamlistener(arg))
-        except StopIteration: raise
-        except:
-            if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
-            traceback.print_exc()
-            
-        # client is disconnected, clear our state for application instance.
-        if _debug: print 'cleaning up client', client.path
-        inst = None
-        if client.path in self.clients:
-            inst = self.clients[client.path][0]
-            self.clients[client.path].remove(client)
-        for stream in client.streams.values(): # for all streams of this client
-            self.closehandler(stream)
-        client.streams.clear() # and clear the collection of streams
-        if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
-            if _debug: print 'removing the application instance'
-            inst = self.clients[client.path][0]
-            inst._clients = None
-            del self.clients[client.path]
-        if inst is not None: inst.onDisconnect(client)
-        
-    def closehandler(self, stream):
-        '''A stream is closed explicitly when a closeStream command is received from given client.'''
-        if stream.client is not None:
-            inst = self.clients[stream.client.path][0]
-            if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
-                inst.onClose(stream.client, stream)
-                del inst.publishers[stream.name]
-            if stream.name in inst.players and stream in inst.players[stream.name]:
-                inst.onStop(stream.client, stream)
-                inst.players[stream.name].remove(stream)
-                if len(inst.players[stream.name]) == 0:
-                    del inst.players[stream.name]
-            stream.close()
-        
-    def clienthandler(self, client, cmd):
-        '''A generator to handle a single command on the client.'''
-        inst = self.clients[client.path][0]
-        if inst:
-            if cmd.name == '_error':
-                if hasattr(inst, 'onStatus'):
-                    result = inst.onStatus(client, cmd.args[0])
-            elif cmd.name == '_result':
-                if hasattr(inst, 'onResult'):
-                    result = inst.onResult(client, cmd.args[0])
-            else:
-                res, code, args = Command(), '_result', dict()
-                try: result = inst.onCommand(client, cmd.name, *cmd.args)
-                except:
-                    if _debug: print 'Client.call exception', (sys and sys.exc_info() or None) 
-                    code, args = '_error', dict()
-                res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
-                res.args, res.cmdData = args, None
-                if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
-                client.writeMessage(res.toMessage())
-                # TODO return result to caller
-        yield
-        
-    def streamlistener(self, stream):
-        '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
-        stream.recordfile = None # so that it doesn't complain about missing attribute
-        while True:
-            msg = (yield stream.recv())
-            if not msg:
-                if _debug: print 'stream closed'
-                self.closehandler(stream)
-                break
-            # if _debug: msg
-            multitask.add(self.streamhandler(stream, msg))
-            
-    def streamhandler(self, stream, message):
-        '''A generator to handle a single message on the stream.'''
-        try:
-            if message.type == Message.RPC:
-                cmd = Command.fromMessage(message)
-                if _debug: print 'streamhandler received cmd=', cmd
-                if cmd.name == 'publish':
-                    yield self.publishhandler(stream, cmd)
-                elif cmd.name == 'play':
-                    yield self.playhandler(stream, cmd)
-                elif cmd.name == 'closeStream':
-                    self.closehandler(stream)
-            else: # audio or video message
-                yield self.mediahandler(stream, message)
-        except GeneratorExit: pass
-        except StopIteration: pass
-        except: 
-            if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
-    
-    def publishhandler(self, stream, cmd):
-        '''A new stream is published. Store the information in the application instance.'''
-        try:
-            stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
-            stream.name = cmd.args[0]
-            if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
-            inst = self.clients[stream.client.path][0]
-            if (stream.name in inst.publishers):
-                raise ValueError, 'Stream name already in use'
-            inst.publishers[stream.name] = stream # store the client for publisher
-            result = inst.onPublish(stream.client, stream)
-            
-            if stream.mode == 'record' or stream.mode == 'append':
-                stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
-            response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
-            yield stream.send(response)
-        except ValueError, E: # some error occurred. inform the app.
-            if _debug: print 'error in publishing stream', str(E)
-            response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
-            yield stream.send(response)
-
-    def playhandler(self, stream, cmd):
-        '''A new stream is being played. Just updated the players list with this stream.'''
-        inst = self.clients[stream.client.path][0]
-        name = stream.name = cmd.args[0]  # store the stream's name
-        start = cmd.args[1] if len(cmd.args) >= 2 else -2
-        if name not in inst.players:
-            inst.players[name] = [] # initialize the players for this stream name
-        if stream not in inst.players[name]: # store the stream as players of this name
-            inst.players[name].append(stream)
-        path = getfilename(stream.client.path, stream.name, self.root)
-        if os.path.exists(path):
-            stream.playfile = FLV().open(path)
-            multitask.add(stream.playfile.reader(stream))
-        if _debug: print 'playing stream=', name, 'start=', start
-        result = inst.onPlay(stream.client, stream)
-        response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
-        yield stream.send(response)
-            
-    def mediahandler(self, stream, message):
-        '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
-        if stream.client is not None:
-            inst = self.clients[stream.client.path][0]
-            result = inst.onPublishData(stream.client, stream, message)
-            if result:
-                client = stream.client
-                for s in (inst.players.get(stream.name, [])):
-                    # if _debug: print 'D', stream.name, s.name
-                    result = inst.onPlayData(s.client, s, message)
-                    if result:
-                        yield s.send(message)
-                if stream.recordfile is not None:
-                    stream.recordfile.write(message)
+
 
 # The main routine to start, run and stop the service
 if __name__ == '__main__':
@@ -858,14 +555,14 @@ if __name__ == '__main__':
     parser.add_option('-r', '--root',    dest='root',    default='./',       help="document root directory. Default './'")
     parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
     (options, args) = parser.parse_args()
-    
+
     _debug = options.verbose
     try:
         if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
         agent = HTTPServer()
         agent.root = options.root
         agent.start(options.host, options.port)
-        if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
+        if _debug: print time.asctime(), 'HTTP Server Starts - %s:%d' % (options.host, options.port)
         multitask.run()
     except KeyboardInterrupt:
         print traceback.print_exc()
@@ -873,4 +570,4 @@ if __name__ == '__main__':
     except:
         print "exception"
         print traceback.print_exc()
-    if _debug: print time.asctime(), 'Flash Server Stops'
+    if _debug: print time.asctime(), 'HTTP Server Stops'