sort out cookie session headers
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
12
13
14 Typically an application can launch this server as follows:
15 $ python rtmp.py
16
17 To know the command line options use the -h option:
18 $ python rtmp.py -h
19
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
23
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
32
33
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
39
40
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
44
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
47 App.__init__(self)
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
52 yield
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
54 ...
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
56
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
60
61 '''
62
63 import os, sys, time, struct, socket, traceback, multitask
64 import threading, Queue
65 import uuid
66 from string import strip
67
68 from BaseHTTPServer import BaseHTTPRequestHandler
69 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
70 from cStringIO import StringIO
71 from cookielib import parse_ns_headers, CookieJar
72 from Cookie import SimpleCookie
73
74 _debug = False
75
76 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
77
78 def setup(self):
79 self.connection = self.request
80 self.rfile = StringIO()
81 self.wfile = StringIO()
82
83 def finish(self):
84 pass
85
86 def info(self):
87 return self.headers
88
89 def get_full_url(self):
90 return self.path
91
92 def get_header(self, hdr, default):
93 return self.headers.getheader(hdr, default)
94
95 def add_cookies(self):
96 for k, v in self.response_cookies.items():
97 val = v.OutputString()
98 self.send_header("Set-Cookie", val)
99
100 class ConnectionClosed:
101 'raised when the client closed the connection'
102
103 class SockStream(object):
104 '''A class that represents a socket as a stream'''
105 def __init__(self, sock):
106 self.sock, self.buffer = sock, ''
107 self.bytesWritten = self.bytesRead = 0
108
109 def close(self):
110 print "sock close"
111 fno = self.sock.fileno()
112 self.sock.close()
113 print "writable?", self.sock, fno
114 yield multitask.writable(fno, timeout=0.1)
115 print "is it?"
116 del self.sock
117 self.sock = None
118
119 def readline(self):
120 try:
121 while True:
122 nl = self.buffer.find("\n")
123 if nl >= 0: # do not have newline in buffer
124 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
125 raise StopIteration(data)
126 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
127 if not data: raise ConnectionClosed
128 if _debug: print 'socket.read[%d] %r'%(len(data), data)
129 self.bytesRead += len(data)
130 self.buffer += data
131 except StopIteration: raise
132 except: raise ConnectionClosed # anything else is treated as connection closed.
133
134 def read(self, count):
135 try:
136 while True:
137 if len(self.buffer) >= count: # do not have data in buffer
138 data, self.buffer = self.buffer[:count], self.buffer[count:]
139 raise StopIteration(data)
140 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
141 if not data: raise ConnectionClosed
142 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
143 self.bytesRead += len(data)
144 self.buffer += data
145 except StopIteration: raise
146 except: raise ConnectionClosed # anything else is treated as connection closed.
147
148 def unread(self, data):
149 self.buffer = data + self.buffer
150
151 def write(self, data):
152 while len(data) > 0: # write in 4K chunks each time
153 chunk, data = data[:4096], data[4096:]
154 self.bytesWritten += len(chunk)
155 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
156 try: yield multitask.send(self.sock, chunk)
157 except: raise ConnectionClosed
158 if _debug: print 'socket.written'
159
160
161 class Header(object):
162 FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
163
164 def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
165 self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
166 self.extendedtime = 0
167 if channel<64: self.hdrdata = chr(channel)
168 elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
169 else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
170
171 def _appendExtendedTimestamp(self, data):
172 if self.time == 0xFFFFFF:
173 data += struct.pack('>I', self.extendedtime)
174 return data
175
176 def toBytes(self, control):
177 data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
178 if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
179
180 data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
181 if control == Header.TIME: return self._appendExtendedTimestamp(data)
182
183 data += struct.pack('>I', self.size)[1:] # size
184 data += chr(self.type) # type
185 if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
186
187 data += struct.pack('<I', self.streamId) # add streamId
188 return self._appendExtendedTimestamp(data)
189
190 def __repr__(self):
191 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
192 % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
193
194 class Message(object):
195 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
196 RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
197 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
198
199 def __init__(self, hdr=None, data=''):
200 if hdr is None: hdr = Header()
201 self.header, self.data = hdr, data
202
203 # define properties type, streamId and time to access self.header.(property)
204 for p in ['type', 'streamId', 'time']:
205 exec 'def _g%s(self): return self.header.%s'%(p, p)
206 exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
207 exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
208 @property
209 def size(self): return len(self.data)
210
211 def __repr__(self):
212 return ("<Message header=%r data=%r>"% (self.header, self.data))
213
214 class Protocol(object):
215 # constants
216 PING_SIZE = 1536
217 DEFAULT_CHUNK_SIZE = 128
218 MIN_CHANNEL_ID = 3
219 PROTOCOL_CHANNEL_ID = 2
220
221 def __init__(self, sock):
222 self.stream = SockStream(sock)
223 self.lastReadHeaders = dict() # indexed by channelId
224 self.incompletePackets = dict() #indexed by channelId
225 self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
226 self.lastWriteHeaders = dict() # indexed by streamId
227 self.nextChannelId = Protocol.MIN_CHANNEL_ID
228 self.writeLock = threading.Lock()
229 self.writeQueue = Queue.Queue()
230
231 def messageReceived(self, msg):
232 yield
233
234 def protocolMessage(self, msg):
235 if msg.type == Message.ACK: # respond to ACK requests
236 response = Message()
237 response.type, response.data = msg.type, msg.data
238 self.writeMessage(response)
239 elif msg.type == Message.CHUNK_SIZE:
240 self.readChunkSize = struct.unpack('>L', msg.data)[0]
241
242 def connectionClosed(self):
243 yield
244
245 def parse(self):
246 try:
247 yield self.parseRequests() # parse http requests
248 except ConnectionClosed:
249 yield self.connectionClosed()
250 if _debug: print 'parse connection closed'
251
252 def writeMessage(self, message):
253 self.writeQueue.put(message)
254
255 def parseHandshake(self):
256 '''Parses the rtmp handshake'''
257 data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
258 yield self.stream.write(data)
259 data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
260 yield self.stream.write(data)
261
262 def parseRequests(self):
263 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
264 self.hr = MultitaskHTTPRequestHandler(self.stream, ("",0,), self.remote)
265 self.hr.close_connection = 1
266 self.cookies = CookieJar()
267
268 while True:
269
270 # prepare reading the request so that when it's handed
271 # over to "standard" HTTPRequestHandler, the data's already
272 # there.
273 print "parseRequests"
274 raw_requestline = (yield self.stream.readline())
275 if _debug: print "parseRequests, line", raw_requestline
276 if not raw_requestline:
277 raise ConnectionClosed
278 data = ''
279 try:
280 while 1:
281 line = (yield self.stream.readline())
282 data += line
283 if line in ['\n', '\r\n']:
284 break
285 except StopIteration:
286 if _debug: print "parseRequests, stopiter"
287 raise
288 except:
289 if _debug:
290 print 'parseRequests', \
291 (traceback and traceback.print_exc() or None)
292
293 raise ConnectionClosed
294
295 self.hr.raw_requestline = raw_requestline
296 self.hr.rfile.write(data)
297 print "parseRequests write after"
298 self.hr.rfile.seek(0)
299 print "parseRequests seek after"
300
301 if not self.hr.parse_request():
302 raise ConnectionClosed
303 print "parseRequests parse_req after"
304 try:
305 yield self.messageReceived(self.hr)
306 except:
307 if _debug:
308 print 'messageReceived', \
309 (traceback and traceback.print_exc() or None)
310
311 def write(self):
312 '''Writes messages to stream'''
313 while True:
314 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
315 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
316 if _debug: print "write to stream", repr(data)
317 if data is None:
318 # just in case TCP socket is not closed, close it.
319 try:
320 print "stream closing"
321 print self.stream
322 yield self.stream.close()
323 print "stream closed"
324 except: pass
325 break
326
327 try:
328 yield self.stream.write(data)
329 except ConnectionClosed:
330 yield self.connectionClosed()
331 except:
332 print traceback.print_exc()
333
334 class Command(object):
335 ''' Class for command / data messages'''
336 def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
337 '''Create a new command with given type, name, id, cmdData and args list.'''
338 self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
339
340 def __repr__(self):
341 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
342
343 def setArg(self, arg):
344 self.args.append(arg)
345
346 def getArg(self, index):
347 return self.args[index]
348
349 @classmethod
350 def fromMessage(cls, message):
351 ''' initialize from a parsed RTMP message'''
352 assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
353
354 length = len(message.data)
355 if length == 0: raise ValueError('zero length message data')
356
357 if message.type == Message.RPC3 or message.type == Message.DATA3:
358 assert message.data[0] == '\x00' # must be 0 in AMD3
359 data = message.data[1:]
360 else:
361 data = message.data
362
363 amfReader = amf.AMF0(data)
364
365 inst = cls()
366 inst.name = amfReader.read() # first field is command name
367
368 try:
369 if message.type == Message.RPC:
370 inst.id = amfReader.read() # second field *may* be message id
371 inst.cmdData = amfReader.read() # third is command data
372 else:
373 inst.id = 0
374 inst.args = [] # others are optional
375 while True:
376 inst.args.append(amfReader.read())
377 except EOFError:
378 pass
379 return inst
380
381 def toMessage(self):
382 msg = Message()
383 assert self.type
384 msg.type = self.type
385 output = amf.BytesIO()
386 amfWriter = amf.AMF0(output)
387 amfWriter.write(self.name)
388 if msg.type == Message.RPC or msg.type == Message.RPC3:
389 amfWriter.write(self.id)
390 amfWriter.write(self.cmdData)
391 for arg in self.args:
392 amfWriter.write(arg)
393 output.seek(0)
394 #hexdump.hexdump(output)
395 #output.seek(0)
396 if msg.type == Message.RPC3 or msg.type == Message.DATA3:
397 data = '\x00' + output.read()
398 else:
399 data = output.read()
400 msg.data = data
401 output.close()
402 return msg
403
404 def getfilename(path, name, root):
405 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
406 the the path present in the path variable.'''
407 ignore, ignore, scope = path.partition('/')
408 if scope: scope = scope + '/'
409 result = root + scope + name + '.flv'
410 if _debug: print 'filename=', result
411 return result
412
413 class Stream(object):
414 '''The stream object that is used for RTMP stream.'''
415 count = 0;
416 def __init__(self, client):
417 self.client, self.id, self.name = client, 0, ''
418 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
419 self.queue = multitask.Queue()
420 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
421 if _debug: print self, 'created'
422
423 def close(self):
424 if _debug: print self, 'closing'
425 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
426 if self.playfile is not None: self.playfile.close(); self.playfile = None
427 self.client = None # to clear the reference
428 pass
429
430 def __repr__(self):
431 return self._name;
432
433 def recv(self):
434 '''Generator to receive new Message on this stream, or None if stream is closed.'''
435 return self.queue.get()
436
437 def send(self, msg):
438 '''Method to send a Message or Command on this stream.'''
439 if isinstance(msg, Command):
440 msg = msg.toMessage()
441 msg.streamId = self.id
442 # if _debug: print self,'send'
443 if self.client is not None: self.client.writeMessage(msg)
444
445 class Client(Protocol):
446 '''The client object represents a single connected client to the server.'''
447 def __init__(self, sock, server, remote):
448 Protocol.__init__(self, sock)
449 self.server = server
450 self.remote = remote
451 self.agent = {}
452 self.streams = {}
453 self._nextCallId = 2
454 self._nextStreamId = 1
455 self.objectEncoding = 0.0
456 self.queue = multitask.Queue() # receive queue used by application
457 multitask.add(self.parse())
458 multitask.add(self.write())
459
460 def recv(self):
461 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
462 return self.queue.get()
463
464 def connectionClosed(self):
465 '''Called when the client drops the connection'''
466 if _debug: 'Client.connectionClosed'
467 self.writeMessage(None)
468 yield self.queue.put((None,None))
469
470 def messageReceived(self, msg):
471 if _debug: print 'messageReceived cmd=', msg.command, msg.path
472 ch = msg.headers.getheaders("Cookie")
473 print "messageReceived cookieheaders=", '; '.join(ch)
474 res = []
475 for c in ch:
476 c = c.split(";")
477 c = map(strip, c)
478 res += c
479 has_sess = False
480 msg.response_cookies = SimpleCookie()
481 for c in res:
482 print "found cookie", str(c)
483 name, value = c.split("=")
484 msg.response_cookies[name] = value
485 msg.response_cookies[name]['path'] = "/"
486 msg.response_cookies[name]['domain'] = self.remote[0]
487 #msg.response_cookies[name]['expires'] = 'None'
488 msg.response_cookies[name]['version'] = 0
489 if name == "session":
490 has_sess = True
491 if not has_sess:
492 msg.response_cookies['session'] = uuid.uuid4().hex
493 #msg.response_cookies['session']['expires'] = 'None'
494 msg.response_cookies['session']['path'] = '/'
495 msg.response_cookies['session']['domain'] = self.remote[0]
496 msg.response_cookies['session']['version'] = 0
497 yield self.server.queue.put((self, msg)) # new connection
498
499 def accept(self):
500 '''Method to accept an incoming client.'''
501 response = Command()
502 response.id, response.name, response.type = 1, '_result', Message.RPC
503 if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
504 response.setArg(dict(level='status', code='NetConnection.Connect.Success',
505 description='Connection succeeded.', details=None,
506 objectEncoding=self.objectEncoding))
507 self.writeMessage(response.toMessage())
508
509 def rejectConnection(self, reason=''):
510 '''Method to reject an incoming client.'''
511 response = Command()
512 response.id, response.name, response.type = 1, '_error', Message.RPC
513 response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
514 description=reason, details=None))
515 self.writeMessage(response.toMessage())
516
517 def call(self, method, *args):
518 '''Call a (callback) method on the client.'''
519 cmd = Command()
520 cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
521 cmd.args, cmd.cmdData = args, None
522 self._nextCallId += 1
523 if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
524 self.writeMessage(cmd.toMessage())
525
526 def createStream(self):
527 ''' Create a stream on the server side'''
528 stream = Stream(self)
529 stream.id = self._nextStreamId
530 self.streams[stream.id] = stream
531 self._nextStreamId += 1
532 return stream
533
534
535 class Server(object):
536 '''A RTMP server listens for incoming connections and informs the app.'''
537 def __init__(self, sock):
538 '''Create an RTMP server on the given bound TCP socket. The server will terminate
539 when the socket is disconnected, or some other error occurs in listening.'''
540 self.sock = sock
541 self.queue = multitask.Queue() # queue to receive incoming client connections
542 multitask.add(self.run())
543
544 def recv(self):
545 '''Generator to wait for incoming client connections on this server and return
546 (client, args) or (None, None) if the socket is closed or some error.'''
547 return self.queue.get()
548
549 def run(self):
550 try:
551 while True:
552 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
553 if sock == None:
554 if _debug: print 'rtmp.Server accept(sock) returned None.'
555 break
556 if _debug: print 'connection received from', remote
557 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
558 client = Client(sock, self, remote)
559 except:
560 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
561 traceback.print_exc()
562
563 if (self.sock):
564 try: self.sock.close(); self.sock = None
565 except: pass
566 if (self.queue):
567 yield self.queue.put((None, None))
568 self.queue = None
569
570 class App(SimpleAppHTTPRequestHandler):
571 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
572 count = 0
573 def __init__(self):
574 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
575 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
576 if _debug: print self.name, 'created'
577 def __del__(self):
578 if _debug: print self.name, 'destroyed'
579 @property
580 def clients(self):
581 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
582 return self._clients[1:] if self._clients is not None else []
583 def onConnect(self, client, *args):
584 if _debug: print self.name, 'onConnect', client.path
585 return True
586 def onDisconnect(self, client):
587 if _debug: print self.name, 'onDisconnect', client.path
588
589 class HttpServer(object):
590 '''A RTMP server to record and stream Flash video.'''
591 def __init__(self):
592 '''Construct a new HttpServer. It initializes the local members.'''
593 self.sock = self.server = None
594 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
595 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
596 self.root = ''
597
598 def start(self, host='0.0.0.0', port=8080):
599 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
600 if _debug: print 'start', host, port
601 if not self.server:
602 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
603 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
604 sock.bind((host, port))
605 if _debug: print 'listening on ', sock.getsockname()
606 sock.listen(5)
607 server = self.server = Server(sock) # start rtmp server on that socket
608 multitask.add(self.serverlistener())
609
610 def stop(self):
611 if _debug: print 'stopping Flash server'
612 if self.server and self.sock:
613 try: self.sock.close(); self.sock = None
614 except: pass
615 self.server = None
616
617 def serverlistener(self):
618 '''Server listener (generator). It accepts all connections and invokes client listener'''
619 try:
620 while True: # main loop to receive new connections on the server
621 client, msg = (yield self.server.recv()) # receive an incoming client connection.
622 # TODO: we should reject non-localhost client connections.
623 if not client: # if the server aborted abnormally,
624 break # hence close the listener.
625 if _debug: print 'client connection received', client, args
626 # if client.objectEncoding != 0 and client.objectEncoding != 3:
627 if client.objectEncoding != 0:
628 yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
629 yield client.connectionClosed()
630 else:
631 print "cookies", str(msg.response_cookies)
632 session = msg.response_cookies['session'].value
633 name, ignore, scope = msg.path.partition('/')
634 if '*' not in self.apps and name not in self.apps:
635 yield client.rejectConnection(reason='Application not found: ' + name)
636 else: # create application instance as needed and add in our list
637 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
638 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
639 print "clients", self.clients.keys()
640 if session in self.clients: inst = self.clients[session][0]
641 else: inst = app()
642 try:
643 methodname = "on%s" % msg.command
644 method = getattr(inst, methodname, None)
645 result = method(client, msg)
646 close_connection = msg.close_connection
647 print "close connection", close_connection
648 except:
649 if _debug: traceback.print_exc()
650 yield client.rejectConnection(reason='Exception on %s' % methodname)
651 continue
652 if result is True or result is None:
653 if session not in self.clients:
654 self.clients[session] = [inst]; inst._clients=self.clients[session]
655 self.clients[session].append(client)
656 msg.wfile.seek(0)
657 data = msg.wfile.read()
658 msg.wfile.seek(0)
659 msg.wfile.truncate()
660 yield client.writeMessage(data)
661 if close_connection:
662 if _debug:
663 print 'close_connection requested'
664 try:
665 yield client.connectionClosed()
666 except ClientClosed:
667 if _debug:
668 print 'close_connection done'
669 pass
670 else:
671 yield client.rejectConnection(reason='Rejected in onConnect')
672 except StopIteration: raise
673 except:
674 if _debug:
675 print 'serverlistener exception', \
676 (sys and sys.exc_info() or None)
677 traceback.print_exc()
678
679 def clientlistener(self, client):
680 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
681 try:
682 while True:
683 msg, arg = (yield client.recv()) # receive new message from client
684 if not msg: # if the client disconnected,
685 if _debug: print 'connection closed from client'
686 break # come out of listening loop.
687 if msg == 'command': # handle a new command
688 multitask.add(self.clienthandler(client, arg))
689 elif msg == 'stream': # a new stream is created, handle the stream.
690 arg.client = client
691 multitask.add(self.streamlistener(arg))
692 except StopIteration: raise
693 except:
694 if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
695 traceback.print_exc()
696
697 # client is disconnected, clear our state for application instance.
698 if _debug: print 'cleaning up client', client.path
699 inst = None
700 if client.path in self.clients:
701 inst = self.clients[client.path][0]
702 self.clients[client.path].remove(client)
703 for stream in client.streams.values(): # for all streams of this client
704 self.closehandler(stream)
705 client.streams.clear() # and clear the collection of streams
706 if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
707 if _debug: print 'removing the application instance'
708 inst = self.clients[client.path][0]
709 inst._clients = None
710 del self.clients[client.path]
711 if inst is not None: inst.onDisconnect(client)
712
713 def closehandler(self, stream):
714 '''A stream is closed explicitly when a closeStream command is received from given client.'''
715 if stream.client is not None:
716 inst = self.clients[stream.client.path][0]
717 if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
718 inst.onClose(stream.client, stream)
719 del inst.publishers[stream.name]
720 if stream.name in inst.players and stream in inst.players[stream.name]:
721 inst.onStop(stream.client, stream)
722 inst.players[stream.name].remove(stream)
723 if len(inst.players[stream.name]) == 0:
724 del inst.players[stream.name]
725 stream.close()
726
727 def clienthandler(self, client, cmd):
728 '''A generator to handle a single command on the client.'''
729 inst = self.clients[client.path][0]
730 if inst:
731 if cmd.name == '_error':
732 if hasattr(inst, 'onStatus'):
733 result = inst.onStatus(client, cmd.args[0])
734 elif cmd.name == '_result':
735 if hasattr(inst, 'onResult'):
736 result = inst.onResult(client, cmd.args[0])
737 else:
738 res, code, args = Command(), '_result', dict()
739 try: result = inst.onCommand(client, cmd.name, *cmd.args)
740 except:
741 if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
742 code, args = '_error', dict()
743 res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
744 res.args, res.cmdData = args, None
745 if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
746 client.writeMessage(res.toMessage())
747 # TODO return result to caller
748 yield
749
750 def streamlistener(self, stream):
751 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
752 stream.recordfile = None # so that it doesn't complain about missing attribute
753 while True:
754 msg = (yield stream.recv())
755 if not msg:
756 if _debug: print 'stream closed'
757 self.closehandler(stream)
758 break
759 # if _debug: msg
760 multitask.add(self.streamhandler(stream, msg))
761
762 def streamhandler(self, stream, message):
763 '''A generator to handle a single message on the stream.'''
764 try:
765 if message.type == Message.RPC:
766 cmd = Command.fromMessage(message)
767 if _debug: print 'streamhandler received cmd=', cmd
768 if cmd.name == 'publish':
769 yield self.publishhandler(stream, cmd)
770 elif cmd.name == 'play':
771 yield self.playhandler(stream, cmd)
772 elif cmd.name == 'closeStream':
773 self.closehandler(stream)
774 else: # audio or video message
775 yield self.mediahandler(stream, message)
776 except GeneratorExit: pass
777 except StopIteration: pass
778 except:
779 if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
780
781 def publishhandler(self, stream, cmd):
782 '''A new stream is published. Store the information in the application instance.'''
783 try:
784 stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
785 stream.name = cmd.args[0]
786 if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
787 inst = self.clients[stream.client.path][0]
788 if (stream.name in inst.publishers):
789 raise ValueError, 'Stream name already in use'
790 inst.publishers[stream.name] = stream # store the client for publisher
791 result = inst.onPublish(stream.client, stream)
792
793 if stream.mode == 'record' or stream.mode == 'append':
794 stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
795 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
796 yield stream.send(response)
797 except ValueError, E: # some error occurred. inform the app.
798 if _debug: print 'error in publishing stream', str(E)
799 response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
800 yield stream.send(response)
801
802 def playhandler(self, stream, cmd):
803 '''A new stream is being played. Just updated the players list with this stream.'''
804 inst = self.clients[stream.client.path][0]
805 name = stream.name = cmd.args[0] # store the stream's name
806 start = cmd.args[1] if len(cmd.args) >= 2 else -2
807 if name not in inst.players:
808 inst.players[name] = [] # initialize the players for this stream name
809 if stream not in inst.players[name]: # store the stream as players of this name
810 inst.players[name].append(stream)
811 path = getfilename(stream.client.path, stream.name, self.root)
812 if os.path.exists(path):
813 stream.playfile = FLV().open(path)
814 multitask.add(stream.playfile.reader(stream))
815 if _debug: print 'playing stream=', name, 'start=', start
816 result = inst.onPlay(stream.client, stream)
817 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
818 yield stream.send(response)
819
820 def mediahandler(self, stream, message):
821 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
822 if stream.client is not None:
823 inst = self.clients[stream.client.path][0]
824 result = inst.onPublishData(stream.client, stream, message)
825 if result:
826 client = stream.client
827 for s in (inst.players.get(stream.name, [])):
828 # if _debug: print 'D', stream.name, s.name
829 result = inst.onPlayData(s.client, s, message)
830 if result:
831 yield s.send(message)
832 if stream.recordfile is not None:
833 stream.recordfile.write(message)
834
835 # The main routine to start, run and stop the service
836 if __name__ == '__main__':
837 print "optparse"
838 from optparse import OptionParser
839 parser = OptionParser()
840 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
841 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
842 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
843 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
844 (options, args) = parser.parse_args()
845
846 _debug = options.verbose
847 try:
848 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
849 agent = HttpServer()
850 agent.root = options.root
851 agent.start(options.host, options.port)
852 if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
853 multitask.run()
854 except KeyboardInterrupt:
855 print traceback.print_exc()
856 pass
857 except:
858 print "exception"
859 print traceback.print_exc()
860 if _debug: print time.asctime(), 'Flash Server Stops'