55c10900a79298815fc15bc3854c453680210a8a
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
12
13
14 Typically an application can launch this server as follows:
15 $ python rtmp.py
16
17 To know the command line options use the -h option:
18 $ python rtmp.py -h
19
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
23
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
32
33
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
39
40
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
44
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
47 App.__init__(self)
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
52 yield
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
54 ...
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
56
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
60
61 '''
62
63 import os, sys, time, struct, socket, traceback, multitask
64 import threading, Queue
65 import uuid
66 from string import strip
67
68 from BaseHTTPServer import BaseHTTPRequestHandler
69 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
70 from cStringIO import StringIO
71 from cookielib import parse_ns_headers, CookieJar
72 from Cookie import SimpleCookie
73
74 global _debug
75 _debug = False
76 def set_debug(dbg):
77 global _debug
78 _debug = dbg
79
80 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
81
82 def setup(self):
83 self.connection = self.request
84 self.rfile = StringIO()
85 self.wfile = StringIO()
86
87 def finish(self):
88 pass
89
90 def info(self):
91 return self.headers
92
93 def get_full_url(self):
94 return self.path
95
96 def get_header(self, hdr, default):
97 return self.headers.getheader(hdr, default)
98
99 def add_cookies(self):
100 for k, v in self.response_cookies.items():
101 val = v.OutputString()
102 self.send_header("Set-Cookie", val)
103
104 def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
105 ch = headers.getheaders(cookie_key)
106 print "messageReceived cookieheaders=", '; '.join(ch)
107 res = []
108 for c in ch:
109 c = c.split(";")
110 if len(c) == 0:
111 continue
112 c = map(strip, c)
113 c = filter(lambda x: x, c)
114 res += c
115 has_sess = False
116 response_cookies = SimpleCookie()
117 for c in res:
118 print "found cookie", repr(c)
119 name, value = c.split("=")
120 response_cookies[name] = value
121 #response_cookies[name]['path'] = "/"
122 #response_cookies[name]['domain'] = remote[0]
123 #response_cookies[name]['version'] = 0
124 if name == "session":
125 response_cookies[name]['expires'] = 50000
126 has_sess = True
127 if not add_sess:
128 return response_cookies
129 if not has_sess:
130 response_cookies['session'] = uuid.uuid4().hex
131 response_cookies['session']['expires'] = 50000
132 #response_cookies['session']['path'] = '/'
133 #response_cookies['session']['domain'] = remote[0]
134 #response_cookies['session']['version'] = 0
135 return response_cookies
136
137 class ConnectionClosed:
138 'raised when the client closed the connection'
139
140 class SockStream(object):
141 '''A class that represents a socket as a stream'''
142 def __init__(self, sock):
143 self.sock, self.buffer = sock, ''
144 self.bytesWritten = self.bytesRead = 0
145
146 def close(self):
147 self.sock.shutdown(1) # can't just close it.
148 self.sock = None
149
150 def readline(self):
151 try:
152 while True:
153 nl = self.buffer.find("\n")
154 if nl >= 0: # do not have newline in buffer
155 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
156 raise StopIteration(data)
157 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
158 if not data: raise ConnectionClosed
159 if _debug: print 'socket.read[%d] %r'%(len(data), data)
160 self.bytesRead += len(data)
161 self.buffer += data
162 except StopIteration: raise
163 except: raise ConnectionClosed # anything else is treated as connection closed.
164
165 def read(self, count):
166 try:
167 while True:
168 if len(self.buffer) >= count: # do not have data in buffer
169 data, self.buffer = self.buffer[:count], self.buffer[count:]
170 raise StopIteration(data)
171 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
172 if not data: raise ConnectionClosed
173 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
174 self.bytesRead += len(data)
175 self.buffer += data
176 except StopIteration: raise
177 except: raise ConnectionClosed # anything else is treated as connection closed.
178
179 def unread(self, data):
180 self.buffer = data + self.buffer
181
182 def write(self, data):
183 while len(data) > 0: # write in 4K chunks each time
184 chunk, data = data[:4096], data[4096:]
185 self.bytesWritten += len(chunk)
186 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
187 try: yield multitask.send(self.sock, chunk)
188 except: raise ConnectionClosed
189 if _debug: print 'socket.written'
190
191
192 class Header(object):
193 FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
194
195 def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
196 self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
197 self.extendedtime = 0
198 if channel<64: self.hdrdata = chr(channel)
199 elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
200 else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
201
202 def _appendExtendedTimestamp(self, data):
203 if self.time == 0xFFFFFF:
204 data += struct.pack('>I', self.extendedtime)
205 return data
206
207 def toBytes(self, control):
208 data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
209 if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
210
211 data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
212 if control == Header.TIME: return self._appendExtendedTimestamp(data)
213
214 data += struct.pack('>I', self.size)[1:] # size
215 data += chr(self.type) # type
216 if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
217
218 data += struct.pack('<I', self.streamId) # add streamId
219 return self._appendExtendedTimestamp(data)
220
221 def __repr__(self):
222 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
223 % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
224
225 class Message(object):
226 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
227 RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
228 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
229
230 def __init__(self, hdr=None, data=''):
231 if hdr is None: hdr = Header()
232 self.header, self.data = hdr, data
233
234 # define properties type, streamId and time to access self.header.(property)
235 for p in ['type', 'streamId', 'time']:
236 exec 'def _g%s(self): return self.header.%s'%(p, p)
237 exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
238 exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
239 @property
240 def size(self): return len(self.data)
241
242 def __repr__(self):
243 return ("<Message header=%r data=%r>"% (self.header, self.data))
244
245 class Protocol(object):
246 # constants
247 PING_SIZE = 1536
248 DEFAULT_CHUNK_SIZE = 128
249 MIN_CHANNEL_ID = 3
250 PROTOCOL_CHANNEL_ID = 2
251
252 def __init__(self, sock):
253 self.stream = SockStream(sock)
254 self.lastReadHeaders = dict() # indexed by channelId
255 self.incompletePackets = dict() #indexed by channelId
256 self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
257 self.lastWriteHeaders = dict() # indexed by streamId
258 self.nextChannelId = Protocol.MIN_CHANNEL_ID
259 self.writeLock = threading.Lock()
260 self.writeQueue = Queue.Queue()
261
262 def messageReceived(self, msg):
263 yield
264
265 def protocolMessage(self, msg):
266 if msg.type == Message.ACK: # respond to ACK requests
267 response = Message()
268 response.type, response.data = msg.type, msg.data
269 self.writeMessage(response)
270 elif msg.type == Message.CHUNK_SIZE:
271 self.readChunkSize = struct.unpack('>L', msg.data)[0]
272
273 def connectionClosed(self):
274 yield
275
276 def parse(self):
277 try:
278 yield self.parseRequests() # parse http requests
279 except ConnectionClosed:
280 yield self.connectionClosed()
281 if _debug: print 'parse connection closed'
282
283 def writeMessage(self, message):
284 self.writeQueue.put(message)
285
286 def parseHandshake(self):
287 '''Parses the rtmp handshake'''
288 data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
289 yield self.stream.write(data)
290 data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
291 yield self.stream.write(data)
292
293 def parseRequests(self):
294 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
295 self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
296 self.hr.close_connection = 1
297 self.cookies = CookieJar()
298
299 while True:
300
301 # prepare reading the request so that when it's handed
302 # over to "standard" HTTPRequestHandler, the data's already
303 # there.
304 print "parseRequests"
305 raw_requestline = (yield self.stream.readline())
306 if _debug: print "parseRequests, line", raw_requestline
307 if not raw_requestline:
308 raise ConnectionClosed
309 data = ''
310 try:
311 while 1:
312 line = (yield self.stream.readline())
313 data += line
314 if line in ['\n', '\r\n']:
315 break
316 except StopIteration:
317 if _debug: print "parseRequests, stopiter"
318 raise
319 except:
320 if _debug:
321 print 'parseRequests', \
322 (traceback and traceback.print_exc() or None)
323
324 raise ConnectionClosed
325
326 self.hr.raw_requestline = raw_requestline
327 self.hr.rfile.write(data)
328 print "parseRequests write after"
329 self.hr.rfile.seek(0)
330 print "parseRequests seek after"
331
332 if not self.hr.parse_request():
333 raise ConnectionClosed
334 print "parseRequests parse_req after"
335 try:
336 yield self.messageReceived(self.hr)
337 except:
338 if _debug:
339 print 'messageReceived', \
340 (traceback and traceback.print_exc() or None)
341
342 def write(self):
343 '''Writes messages to stream'''
344 while True:
345 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
346 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
347 if _debug: print "write to stream", repr(data)
348 if data is None:
349 # just in case TCP socket is not closed, close it.
350 try:
351 print "stream closing"
352 print self.stream
353 yield self.stream.close()
354 print "stream closed"
355 except: pass
356 break
357
358 try:
359 yield self.stream.write(data)
360 except ConnectionClosed:
361 yield self.connectionClosed()
362 except:
363 print traceback.print_exc()
364
365 class Command(object):
366 ''' Class for command / data messages'''
367 def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
368 '''Create a new command with given type, name, id, cmdData and args list.'''
369 self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
370
371 def __repr__(self):
372 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
373
374 def setArg(self, arg):
375 self.args.append(arg)
376
377 def getArg(self, index):
378 return self.args[index]
379
380 @classmethod
381 def fromMessage(cls, message):
382 ''' initialize from a parsed RTMP message'''
383 assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
384
385 length = len(message.data)
386 if length == 0: raise ValueError('zero length message data')
387
388 if message.type == Message.RPC3 or message.type == Message.DATA3:
389 assert message.data[0] == '\x00' # must be 0 in AMD3
390 data = message.data[1:]
391 else:
392 data = message.data
393
394 amfReader = amf.AMF0(data)
395
396 inst = cls()
397 inst.name = amfReader.read() # first field is command name
398
399 try:
400 if message.type == Message.RPC:
401 inst.id = amfReader.read() # second field *may* be message id
402 inst.cmdData = amfReader.read() # third is command data
403 else:
404 inst.id = 0
405 inst.args = [] # others are optional
406 while True:
407 inst.args.append(amfReader.read())
408 except EOFError:
409 pass
410 return inst
411
412 def toMessage(self):
413 msg = Message()
414 assert self.type
415 msg.type = self.type
416 output = amf.BytesIO()
417 amfWriter = amf.AMF0(output)
418 amfWriter.write(self.name)
419 if msg.type == Message.RPC or msg.type == Message.RPC3:
420 amfWriter.write(self.id)
421 amfWriter.write(self.cmdData)
422 for arg in self.args:
423 amfWriter.write(arg)
424 output.seek(0)
425 #hexdump.hexdump(output)
426 #output.seek(0)
427 if msg.type == Message.RPC3 or msg.type == Message.DATA3:
428 data = '\x00' + output.read()
429 else:
430 data = output.read()
431 msg.data = data
432 output.close()
433 return msg
434
435 def getfilename(path, name, root):
436 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
437 the the path present in the path variable.'''
438 ignore, ignore, scope = path.partition('/')
439 if scope: scope = scope + '/'
440 result = root + scope + name + '.flv'
441 if _debug: print 'filename=', result
442 return result
443
444 class Stream(object):
445 '''The stream object that is used for RTMP stream.'''
446 count = 0;
447 def __init__(self, client):
448 self.client, self.id, self.name = client, 0, ''
449 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
450 self.queue = multitask.Queue()
451 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
452 if _debug: print self, 'created'
453
454 def close(self):
455 if _debug: print self, 'closing'
456 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
457 if self.playfile is not None: self.playfile.close(); self.playfile = None
458 self.client = None # to clear the reference
459 pass
460
461 def __repr__(self):
462 return self._name;
463
464 def recv(self):
465 '''Generator to receive new Message on this stream, or None if stream is closed.'''
466 return self.queue.get()
467
468 def send(self, msg):
469 '''Method to send a Message or Command on this stream.'''
470 if isinstance(msg, Command):
471 msg = msg.toMessage()
472 msg.streamId = self.id
473 # if _debug: print self,'send'
474 if self.client is not None: self.client.writeMessage(msg)
475
476 class Client(Protocol):
477 '''The client object represents a single connected client to the server.'''
478 def __init__(self, sock, server, remote):
479 Protocol.__init__(self, sock)
480 self.server = server
481 self.remote = remote
482 self.agent = {}
483 self.streams = {}
484 self._nextCallId = 2
485 self._nextStreamId = 1
486 self.objectEncoding = 0.0
487 self.queue = multitask.Queue() # receive queue used by application
488 multitask.add(self.parse())
489 multitask.add(self.write())
490
491 def recv(self):
492 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
493 return self.queue.get()
494
495 def connectionClosed(self):
496 '''Called when the client drops the connection'''
497 if _debug: 'Client.connectionClosed'
498 self.writeMessage(None)
499 yield self.queue.put((None,None))
500
501 def messageReceived(self, msg):
502 if _debug: print 'messageReceived cmd=', msg.command, msg.path
503 msg.response_cookies = process_cookies(msg.headers, self.remote)
504
505 if msg.headers.has_key('content-length'):
506 max_chunk_size = 10*1024*1024
507 size_remaining = int(msg.headers["content-length"])
508 L = []
509 while size_remaining:
510 chunk_size = min(size_remaining, max_chunk_size)
511 data = (yield self.stream.read(chunk_size))
512 L.append(data)
513 size_remaining -= len(L[-1])
514
515 pos = msg.rfile.tell()
516 msg.rfile.write(''.join(L))
517 msg.rfile.seek(pos)
518
519 yield self.server.queue.put((self, msg)) # new connection
520
521 def accept(self):
522 '''Method to accept an incoming client.'''
523 response = Command()
524 response.id, response.name, response.type = 1, '_result', Message.RPC
525 if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
526 response.setArg(dict(level='status', code='NetConnection.Connect.Success',
527 description='Connection succeeded.', details=None,
528 objectEncoding=self.objectEncoding))
529 self.writeMessage(response.toMessage())
530
531 def rejectConnection(self, reason=''):
532 '''Method to reject an incoming client.'''
533 response = Command()
534 response.id, response.name, response.type = 1, '_error', Message.RPC
535 response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
536 description=reason, details=None))
537 self.writeMessage(response.toMessage())
538
539 def call(self, method, *args):
540 '''Call a (callback) method on the client.'''
541 cmd = Command()
542 cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
543 cmd.args, cmd.cmdData = args, None
544 self._nextCallId += 1
545 if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
546 self.writeMessage(cmd.toMessage())
547
548 def createStream(self):
549 ''' Create a stream on the server side'''
550 stream = Stream(self)
551 stream.id = self._nextStreamId
552 self.streams[stream.id] = stream
553 self._nextStreamId += 1
554 return stream
555
556
557 class Server(object):
558 '''A RTMP server listens for incoming connections and informs the app.'''
559 def __init__(self, sock):
560 '''Create an RTMP server on the given bound TCP socket. The server will terminate
561 when the socket is disconnected, or some other error occurs in listening.'''
562 self.sock = sock
563 self.queue = multitask.Queue() # queue to receive incoming client connections
564 multitask.add(self.run())
565
566 def recv(self):
567 '''Generator to wait for incoming client connections on this server and return
568 (client, args) or (None, None) if the socket is closed or some error.'''
569 return self.queue.get()
570
571 def run(self):
572 try:
573 while True:
574 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
575 if sock == None:
576 if _debug: print 'rtmp.Server accept(sock) returned None.'
577 break
578 if _debug: print 'connection received from', remote
579 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
580 client = Client(sock, self, remote)
581 except:
582 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
583 traceback.print_exc()
584
585 if (self.sock):
586 try: self.sock.close(); self.sock = None
587 except: pass
588 if (self.queue):
589 yield self.queue.put((None, None))
590 self.queue = None
591
592 class BaseApp(object):
593 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
594 count = 0
595 def __init__(self):
596 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
597 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
598 if _debug: print self.name, 'created'
599 def __del__(self):
600 if _debug: print self.name, 'destroyed'
601 @property
602 def clients(self):
603 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
604 return self._clients[1:] if self._clients is not None else []
605
606 class App(BaseApp, SimpleAppHTTPRequestHandler):
607 pass
608
609 class HTTPServer(object):
610 '''A RTMP server to record and stream Flash video.'''
611 def __init__(self):
612 '''Construct a new HttpServer. It initializes the local members.'''
613 self.sock = self.server = None
614 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
615 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
616 self.root = ''
617
618 def start(self, host='0.0.0.0', port=8080):
619 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
620 if _debug: print 'start', host, port
621 if not self.server:
622 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
623 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
624 sock.bind((host, port))
625 if _debug: print 'listening on ', sock.getsockname()
626 sock.listen(5)
627 server = self.server = Server(sock) # start rtmp server on that socket
628 multitask.add(self.serverlistener())
629
630 def stop(self):
631 if _debug: print 'stopping Flash server'
632 if self.server and self.sock:
633 try: self.sock.close(); self.sock = None
634 except: pass
635 self.server = None
636
637 def serverlistener(self):
638 '''Server listener (generator). It accepts all connections and invokes client listener'''
639 try:
640 while True: # main loop to receive new connections on the server
641 client, msg = (yield self.server.recv()) # receive an incoming client connection.
642 # TODO: we should reject non-localhost client connections.
643 if not client: # if the server aborted abnormally,
644 break # hence close the listener.
645 if _debug: print 'client connection received', client, msg
646 # if client.objectEncoding != 0 and client.objectEncoding != 3:
647 if client.objectEncoding != 0:
648 yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
649 yield client.connectionClosed()
650 else:
651 print "cookies", str(msg.response_cookies)
652 session = msg.response_cookies['session'].value
653 name = msg.path
654 print "serverlistener", name
655 if '*' not in self.apps and name not in self.apps:
656 yield client.rejectConnection(reason='Application not found: ' + name)
657 else: # create application instance as needed and add in our list
658 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
659 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
660 print "clients", self.clients.keys()
661 if session in self.clients:
662 inst = self.clients[session][0]
663 else:
664 inst = app()
665 msg.server = inst # whew! just in time!
666 try:
667 methodname = "on%s" % msg.command
668 print methodname, dir(inst)
669 method = getattr(inst, methodname, None)
670 result = method(client, msg)
671 close_connection = msg.close_connection
672 print "close connection", close_connection
673 except:
674 if _debug: traceback.print_exc()
675 yield client.rejectConnection(reason='Exception on %s' % methodname)
676 continue
677 if result is True or result is None:
678 if session not in self.clients:
679 self.clients[session] = [inst]; inst._clients=self.clients[session]
680 if result is None:
681 msg.wfile.seek(0)
682 data = msg.wfile.read()
683 msg.wfile.seek(0)
684 msg.wfile.truncate()
685 yield client.writeMessage(data)
686 if close_connection:
687 if _debug:
688 print 'close_connection requested'
689 try:
690 yield client.connectionClosed()
691 except ConnectionClosed:
692 if _debug:
693 print 'close_connection done'
694 pass
695 else:
696 print "result", result
697 yield client.rejectConnection(reason='Rejected in onConnect')
698 except StopIteration: raise
699 except:
700 if _debug:
701 print 'serverlistener exception', \
702 (sys and sys.exc_info() or None)
703 traceback.print_exc()
704
705 def clientlistener(self, client):
706 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
707 try:
708 while True:
709 msg, arg = (yield client.recv()) # receive new message from client
710 if not msg: # if the client disconnected,
711 if _debug: print 'connection closed from client'
712 break # come out of listening loop.
713 if msg == 'command': # handle a new command
714 multitask.add(self.clienthandler(client, arg))
715 elif msg == 'stream': # a new stream is created, handle the stream.
716 arg.client = client
717 multitask.add(self.streamlistener(arg))
718 except StopIteration: raise
719 except:
720 if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
721 traceback.print_exc()
722
723 # client is disconnected, clear our state for application instance.
724 if _debug: print 'cleaning up client', client.path
725 inst = None
726 if client.path in self.clients:
727 inst = self.clients[client.path][0]
728 self.clients[client.path].remove(client)
729 for stream in client.streams.values(): # for all streams of this client
730 self.closehandler(stream)
731 client.streams.clear() # and clear the collection of streams
732 if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
733 if _debug: print 'removing the application instance'
734 inst = self.clients[client.path][0]
735 inst._clients = None
736 del self.clients[client.path]
737 if inst is not None: inst.onDisconnect(client)
738
739 def closehandler(self, stream):
740 '''A stream is closed explicitly when a closeStream command is received from given client.'''
741 if stream.client is not None:
742 inst = self.clients[stream.client.path][0]
743 if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
744 inst.onClose(stream.client, stream)
745 del inst.publishers[stream.name]
746 if stream.name in inst.players and stream in inst.players[stream.name]:
747 inst.onStop(stream.client, stream)
748 inst.players[stream.name].remove(stream)
749 if len(inst.players[stream.name]) == 0:
750 del inst.players[stream.name]
751 stream.close()
752
753 def clienthandler(self, client, cmd):
754 '''A generator to handle a single command on the client.'''
755 inst = self.clients[client.path][0]
756 if inst:
757 if cmd.name == '_error':
758 if hasattr(inst, 'onStatus'):
759 result = inst.onStatus(client, cmd.args[0])
760 elif cmd.name == '_result':
761 if hasattr(inst, 'onResult'):
762 result = inst.onResult(client, cmd.args[0])
763 else:
764 res, code, args = Command(), '_result', dict()
765 try: result = inst.onCommand(client, cmd.name, *cmd.args)
766 except:
767 if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
768 code, args = '_error', dict()
769 res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
770 res.args, res.cmdData = args, None
771 if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
772 client.writeMessage(res.toMessage())
773 # TODO return result to caller
774 yield
775
776 def streamlistener(self, stream):
777 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
778 stream.recordfile = None # so that it doesn't complain about missing attribute
779 while True:
780 msg = (yield stream.recv())
781 if not msg:
782 if _debug: print 'stream closed'
783 self.closehandler(stream)
784 break
785 # if _debug: msg
786 multitask.add(self.streamhandler(stream, msg))
787
788 def streamhandler(self, stream, message):
789 '''A generator to handle a single message on the stream.'''
790 try:
791 if message.type == Message.RPC:
792 cmd = Command.fromMessage(message)
793 if _debug: print 'streamhandler received cmd=', cmd
794 if cmd.name == 'publish':
795 yield self.publishhandler(stream, cmd)
796 elif cmd.name == 'play':
797 yield self.playhandler(stream, cmd)
798 elif cmd.name == 'closeStream':
799 self.closehandler(stream)
800 else: # audio or video message
801 yield self.mediahandler(stream, message)
802 except GeneratorExit: pass
803 except StopIteration: pass
804 except:
805 if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
806
807 def publishhandler(self, stream, cmd):
808 '''A new stream is published. Store the information in the application instance.'''
809 try:
810 stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
811 stream.name = cmd.args[0]
812 if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
813 inst = self.clients[stream.client.path][0]
814 if (stream.name in inst.publishers):
815 raise ValueError, 'Stream name already in use'
816 inst.publishers[stream.name] = stream # store the client for publisher
817 result = inst.onPublish(stream.client, stream)
818
819 if stream.mode == 'record' or stream.mode == 'append':
820 stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
821 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
822 yield stream.send(response)
823 except ValueError, E: # some error occurred. inform the app.
824 if _debug: print 'error in publishing stream', str(E)
825 response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
826 yield stream.send(response)
827
828 def playhandler(self, stream, cmd):
829 '''A new stream is being played. Just updated the players list with this stream.'''
830 inst = self.clients[stream.client.path][0]
831 name = stream.name = cmd.args[0] # store the stream's name
832 start = cmd.args[1] if len(cmd.args) >= 2 else -2
833 if name not in inst.players:
834 inst.players[name] = [] # initialize the players for this stream name
835 if stream not in inst.players[name]: # store the stream as players of this name
836 inst.players[name].append(stream)
837 path = getfilename(stream.client.path, stream.name, self.root)
838 if os.path.exists(path):
839 stream.playfile = FLV().open(path)
840 multitask.add(stream.playfile.reader(stream))
841 if _debug: print 'playing stream=', name, 'start=', start
842 result = inst.onPlay(stream.client, stream)
843 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
844 yield stream.send(response)
845
846 def mediahandler(self, stream, message):
847 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
848 if stream.client is not None:
849 inst = self.clients[stream.client.path][0]
850 result = inst.onPublishData(stream.client, stream, message)
851 if result:
852 client = stream.client
853 for s in (inst.players.get(stream.name, [])):
854 # if _debug: print 'D', stream.name, s.name
855 result = inst.onPlayData(s.client, s, message)
856 if result:
857 yield s.send(message)
858 if stream.recordfile is not None:
859 stream.recordfile.write(message)
860
861 # The main routine to start, run and stop the service
862 if __name__ == '__main__':
863 print "optparse"
864 from optparse import OptionParser
865 parser = OptionParser()
866 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
867 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
868 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
869 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
870 (options, args) = parser.parse_args()
871
872 _debug = options.verbose
873 try:
874 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
875 agent = HTTPServer()
876 agent.root = options.root
877 agent.start(options.host, options.port)
878 if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
879 multitask.run()
880 except KeyboardInterrupt:
881 print traceback.print_exc()
882 pass
883 except:
884 print "exception"
885 print traceback.print_exc()
886 if _debug: print time.asctime(), 'Flash Server Stops'