Merge branch 'master' of pyjs.org:multitaskhttpd
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
12
13
14 Typically an application can launch this server as follows:
15 $ python rtmp.py
16
17 To know the command line options use the -h option:
18 $ python rtmp.py -h
19
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
23
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
32
33
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
39
40
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
44
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
47 App.__init__(self)
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
52 yield
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
54 ...
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
56
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
60
61 '''
62
63 import os, sys, time, struct, socket, traceback, multitask
64 import threading, Queue
65 import uuid
66 from string import strip
67
68 from BaseHTTPServer import BaseHTTPRequestHandler
69 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
70 from cStringIO import StringIO
71 from cookielib import parse_ns_headers, CookieJar
72 from Cookie import SimpleCookie
73
74 global _debug
75 _debug = False
76 def set_debug(dbg):
77 global _debug
78 _debug = dbg
79
80 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
81
82 def setup(self):
83 self.connection = self.request
84 self.rfile = StringIO()
85 self.wfile = StringIO()
86
87 def finish(self):
88 pass
89
90 def info(self):
91 return self.headers
92
93 def get_full_url(self):
94 return self.path
95
96 def get_header(self, hdr, default):
97 return self.headers.getheader(hdr, default)
98
99 def add_cookies(self):
100 for k, v in self.response_cookies.items():
101 val = v.OutputString()
102 self.send_header("Set-Cookie", val)
103
104 class ConnectionClosed:
105 'raised when the client closed the connection'
106
107 class SockStream(object):
108 '''A class that represents a socket as a stream'''
109 def __init__(self, sock):
110 self.sock, self.buffer = sock, ''
111 self.bytesWritten = self.bytesRead = 0
112
113 def close(self):
114 self.sock.shutdown(1) # can't just close it.
115 self.sock = None
116
117 def readline(self):
118 try:
119 while True:
120 nl = self.buffer.find("\n")
121 if nl >= 0: # do not have newline in buffer
122 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
123 raise StopIteration(data)
124 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
125 if not data: raise ConnectionClosed
126 if _debug: print 'socket.read[%d] %r'%(len(data), data)
127 self.bytesRead += len(data)
128 self.buffer += data
129 except StopIteration: raise
130 except: raise ConnectionClosed # anything else is treated as connection closed.
131
132 def read(self, count):
133 try:
134 while True:
135 if len(self.buffer) >= count: # do not have data in buffer
136 data, self.buffer = self.buffer[:count], self.buffer[count:]
137 raise StopIteration(data)
138 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
139 if not data: raise ConnectionClosed
140 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
141 self.bytesRead += len(data)
142 self.buffer += data
143 except StopIteration: raise
144 except: raise ConnectionClosed # anything else is treated as connection closed.
145
146 def unread(self, data):
147 self.buffer = data + self.buffer
148
149 def write(self, data):
150 while len(data) > 0: # write in 4K chunks each time
151 chunk, data = data[:4096], data[4096:]
152 self.bytesWritten += len(chunk)
153 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
154 try: yield multitask.send(self.sock, chunk)
155 except: raise ConnectionClosed
156 if _debug: print 'socket.written'
157
158
159 class Header(object):
160 FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
161
162 def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
163 self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
164 self.extendedtime = 0
165 if channel<64: self.hdrdata = chr(channel)
166 elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
167 else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
168
169 def _appendExtendedTimestamp(self, data):
170 if self.time == 0xFFFFFF:
171 data += struct.pack('>I', self.extendedtime)
172 return data
173
174 def toBytes(self, control):
175 data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
176 if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
177
178 data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
179 if control == Header.TIME: return self._appendExtendedTimestamp(data)
180
181 data += struct.pack('>I', self.size)[1:] # size
182 data += chr(self.type) # type
183 if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
184
185 data += struct.pack('<I', self.streamId) # add streamId
186 return self._appendExtendedTimestamp(data)
187
188 def __repr__(self):
189 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
190 % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
191
192 class Message(object):
193 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
194 RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
195 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
196
197 def __init__(self, hdr=None, data=''):
198 if hdr is None: hdr = Header()
199 self.header, self.data = hdr, data
200
201 # define properties type, streamId and time to access self.header.(property)
202 for p in ['type', 'streamId', 'time']:
203 exec 'def _g%s(self): return self.header.%s'%(p, p)
204 exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
205 exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
206 @property
207 def size(self): return len(self.data)
208
209 def __repr__(self):
210 return ("<Message header=%r data=%r>"% (self.header, self.data))
211
212 class Protocol(object):
213 # constants
214 PING_SIZE = 1536
215 DEFAULT_CHUNK_SIZE = 128
216 MIN_CHANNEL_ID = 3
217 PROTOCOL_CHANNEL_ID = 2
218
219 def __init__(self, sock):
220 self.stream = SockStream(sock)
221 self.lastReadHeaders = dict() # indexed by channelId
222 self.incompletePackets = dict() #indexed by channelId
223 self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
224 self.lastWriteHeaders = dict() # indexed by streamId
225 self.nextChannelId = Protocol.MIN_CHANNEL_ID
226 self.writeLock = threading.Lock()
227 self.writeQueue = Queue.Queue()
228
229 def messageReceived(self, msg):
230 yield
231
232 def protocolMessage(self, msg):
233 if msg.type == Message.ACK: # respond to ACK requests
234 response = Message()
235 response.type, response.data = msg.type, msg.data
236 self.writeMessage(response)
237 elif msg.type == Message.CHUNK_SIZE:
238 self.readChunkSize = struct.unpack('>L', msg.data)[0]
239
240 def connectionClosed(self):
241 yield
242
243 def parse(self):
244 try:
245 yield self.parseRequests() # parse http requests
246 except ConnectionClosed:
247 yield self.connectionClosed()
248 if _debug: print 'parse connection closed'
249
250 def writeMessage(self, message):
251 self.writeQueue.put(message)
252
253 def parseHandshake(self):
254 '''Parses the rtmp handshake'''
255 data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
256 yield self.stream.write(data)
257 data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
258 yield self.stream.write(data)
259
260 def parseRequests(self):
261 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
262 self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
263 self.hr.close_connection = 1
264 self.cookies = CookieJar()
265
266 while True:
267
268 # prepare reading the request so that when it's handed
269 # over to "standard" HTTPRequestHandler, the data's already
270 # there.
271 print "parseRequests"
272 raw_requestline = (yield self.stream.readline())
273 if _debug: print "parseRequests, line", raw_requestline
274 if not raw_requestline:
275 raise ConnectionClosed
276 data = ''
277 try:
278 while 1:
279 line = (yield self.stream.readline())
280 data += line
281 if line in ['\n', '\r\n']:
282 break
283 except StopIteration:
284 if _debug: print "parseRequests, stopiter"
285 raise
286 except:
287 if _debug:
288 print 'parseRequests', \
289 (traceback and traceback.print_exc() or None)
290
291 raise ConnectionClosed
292
293 self.hr.raw_requestline = raw_requestline
294 self.hr.rfile.write(data)
295 print "parseRequests write after"
296 self.hr.rfile.seek(0)
297 print "parseRequests seek after"
298
299 if not self.hr.parse_request():
300 raise ConnectionClosed
301 print "parseRequests parse_req after"
302 try:
303 yield self.messageReceived(self.hr)
304 except:
305 if _debug:
306 print 'messageReceived', \
307 (traceback and traceback.print_exc() or None)
308
309 def write(self):
310 '''Writes messages to stream'''
311 while True:
312 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
313 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
314 if _debug: print "write to stream", repr(data)
315 if data is None:
316 # just in case TCP socket is not closed, close it.
317 try:
318 print "stream closing"
319 print self.stream
320 yield self.stream.close()
321 print "stream closed"
322 except: pass
323 break
324
325 try:
326 yield self.stream.write(data)
327 except ConnectionClosed:
328 yield self.connectionClosed()
329 except:
330 print traceback.print_exc()
331
332 class Command(object):
333 ''' Class for command / data messages'''
334 def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
335 '''Create a new command with given type, name, id, cmdData and args list.'''
336 self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
337
338 def __repr__(self):
339 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
340
341 def setArg(self, arg):
342 self.args.append(arg)
343
344 def getArg(self, index):
345 return self.args[index]
346
347 @classmethod
348 def fromMessage(cls, message):
349 ''' initialize from a parsed RTMP message'''
350 assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
351
352 length = len(message.data)
353 if length == 0: raise ValueError('zero length message data')
354
355 if message.type == Message.RPC3 or message.type == Message.DATA3:
356 assert message.data[0] == '\x00' # must be 0 in AMD3
357 data = message.data[1:]
358 else:
359 data = message.data
360
361 amfReader = amf.AMF0(data)
362
363 inst = cls()
364 inst.name = amfReader.read() # first field is command name
365
366 try:
367 if message.type == Message.RPC:
368 inst.id = amfReader.read() # second field *may* be message id
369 inst.cmdData = amfReader.read() # third is command data
370 else:
371 inst.id = 0
372 inst.args = [] # others are optional
373 while True:
374 inst.args.append(amfReader.read())
375 except EOFError:
376 pass
377 return inst
378
379 def toMessage(self):
380 msg = Message()
381 assert self.type
382 msg.type = self.type
383 output = amf.BytesIO()
384 amfWriter = amf.AMF0(output)
385 amfWriter.write(self.name)
386 if msg.type == Message.RPC or msg.type == Message.RPC3:
387 amfWriter.write(self.id)
388 amfWriter.write(self.cmdData)
389 for arg in self.args:
390 amfWriter.write(arg)
391 output.seek(0)
392 #hexdump.hexdump(output)
393 #output.seek(0)
394 if msg.type == Message.RPC3 or msg.type == Message.DATA3:
395 data = '\x00' + output.read()
396 else:
397 data = output.read()
398 msg.data = data
399 output.close()
400 return msg
401
402 def getfilename(path, name, root):
403 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
404 the the path present in the path variable.'''
405 ignore, ignore, scope = path.partition('/')
406 if scope: scope = scope + '/'
407 result = root + scope + name + '.flv'
408 if _debug: print 'filename=', result
409 return result
410
411 class Stream(object):
412 '''The stream object that is used for RTMP stream.'''
413 count = 0;
414 def __init__(self, client):
415 self.client, self.id, self.name = client, 0, ''
416 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
417 self.queue = multitask.Queue()
418 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
419 if _debug: print self, 'created'
420
421 def close(self):
422 if _debug: print self, 'closing'
423 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
424 if self.playfile is not None: self.playfile.close(); self.playfile = None
425 self.client = None # to clear the reference
426 pass
427
428 def __repr__(self):
429 return self._name;
430
431 def recv(self):
432 '''Generator to receive new Message on this stream, or None if stream is closed.'''
433 return self.queue.get()
434
435 def send(self, msg):
436 '''Method to send a Message or Command on this stream.'''
437 if isinstance(msg, Command):
438 msg = msg.toMessage()
439 msg.streamId = self.id
440 # if _debug: print self,'send'
441 if self.client is not None: self.client.writeMessage(msg)
442
443 class Client(Protocol):
444 '''The client object represents a single connected client to the server.'''
445 def __init__(self, sock, server, remote):
446 Protocol.__init__(self, sock)
447 self.server = server
448 self.remote = remote
449 self.agent = {}
450 self.streams = {}
451 self._nextCallId = 2
452 self._nextStreamId = 1
453 self.objectEncoding = 0.0
454 self.queue = multitask.Queue() # receive queue used by application
455 multitask.add(self.parse())
456 multitask.add(self.write())
457
458 def recv(self):
459 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
460 return self.queue.get()
461
462 def connectionClosed(self):
463 '''Called when the client drops the connection'''
464 if _debug: 'Client.connectionClosed'
465 self.writeMessage(None)
466 yield self.queue.put((None,None))
467
468 def messageReceived(self, msg):
469 if _debug: print 'messageReceived cmd=', msg.command, msg.path
470 ch = msg.headers.getheaders("Cookie")
471 print "messageReceived cookieheaders=", '; '.join(ch)
472 res = []
473 for c in ch:
474 c = c.split(";")
475 c = map(strip, c)
476 res += c
477 has_sess = False
478 msg.response_cookies = SimpleCookie()
479 for c in res:
480 print "found cookie", str(c)
481 name, value = c.split("=")
482 msg.response_cookies[name] = value
483 msg.response_cookies[name]['path'] = "/"
484 msg.response_cookies[name]['domain'] = self.remote[0]
485 #msg.response_cookies[name]['expires'] = 'None'
486 msg.response_cookies[name]['version'] = 0
487 if name == "session":
488 has_sess = True
489 if not has_sess:
490 msg.response_cookies['session'] = uuid.uuid4().hex
491 #msg.response_cookies['session']['expires'] = 'None'
492 msg.response_cookies['session']['path'] = '/'
493 msg.response_cookies['session']['domain'] = self.remote[0]
494 msg.response_cookies['session']['version'] = 0
495
496 if msg.headers.has_key('content-length'):
497 max_chunk_size = 10*1024*1024
498 size_remaining = int(msg.headers["content-length"])
499 L = []
500 while size_remaining:
501 chunk_size = min(size_remaining, max_chunk_size)
502 data = (yield self.stream.read(chunk_size))
503 L.append(data)
504 size_remaining -= len(L[-1])
505
506 pos = msg.rfile.tell()
507 msg.rfile.write(''.join(L))
508 msg.rfile.seek(pos)
509
510 yield self.server.queue.put((self, msg)) # new connection
511
512 def accept(self):
513 '''Method to accept an incoming client.'''
514 response = Command()
515 response.id, response.name, response.type = 1, '_result', Message.RPC
516 if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
517 response.setArg(dict(level='status', code='NetConnection.Connect.Success',
518 description='Connection succeeded.', details=None,
519 objectEncoding=self.objectEncoding))
520 self.writeMessage(response.toMessage())
521
522 def rejectConnection(self, reason=''):
523 '''Method to reject an incoming client.'''
524 response = Command()
525 response.id, response.name, response.type = 1, '_error', Message.RPC
526 response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
527 description=reason, details=None))
528 self.writeMessage(response.toMessage())
529
530 def call(self, method, *args):
531 '''Call a (callback) method on the client.'''
532 cmd = Command()
533 cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
534 cmd.args, cmd.cmdData = args, None
535 self._nextCallId += 1
536 if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
537 self.writeMessage(cmd.toMessage())
538
539 def createStream(self):
540 ''' Create a stream on the server side'''
541 stream = Stream(self)
542 stream.id = self._nextStreamId
543 self.streams[stream.id] = stream
544 self._nextStreamId += 1
545 return stream
546
547
548 class Server(object):
549 '''A RTMP server listens for incoming connections and informs the app.'''
550 def __init__(self, sock):
551 '''Create an RTMP server on the given bound TCP socket. The server will terminate
552 when the socket is disconnected, or some other error occurs in listening.'''
553 self.sock = sock
554 self.queue = multitask.Queue() # queue to receive incoming client connections
555 multitask.add(self.run())
556
557 def recv(self):
558 '''Generator to wait for incoming client connections on this server and return
559 (client, args) or (None, None) if the socket is closed or some error.'''
560 return self.queue.get()
561
562 def run(self):
563 try:
564 while True:
565 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
566 if sock == None:
567 if _debug: print 'rtmp.Server accept(sock) returned None.'
568 break
569 if _debug: print 'connection received from', remote
570 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
571 client = Client(sock, self, remote)
572 except:
573 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
574 traceback.print_exc()
575
576 if (self.sock):
577 try: self.sock.close(); self.sock = None
578 except: pass
579 if (self.queue):
580 yield self.queue.put((None, None))
581 self.queue = None
582
583 class BaseApp(object):
584 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
585 count = 0
586 def __init__(self):
587 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
588 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
589 if _debug: print self.name, 'created'
590 def __del__(self):
591 if _debug: print self.name, 'destroyed'
592 @property
593 def clients(self):
594 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
595 return self._clients[1:] if self._clients is not None else []
596
597 class App(BaseApp, SimpleAppHTTPRequestHandler):
598 pass
599
600 class HTTPServer(object):
601 '''A RTMP server to record and stream Flash video.'''
602 def __init__(self):
603 '''Construct a new HttpServer. It initializes the local members.'''
604 self.sock = self.server = None
605 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
606 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
607 self.root = ''
608
609 def start(self, host='0.0.0.0', port=8080):
610 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
611 if _debug: print 'start', host, port
612 if not self.server:
613 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
614 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
615 sock.bind((host, port))
616 if _debug: print 'listening on ', sock.getsockname()
617 sock.listen(5)
618 server = self.server = Server(sock) # start rtmp server on that socket
619 multitask.add(self.serverlistener())
620
621 def stop(self):
622 if _debug: print 'stopping Flash server'
623 if self.server and self.sock:
624 try: self.sock.close(); self.sock = None
625 except: pass
626 self.server = None
627
628 def serverlistener(self):
629 '''Server listener (generator). It accepts all connections and invokes client listener'''
630 try:
631 while True: # main loop to receive new connections on the server
632 client, msg = (yield self.server.recv()) # receive an incoming client connection.
633 # TODO: we should reject non-localhost client connections.
634 if not client: # if the server aborted abnormally,
635 break # hence close the listener.
636 if _debug: print 'client connection received', client, msg
637 # if client.objectEncoding != 0 and client.objectEncoding != 3:
638 if client.objectEncoding != 0:
639 yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
640 yield client.connectionClosed()
641 else:
642 print "cookies", str(msg.response_cookies)
643 session = msg.response_cookies['session'].value
644 name = msg.path
645 print "serverlistener", name
646 if '*' not in self.apps and name not in self.apps:
647 yield client.rejectConnection(reason='Application not found: ' + name)
648 else: # create application instance as needed and add in our list
649 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
650 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
651 print "clients", self.clients.keys()
652 if session in self.clients:
653 inst = self.clients[session][0]
654 else:
655 inst = app()
656 msg.server = inst # whew! just in time!
657 try:
658 methodname = "on%s" % msg.command
659 print methodname, dir(inst)
660 method = getattr(inst, methodname, None)
661 result = method(client, msg)
662 close_connection = msg.close_connection
663 print "close connection", close_connection
664 except:
665 if _debug: traceback.print_exc()
666 yield client.rejectConnection(reason='Exception on %s' % methodname)
667 continue
668 if result is True or result is None:
669 if session not in self.clients:
670 self.clients[session] = [inst]; inst._clients=self.clients[session]
671 self.clients[session].append(client)
672 msg.wfile.seek(0)
673 data = msg.wfile.read()
674 msg.wfile.seek(0)
675 msg.wfile.truncate()
676 yield client.writeMessage(data)
677 if close_connection:
678 if _debug:
679 print 'close_connection requested'
680 try:
681 yield client.connectionClosed()
682 except ClientClosed:
683 if _debug:
684 print 'close_connection done'
685 pass
686 else:
687 yield client.rejectConnection(reason='Rejected in onConnect')
688 except StopIteration: raise
689 except:
690 if _debug:
691 print 'serverlistener exception', \
692 (sys and sys.exc_info() or None)
693 traceback.print_exc()
694
695 def clientlistener(self, client):
696 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
697 try:
698 while True:
699 msg, arg = (yield client.recv()) # receive new message from client
700 if not msg: # if the client disconnected,
701 if _debug: print 'connection closed from client'
702 break # come out of listening loop.
703 if msg == 'command': # handle a new command
704 multitask.add(self.clienthandler(client, arg))
705 elif msg == 'stream': # a new stream is created, handle the stream.
706 arg.client = client
707 multitask.add(self.streamlistener(arg))
708 except StopIteration: raise
709 except:
710 if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
711 traceback.print_exc()
712
713 # client is disconnected, clear our state for application instance.
714 if _debug: print 'cleaning up client', client.path
715 inst = None
716 if client.path in self.clients:
717 inst = self.clients[client.path][0]
718 self.clients[client.path].remove(client)
719 for stream in client.streams.values(): # for all streams of this client
720 self.closehandler(stream)
721 client.streams.clear() # and clear the collection of streams
722 if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
723 if _debug: print 'removing the application instance'
724 inst = self.clients[client.path][0]
725 inst._clients = None
726 del self.clients[client.path]
727 if inst is not None: inst.onDisconnect(client)
728
729 def closehandler(self, stream):
730 '''A stream is closed explicitly when a closeStream command is received from given client.'''
731 if stream.client is not None:
732 inst = self.clients[stream.client.path][0]
733 if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
734 inst.onClose(stream.client, stream)
735 del inst.publishers[stream.name]
736 if stream.name in inst.players and stream in inst.players[stream.name]:
737 inst.onStop(stream.client, stream)
738 inst.players[stream.name].remove(stream)
739 if len(inst.players[stream.name]) == 0:
740 del inst.players[stream.name]
741 stream.close()
742
743 def clienthandler(self, client, cmd):
744 '''A generator to handle a single command on the client.'''
745 inst = self.clients[client.path][0]
746 if inst:
747 if cmd.name == '_error':
748 if hasattr(inst, 'onStatus'):
749 result = inst.onStatus(client, cmd.args[0])
750 elif cmd.name == '_result':
751 if hasattr(inst, 'onResult'):
752 result = inst.onResult(client, cmd.args[0])
753 else:
754 res, code, args = Command(), '_result', dict()
755 try: result = inst.onCommand(client, cmd.name, *cmd.args)
756 except:
757 if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
758 code, args = '_error', dict()
759 res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
760 res.args, res.cmdData = args, None
761 if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
762 client.writeMessage(res.toMessage())
763 # TODO return result to caller
764 yield
765
766 def streamlistener(self, stream):
767 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
768 stream.recordfile = None # so that it doesn't complain about missing attribute
769 while True:
770 msg = (yield stream.recv())
771 if not msg:
772 if _debug: print 'stream closed'
773 self.closehandler(stream)
774 break
775 # if _debug: msg
776 multitask.add(self.streamhandler(stream, msg))
777
778 def streamhandler(self, stream, message):
779 '''A generator to handle a single message on the stream.'''
780 try:
781 if message.type == Message.RPC:
782 cmd = Command.fromMessage(message)
783 if _debug: print 'streamhandler received cmd=', cmd
784 if cmd.name == 'publish':
785 yield self.publishhandler(stream, cmd)
786 elif cmd.name == 'play':
787 yield self.playhandler(stream, cmd)
788 elif cmd.name == 'closeStream':
789 self.closehandler(stream)
790 else: # audio or video message
791 yield self.mediahandler(stream, message)
792 except GeneratorExit: pass
793 except StopIteration: pass
794 except:
795 if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
796
797 def publishhandler(self, stream, cmd):
798 '''A new stream is published. Store the information in the application instance.'''
799 try:
800 stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
801 stream.name = cmd.args[0]
802 if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
803 inst = self.clients[stream.client.path][0]
804 if (stream.name in inst.publishers):
805 raise ValueError, 'Stream name already in use'
806 inst.publishers[stream.name] = stream # store the client for publisher
807 result = inst.onPublish(stream.client, stream)
808
809 if stream.mode == 'record' or stream.mode == 'append':
810 stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
811 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
812 yield stream.send(response)
813 except ValueError, E: # some error occurred. inform the app.
814 if _debug: print 'error in publishing stream', str(E)
815 response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
816 yield stream.send(response)
817
818 def playhandler(self, stream, cmd):
819 '''A new stream is being played. Just updated the players list with this stream.'''
820 inst = self.clients[stream.client.path][0]
821 name = stream.name = cmd.args[0] # store the stream's name
822 start = cmd.args[1] if len(cmd.args) >= 2 else -2
823 if name not in inst.players:
824 inst.players[name] = [] # initialize the players for this stream name
825 if stream not in inst.players[name]: # store the stream as players of this name
826 inst.players[name].append(stream)
827 path = getfilename(stream.client.path, stream.name, self.root)
828 if os.path.exists(path):
829 stream.playfile = FLV().open(path)
830 multitask.add(stream.playfile.reader(stream))
831 if _debug: print 'playing stream=', name, 'start=', start
832 result = inst.onPlay(stream.client, stream)
833 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
834 yield stream.send(response)
835
836 def mediahandler(self, stream, message):
837 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
838 if stream.client is not None:
839 inst = self.clients[stream.client.path][0]
840 result = inst.onPublishData(stream.client, stream, message)
841 if result:
842 client = stream.client
843 for s in (inst.players.get(stream.name, [])):
844 # if _debug: print 'D', stream.name, s.name
845 result = inst.onPlayData(s.client, s, message)
846 if result:
847 yield s.send(message)
848 if stream.recordfile is not None:
849 stream.recordfile.write(message)
850
851 # The main routine to start, run and stop the service
852 if __name__ == '__main__':
853 print "optparse"
854 from optparse import OptionParser
855 parser = OptionParser()
856 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
857 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
858 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
859 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
860 (options, args) = parser.parse_args()
861
862 _debug = options.verbose
863 try:
864 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
865 agent = HTTPServer()
866 agent.root = options.root
867 agent.start(options.host, options.port)
868 if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
869 multitask.run()
870 except KeyboardInterrupt:
871 print traceback.print_exc()
872 pass
873 except:
874 print "exception"
875 print traceback.print_exc()
876 if _debug: print time.asctime(), 'Flash Server Stops'