a7cd598dd2aab918da3f696831ad4780fe68ed8c
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
12
13
14 Typically an application can launch this server as follows:
15 $ python rtmp.py
16
17 To know the command line options use the -h option:
18 $ python rtmp.py -h
19
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
23
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
32
33
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
39
40
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
44
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
47 App.__init__(self)
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
52 yield
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
54 ...
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
56
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
60
61 '''
62
63 import os, sys, time, struct, socket, traceback, multitask
64 import threading, Queue
65 import uuid
66
67 from BaseHTTPServer import BaseHTTPRequestHandler
68 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
69 from cStringIO import StringIO
70 from cookielib import parse_ns_headers, CookieJar
71
72 _debug = False
73
74 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
75
76 def setup(self):
77 self.connection = self.request
78 self.rfile = StringIO()
79 self.wfile = StringIO()
80
81 def finish(self):
82 pass
83
84 def info(self):
85 return self.headers
86
87 def get_full_url(self):
88 return self.path
89
90 def get_header(self, hdr, default):
91 return self.headers.getheader(hdr, default)
92
93 def has_header(self, hdr):
94 return False
95
96 def is_unverifiable(self):
97 return False
98
99 def add_unredirected_header(self, name, val):
100 if name == 'Cookie':
101 self.send_header("Set-Cookie", val)
102
103 class ConnectionClosed:
104 'raised when the client closed the connection'
105
106 class SockStream(object):
107 '''A class that represents a socket as a stream'''
108 def __init__(self, sock):
109 self.sock, self.buffer = sock, ''
110 self.bytesWritten = self.bytesRead = 0
111
112 def close(self):
113 print "sock close"
114 fno = self.sock.fileno()
115 self.sock.close()
116 print "writable?", self.sock, fno
117 yield multitask.writable(fno, timeout=0.1)
118 print "is it?"
119 del self.sock
120 self.sock = None
121
122 def readline(self):
123 try:
124 while True:
125 nl = self.buffer.find("\n")
126 if nl >= 0: # do not have newline in buffer
127 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
128 raise StopIteration(data)
129 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
130 if not data: raise ConnectionClosed
131 if _debug: print 'socket.read[%d] %r'%(len(data), data)
132 self.bytesRead += len(data)
133 self.buffer += data
134 except StopIteration: raise
135 except: raise ConnectionClosed # anything else is treated as connection closed.
136
137 def read(self, count):
138 try:
139 while True:
140 if len(self.buffer) >= count: # do not have data in buffer
141 data, self.buffer = self.buffer[:count], self.buffer[count:]
142 raise StopIteration(data)
143 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
144 if not data: raise ConnectionClosed
145 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
146 self.bytesRead += len(data)
147 self.buffer += data
148 except StopIteration: raise
149 except: raise ConnectionClosed # anything else is treated as connection closed.
150
151 def unread(self, data):
152 self.buffer = data + self.buffer
153
154 def write(self, data):
155 while len(data) > 0: # write in 4K chunks each time
156 chunk, data = data[:4096], data[4096:]
157 self.bytesWritten += len(chunk)
158 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
159 try: yield multitask.send(self.sock, chunk)
160 except: raise ConnectionClosed
161 if _debug: print 'socket.written'
162
163
164 class Header(object):
165 FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
166
167 def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
168 self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
169 self.extendedtime = 0
170 if channel<64: self.hdrdata = chr(channel)
171 elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
172 else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
173
174 def _appendExtendedTimestamp(self, data):
175 if self.time == 0xFFFFFF:
176 data += struct.pack('>I', self.extendedtime)
177 return data
178
179 def toBytes(self, control):
180 data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
181 if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
182
183 data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
184 if control == Header.TIME: return self._appendExtendedTimestamp(data)
185
186 data += struct.pack('>I', self.size)[1:] # size
187 data += chr(self.type) # type
188 if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
189
190 data += struct.pack('<I', self.streamId) # add streamId
191 return self._appendExtendedTimestamp(data)
192
193 def __repr__(self):
194 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
195 % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
196
197 class Message(object):
198 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
199 RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
200 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
201
202 def __init__(self, hdr=None, data=''):
203 if hdr is None: hdr = Header()
204 self.header, self.data = hdr, data
205
206 # define properties type, streamId and time to access self.header.(property)
207 for p in ['type', 'streamId', 'time']:
208 exec 'def _g%s(self): return self.header.%s'%(p, p)
209 exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
210 exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
211 @property
212 def size(self): return len(self.data)
213
214 def __repr__(self):
215 return ("<Message header=%r data=%r>"% (self.header, self.data))
216
217 class Protocol(object):
218 # constants
219 PING_SIZE = 1536
220 DEFAULT_CHUNK_SIZE = 128
221 MIN_CHANNEL_ID = 3
222 PROTOCOL_CHANNEL_ID = 2
223
224 def __init__(self, sock):
225 self.stream = SockStream(sock)
226 self.lastReadHeaders = dict() # indexed by channelId
227 self.incompletePackets = dict() #indexed by channelId
228 self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
229 self.lastWriteHeaders = dict() # indexed by streamId
230 self.nextChannelId = Protocol.MIN_CHANNEL_ID
231 self.writeLock = threading.Lock()
232 self.writeQueue = Queue.Queue()
233
234 def messageReceived(self, msg):
235 yield
236
237 def protocolMessage(self, msg):
238 if msg.type == Message.ACK: # respond to ACK requests
239 response = Message()
240 response.type, response.data = msg.type, msg.data
241 self.writeMessage(response)
242 elif msg.type == Message.CHUNK_SIZE:
243 self.readChunkSize = struct.unpack('>L', msg.data)[0]
244
245 def connectionClosed(self):
246 yield
247
248 def parse(self):
249 try:
250 yield self.parseRequests() # parse http requests
251 except ConnectionClosed:
252 yield self.connectionClosed()
253 if _debug: print 'parse connection closed'
254
255 def writeMessage(self, message):
256 self.writeQueue.put(message)
257
258 def parseHandshake(self):
259 '''Parses the rtmp handshake'''
260 data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
261 yield self.stream.write(data)
262 data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
263 yield self.stream.write(data)
264
265 def parseRequests(self):
266 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
267 self.hr = MultitaskHTTPRequestHandler(self.stream, ("",0,), self.remote)
268 self.hr.close_connection = 1
269 self.cookies = CookieJar()
270
271 while True:
272
273 # prepare reading the request so that when it's handed
274 # over to "standard" HTTPRequestHandler, the data's already
275 # there.
276 print "parseRequests"
277 raw_requestline = (yield self.stream.readline())
278 if _debug: print "parseRequests, line", raw_requestline
279 if not raw_requestline:
280 raise ConnectionClosed
281 data = ''
282 try:
283 while 1:
284 line = (yield self.stream.readline())
285 data += line
286 if line in ['\n', '\r\n']:
287 break
288 except StopIteration:
289 if _debug: print "parseRequests, stopiter"
290 raise
291 except:
292 if _debug:
293 print 'parseRequests', \
294 (traceback and traceback.print_exc() or None)
295
296 raise ConnectionClosed
297
298 self.hr.raw_requestline = raw_requestline
299 self.hr.rfile.write(data)
300 print "parseRequests write after"
301 self.hr.rfile.seek(0)
302 print "parseRequests seek after"
303
304 if not self.hr.parse_request():
305 raise ConnectionClosed
306 print "parseRequests parse_req after"
307 try:
308 yield self.messageReceived(self.hr)
309 except:
310 if _debug:
311 print 'messageReceived', \
312 (traceback and traceback.print_exc() or None)
313
314 def write(self):
315 '''Writes messages to stream'''
316 while True:
317 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
318 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
319 if _debug: print "write to stream", repr(data)
320 if data is None:
321 # just in case TCP socket is not closed, close it.
322 try:
323 print "stream closing"
324 print self.stream
325 yield self.stream.close()
326 print "stream closed"
327 except: pass
328 break
329
330 try:
331 yield self.stream.write(data)
332 except ConnectionClosed:
333 yield self.connectionClosed()
334 except:
335 print traceback.print_exc()
336
337 class Command(object):
338 ''' Class for command / data messages'''
339 def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
340 '''Create a new command with given type, name, id, cmdData and args list.'''
341 self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
342
343 def __repr__(self):
344 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
345
346 def setArg(self, arg):
347 self.args.append(arg)
348
349 def getArg(self, index):
350 return self.args[index]
351
352 @classmethod
353 def fromMessage(cls, message):
354 ''' initialize from a parsed RTMP message'''
355 assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
356
357 length = len(message.data)
358 if length == 0: raise ValueError('zero length message data')
359
360 if message.type == Message.RPC3 or message.type == Message.DATA3:
361 assert message.data[0] == '\x00' # must be 0 in AMD3
362 data = message.data[1:]
363 else:
364 data = message.data
365
366 amfReader = amf.AMF0(data)
367
368 inst = cls()
369 inst.name = amfReader.read() # first field is command name
370
371 try:
372 if message.type == Message.RPC:
373 inst.id = amfReader.read() # second field *may* be message id
374 inst.cmdData = amfReader.read() # third is command data
375 else:
376 inst.id = 0
377 inst.args = [] # others are optional
378 while True:
379 inst.args.append(amfReader.read())
380 except EOFError:
381 pass
382 return inst
383
384 def toMessage(self):
385 msg = Message()
386 assert self.type
387 msg.type = self.type
388 output = amf.BytesIO()
389 amfWriter = amf.AMF0(output)
390 amfWriter.write(self.name)
391 if msg.type == Message.RPC or msg.type == Message.RPC3:
392 amfWriter.write(self.id)
393 amfWriter.write(self.cmdData)
394 for arg in self.args:
395 amfWriter.write(arg)
396 output.seek(0)
397 #hexdump.hexdump(output)
398 #output.seek(0)
399 if msg.type == Message.RPC3 or msg.type == Message.DATA3:
400 data = '\x00' + output.read()
401 else:
402 data = output.read()
403 msg.data = data
404 output.close()
405 return msg
406
407 def getfilename(path, name, root):
408 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
409 the the path present in the path variable.'''
410 ignore, ignore, scope = path.partition('/')
411 if scope: scope = scope + '/'
412 result = root + scope + name + '.flv'
413 if _debug: print 'filename=', result
414 return result
415
416 class Stream(object):
417 '''The stream object that is used for RTMP stream.'''
418 count = 0;
419 def __init__(self, client):
420 self.client, self.id, self.name = client, 0, ''
421 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
422 self.queue = multitask.Queue()
423 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
424 if _debug: print self, 'created'
425
426 def close(self):
427 if _debug: print self, 'closing'
428 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
429 if self.playfile is not None: self.playfile.close(); self.playfile = None
430 self.client = None # to clear the reference
431 pass
432
433 def __repr__(self):
434 return self._name;
435
436 def recv(self):
437 '''Generator to receive new Message on this stream, or None if stream is closed.'''
438 return self.queue.get()
439
440 def send(self, msg):
441 '''Method to send a Message or Command on this stream.'''
442 if isinstance(msg, Command):
443 msg = msg.toMessage()
444 msg.streamId = self.id
445 # if _debug: print self,'send'
446 if self.client is not None: self.client.writeMessage(msg)
447
448 class Client(Protocol):
449 '''The client object represents a single connected client to the server.'''
450 def __init__(self, sock, server, remote):
451 Protocol.__init__(self, sock)
452 self.server = server
453 self.remote = remote
454 self.agent = {}
455 self.streams = {}
456 self._nextCallId = 2
457 self._nextStreamId = 1
458 self.objectEncoding = 0.0
459 self.queue = multitask.Queue() # receive queue used by application
460 multitask.add(self.parse())
461 multitask.add(self.write())
462
463 def recv(self):
464 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
465 return self.queue.get()
466
467 def connectionClosed(self):
468 '''Called when the client drops the connection'''
469 if _debug: 'Client.connectionClosed'
470 self.writeMessage(None)
471 yield self.queue.put((None,None))
472
473 def messageReceived(self, msg):
474 if _debug: print 'messageReceived cmd=', msg.command, msg.path
475 ch = msg.headers.getheaders("Cookie")
476 ch = parse_ns_headers(ch)
477 cookies = self.cookies._cookies_from_attrs_set(ch, msg)
478 has_sess = False
479 for c in cookies:
480 self.cookies.set_cookie(c)
481 if c.name == "session":
482 has_sess = True
483 msg.sess_cookie = c
484 if not has_sess:
485 t = ("session", uuid.uuid4().hex, {}, {})
486 msg.sess_cookie = self.cookies._cookie_from_cookie_tuple(t, msg)
487 self.cookies.set_cookie(msg.sess_cookie)
488 yield self.server.queue.put((self, msg)) # new connection
489
490 def accept(self):
491 '''Method to accept an incoming client.'''
492 response = Command()
493 response.id, response.name, response.type = 1, '_result', Message.RPC
494 if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
495 response.setArg(dict(level='status', code='NetConnection.Connect.Success',
496 description='Connection succeeded.', details=None,
497 objectEncoding=self.objectEncoding))
498 self.writeMessage(response.toMessage())
499
500 def rejectConnection(self, reason=''):
501 '''Method to reject an incoming client.'''
502 response = Command()
503 response.id, response.name, response.type = 1, '_error', Message.RPC
504 response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
505 description=reason, details=None))
506 self.writeMessage(response.toMessage())
507
508 def call(self, method, *args):
509 '''Call a (callback) method on the client.'''
510 cmd = Command()
511 cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
512 cmd.args, cmd.cmdData = args, None
513 self._nextCallId += 1
514 if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
515 self.writeMessage(cmd.toMessage())
516
517 def createStream(self):
518 ''' Create a stream on the server side'''
519 stream = Stream(self)
520 stream.id = self._nextStreamId
521 self.streams[stream.id] = stream
522 self._nextStreamId += 1
523 return stream
524
525
526 class Server(object):
527 '''A RTMP server listens for incoming connections and informs the app.'''
528 def __init__(self, sock):
529 '''Create an RTMP server on the given bound TCP socket. The server will terminate
530 when the socket is disconnected, or some other error occurs in listening.'''
531 self.sock = sock
532 self.queue = multitask.Queue() # queue to receive incoming client connections
533 multitask.add(self.run())
534
535 def recv(self):
536 '''Generator to wait for incoming client connections on this server and return
537 (client, args) or (None, None) if the socket is closed or some error.'''
538 return self.queue.get()
539
540 def run(self):
541 try:
542 while True:
543 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
544 if sock == None:
545 if _debug: print 'rtmp.Server accept(sock) returned None.'
546 break
547 if _debug: print 'connection received from', remote
548 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
549 client = Client(sock, self, remote)
550 except:
551 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
552 traceback.print_exc()
553
554 if (self.sock):
555 try: self.sock.close(); self.sock = None
556 except: pass
557 if (self.queue):
558 yield self.queue.put((None, None))
559 self.queue = None
560
561 class App(SimpleAppHTTPRequestHandler):
562 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
563 count = 0
564 def __init__(self):
565 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
566 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
567 if _debug: print self.name, 'created'
568 def __del__(self):
569 if _debug: print self.name, 'destroyed'
570 @property
571 def clients(self):
572 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
573 return self._clients[1:] if self._clients is not None else []
574 def onConnect(self, client, *args):
575 if _debug: print self.name, 'onConnect', client.path
576 return True
577 def onDisconnect(self, client):
578 if _debug: print self.name, 'onDisconnect', client.path
579
580 class HttpServer(object):
581 '''A RTMP server to record and stream Flash video.'''
582 def __init__(self):
583 '''Construct a new HttpServer. It initializes the local members.'''
584 self.sock = self.server = None
585 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
586 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
587 self.root = ''
588
589 def start(self, host='0.0.0.0', port=8080):
590 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
591 if _debug: print 'start', host, port
592 if not self.server:
593 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
594 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
595 sock.bind((host, port))
596 if _debug: print 'listening on ', sock.getsockname()
597 sock.listen(5)
598 server = self.server = Server(sock) # start rtmp server on that socket
599 multitask.add(self.serverlistener())
600
601 def stop(self):
602 if _debug: print 'stopping Flash server'
603 if self.server and self.sock:
604 try: self.sock.close(); self.sock = None
605 except: pass
606 self.server = None
607
608 def serverlistener(self):
609 '''Server listener (generator). It accepts all connections and invokes client listener'''
610 try:
611 while True: # main loop to receive new connections on the server
612 client, msg = (yield self.server.recv()) # receive an incoming client connection.
613 # TODO: we should reject non-localhost client connections.
614 if not client: # if the server aborted abnormally,
615 break # hence close the listener.
616 if _debug: print 'client connection received', client, args
617 # if client.objectEncoding != 0 and client.objectEncoding != 3:
618 if client.objectEncoding != 0:
619 yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
620 yield client.connectionClosed()
621 else:
622 print "cookies", str(client.cookies)
623 session = msg.sess_cookie.value
624 name, ignore, scope = msg.path.partition('/')
625 if '*' not in self.apps and name not in self.apps:
626 yield client.rejectConnection(reason='Application not found: ' + name)
627 else: # create application instance as needed and add in our list
628 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
629 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
630 if session in self.clients: inst = self.clients[session][0]
631 else: inst = app()
632 try:
633 methodname = "on%s" % msg.command
634 method = getattr(inst, methodname, None)
635 result = method(client, msg)
636 close_connection = msg.close_connection
637 print "close connection", close_connection
638 except:
639 if _debug: traceback.print_exc()
640 yield client.rejectConnection(reason='Exception on %s' % methodname)
641 continue
642 if result is True or result is None:
643 if session not in self.clients:
644 self.clients[session] = [inst]; inst._clients=self.clients[session]
645 self.clients[session].append(client)
646 msg.wfile.seek(0)
647 data = msg.wfile.read()
648 msg.wfile.seek(0)
649 msg.wfile.truncate()
650 yield client.writeMessage(data)
651 if close_connection:
652 if _debug:
653 print 'close_connection requested'
654 try:
655 yield client.connectionClosed()
656 except ClientClosed:
657 if _debug:
658 print 'close_connection done'
659 pass
660 else:
661 yield client.rejectConnection(reason='Rejected in onConnect')
662 except StopIteration: raise
663 except:
664 if _debug:
665 print 'serverlistener exception', \
666 (sys and sys.exc_info() or None)
667 traceback.print_exc()
668
669 def clientlistener(self, client):
670 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
671 try:
672 while True:
673 msg, arg = (yield client.recv()) # receive new message from client
674 if not msg: # if the client disconnected,
675 if _debug: print 'connection closed from client'
676 break # come out of listening loop.
677 if msg == 'command': # handle a new command
678 multitask.add(self.clienthandler(client, arg))
679 elif msg == 'stream': # a new stream is created, handle the stream.
680 arg.client = client
681 multitask.add(self.streamlistener(arg))
682 except StopIteration: raise
683 except:
684 if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
685 traceback.print_exc()
686
687 # client is disconnected, clear our state for application instance.
688 if _debug: print 'cleaning up client', client.path
689 inst = None
690 if client.path in self.clients:
691 inst = self.clients[client.path][0]
692 self.clients[client.path].remove(client)
693 for stream in client.streams.values(): # for all streams of this client
694 self.closehandler(stream)
695 client.streams.clear() # and clear the collection of streams
696 if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
697 if _debug: print 'removing the application instance'
698 inst = self.clients[client.path][0]
699 inst._clients = None
700 del self.clients[client.path]
701 if inst is not None: inst.onDisconnect(client)
702
703 def closehandler(self, stream):
704 '''A stream is closed explicitly when a closeStream command is received from given client.'''
705 if stream.client is not None:
706 inst = self.clients[stream.client.path][0]
707 if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
708 inst.onClose(stream.client, stream)
709 del inst.publishers[stream.name]
710 if stream.name in inst.players and stream in inst.players[stream.name]:
711 inst.onStop(stream.client, stream)
712 inst.players[stream.name].remove(stream)
713 if len(inst.players[stream.name]) == 0:
714 del inst.players[stream.name]
715 stream.close()
716
717 def clienthandler(self, client, cmd):
718 '''A generator to handle a single command on the client.'''
719 inst = self.clients[client.path][0]
720 if inst:
721 if cmd.name == '_error':
722 if hasattr(inst, 'onStatus'):
723 result = inst.onStatus(client, cmd.args[0])
724 elif cmd.name == '_result':
725 if hasattr(inst, 'onResult'):
726 result = inst.onResult(client, cmd.args[0])
727 else:
728 res, code, args = Command(), '_result', dict()
729 try: result = inst.onCommand(client, cmd.name, *cmd.args)
730 except:
731 if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
732 code, args = '_error', dict()
733 res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
734 res.args, res.cmdData = args, None
735 if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
736 client.writeMessage(res.toMessage())
737 # TODO return result to caller
738 yield
739
740 def streamlistener(self, stream):
741 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
742 stream.recordfile = None # so that it doesn't complain about missing attribute
743 while True:
744 msg = (yield stream.recv())
745 if not msg:
746 if _debug: print 'stream closed'
747 self.closehandler(stream)
748 break
749 # if _debug: msg
750 multitask.add(self.streamhandler(stream, msg))
751
752 def streamhandler(self, stream, message):
753 '''A generator to handle a single message on the stream.'''
754 try:
755 if message.type == Message.RPC:
756 cmd = Command.fromMessage(message)
757 if _debug: print 'streamhandler received cmd=', cmd
758 if cmd.name == 'publish':
759 yield self.publishhandler(stream, cmd)
760 elif cmd.name == 'play':
761 yield self.playhandler(stream, cmd)
762 elif cmd.name == 'closeStream':
763 self.closehandler(stream)
764 else: # audio or video message
765 yield self.mediahandler(stream, message)
766 except GeneratorExit: pass
767 except StopIteration: pass
768 except:
769 if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
770
771 def publishhandler(self, stream, cmd):
772 '''A new stream is published. Store the information in the application instance.'''
773 try:
774 stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
775 stream.name = cmd.args[0]
776 if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
777 inst = self.clients[stream.client.path][0]
778 if (stream.name in inst.publishers):
779 raise ValueError, 'Stream name already in use'
780 inst.publishers[stream.name] = stream # store the client for publisher
781 result = inst.onPublish(stream.client, stream)
782
783 if stream.mode == 'record' or stream.mode == 'append':
784 stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
785 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
786 yield stream.send(response)
787 except ValueError, E: # some error occurred. inform the app.
788 if _debug: print 'error in publishing stream', str(E)
789 response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
790 yield stream.send(response)
791
792 def playhandler(self, stream, cmd):
793 '''A new stream is being played. Just updated the players list with this stream.'''
794 inst = self.clients[stream.client.path][0]
795 name = stream.name = cmd.args[0] # store the stream's name
796 start = cmd.args[1] if len(cmd.args) >= 2 else -2
797 if name not in inst.players:
798 inst.players[name] = [] # initialize the players for this stream name
799 if stream not in inst.players[name]: # store the stream as players of this name
800 inst.players[name].append(stream)
801 path = getfilename(stream.client.path, stream.name, self.root)
802 if os.path.exists(path):
803 stream.playfile = FLV().open(path)
804 multitask.add(stream.playfile.reader(stream))
805 if _debug: print 'playing stream=', name, 'start=', start
806 result = inst.onPlay(stream.client, stream)
807 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
808 yield stream.send(response)
809
810 def mediahandler(self, stream, message):
811 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
812 if stream.client is not None:
813 inst = self.clients[stream.client.path][0]
814 result = inst.onPublishData(stream.client, stream, message)
815 if result:
816 client = stream.client
817 for s in (inst.players.get(stream.name, [])):
818 # if _debug: print 'D', stream.name, s.name
819 result = inst.onPlayData(s.client, s, message)
820 if result:
821 yield s.send(message)
822 if stream.recordfile is not None:
823 stream.recordfile.write(message)
824
825 # The main routine to start, run and stop the service
826 if __name__ == '__main__':
827 print "optparse"
828 from optparse import OptionParser
829 parser = OptionParser()
830 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
831 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
832 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
833 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
834 (options, args) = parser.parse_args()
835
836 _debug = options.verbose
837 try:
838 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
839 agent = HttpServer()
840 agent.root = options.root
841 agent.start(options.host, options.port)
842 if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
843 multitask.run()
844 except KeyboardInterrupt:
845 print traceback.print_exc()
846 pass
847 except:
848 print "exception"
849 print traceback.print_exc()
850 if _debug: print time.asctime(), 'Flash Server Stops'