remove debug info on debuglevel > 0
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
12
13
14 Typically an application can launch this server as follows:
15 $ python rtmp.py
16
17 To know the command line options use the -h option:
18 $ python rtmp.py -h
19
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
23
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
32
33
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
39
40
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
44
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
47 App.__init__(self)
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
52 yield
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
54 ...
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
56
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
60
61 '''
62
63 import os, sys, time, struct, socket, traceback, multitask
64 import threading, Queue
65 import uuid
66 import select
67 from string import strip
68
69 from BaseHTTPServer import BaseHTTPRequestHandler
70 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
71 from cStringIO import StringIO
72 from cookielib import parse_ns_headers, CookieJar
73 from Cookie import SimpleCookie
74
75 global _debug
76 _debug = False
77 def set_debug(dbg):
78 global _debug
79 _debug = dbg
80
81 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
82
83 def setup(self):
84 self.connection = self.request
85 self.rfile = StringIO()
86 self.wfile = StringIO()
87
88 def finish(self):
89 pass
90
91 def info(self):
92 return self.headers
93
94 def get_full_url(self):
95 return self.path
96
97 def get_header(self, hdr, default):
98 return self.headers.getheader(hdr, default)
99
100 def add_cookies(self):
101 for k, v in self.response_cookies.items():
102 val = v.OutputString()
103 self.send_header("Set-Cookie", val)
104
105 def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
106 ch = headers.getheaders(cookie_key)
107 if _debug:
108 print "messageReceived cookieheaders=", '; '.join(ch)
109 res = []
110 for c in ch:
111 c = c.split(";")
112 if len(c) == 0:
113 continue
114 c = map(strip, c)
115 c = filter(lambda x: x, c)
116 res += c
117 has_sess = False
118 response_cookies = SimpleCookie()
119 for c in res:
120 if _debug:
121 print "found cookie", repr(c)
122 name, value = c.split("=")
123 response_cookies[name] = value
124 #response_cookies[name]['path'] = "/"
125 #response_cookies[name]['domain'] = remote[0]
126 #response_cookies[name]['version'] = 0
127 if name == "session":
128 response_cookies[name]['expires'] = 50000
129 has_sess = True
130 if not add_sess:
131 return response_cookies
132 if not has_sess:
133 response_cookies['session'] = uuid.uuid4().hex
134 response_cookies['session']['expires'] = 50000
135 #response_cookies['session']['path'] = '/'
136 #response_cookies['session']['domain'] = remote[0]
137 #response_cookies['session']['version'] = 0
138 return response_cookies
139
140 class ConnectionClosed:
141 'raised when the client closed the connection'
142
143 class SockStream(object):
144 '''A class that represents a socket as a stream'''
145 def __init__(self, sock):
146 self.sock, self.buffer = sock, ''
147 self.bytesWritten = self.bytesRead = 0
148
149 def close(self):
150 self.sock.shutdown(1) # can't just close it.
151 self.sock = None
152
153 def readline(self):
154 try:
155 while True:
156 nl = self.buffer.find("\n")
157 if nl >= 0: # do not have newline in buffer
158 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
159 raise StopIteration(data)
160 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
161 if not data: raise ConnectionClosed
162 if _debug: print 'socket.read[%d] %r'%(len(data), data)
163 self.bytesRead += len(data)
164 self.buffer += data
165 except StopIteration: raise
166 except: raise ConnectionClosed # anything else is treated as connection closed.
167
168 def read(self, count):
169 try:
170 while True:
171 if len(self.buffer) >= count: # do not have data in buffer
172 data, self.buffer = self.buffer[:count], self.buffer[count:]
173 raise StopIteration(data)
174 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
175 if not data: raise ConnectionClosed
176 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
177 self.bytesRead += len(data)
178 self.buffer += data
179 except StopIteration: raise
180 except: raise ConnectionClosed # anything else is treated as connection closed.
181
182 def unread(self, data):
183 self.buffer = data + self.buffer
184
185 def write(self, data):
186 while len(data) > 0: # write in 4K chunks each time
187 chunk, data = data[:4096], data[4096:]
188 self.bytesWritten += len(chunk)
189 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
190 try: yield multitask.send(self.sock, chunk)
191 except: raise ConnectionClosed
192 if _debug: print 'socket.written'
193
194
195 class Header(object):
196 FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
197
198 def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
199 self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
200 self.extendedtime = 0
201 if channel<64: self.hdrdata = chr(channel)
202 elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
203 else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
204
205 def _appendExtendedTimestamp(self, data):
206 if self.time == 0xFFFFFF:
207 data += struct.pack('>I', self.extendedtime)
208 return data
209
210 def toBytes(self, control):
211 data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
212 if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
213
214 data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
215 if control == Header.TIME: return self._appendExtendedTimestamp(data)
216
217 data += struct.pack('>I', self.size)[1:] # size
218 data += chr(self.type) # type
219 if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
220
221 data += struct.pack('<I', self.streamId) # add streamId
222 return self._appendExtendedTimestamp(data)
223
224 def __repr__(self):
225 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
226 % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
227
228 class Message(object):
229 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
230 RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
231 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
232
233 def __init__(self, hdr=None, data=''):
234 if hdr is None: hdr = Header()
235 self.header, self.data = hdr, data
236
237 # define properties type, streamId and time to access self.header.(property)
238 for p in ['type', 'streamId', 'time']:
239 exec 'def _g%s(self): return self.header.%s'%(p, p)
240 exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
241 exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
242 @property
243 def size(self): return len(self.data)
244
245 def __repr__(self):
246 return ("<Message header=%r data=%r>"% (self.header, self.data))
247
248 class Protocol(object):
249 # constants
250 PING_SIZE = 1536
251 DEFAULT_CHUNK_SIZE = 128
252 MIN_CHANNEL_ID = 3
253 PROTOCOL_CHANNEL_ID = 2
254
255 def __init__(self, sock):
256 self.stream = SockStream(sock)
257 self.lastReadHeaders = dict() # indexed by channelId
258 self.incompletePackets = dict() #indexed by channelId
259 self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
260 self.lastWriteHeaders = dict() # indexed by streamId
261 self.nextChannelId = Protocol.MIN_CHANNEL_ID
262 self.writeLock = threading.Lock()
263 self.writeQueue = Queue.Queue()
264
265 def messageReceived(self, msg):
266 yield
267
268 def protocolMessage(self, msg):
269 if msg.type == Message.ACK: # respond to ACK requests
270 response = Message()
271 response.type, response.data = msg.type, msg.data
272 self.writeMessage(response)
273 elif msg.type == Message.CHUNK_SIZE:
274 self.readChunkSize = struct.unpack('>L', msg.data)[0]
275
276 def connectionClosed(self):
277 yield
278
279 def parse(self):
280 try:
281 yield self.parseRequests() # parse http requests
282 except ConnectionClosed:
283 #yield self.connectionClosed()
284 if _debug: print 'parse connection closed'
285 #yield self.server.queue.put((self, None)) # close connection
286
287 def removeConnection(self):
288 yield self.server.queue.put((self, None)) # close connection
289
290 def writeMessage(self, message):
291 try:
292 yield self.stream.write(message)
293 except ConnectionClosed:
294 yield self.connectionClosed()
295 except:
296 print traceback.print_exc()
297 #self.writeQueue.put(message)
298
299 def parseHandshake(self):
300 '''Parses the rtmp handshake'''
301 data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
302 yield self.stream.write(data)
303 data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
304 yield self.stream.write(data)
305
306 def parseRequests(self):
307 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
308
309 if _debug:
310 print "parseRequests start", repr(self)
311 self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
312 self.hr.close_connection = 1
313 self.cookies = CookieJar()
314
315 while True:
316
317 # prepare reading the request so that when it's handed
318 # over to "standard" HTTPRequestHandler, the data's already
319 # there.
320 if _debug:
321 print "parseRequests"
322 try:
323 readok = (yield multitask.readable(self.stream.sock, 5000))
324 except select.error:
325 print "select error: connection closed"
326 raise ConnectionClosed
327
328 if _debug:
329 print "readok", readok
330 print
331 raw_requestline = (yield self.stream.readline())
332 if _debug: print "parseRequests, line", raw_requestline
333 if not raw_requestline:
334 raise ConnectionClosed
335 data = ''
336 try:
337 while 1:
338 line = (yield self.stream.readline())
339 data += line
340 if line in ['\n', '\r\n']:
341 break
342 except StopIteration:
343 if _debug: print "parseRequests, stopiter"
344 raise
345 except:
346 if _debug:
347 print 'parseRequests', \
348 (traceback and traceback.print_exc() or None)
349
350 raise ConnectionClosed
351
352 self.hr.raw_requestline = raw_requestline
353 #pos = self.hr.rfile.tell()
354 pos = 0
355 self.hr.rfile.truncate(0)
356 self.hr.rfile.write(data)
357 if _debug:
358 print "parseRequests write after"
359 self.hr.rfile.seek(pos)
360 if _debug:
361 print "parseRequests seek after"
362
363 if not self.hr.parse_request():
364 raise ConnectionClosed
365 if _debug:
366 print "parseRequests parse_req after"
367 print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
368 try:
369 yield self.messageReceived(self.hr)
370 except ConnectionClosed:
371 if _debug:
372 print 'parseRequests, ConnectionClosed '
373 raise StopIteration
374
375 except:
376 if _debug:
377 print 'messageReceived', \
378 (traceback and traceback.print_exc() or None)
379
380 def write(self):
381 '''Writes messages to stream'''
382 print "starting protocol write loop", repr(self)
383 while True:
384 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
385 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
386 if _debug: print "write to stream", repr(data)
387 if data is None:
388 # just in case TCP socket is not closed, close it.
389 try:
390 if _debug:
391 print "stream closing"
392 print self.stream
393 yield self.stream.close()
394 if _debug:
395 print "stream closed"
396 except: pass
397 break
398
399 try:
400 yield self.stream.write(data)
401 except ConnectionClosed:
402 yield self.connectionClosed()
403 except:
404 print traceback.print_exc()
405 print "ending protocol write loop", repr(self)
406
407 class Command(object):
408 ''' Class for command / data messages'''
409 def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
410 '''Create a new command with given type, name, id, cmdData and args list.'''
411 self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
412
413 def __repr__(self):
414 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
415
416 def setArg(self, arg):
417 self.args.append(arg)
418
419 def getArg(self, index):
420 return self.args[index]
421
422 @classmethod
423 def fromMessage(cls, message):
424 ''' initialize from a parsed RTMP message'''
425 assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
426
427 length = len(message.data)
428 if length == 0: raise ValueError('zero length message data')
429
430 if message.type == Message.RPC3 or message.type == Message.DATA3:
431 assert message.data[0] == '\x00' # must be 0 in AMD3
432 data = message.data[1:]
433 else:
434 data = message.data
435
436 amfReader = amf.AMF0(data)
437
438 inst = cls()
439 inst.name = amfReader.read() # first field is command name
440
441 try:
442 if message.type == Message.RPC:
443 inst.id = amfReader.read() # second field *may* be message id
444 inst.cmdData = amfReader.read() # third is command data
445 else:
446 inst.id = 0
447 inst.args = [] # others are optional
448 while True:
449 inst.args.append(amfReader.read())
450 except EOFError:
451 pass
452 return inst
453
454 def toMessage(self):
455 msg = Message()
456 assert self.type
457 msg.type = self.type
458 output = amf.BytesIO()
459 amfWriter = amf.AMF0(output)
460 amfWriter.write(self.name)
461 if msg.type == Message.RPC or msg.type == Message.RPC3:
462 amfWriter.write(self.id)
463 amfWriter.write(self.cmdData)
464 for arg in self.args:
465 amfWriter.write(arg)
466 output.seek(0)
467 #hexdump.hexdump(output)
468 #output.seek(0)
469 if msg.type == Message.RPC3 or msg.type == Message.DATA3:
470 data = '\x00' + output.read()
471 else:
472 data = output.read()
473 msg.data = data
474 output.close()
475 return msg
476
477 def getfilename(path, name, root):
478 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
479 the the path present in the path variable.'''
480 ignore, ignore, scope = path.partition('/')
481 if scope: scope = scope + '/'
482 result = root + scope + name + '.flv'
483 if _debug: print 'filename=', result
484 return result
485
486 class Stream(object):
487 '''The stream object that is used for RTMP stream.'''
488 count = 0;
489 def __init__(self, client):
490 self.client, self.id, self.name = client, 0, ''
491 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
492 self.queue = multitask.Queue()
493 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
494 if _debug: print self, 'created'
495
496 def close(self):
497 if _debug: print self, 'closing'
498 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
499 if self.playfile is not None: self.playfile.close(); self.playfile = None
500 self.client = None # to clear the reference
501 pass
502
503 def __repr__(self):
504 return self._name;
505
506 def recv(self):
507 '''Generator to receive new Message on this stream, or None if stream is closed.'''
508 return self.queue.get()
509
510 def send(self, msg):
511 '''Method to send a Message or Command on this stream.'''
512 if isinstance(msg, Command):
513 msg = msg.toMessage()
514 msg.streamId = self.id
515 # if _debug: print self,'send'
516 if self.client is not None: self.client.writeMessage(msg)
517
518 class Client(Protocol):
519 '''The client object represents a single connected client to the server.'''
520 def __init__(self, sock, server, remote):
521 Protocol.__init__(self, sock)
522 self.server = server
523 self.remote = remote
524 self.agent = {}
525 self.streams = {}
526 self._nextCallId = 2
527 self._nextStreamId = 1
528 self.objectEncoding = 0.0
529 self.queue = multitask.Queue() # receive queue used by application
530 multitask.add(self.parse())
531 #multitask.add(self.write())
532
533 def recv(self):
534 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
535 return self.queue.get()
536
537 def connectionClosed(self):
538 '''Called when the client drops the connection'''
539 if _debug: 'Client.connectionClosed'
540 #self.writeMessage(None)
541 #yield self.queue.put((None,None))
542 if self.stream.sock:
543 yield self.stream.close()
544 else:
545 yield None
546
547 def messageReceived(self, msg):
548 if _debug: print 'messageReceived cmd=', msg.command, msg.path
549 msg.response_cookies = process_cookies(msg.headers, self.remote)
550
551 # slightly bad, this: read everything, put it into memory...
552 if msg.headers.has_key('content-length'):
553 max_chunk_size = 10*1024*1024
554 size_remaining = int(msg.headers["content-length"])
555 L = []
556 while size_remaining:
557 chunk_size = min(size_remaining, max_chunk_size)
558 data = (yield self.stream.read(chunk_size))
559 L.append(data)
560 size_remaining -= len(L[-1])
561
562 pos = msg.rfile.tell()
563 msg.rfile.write(''.join(L))
564 msg.rfile.seek(pos)
565
566 yield self.server.queue.put((self, msg)) # new connection
567
568 def accept(self):
569 '''Method to accept an incoming client.'''
570 response = Command()
571 response.id, response.name, response.type = 1, '_result', Message.RPC
572 if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
573 response.setArg(dict(level='status', code='NetConnection.Connect.Success',
574 description='Connection succeeded.', details=None,
575 objectEncoding=self.objectEncoding))
576 self.writeMessage(response.toMessage())
577
578 def rejectConnection(self, reason=''):
579 '''Method to reject an incoming client.'''
580 response = Command()
581 response.id, response.name, response.type = 1, '_error', Message.RPC
582 response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
583 description=reason, details=None))
584 self.writeMessage(response.toMessage())
585
586 def call(self, method, *args):
587 '''Call a (callback) method on the client.'''
588 cmd = Command()
589 cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
590 cmd.args, cmd.cmdData = args, None
591 self._nextCallId += 1
592 if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
593 self.writeMessage(cmd.toMessage())
594
595 def createStream(self):
596 ''' Create a stream on the server side'''
597 stream = Stream(self)
598 stream.id = self._nextStreamId
599 self.streams[stream.id] = stream
600 self._nextStreamId += 1
601 return stream
602
603
604 class Server(object):
605 '''A RTMP server listens for incoming connections and informs the app.'''
606 def __init__(self, sock):
607 '''Create an RTMP server on the given bound TCP socket. The server will terminate
608 when the socket is disconnected, or some other error occurs in listening.'''
609 self.sock = sock
610 self.queue = multitask.Queue() # queue to receive incoming client connections
611 multitask.add(self.run())
612
613 def recv(self):
614 '''Generator to wait for incoming client connections on this server and return
615 (client, args) or (None, None) if the socket is closed or some error.'''
616 return self.queue.get()
617
618 def run(self):
619 try:
620 while True:
621 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
622 if sock == None:
623 if _debug: print 'rtmp.Server accept(sock) returned None.'
624 break
625 if _debug: print 'connection received from', remote
626 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
627 client = Client(sock, self, remote)
628 except:
629 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
630 traceback.print_exc()
631
632 if (self.sock):
633 try: self.sock.close(); self.sock = None
634 except: pass
635 if (self.queue):
636 yield self.queue.put((None, None))
637 self.queue = None
638
639 class BaseApp(object):
640 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
641 count = 0
642 def __init__(self):
643 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
644 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
645 if _debug: print self.name, 'created'
646 def __del__(self):
647 if _debug: print self.name, 'destroyed'
648 @property
649 def clients(self):
650 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
651 return self._clients[1:] if self._clients is not None else []
652
653 class App(BaseApp, SimpleAppHTTPRequestHandler):
654 pass
655
656 class HTTPServer(object):
657 '''A RTMP server to record and stream Flash video.'''
658 def __init__(self):
659 '''Construct a new HttpServer. It initializes the local members.'''
660 self.sock = self.server = None
661 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
662 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
663 self.root = ''
664
665 def start(self, host='0.0.0.0', port=8080):
666 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
667 if _debug: print 'start', host, port
668 if not self.server:
669 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
670 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
671 sock.bind((host, port))
672 if _debug: print 'listening on ', sock.getsockname()
673 sock.listen(5)
674 server = self.server = Server(sock) # start rtmp server on that socket
675 multitask.add(self.serverlistener())
676
677 def stop(self):
678 if _debug: print 'stopping Flash server'
679 if self.server and self.sock:
680 try: self.sock.close(); self.sock = None
681 except: pass
682 self.server = None
683
684 def serverlistener(self):
685 '''Server listener (generator). It accepts all connections and invokes client listener'''
686 try:
687 while True: # main loop to receive new connections on the server
688 client, msg = (yield self.server.recv()) # receive an incoming client connection.
689 # TODO: we should reject non-localhost client connections.
690 if not client: # if the server aborted abnormally,
691 break # hence close the listener.
692 if _debug: print 'client connection received', client, msg
693
694 if msg is None:
695 yield client.connectionClosed()
696 session = client.session
697 del self.clients[session]
698 continue
699
700 # if client.objectEncoding != 0 and client.objectEncoding != 3:
701 if client.objectEncoding != 0:
702 yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
703 yield client.connectionClosed()
704 else:
705 if _debug:
706 print "cookies", str(msg.response_cookies)
707 session = msg.response_cookies['session'].value
708 client.session = session
709 name = msg.path
710 if _debug:
711 print "serverlistener", name
712 if '*' not in self.apps and name not in self.apps:
713 yield client.rejectConnection(reason='Application not found: ' + name)
714 else: # create application instance as needed and add in our list
715 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
716 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
717 if _debug:
718 print "clients", self.clients.keys()
719 if session in self.clients:
720 inst = self.clients[session][0]
721 else:
722 inst = app()
723 self.clients[session] = [inst]; inst._clients=self.clients[session]
724 msg.server = inst # whew! just in time!
725 try:
726 methodname = "on%s" % msg.command
727 if _debug:
728 print methodname, dir(inst)
729 method = getattr(inst, methodname, None)
730 result = method(client, msg)
731 close_connection = msg.close_connection
732 if _debug:
733 print "close connection", close_connection
734 except:
735 if _debug: traceback.print_exc()
736 yield client.rejectConnection(reason='Exception on %s' % methodname)
737 continue
738 if result is True or result is None:
739 if result is None:
740 msg.wfile.seek(0)
741 data = msg.wfile.read()
742 msg.wfile.seek(0)
743 msg.wfile.truncate()
744 yield client.writeMessage(data)
745 if close_connection:
746 if _debug:
747 print 'close_connection requested'
748 try:
749 yield client.connectionClosed()
750 raise ConnectionClosed
751 except ConnectionClosed:
752 if _debug:
753 print 'close_connection done'
754 pass
755 else:
756 if _debug:
757 print "result", result
758 yield client.rejectConnection(reason='Rejected in onConnect')
759 except StopIteration: raise
760 except:
761 if _debug:
762 print 'serverlistener exception', \
763 (sys and sys.exc_info() or None)
764 traceback.print_exc()
765
766 def clientlistener(self, client):
767 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
768 try:
769 while True:
770 msg, arg = (yield client.recv()) # receive new message from client
771 if not msg: # if the client disconnected,
772 if _debug: print 'connection closed from client'
773 break # come out of listening loop.
774 if msg == 'command': # handle a new command
775 multitask.add(self.clienthandler(client, arg))
776 elif msg == 'stream': # a new stream is created, handle the stream.
777 arg.client = client
778 multitask.add(self.streamlistener(arg))
779 except StopIteration: raise
780 except:
781 if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
782 traceback.print_exc()
783
784 # client is disconnected, clear our state for application instance.
785 if _debug: print 'cleaning up client', client.path
786 inst = None
787 if client.path in self.clients:
788 inst = self.clients[client.path][0]
789 self.clients[client.path].remove(client)
790 for stream in client.streams.values(): # for all streams of this client
791 self.closehandler(stream)
792 client.streams.clear() # and clear the collection of streams
793 if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
794 if _debug: print 'removing the application instance'
795 inst = self.clients[client.path][0]
796 inst._clients = None
797 del self.clients[client.path]
798 if inst is not None: inst.onDisconnect(client)
799
800 def closehandler(self, stream):
801 '''A stream is closed explicitly when a closeStream command is received from given client.'''
802 if stream.client is not None:
803 inst = self.clients[stream.client.path][0]
804 if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
805 inst.onClose(stream.client, stream)
806 del inst.publishers[stream.name]
807 if stream.name in inst.players and stream in inst.players[stream.name]:
808 inst.onStop(stream.client, stream)
809 inst.players[stream.name].remove(stream)
810 if len(inst.players[stream.name]) == 0:
811 del inst.players[stream.name]
812 stream.close()
813
814 def clienthandler(self, client, cmd):
815 '''A generator to handle a single command on the client.'''
816 inst = self.clients[client.path][0]
817 if inst:
818 if cmd.name == '_error':
819 if hasattr(inst, 'onStatus'):
820 result = inst.onStatus(client, cmd.args[0])
821 elif cmd.name == '_result':
822 if hasattr(inst, 'onResult'):
823 result = inst.onResult(client, cmd.args[0])
824 else:
825 res, code, args = Command(), '_result', dict()
826 try: result = inst.onCommand(client, cmd.name, *cmd.args)
827 except:
828 if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
829 code, args = '_error', dict()
830 res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
831 res.args, res.cmdData = args, None
832 if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
833 client.writeMessage(res.toMessage())
834 # TODO return result to caller
835 yield
836
837 def streamlistener(self, stream):
838 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
839 stream.recordfile = None # so that it doesn't complain about missing attribute
840 while True:
841 msg = (yield stream.recv())
842 if not msg:
843 if _debug: print 'stream closed'
844 self.closehandler(stream)
845 break
846 # if _debug: msg
847 multitask.add(self.streamhandler(stream, msg))
848
849 def streamhandler(self, stream, message):
850 '''A generator to handle a single message on the stream.'''
851 try:
852 if message.type == Message.RPC:
853 cmd = Command.fromMessage(message)
854 if _debug: print 'streamhandler received cmd=', cmd
855 if cmd.name == 'publish':
856 yield self.publishhandler(stream, cmd)
857 elif cmd.name == 'play':
858 yield self.playhandler(stream, cmd)
859 elif cmd.name == 'closeStream':
860 self.closehandler(stream)
861 else: # audio or video message
862 yield self.mediahandler(stream, message)
863 except GeneratorExit: pass
864 except StopIteration: pass
865 except:
866 if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
867
868 def publishhandler(self, stream, cmd):
869 '''A new stream is published. Store the information in the application instance.'''
870 try:
871 stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
872 stream.name = cmd.args[0]
873 if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
874 inst = self.clients[stream.client.path][0]
875 if (stream.name in inst.publishers):
876 raise ValueError, 'Stream name already in use'
877 inst.publishers[stream.name] = stream # store the client for publisher
878 result = inst.onPublish(stream.client, stream)
879
880 if stream.mode == 'record' or stream.mode == 'append':
881 stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
882 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
883 yield stream.send(response)
884 except ValueError, E: # some error occurred. inform the app.
885 if _debug: print 'error in publishing stream', str(E)
886 response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
887 yield stream.send(response)
888
889 def playhandler(self, stream, cmd):
890 '''A new stream is being played. Just updated the players list with this stream.'''
891 inst = self.clients[stream.client.path][0]
892 name = stream.name = cmd.args[0] # store the stream's name
893 start = cmd.args[1] if len(cmd.args) >= 2 else -2
894 if name not in inst.players:
895 inst.players[name] = [] # initialize the players for this stream name
896 if stream not in inst.players[name]: # store the stream as players of this name
897 inst.players[name].append(stream)
898 path = getfilename(stream.client.path, stream.name, self.root)
899 if os.path.exists(path):
900 stream.playfile = FLV().open(path)
901 multitask.add(stream.playfile.reader(stream))
902 if _debug: print 'playing stream=', name, 'start=', start
903 result = inst.onPlay(stream.client, stream)
904 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
905 yield stream.send(response)
906
907 def mediahandler(self, stream, message):
908 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
909 if stream.client is not None:
910 inst = self.clients[stream.client.path][0]
911 result = inst.onPublishData(stream.client, stream, message)
912 if result:
913 client = stream.client
914 for s in (inst.players.get(stream.name, [])):
915 # if _debug: print 'D', stream.name, s.name
916 result = inst.onPlayData(s.client, s, message)
917 if result:
918 yield s.send(message)
919 if stream.recordfile is not None:
920 stream.recordfile.write(message)
921
922 # The main routine to start, run and stop the service
923 if __name__ == '__main__':
924 print "optparse"
925 from optparse import OptionParser
926 parser = OptionParser()
927 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
928 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
929 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
930 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
931 (options, args) = parser.parse_args()
932
933 _debug = options.verbose
934 try:
935 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
936 agent = HTTPServer()
937 agent.root = options.root
938 agent.start(options.host, options.port)
939 if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
940 multitask.run()
941 except KeyboardInterrupt:
942 print traceback.print_exc()
943 pass
944 except:
945 print "exception"
946 print traceback.print_exc()
947 if _debug: print time.asctime(), 'Flash Server Stops'