# Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
'''
-This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
-1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
- It also uses the App abstract class to implement the applications.
-2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
- derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
- various streams from the client, represented using the Stream class.
-3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
- and write of FLV file format.
-
-
-Typically an application can launch this server as follows:
-$ python rtmp.py
-
-To know the command line options use the -h option:
-$ python rtmp.py -h
-
-To start the server with a different directory for recording and playing FLV files from, use the following command.
-$ python rtmp.py -r some-other-directory/
-Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
-
-A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
-from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
-local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
-same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
-from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
-it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
-see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
-your played video will start appearing after some initial delay.
If an application wants to use this module as a library, it can launch the server as follows:
->>> agent = FlashServer() # a new RTMP server instance
+>>> agent = HTTPServer() # a new RTMP server instance
>>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
>>> agent.start() # start the server
>>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
val = v.OutputString()
self.send_header("Set-Cookie", val)
+
def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
ch = headers.getheaders(cookie_key)
- print "messageReceived cookieheaders=", '; '.join(ch)
+ if _debug:
+ print "messageReceived cookieheaders=", '; '.join(ch)
res = []
for c in ch:
c = c.split(";")
has_sess = False
response_cookies = SimpleCookie()
for c in res:
- print "found cookie", repr(c)
+ if _debug:
+ print "found cookie", repr(c)
name, value = c.split("=")
response_cookies[name] = value
#response_cookies[name]['path'] = "/"
#response_cookies['session']['version'] = 0
return response_cookies
+
class ConnectionClosed:
'raised when the client closed the connection'
+
class SockStream(object):
'''A class that represents a socket as a stream'''
def __init__(self, sock):
self.sock, self.buffer = sock, ''
self.bytesWritten = self.bytesRead = 0
-
+
def close(self):
self.sock.shutdown(1) # can't just close it.
self.sock = None
-
+
def readline(self):
try:
while True:
self.buffer += data
except StopIteration: raise
except: raise ConnectionClosed # anything else is treated as connection closed.
-
+
def read(self, count):
try:
while True:
self.buffer += data
except StopIteration: raise
except: raise ConnectionClosed # anything else is treated as connection closed.
-
+
def unread(self, data):
self.buffer = data + self.buffer
-
+
def write(self, data):
while len(data) > 0: # write in 4K chunks each time
chunk, data = data[:4096], data[4096:]
if _debug: print 'socket.written'
-class Header(object):
- FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
-
- def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
- self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
- self.extendedtime = 0
- if channel<64: self.hdrdata = chr(channel)
- elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
- else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
-
- def _appendExtendedTimestamp(self, data):
- if self.time == 0xFFFFFF:
- data += struct.pack('>I', self.extendedtime)
- return data
-
- def toBytes(self, control):
- data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
- if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
-
- data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
- if control == Header.TIME: return self._appendExtendedTimestamp(data)
-
- data += struct.pack('>I', self.size)[1:] # size
- data += chr(self.type) # type
- if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
-
- data += struct.pack('<I', self.streamId) # add streamId
- return self._appendExtendedTimestamp(data)
-
- def __repr__(self):
- return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
- % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
-
-class Message(object):
- # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
- RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
- 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
-
- def __init__(self, hdr=None, data=''):
- if hdr is None: hdr = Header()
- self.header, self.data = hdr, data
-
- # define properties type, streamId and time to access self.header.(property)
- for p in ['type', 'streamId', 'time']:
- exec 'def _g%s(self): return self.header.%s'%(p, p)
- exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
- exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
- @property
- def size(self): return len(self.data)
-
- def __repr__(self):
- return ("<Message header=%r data=%r>"% (self.header, self.data))
-
class Protocol(object):
- # constants
- PING_SIZE = 1536
- DEFAULT_CHUNK_SIZE = 128
- MIN_CHANNEL_ID = 3
- PROTOCOL_CHANNEL_ID = 2
-
+
def __init__(self, sock):
self.stream = SockStream(sock)
- self.lastReadHeaders = dict() # indexed by channelId
- self.incompletePackets = dict() #indexed by channelId
- self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
- self.lastWriteHeaders = dict() # indexed by streamId
- self.nextChannelId = Protocol.MIN_CHANNEL_ID
self.writeLock = threading.Lock()
self.writeQueue = Queue.Queue()
-
+
def messageReceived(self, msg):
yield
-
- def protocolMessage(self, msg):
- if msg.type == Message.ACK: # respond to ACK requests
- response = Message()
- response.type, response.data = msg.type, msg.data
- self.writeMessage(response)
- elif msg.type == Message.CHUNK_SIZE:
- self.readChunkSize = struct.unpack('>L', msg.data)[0]
-
+
def connectionClosed(self):
yield
-
+
def parse(self):
try:
yield self.parseRequests() # parse http requests
#yield self.connectionClosed()
if _debug: print 'parse connection closed'
#yield self.server.queue.put((self, None)) # close connection
-
+
def removeConnection(self):
yield self.server.queue.put((self, None)) # close connection
except:
print traceback.print_exc()
#self.writeQueue.put(message)
-
- def parseHandshake(self):
- '''Parses the rtmp handshake'''
- data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
- yield self.stream.write(data)
- data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
- yield self.stream.write(data)
-
+
def parseRequests(self):
'''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
- print "parseRequests start", repr(self)
+ if _debug:
+ print "parseRequests start", repr(self)
self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
self.hr.close_connection = 1
self.cookies = CookieJar()
# prepare reading the request so that when it's handed
# over to "standard" HTTPRequestHandler, the data's already
# there.
- print "parseRequests"
+ if _debug:
+ print "parseRequests"
try:
readok = (yield multitask.readable(self.stream.sock, 5000))
except select.error:
print "select error: connection closed"
raise ConnectionClosed
- print "readok", readok
- print
+ if _debug:
+ print "readok", readok
+ print
raw_requestline = (yield self.stream.readline())
if _debug: print "parseRequests, line", raw_requestline
if not raw_requestline:
- raise ConnectionClosed
+ raise ConnectionClosed
data = ''
try:
while 1:
print 'parseRequests', \
(traceback and traceback.print_exc() or None)
- raise ConnectionClosed
+ raise ConnectionClosed
self.hr.raw_requestline = raw_requestline
#pos = self.hr.rfile.tell()
pos = 0
self.hr.rfile.truncate(0)
self.hr.rfile.write(data)
- print "parseRequests write after"
+ if _debug:
+ print "parseRequests write after"
self.hr.rfile.seek(pos)
- print "parseRequests seek after"
+ if _debug:
+ print "parseRequests seek after"
if not self.hr.parse_request():
- raise ConnectionClosed
- print "parseRequests parse_req after"
- print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
+ raise ConnectionClosed
+ if _debug:
+ print "parseRequests parse_req after"
+ print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
try:
yield self.messageReceived(self.hr)
except ConnectionClosed:
if _debug:
print 'parseRequests, ConnectionClosed '
raise StopIteration
-
+
except:
if _debug:
print 'messageReceived', \
while self.writeQueue.empty(): (yield multitask.sleep(0.01))
data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
if _debug: print "write to stream", repr(data)
- if data is None:
+ if data is None:
# just in case TCP socket is not closed, close it.
try:
- print "stream closing"
- print self.stream
+ if _debug:
+ print "stream closing"
+ print self.stream
yield self.stream.close()
- print "stream closed"
+ if _debug:
+ print "stream closed"
except: pass
break
-
+
try:
yield self.stream.write(data)
except ConnectionClosed:
print traceback.print_exc()
print "ending protocol write loop", repr(self)
-class Command(object):
- ''' Class for command / data messages'''
- def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
- '''Create a new command with given type, name, id, cmdData and args list.'''
- self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
-
- def __repr__(self):
- return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
-
- def setArg(self, arg):
- self.args.append(arg)
-
- def getArg(self, index):
- return self.args[index]
-
- @classmethod
- def fromMessage(cls, message):
- ''' initialize from a parsed RTMP message'''
- assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
-
- length = len(message.data)
- if length == 0: raise ValueError('zero length message data')
-
- if message.type == Message.RPC3 or message.type == Message.DATA3:
- assert message.data[0] == '\x00' # must be 0 in AMD3
- data = message.data[1:]
- else:
- data = message.data
-
- amfReader = amf.AMF0(data)
-
- inst = cls()
- inst.name = amfReader.read() # first field is command name
-
- try:
- if message.type == Message.RPC:
- inst.id = amfReader.read() # second field *may* be message id
- inst.cmdData = amfReader.read() # third is command data
- else:
- inst.id = 0
- inst.args = [] # others are optional
- while True:
- inst.args.append(amfReader.read())
- except EOFError:
- pass
- return inst
-
- def toMessage(self):
- msg = Message()
- assert self.type
- msg.type = self.type
- output = amf.BytesIO()
- amfWriter = amf.AMF0(output)
- amfWriter.write(self.name)
- if msg.type == Message.RPC or msg.type == Message.RPC3:
- amfWriter.write(self.id)
- amfWriter.write(self.cmdData)
- for arg in self.args:
- amfWriter.write(arg)
- output.seek(0)
- #hexdump.hexdump(output)
- #output.seek(0)
- if msg.type == Message.RPC3 or msg.type == Message.DATA3:
- data = '\x00' + output.read()
- else:
- data = output.read()
- msg.data = data
- output.close()
- return msg
-
-def getfilename(path, name, root):
- '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
- the the path present in the path variable.'''
- ignore, ignore, scope = path.partition('/')
- if scope: scope = scope + '/'
- result = root + scope + name + '.flv'
- if _debug: print 'filename=', result
- return result
class Stream(object):
'''The stream object that is used for RTMP stream.'''
- count = 0;
+ count = 0
def __init__(self, client):
self.client, self.id, self.name = client, 0, ''
self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
self.queue = multitask.Queue()
self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
if _debug: print self, 'created'
-
+
def close(self):
if _debug: print self, 'closing'
if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
if self.playfile is not None: self.playfile.close(); self.playfile = None
self.client = None # to clear the reference
pass
-
+
def __repr__(self):
- return self._name;
-
+ return self._name
+
def recv(self):
'''Generator to receive new Message on this stream, or None if stream is closed.'''
return self.queue.get()
-
- def send(self, msg):
- '''Method to send a Message or Command on this stream.'''
- if isinstance(msg, Command):
- msg = msg.toMessage()
- msg.streamId = self.id
- # if _debug: print self,'send'
- if self.client is not None: self.client.writeMessage(msg)
-
+
+
class Client(Protocol):
'''The client object represents a single connected client to the server.'''
def __init__(self, sock, server, remote):
Protocol.__init__(self, sock)
self.server = server
self.remote = remote
- self.agent = {}
self.streams = {}
- self._nextCallId = 2
- self._nextStreamId = 1
- self.objectEncoding = 0.0
self.queue = multitask.Queue() # receive queue used by application
multitask.add(self.parse())
#multitask.add(self.write())
def recv(self):
'''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
return self.queue.get()
-
+
def connectionClosed(self):
'''Called when the client drops the connection'''
if _debug: 'Client.connectionClosed'
yield self.stream.close()
else:
yield None
-
+
def messageReceived(self, msg):
if _debug: print 'messageReceived cmd=', msg.command, msg.path
msg.response_cookies = process_cookies(msg.headers, self.remote)
- # slightly bad, this: read everything, put it into memory...
+ # slightly bad, this: read everything, put it into memory,
+ # but it helps to jump-start the project and enables use of
+ # standard http://python.org BaseHTTPRequestHandler.
+ # modification of mimetools.Message (actually rfc822) is
+ # otherwise required.
if msg.headers.has_key('content-length'):
max_chunk_size = 10*1024*1024
size_remaining = int(msg.headers["content-length"])
yield self.server.queue.put((self, msg)) # new connection
- def accept(self):
- '''Method to accept an incoming client.'''
- response = Command()
- response.id, response.name, response.type = 1, '_result', Message.RPC
- if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
- response.setArg(dict(level='status', code='NetConnection.Connect.Success',
- description='Connection succeeded.', details=None,
- objectEncoding=self.objectEncoding))
- self.writeMessage(response.toMessage())
-
def rejectConnection(self, reason=''):
- '''Method to reject an incoming client.'''
- response = Command()
- response.id, response.name, response.type = 1, '_error', Message.RPC
- response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
- description=reason, details=None))
- self.writeMessage(response.toMessage())
-
- def call(self, method, *args):
- '''Call a (callback) method on the client.'''
- cmd = Command()
- cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
- cmd.args, cmd.cmdData = args, None
- self._nextCallId += 1
- if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
- self.writeMessage(cmd.toMessage())
-
- def createStream(self):
- ''' Create a stream on the server side'''
- stream = Stream(self)
- stream.id = self._nextStreamId
- self.streams[stream.id] = stream
- self._nextStreamId += 1
- return stream
+ '''Method to reject an incoming client. just close.
+ TODO: report back an HTTP error with "reason" in it.
+ '''
+ self.removeConnection()
class Server(object):
'''Generator to wait for incoming client connections on this server and return
(client, args) or (None, None) if the socket is closed or some error.'''
return self.queue.get()
-
+
def run(self):
try:
while True:
sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
if sock == None:
- if _debug: print 'rtmp.Server accept(sock) returned None.'
+ if _debug: print 'rtmp.Server accept(sock) returned None.'
break
if _debug: print 'connection received from', remote
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
client = Client(sock, self, remote)
- except:
+ except:
if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
traceback.print_exc()
-
+
if (self.sock):
try: self.sock.close(); self.sock = None
except: pass
def clients(self):
'''everytime this property is accessed it returns a new list of clients connected to this instance.'''
return self._clients[1:] if self._clients is not None else []
-
+
+
class App(BaseApp, SimpleAppHTTPRequestHandler):
pass
-
+
+
class HTTPServer(object):
- '''A RTMP server to record and stream Flash video.'''
+ '''A RTMP server to record and stream HTTP.'''
def __init__(self):
'''Construct a new HttpServer. It initializes the local members.'''
self.sock = self.server = None
self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
self.root = ''
-
+
def start(self, host='0.0.0.0', port=8080):
'''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
if _debug: print 'start', host, port
sock.listen(5)
server = self.server = Server(sock) # start rtmp server on that socket
multitask.add(self.serverlistener())
-
+
def stop(self):
- if _debug: print 'stopping Flash server'
+ if _debug: print 'stopping HTTP server'
if self.server and self.sock:
try: self.sock.close(); self.sock = None
except: pass
self.server = None
-
+
def serverlistener(self):
'''Server listener (generator). It accepts all connections and invokes client listener'''
try:
del self.clients[session]
continue
- # if client.objectEncoding != 0 and client.objectEncoding != 3:
- if client.objectEncoding != 0:
- yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
- yield client.connectionClosed()
- else:
+ if _debug:
print "cookies", str(msg.response_cookies)
- session = msg.response_cookies['session'].value
- client.session = session
- name = msg.path
+ session = msg.response_cookies['session'].value
+ client.session = session
+ name = msg.path
+ if _debug:
print "serverlistener", name
- if '*' not in self.apps and name not in self.apps:
- yield client.rejectConnection(reason='Application not found: ' + name)
- else: # create application instance as needed and add in our list
- if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
- app = self.apps[name] if name in self.apps else self.apps['*'] # application class
+ if '*' not in self.apps and name not in self.apps:
+ yield client.rejectConnection(reason='Application not found: ' + name)
+ else: # create application instance as needed and add in our list
+ if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
+ app = self.apps[name] if name in self.apps else self.apps['*'] # application class
+ if _debug:
print "clients", self.clients.keys()
- if session in self.clients:
- inst = self.clients[session][0]
- else:
- inst = app()
- self.clients[session] = [inst]; inst._clients=self.clients[session]
- msg.server = inst # whew! just in time!
- try:
- methodname = "on%s" % msg.command
+ if session in self.clients:
+ inst = self.clients[session][0]
+ else:
+ inst = app()
+ self.clients[session] = [inst]; inst._clients=self.clients[session]
+ msg.server = inst # whew! just in time!
+ try:
+ methodname = "on%s" % msg.command
+ if _debug:
print methodname, dir(inst)
- method = getattr(inst, methodname, None)
- result = method(client, msg)
- close_connection = msg.close_connection
+ method = getattr(inst, methodname, None)
+ yield method(client, msg)
+ result = None
+ close_connection = msg.close_connection
+ if _debug:
print "close connection", close_connection
- except:
- if _debug: traceback.print_exc()
- yield client.rejectConnection(reason='Exception on %s' % methodname)
- continue
- if result is True or result is None:
- if result is None:
- msg.wfile.seek(0)
- data = msg.wfile.read()
- msg.wfile.seek(0)
- msg.wfile.truncate()
- yield client.writeMessage(data)
- if close_connection:
+ except:
+ if _debug: traceback.print_exc()
+ yield client.rejectConnection(reason='Exception on %s' % methodname)
+ continue
+ if result is True or result is None:
+ if result is None:
+ msg.wfile.seek(0)
+ data = msg.wfile.read()
+ msg.wfile.seek(0)
+ msg.wfile.truncate()
+ yield client.writeMessage(data)
+ if close_connection:
+ if _debug:
+ print 'close_connection requested'
+ try:
+ yield client.connectionClosed()
+ raise ConnectionClosed
+ except ConnectionClosed:
if _debug:
- print 'close_connection requested'
- try:
- yield client.connectionClosed()
- raise ConnectionClosed
- except ConnectionClosed:
- if _debug:
- print 'close_connection done'
- pass
- else:
+ print 'close_connection done'
+ pass
+ else:
+ if _debug:
print "result", result
- yield client.rejectConnection(reason='Rejected in onConnect')
+ yield client.rejectConnection(reason='Rejected in onConnect')
except StopIteration: raise
- except:
+ except:
if _debug:
print 'serverlistener exception', \
(sys and sys.exc_info() or None)
traceback.print_exc()
-
- def clientlistener(self, client):
- '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
- try:
- while True:
- msg, arg = (yield client.recv()) # receive new message from client
- if not msg: # if the client disconnected,
- if _debug: print 'connection closed from client'
- break # come out of listening loop.
- if msg == 'command': # handle a new command
- multitask.add(self.clienthandler(client, arg))
- elif msg == 'stream': # a new stream is created, handle the stream.
- arg.client = client
- multitask.add(self.streamlistener(arg))
- except StopIteration: raise
- except:
- if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
- traceback.print_exc()
-
- # client is disconnected, clear our state for application instance.
- if _debug: print 'cleaning up client', client.path
- inst = None
- if client.path in self.clients:
- inst = self.clients[client.path][0]
- self.clients[client.path].remove(client)
- for stream in client.streams.values(): # for all streams of this client
- self.closehandler(stream)
- client.streams.clear() # and clear the collection of streams
- if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
- if _debug: print 'removing the application instance'
- inst = self.clients[client.path][0]
- inst._clients = None
- del self.clients[client.path]
- if inst is not None: inst.onDisconnect(client)
-
- def closehandler(self, stream):
- '''A stream is closed explicitly when a closeStream command is received from given client.'''
- if stream.client is not None:
- inst = self.clients[stream.client.path][0]
- if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
- inst.onClose(stream.client, stream)
- del inst.publishers[stream.name]
- if stream.name in inst.players and stream in inst.players[stream.name]:
- inst.onStop(stream.client, stream)
- inst.players[stream.name].remove(stream)
- if len(inst.players[stream.name]) == 0:
- del inst.players[stream.name]
- stream.close()
-
- def clienthandler(self, client, cmd):
- '''A generator to handle a single command on the client.'''
- inst = self.clients[client.path][0]
- if inst:
- if cmd.name == '_error':
- if hasattr(inst, 'onStatus'):
- result = inst.onStatus(client, cmd.args[0])
- elif cmd.name == '_result':
- if hasattr(inst, 'onResult'):
- result = inst.onResult(client, cmd.args[0])
- else:
- res, code, args = Command(), '_result', dict()
- try: result = inst.onCommand(client, cmd.name, *cmd.args)
- except:
- if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
- code, args = '_error', dict()
- res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
- res.args, res.cmdData = args, None
- if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
- client.writeMessage(res.toMessage())
- # TODO return result to caller
- yield
-
- def streamlistener(self, stream):
- '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
- stream.recordfile = None # so that it doesn't complain about missing attribute
- while True:
- msg = (yield stream.recv())
- if not msg:
- if _debug: print 'stream closed'
- self.closehandler(stream)
- break
- # if _debug: msg
- multitask.add(self.streamhandler(stream, msg))
-
- def streamhandler(self, stream, message):
- '''A generator to handle a single message on the stream.'''
- try:
- if message.type == Message.RPC:
- cmd = Command.fromMessage(message)
- if _debug: print 'streamhandler received cmd=', cmd
- if cmd.name == 'publish':
- yield self.publishhandler(stream, cmd)
- elif cmd.name == 'play':
- yield self.playhandler(stream, cmd)
- elif cmd.name == 'closeStream':
- self.closehandler(stream)
- else: # audio or video message
- yield self.mediahandler(stream, message)
- except GeneratorExit: pass
- except StopIteration: pass
- except:
- if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
-
- def publishhandler(self, stream, cmd):
- '''A new stream is published. Store the information in the application instance.'''
- try:
- stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
- stream.name = cmd.args[0]
- if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
- inst = self.clients[stream.client.path][0]
- if (stream.name in inst.publishers):
- raise ValueError, 'Stream name already in use'
- inst.publishers[stream.name] = stream # store the client for publisher
- result = inst.onPublish(stream.client, stream)
-
- if stream.mode == 'record' or stream.mode == 'append':
- stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
- response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
- yield stream.send(response)
- except ValueError, E: # some error occurred. inform the app.
- if _debug: print 'error in publishing stream', str(E)
- response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
- yield stream.send(response)
-
- def playhandler(self, stream, cmd):
- '''A new stream is being played. Just updated the players list with this stream.'''
- inst = self.clients[stream.client.path][0]
- name = stream.name = cmd.args[0] # store the stream's name
- start = cmd.args[1] if len(cmd.args) >= 2 else -2
- if name not in inst.players:
- inst.players[name] = [] # initialize the players for this stream name
- if stream not in inst.players[name]: # store the stream as players of this name
- inst.players[name].append(stream)
- path = getfilename(stream.client.path, stream.name, self.root)
- if os.path.exists(path):
- stream.playfile = FLV().open(path)
- multitask.add(stream.playfile.reader(stream))
- if _debug: print 'playing stream=', name, 'start=', start
- result = inst.onPlay(stream.client, stream)
- response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
- yield stream.send(response)
-
- def mediahandler(self, stream, message):
- '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
- if stream.client is not None:
- inst = self.clients[stream.client.path][0]
- result = inst.onPublishData(stream.client, stream, message)
- if result:
- client = stream.client
- for s in (inst.players.get(stream.name, [])):
- # if _debug: print 'D', stream.name, s.name
- result = inst.onPlayData(s.client, s, message)
- if result:
- yield s.send(message)
- if stream.recordfile is not None:
- stream.recordfile.write(message)
+
# The main routine to start, run and stop the service
if __name__ == '__main__':
parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
(options, args) = parser.parse_args()
-
+
_debug = options.verbose
try:
if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
agent = HTTPServer()
agent.root = options.root
agent.start(options.host, options.port)
- if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
+ if _debug: print time.asctime(), 'HTTP Server Starts - %s:%d' % (options.host, options.port)
multitask.run()
except KeyboardInterrupt:
print traceback.print_exc()
except:
print "exception"
print traceback.print_exc()
- if _debug: print time.asctime(), 'Flash Server Stops'
+ if _debug: print time.asctime(), 'HTTP Server Stops'