From f215afb8490ea00c2d3efe5be30067627ed61bca Mon Sep 17 00:00:00 2001 From: lkcl Date: Thu, 15 Jul 2010 21:06:13 +0100 Subject: [PATCH] whitespace cleanup and remove unneeded code --- httpd.py | 372 ++----------------------------------------------------- 1 file changed, 10 insertions(+), 362 deletions(-) diff --git a/httpd.py b/httpd.py index a5f1dd7..e2dc6fe 100644 --- a/httpd.py +++ b/httpd.py @@ -1,38 +1,10 @@ # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details. ''' -This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows: -1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking. - It also uses the App abstract class to implement the applications. -2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class - derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains - various streams from the client, represented using the Stream class. -3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read - and write of FLV file format. - - -Typically an application can launch this server as follows: -$ python rtmp.py - -To know the command line options use the -h option: -$ python rtmp.py -h - -To start the server with a different directory for recording and playing FLV files from, use the following command. -$ python rtmp.py -r some-other-directory/ -Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names. - -A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch -from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to -local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with -same URL and first connect then play the same stream name. If everything works fine you should be able to see the video -from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing, -it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to -see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect), -your played video will start appearing after some initial delay. If an application wants to use this module as a library, it can launch the server as follows: ->>> agent = FlashServer() # a new RTMP server instance +>>> agent = HTTPServer() # a new RTMP server instance >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory. >>> agent.start() # start the server >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking. @@ -102,6 +74,7 @@ class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler): val = v.OutputString() self.send_header("Set-Cookie", val) + def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True): ch = headers.getheaders(cookie_key) if _debug: @@ -137,9 +110,11 @@ def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True): #response_cookies['session']['version'] = 0 return response_cookies + class ConnectionClosed: 'raised when the client closed the connection' + class SockStream(object): '''A class that represents a socket as a stream''' def __init__(self, sock): @@ -192,87 +167,16 @@ class SockStream(object): if _debug: print 'socket.written' -class Header(object): - FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0 - - def __init__(self, channel=0, time=0, size=None, type=None, streamId=0): - self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId - self.extendedtime = 0 - if channel<64: self.hdrdata = chr(channel) - elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64) - else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256) - - def _appendExtendedTimestamp(self, data): - if self.time == 0xFFFFFF: - data += struct.pack('>I', self.extendedtime) - return data - - def toBytes(self, control): - data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:] - if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data) - - data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time - if control == Header.TIME: return self._appendExtendedTimestamp(data) - - data += struct.pack('>I', self.size)[1:] # size - data += chr(self.type) # type - if control == Header.MESSAGE: return self._appendExtendedTimestamp(data) - - data += struct.pack('" - % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId)) - -class Message(object): - # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3 - RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \ - 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01 - - def __init__(self, hdr=None, data=''): - if hdr is None: hdr = Header() - self.header, self.data = hdr, data - - # define properties type, streamId and time to access self.header.(property) - for p in ['type', 'streamId', 'time']: - exec 'def _g%s(self): return self.header.%s'%(p, p) - exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p) - exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p) - @property - def size(self): return len(self.data) - - def __repr__(self): - return (""% (self.header, self.data)) - class Protocol(object): - # constants - PING_SIZE = 1536 - DEFAULT_CHUNK_SIZE = 128 - MIN_CHANNEL_ID = 3 - PROTOCOL_CHANNEL_ID = 2 def __init__(self, sock): self.stream = SockStream(sock) - self.lastReadHeaders = dict() # indexed by channelId - self.incompletePackets = dict() #indexed by channelId - self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE - self.lastWriteHeaders = dict() # indexed by streamId - self.nextChannelId = Protocol.MIN_CHANNEL_ID self.writeLock = threading.Lock() self.writeQueue = Queue.Queue() def messageReceived(self, msg): yield - def protocolMessage(self, msg): - if msg.type == Message.ACK: # respond to ACK requests - response = Message() - response.type, response.data = msg.type, msg.data - self.writeMessage(response) - elif msg.type == Message.CHUNK_SIZE: - self.readChunkSize = struct.unpack('>L', msg.data)[0] - def connectionClosed(self): yield @@ -296,13 +200,6 @@ class Protocol(object): print traceback.print_exc() #self.writeQueue.put(message) - def parseHandshake(self): - '''Parses the rtmp handshake''' - data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping - yield self.stream.write(data) - data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping - yield self.stream.write(data) - def parseRequests(self): '''Parses complete messages until connection closed. Raises ConnectionLost exception.''' @@ -404,84 +301,6 @@ class Protocol(object): print traceback.print_exc() print "ending protocol write loop", repr(self) -class Command(object): - ''' Class for command / data messages''' - def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]): - '''Create a new command with given type, name, id, cmdData and args list.''' - self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:] - - def __repr__(self): - return ("" % (self.type, self.name, self.id, self.cmdData, self.args)) - - def setArg(self, arg): - self.args.append(arg) - - def getArg(self, index): - return self.args[index] - - @classmethod - def fromMessage(cls, message): - ''' initialize from a parsed RTMP message''' - assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3]) - - length = len(message.data) - if length == 0: raise ValueError('zero length message data') - - if message.type == Message.RPC3 or message.type == Message.DATA3: - assert message.data[0] == '\x00' # must be 0 in AMD3 - data = message.data[1:] - else: - data = message.data - - amfReader = amf.AMF0(data) - - inst = cls() - inst.name = amfReader.read() # first field is command name - - try: - if message.type == Message.RPC: - inst.id = amfReader.read() # second field *may* be message id - inst.cmdData = amfReader.read() # third is command data - else: - inst.id = 0 - inst.args = [] # others are optional - while True: - inst.args.append(amfReader.read()) - except EOFError: - pass - return inst - - def toMessage(self): - msg = Message() - assert self.type - msg.type = self.type - output = amf.BytesIO() - amfWriter = amf.AMF0(output) - amfWriter.write(self.name) - if msg.type == Message.RPC or msg.type == Message.RPC3: - amfWriter.write(self.id) - amfWriter.write(self.cmdData) - for arg in self.args: - amfWriter.write(arg) - output.seek(0) - #hexdump.hexdump(output) - #output.seek(0) - if msg.type == Message.RPC3 or msg.type == Message.DATA3: - data = '\x00' + output.read() - else: - data = output.read() - msg.data = data - output.close() - return msg - -def getfilename(path, name, root): - '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is - the the path present in the path variable.''' - ignore, ignore, scope = path.partition('/') - if scope: scope = scope + '/' - result = root + scope + name + '.flv' - if _debug: print 'filename=', result - return result class Stream(object): '''The stream object that is used for RTMP stream.''' @@ -565,16 +384,6 @@ class Client(Protocol): yield self.server.queue.put((self, msg)) # new connection - def accept(self): - '''Method to accept an incoming client.''' - response = Command() - response.id, response.name, response.type = 1, '_result', Message.RPC - if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding - response.setArg(dict(level='status', code='NetConnection.Connect.Success', - description='Connection succeeded.', details=None, - objectEncoding=self.objectEncoding)) - self.writeMessage(response.toMessage()) - def rejectConnection(self, reason=''): '''Method to reject an incoming client.''' response = Command() @@ -592,14 +401,6 @@ class Client(Protocol): if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage() self.writeMessage(cmd.toMessage()) - def createStream(self): - ''' Create a stream on the server side''' - stream = Stream(self) - stream.id = self._nextStreamId - self.streams[stream.id] = stream - self._nextStreamId += 1 - return stream - class Server(object): '''A RTMP server listens for incoming connections and informs the app.''' @@ -650,11 +451,13 @@ class BaseApp(object): '''everytime this property is accessed it returns a new list of clients connected to this instance.''' return self._clients[1:] if self._clients is not None else [] + class App(BaseApp, SimpleAppHTTPRequestHandler): pass + class HTTPServer(object): - '''A RTMP server to record and stream Flash video.''' + '''A RTMP server to record and stream HTTP.''' def __init__(self): '''Construct a new HttpServer. It initializes the local members.''' self.sock = self.server = None @@ -675,7 +478,7 @@ class HTTPServer(object): multitask.add(self.serverlistener()) def stop(self): - if _debug: print 'stopping Flash server' + if _debug: print 'stopping HTTP server' if self.server and self.sock: try: self.sock.close(); self.sock = None except: pass @@ -763,161 +566,6 @@ class HTTPServer(object): (sys and sys.exc_info() or None) traceback.print_exc() - def clientlistener(self, client): - '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.''' - try: - while True: - msg, arg = (yield client.recv()) # receive new message from client - if not msg: # if the client disconnected, - if _debug: print 'connection closed from client' - break # come out of listening loop. - if msg == 'command': # handle a new command - multitask.add(self.clienthandler(client, arg)) - elif msg == 'stream': # a new stream is created, handle the stream. - arg.client = client - multitask.add(self.streamlistener(arg)) - except StopIteration: raise - except: - if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None) - traceback.print_exc() - - # client is disconnected, clear our state for application instance. - if _debug: print 'cleaning up client', client.path - inst = None - if client.path in self.clients: - inst = self.clients[client.path][0] - self.clients[client.path].remove(client) - for stream in client.streams.values(): # for all streams of this client - self.closehandler(stream) - client.streams.clear() # and clear the collection of streams - if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance. - if _debug: print 'removing the application instance' - inst = self.clients[client.path][0] - inst._clients = None - del self.clients[client.path] - if inst is not None: inst.onDisconnect(client) - - def closehandler(self, stream): - '''A stream is closed explicitly when a closeStream command is received from given client.''' - if stream.client is not None: - inst = self.clients[stream.client.path][0] - if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream - inst.onClose(stream.client, stream) - del inst.publishers[stream.name] - if stream.name in inst.players and stream in inst.players[stream.name]: - inst.onStop(stream.client, stream) - inst.players[stream.name].remove(stream) - if len(inst.players[stream.name]) == 0: - del inst.players[stream.name] - stream.close() - - def clienthandler(self, client, cmd): - '''A generator to handle a single command on the client.''' - inst = self.clients[client.path][0] - if inst: - if cmd.name == '_error': - if hasattr(inst, 'onStatus'): - result = inst.onStatus(client, cmd.args[0]) - elif cmd.name == '_result': - if hasattr(inst, 'onResult'): - result = inst.onResult(client, cmd.args[0]) - else: - res, code, args = Command(), '_result', dict() - try: result = inst.onCommand(client, cmd.name, *cmd.args) - except: - if _debug: print 'Client.call exception', (sys and sys.exc_info() or None) - code, args = '_error', dict() - res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3) - res.args, res.cmdData = args, None - if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage() - client.writeMessage(res.toMessage()) - # TODO return result to caller - yield - - def streamlistener(self, stream): - '''Stream listener (generator). It receives stream message and invokes streamhandler.''' - stream.recordfile = None # so that it doesn't complain about missing attribute - while True: - msg = (yield stream.recv()) - if not msg: - if _debug: print 'stream closed' - self.closehandler(stream) - break - # if _debug: msg - multitask.add(self.streamhandler(stream, msg)) - - def streamhandler(self, stream, message): - '''A generator to handle a single message on the stream.''' - try: - if message.type == Message.RPC: - cmd = Command.fromMessage(message) - if _debug: print 'streamhandler received cmd=', cmd - if cmd.name == 'publish': - yield self.publishhandler(stream, cmd) - elif cmd.name == 'play': - yield self.playhandler(stream, cmd) - elif cmd.name == 'closeStream': - self.closehandler(stream) - else: # audio or video message - yield self.mediahandler(stream, message) - except GeneratorExit: pass - except StopIteration: pass - except: - if _debug: print 'exception in streamhandler', (sys and sys.exc_info()) - - def publishhandler(self, stream, cmd): - '''A new stream is published. Store the information in the application instance.''' - try: - stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append - stream.name = cmd.args[0] - if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode - inst = self.clients[stream.client.path][0] - if (stream.name in inst.publishers): - raise ValueError, 'Stream name already in use' - inst.publishers[stream.name] = stream # store the client for publisher - result = inst.onPublish(stream.client, stream) - - if stream.mode == 'record' or stream.mode == 'append': - stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode) - response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)]) - yield stream.send(response) - except ValueError, E: # some error occurred. inform the app. - if _debug: print 'error in publishing stream', str(E) - response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)]) - yield stream.send(response) - - def playhandler(self, stream, cmd): - '''A new stream is being played. Just updated the players list with this stream.''' - inst = self.clients[stream.client.path][0] - name = stream.name = cmd.args[0] # store the stream's name - start = cmd.args[1] if len(cmd.args) >= 2 else -2 - if name not in inst.players: - inst.players[name] = [] # initialize the players for this stream name - if stream not in inst.players[name]: # store the stream as players of this name - inst.players[name].append(stream) - path = getfilename(stream.client.path, stream.name, self.root) - if os.path.exists(path): - stream.playfile = FLV().open(path) - multitask.add(stream.playfile.reader(stream)) - if _debug: print 'playing stream=', name, 'start=', start - result = inst.onPlay(stream.client, stream) - response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)]) - yield stream.send(response) - - def mediahandler(self, stream, message): - '''Handle incoming media on the stream, by sending to other stream in this application instance.''' - if stream.client is not None: - inst = self.clients[stream.client.path][0] - result = inst.onPublishData(stream.client, stream, message) - if result: - client = stream.client - for s in (inst.players.get(stream.name, [])): - # if _debug: print 'D', stream.name, s.name - result = inst.onPlayData(s.client, s, message) - if result: - yield s.send(message) - if stream.recordfile is not None: - stream.recordfile.write(message) # The main routine to start, run and stop the service if __name__ == '__main__': @@ -936,7 +584,7 @@ if __name__ == '__main__': agent = HTTPServer() agent.root = options.root agent.start(options.host, options.port) - if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port) + if _debug: print time.asctime(), 'HTTP Server Starts - %s:%d' % (options.host, options.port) multitask.run() except KeyboardInterrupt: print traceback.print_exc() @@ -944,4 +592,4 @@ if __name__ == '__main__': except: print "exception" print traceback.print_exc() - if _debug: print time.asctime(), 'Flash Server Stops' + if _debug: print time.asctime(), 'HTTP Server Stops' -- 2.30.2