a2aadb3cf4c31a64fc2d3e2f6011cb3f3ff3d129
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
12
13
14 Typically an application can launch this server as follows:
15 $ python rtmp.py
16
17 To know the command line options use the -h option:
18 $ python rtmp.py -h
19
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
23
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
32
33
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
39
40
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
44
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
47 App.__init__(self)
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
52 yield
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
54 ...
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
56
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
60
61 '''
62
63 import os, sys, time, struct, socket, traceback, multitask
64 import threading, Queue
65 import uuid
66 import select
67 from string import strip
68
69 from BaseHTTPServer import BaseHTTPRequestHandler
70 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
71 from cStringIO import StringIO
72 from cookielib import parse_ns_headers, CookieJar
73 from Cookie import SimpleCookie
74
75 global _debug
76 _debug = False
77 def set_debug(dbg):
78 global _debug
79 _debug = dbg
80
81 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
82
83 def setup(self):
84 self.connection = self.request
85 self.rfile = StringIO()
86 self.wfile = StringIO()
87
88 def finish(self):
89 pass
90
91 def info(self):
92 return self.headers
93
94 def get_full_url(self):
95 return self.path
96
97 def get_header(self, hdr, default):
98 return self.headers.getheader(hdr, default)
99
100 def add_cookies(self):
101 for k, v in self.response_cookies.items():
102 val = v.OutputString()
103 self.send_header("Set-Cookie", val)
104
105 def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
106 ch = headers.getheaders(cookie_key)
107 print "messageReceived cookieheaders=", '; '.join(ch)
108 res = []
109 for c in ch:
110 c = c.split(";")
111 if len(c) == 0:
112 continue
113 c = map(strip, c)
114 c = filter(lambda x: x, c)
115 res += c
116 has_sess = False
117 response_cookies = SimpleCookie()
118 for c in res:
119 print "found cookie", repr(c)
120 name, value = c.split("=")
121 response_cookies[name] = value
122 #response_cookies[name]['path'] = "/"
123 #response_cookies[name]['domain'] = remote[0]
124 #response_cookies[name]['version'] = 0
125 if name == "session":
126 response_cookies[name]['expires'] = 50000
127 has_sess = True
128 if not add_sess:
129 return response_cookies
130 if not has_sess:
131 response_cookies['session'] = uuid.uuid4().hex
132 response_cookies['session']['expires'] = 50000
133 #response_cookies['session']['path'] = '/'
134 #response_cookies['session']['domain'] = remote[0]
135 #response_cookies['session']['version'] = 0
136 return response_cookies
137
138 class ConnectionClosed:
139 'raised when the client closed the connection'
140
141 class SockStream(object):
142 '''A class that represents a socket as a stream'''
143 def __init__(self, sock):
144 self.sock, self.buffer = sock, ''
145 self.bytesWritten = self.bytesRead = 0
146
147 def close(self):
148 self.sock.shutdown(1) # can't just close it.
149 self.sock = None
150
151 def readline(self):
152 try:
153 while True:
154 nl = self.buffer.find("\n")
155 if nl >= 0: # do not have newline in buffer
156 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
157 raise StopIteration(data)
158 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
159 if not data: raise ConnectionClosed
160 if _debug: print 'socket.read[%d] %r'%(len(data), data)
161 self.bytesRead += len(data)
162 self.buffer += data
163 except StopIteration: raise
164 except: raise ConnectionClosed # anything else is treated as connection closed.
165
166 def read(self, count):
167 try:
168 while True:
169 if len(self.buffer) >= count: # do not have data in buffer
170 data, self.buffer = self.buffer[:count], self.buffer[count:]
171 raise StopIteration(data)
172 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
173 if not data: raise ConnectionClosed
174 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
175 self.bytesRead += len(data)
176 self.buffer += data
177 except StopIteration: raise
178 except: raise ConnectionClosed # anything else is treated as connection closed.
179
180 def unread(self, data):
181 self.buffer = data + self.buffer
182
183 def write(self, data):
184 while len(data) > 0: # write in 4K chunks each time
185 chunk, data = data[:4096], data[4096:]
186 self.bytesWritten += len(chunk)
187 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
188 try: yield multitask.send(self.sock, chunk)
189 except: raise ConnectionClosed
190 if _debug: print 'socket.written'
191
192
193 class Header(object):
194 FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
195
196 def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
197 self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
198 self.extendedtime = 0
199 if channel<64: self.hdrdata = chr(channel)
200 elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
201 else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
202
203 def _appendExtendedTimestamp(self, data):
204 if self.time == 0xFFFFFF:
205 data += struct.pack('>I', self.extendedtime)
206 return data
207
208 def toBytes(self, control):
209 data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
210 if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
211
212 data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
213 if control == Header.TIME: return self._appendExtendedTimestamp(data)
214
215 data += struct.pack('>I', self.size)[1:] # size
216 data += chr(self.type) # type
217 if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
218
219 data += struct.pack('<I', self.streamId) # add streamId
220 return self._appendExtendedTimestamp(data)
221
222 def __repr__(self):
223 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
224 % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
225
226 class Message(object):
227 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
228 RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
229 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
230
231 def __init__(self, hdr=None, data=''):
232 if hdr is None: hdr = Header()
233 self.header, self.data = hdr, data
234
235 # define properties type, streamId and time to access self.header.(property)
236 for p in ['type', 'streamId', 'time']:
237 exec 'def _g%s(self): return self.header.%s'%(p, p)
238 exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
239 exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
240 @property
241 def size(self): return len(self.data)
242
243 def __repr__(self):
244 return ("<Message header=%r data=%r>"% (self.header, self.data))
245
246 class Protocol(object):
247 # constants
248 PING_SIZE = 1536
249 DEFAULT_CHUNK_SIZE = 128
250 MIN_CHANNEL_ID = 3
251 PROTOCOL_CHANNEL_ID = 2
252
253 def __init__(self, sock):
254 self.stream = SockStream(sock)
255 self.lastReadHeaders = dict() # indexed by channelId
256 self.incompletePackets = dict() #indexed by channelId
257 self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
258 self.lastWriteHeaders = dict() # indexed by streamId
259 self.nextChannelId = Protocol.MIN_CHANNEL_ID
260 self.writeLock = threading.Lock()
261 self.writeQueue = Queue.Queue()
262
263 def messageReceived(self, msg):
264 yield
265
266 def protocolMessage(self, msg):
267 if msg.type == Message.ACK: # respond to ACK requests
268 response = Message()
269 response.type, response.data = msg.type, msg.data
270 self.writeMessage(response)
271 elif msg.type == Message.CHUNK_SIZE:
272 self.readChunkSize = struct.unpack('>L', msg.data)[0]
273
274 def connectionClosed(self):
275 yield
276
277 def parse(self):
278 try:
279 yield self.parseRequests() # parse http requests
280 except ConnectionClosed:
281 #yield self.connectionClosed()
282 if _debug: print 'parse connection closed'
283 #yield self.server.queue.put((self, None)) # close connection
284
285 def removeConnection(self):
286 yield self.server.queue.put((self, None)) # close connection
287
288 def writeMessage(self, message):
289 try:
290 yield self.stream.write(message)
291 except ConnectionClosed:
292 yield self.connectionClosed()
293 except:
294 print traceback.print_exc()
295 #self.writeQueue.put(message)
296
297 def parseHandshake(self):
298 '''Parses the rtmp handshake'''
299 data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
300 yield self.stream.write(data)
301 data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
302 yield self.stream.write(data)
303
304 def parseRequests(self):
305 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
306
307 print "parseRequests start", repr(self)
308 self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
309 self.hr.close_connection = 1
310 self.cookies = CookieJar()
311
312 while True:
313
314 # prepare reading the request so that when it's handed
315 # over to "standard" HTTPRequestHandler, the data's already
316 # there.
317 print "parseRequests"
318 try:
319 readok = (yield multitask.readable(self.stream.sock, 5000))
320 except select.error:
321 print "select error: connection closed"
322 raise ConnectionClosed
323
324 print "readok", readok
325 print
326 raw_requestline = (yield self.stream.readline())
327 if _debug: print "parseRequests, line", raw_requestline
328 if not raw_requestline:
329 raise ConnectionClosed
330 data = ''
331 try:
332 while 1:
333 line = (yield self.stream.readline())
334 data += line
335 if line in ['\n', '\r\n']:
336 break
337 except StopIteration:
338 if _debug: print "parseRequests, stopiter"
339 raise
340 except:
341 if _debug:
342 print 'parseRequests', \
343 (traceback and traceback.print_exc() or None)
344
345 raise ConnectionClosed
346
347 self.hr.raw_requestline = raw_requestline
348 #pos = self.hr.rfile.tell()
349 pos = 0
350 self.hr.rfile.truncate(0)
351 self.hr.rfile.write(data)
352 print "parseRequests write after"
353 self.hr.rfile.seek(pos)
354 print "parseRequests seek after"
355
356 if not self.hr.parse_request():
357 raise ConnectionClosed
358 print "parseRequests parse_req after"
359 print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
360 try:
361 yield self.messageReceived(self.hr)
362 except ConnectionClosed:
363 if _debug:
364 print 'parseRequests, ConnectionClosed '
365 raise StopIteration
366
367 except:
368 if _debug:
369 print 'messageReceived', \
370 (traceback and traceback.print_exc() or None)
371
372 def write(self):
373 '''Writes messages to stream'''
374 print "starting protocol write loop", repr(self)
375 while True:
376 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
377 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
378 if _debug: print "write to stream", repr(data)
379 if data is None:
380 # just in case TCP socket is not closed, close it.
381 try:
382 print "stream closing"
383 print self.stream
384 yield self.stream.close()
385 print "stream closed"
386 except: pass
387 break
388
389 try:
390 yield self.stream.write(data)
391 except ConnectionClosed:
392 yield self.connectionClosed()
393 except:
394 print traceback.print_exc()
395 print "ending protocol write loop", repr(self)
396
397 class Command(object):
398 ''' Class for command / data messages'''
399 def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
400 '''Create a new command with given type, name, id, cmdData and args list.'''
401 self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
402
403 def __repr__(self):
404 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
405
406 def setArg(self, arg):
407 self.args.append(arg)
408
409 def getArg(self, index):
410 return self.args[index]
411
412 @classmethod
413 def fromMessage(cls, message):
414 ''' initialize from a parsed RTMP message'''
415 assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
416
417 length = len(message.data)
418 if length == 0: raise ValueError('zero length message data')
419
420 if message.type == Message.RPC3 or message.type == Message.DATA3:
421 assert message.data[0] == '\x00' # must be 0 in AMD3
422 data = message.data[1:]
423 else:
424 data = message.data
425
426 amfReader = amf.AMF0(data)
427
428 inst = cls()
429 inst.name = amfReader.read() # first field is command name
430
431 try:
432 if message.type == Message.RPC:
433 inst.id = amfReader.read() # second field *may* be message id
434 inst.cmdData = amfReader.read() # third is command data
435 else:
436 inst.id = 0
437 inst.args = [] # others are optional
438 while True:
439 inst.args.append(amfReader.read())
440 except EOFError:
441 pass
442 return inst
443
444 def toMessage(self):
445 msg = Message()
446 assert self.type
447 msg.type = self.type
448 output = amf.BytesIO()
449 amfWriter = amf.AMF0(output)
450 amfWriter.write(self.name)
451 if msg.type == Message.RPC or msg.type == Message.RPC3:
452 amfWriter.write(self.id)
453 amfWriter.write(self.cmdData)
454 for arg in self.args:
455 amfWriter.write(arg)
456 output.seek(0)
457 #hexdump.hexdump(output)
458 #output.seek(0)
459 if msg.type == Message.RPC3 or msg.type == Message.DATA3:
460 data = '\x00' + output.read()
461 else:
462 data = output.read()
463 msg.data = data
464 output.close()
465 return msg
466
467 def getfilename(path, name, root):
468 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
469 the the path present in the path variable.'''
470 ignore, ignore, scope = path.partition('/')
471 if scope: scope = scope + '/'
472 result = root + scope + name + '.flv'
473 if _debug: print 'filename=', result
474 return result
475
476 class Stream(object):
477 '''The stream object that is used for RTMP stream.'''
478 count = 0;
479 def __init__(self, client):
480 self.client, self.id, self.name = client, 0, ''
481 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
482 self.queue = multitask.Queue()
483 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
484 if _debug: print self, 'created'
485
486 def close(self):
487 if _debug: print self, 'closing'
488 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
489 if self.playfile is not None: self.playfile.close(); self.playfile = None
490 self.client = None # to clear the reference
491 pass
492
493 def __repr__(self):
494 return self._name;
495
496 def recv(self):
497 '''Generator to receive new Message on this stream, or None if stream is closed.'''
498 return self.queue.get()
499
500 def send(self, msg):
501 '''Method to send a Message or Command on this stream.'''
502 if isinstance(msg, Command):
503 msg = msg.toMessage()
504 msg.streamId = self.id
505 # if _debug: print self,'send'
506 if self.client is not None: self.client.writeMessage(msg)
507
508 class Client(Protocol):
509 '''The client object represents a single connected client to the server.'''
510 def __init__(self, sock, server, remote):
511 Protocol.__init__(self, sock)
512 self.server = server
513 self.remote = remote
514 self.agent = {}
515 self.streams = {}
516 self._nextCallId = 2
517 self._nextStreamId = 1
518 self.objectEncoding = 0.0
519 self.queue = multitask.Queue() # receive queue used by application
520 multitask.add(self.parse())
521 #multitask.add(self.write())
522
523 def recv(self):
524 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
525 return self.queue.get()
526
527 def connectionClosed(self):
528 '''Called when the client drops the connection'''
529 if _debug: 'Client.connectionClosed'
530 #self.writeMessage(None)
531 #yield self.queue.put((None,None))
532 if self.stream.sock:
533 yield self.stream.close()
534 else:
535 yield None
536
537 def messageReceived(self, msg):
538 if _debug: print 'messageReceived cmd=', msg.command, msg.path
539 msg.response_cookies = process_cookies(msg.headers, self.remote)
540
541 # slightly bad, this: read everything, put it into memory...
542 if msg.headers.has_key('content-length'):
543 max_chunk_size = 10*1024*1024
544 size_remaining = int(msg.headers["content-length"])
545 L = []
546 while size_remaining:
547 chunk_size = min(size_remaining, max_chunk_size)
548 data = (yield self.stream.read(chunk_size))
549 L.append(data)
550 size_remaining -= len(L[-1])
551
552 pos = msg.rfile.tell()
553 msg.rfile.write(''.join(L))
554 msg.rfile.seek(pos)
555
556 yield self.server.queue.put((self, msg)) # new connection
557
558 def accept(self):
559 '''Method to accept an incoming client.'''
560 response = Command()
561 response.id, response.name, response.type = 1, '_result', Message.RPC
562 if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
563 response.setArg(dict(level='status', code='NetConnection.Connect.Success',
564 description='Connection succeeded.', details=None,
565 objectEncoding=self.objectEncoding))
566 self.writeMessage(response.toMessage())
567
568 def rejectConnection(self, reason=''):
569 '''Method to reject an incoming client.'''
570 response = Command()
571 response.id, response.name, response.type = 1, '_error', Message.RPC
572 response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
573 description=reason, details=None))
574 self.writeMessage(response.toMessage())
575
576 def call(self, method, *args):
577 '''Call a (callback) method on the client.'''
578 cmd = Command()
579 cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
580 cmd.args, cmd.cmdData = args, None
581 self._nextCallId += 1
582 if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
583 self.writeMessage(cmd.toMessage())
584
585 def createStream(self):
586 ''' Create a stream on the server side'''
587 stream = Stream(self)
588 stream.id = self._nextStreamId
589 self.streams[stream.id] = stream
590 self._nextStreamId += 1
591 return stream
592
593
594 class Server(object):
595 '''A RTMP server listens for incoming connections and informs the app.'''
596 def __init__(self, sock):
597 '''Create an RTMP server on the given bound TCP socket. The server will terminate
598 when the socket is disconnected, or some other error occurs in listening.'''
599 self.sock = sock
600 self.queue = multitask.Queue() # queue to receive incoming client connections
601 multitask.add(self.run())
602
603 def recv(self):
604 '''Generator to wait for incoming client connections on this server and return
605 (client, args) or (None, None) if the socket is closed or some error.'''
606 return self.queue.get()
607
608 def run(self):
609 try:
610 while True:
611 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
612 if sock == None:
613 if _debug: print 'rtmp.Server accept(sock) returned None.'
614 break
615 if _debug: print 'connection received from', remote
616 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
617 client = Client(sock, self, remote)
618 except:
619 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
620 traceback.print_exc()
621
622 if (self.sock):
623 try: self.sock.close(); self.sock = None
624 except: pass
625 if (self.queue):
626 yield self.queue.put((None, None))
627 self.queue = None
628
629 class BaseApp(object):
630 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
631 count = 0
632 def __init__(self):
633 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
634 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
635 if _debug: print self.name, 'created'
636 def __del__(self):
637 if _debug: print self.name, 'destroyed'
638 @property
639 def clients(self):
640 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
641 return self._clients[1:] if self._clients is not None else []
642
643 class App(BaseApp, SimpleAppHTTPRequestHandler):
644 pass
645
646 class HTTPServer(object):
647 '''A RTMP server to record and stream Flash video.'''
648 def __init__(self):
649 '''Construct a new HttpServer. It initializes the local members.'''
650 self.sock = self.server = None
651 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
652 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
653 self.root = ''
654
655 def start(self, host='0.0.0.0', port=8080):
656 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
657 if _debug: print 'start', host, port
658 if not self.server:
659 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
660 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
661 sock.bind((host, port))
662 if _debug: print 'listening on ', sock.getsockname()
663 sock.listen(5)
664 server = self.server = Server(sock) # start rtmp server on that socket
665 multitask.add(self.serverlistener())
666
667 def stop(self):
668 if _debug: print 'stopping Flash server'
669 if self.server and self.sock:
670 try: self.sock.close(); self.sock = None
671 except: pass
672 self.server = None
673
674 def serverlistener(self):
675 '''Server listener (generator). It accepts all connections and invokes client listener'''
676 try:
677 while True: # main loop to receive new connections on the server
678 client, msg = (yield self.server.recv()) # receive an incoming client connection.
679 # TODO: we should reject non-localhost client connections.
680 if not client: # if the server aborted abnormally,
681 break # hence close the listener.
682 if _debug: print 'client connection received', client, msg
683
684 if msg is None:
685 yield client.connectionClosed()
686 session = client.session
687 del self.clients[session]
688 continue
689
690 # if client.objectEncoding != 0 and client.objectEncoding != 3:
691 if client.objectEncoding != 0:
692 yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
693 yield client.connectionClosed()
694 else:
695 print "cookies", str(msg.response_cookies)
696 session = msg.response_cookies['session'].value
697 client.session = session
698 name = msg.path
699 print "serverlistener", name
700 if '*' not in self.apps and name not in self.apps:
701 yield client.rejectConnection(reason='Application not found: ' + name)
702 else: # create application instance as needed and add in our list
703 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
704 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
705 print "clients", self.clients.keys()
706 if session in self.clients:
707 inst = self.clients[session][0]
708 else:
709 inst = app()
710 self.clients[session] = [inst]; inst._clients=self.clients[session]
711 msg.server = inst # whew! just in time!
712 try:
713 methodname = "on%s" % msg.command
714 print methodname, dir(inst)
715 method = getattr(inst, methodname, None)
716 result = method(client, msg)
717 close_connection = msg.close_connection
718 print "close connection", close_connection
719 except:
720 if _debug: traceback.print_exc()
721 yield client.rejectConnection(reason='Exception on %s' % methodname)
722 continue
723 if result is True or result is None:
724 if result is None:
725 msg.wfile.seek(0)
726 data = msg.wfile.read()
727 msg.wfile.seek(0)
728 msg.wfile.truncate()
729 yield client.writeMessage(data)
730 if close_connection:
731 if _debug:
732 print 'close_connection requested'
733 try:
734 yield client.connectionClosed()
735 raise ConnectionClosed
736 except ConnectionClosed:
737 if _debug:
738 print 'close_connection done'
739 pass
740 else:
741 print "result", result
742 yield client.rejectConnection(reason='Rejected in onConnect')
743 except StopIteration: raise
744 except:
745 if _debug:
746 print 'serverlistener exception', \
747 (sys and sys.exc_info() or None)
748 traceback.print_exc()
749
750 def clientlistener(self, client):
751 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
752 try:
753 while True:
754 msg, arg = (yield client.recv()) # receive new message from client
755 if not msg: # if the client disconnected,
756 if _debug: print 'connection closed from client'
757 break # come out of listening loop.
758 if msg == 'command': # handle a new command
759 multitask.add(self.clienthandler(client, arg))
760 elif msg == 'stream': # a new stream is created, handle the stream.
761 arg.client = client
762 multitask.add(self.streamlistener(arg))
763 except StopIteration: raise
764 except:
765 if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
766 traceback.print_exc()
767
768 # client is disconnected, clear our state for application instance.
769 if _debug: print 'cleaning up client', client.path
770 inst = None
771 if client.path in self.clients:
772 inst = self.clients[client.path][0]
773 self.clients[client.path].remove(client)
774 for stream in client.streams.values(): # for all streams of this client
775 self.closehandler(stream)
776 client.streams.clear() # and clear the collection of streams
777 if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
778 if _debug: print 'removing the application instance'
779 inst = self.clients[client.path][0]
780 inst._clients = None
781 del self.clients[client.path]
782 if inst is not None: inst.onDisconnect(client)
783
784 def closehandler(self, stream):
785 '''A stream is closed explicitly when a closeStream command is received from given client.'''
786 if stream.client is not None:
787 inst = self.clients[stream.client.path][0]
788 if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
789 inst.onClose(stream.client, stream)
790 del inst.publishers[stream.name]
791 if stream.name in inst.players and stream in inst.players[stream.name]:
792 inst.onStop(stream.client, stream)
793 inst.players[stream.name].remove(stream)
794 if len(inst.players[stream.name]) == 0:
795 del inst.players[stream.name]
796 stream.close()
797
798 def clienthandler(self, client, cmd):
799 '''A generator to handle a single command on the client.'''
800 inst = self.clients[client.path][0]
801 if inst:
802 if cmd.name == '_error':
803 if hasattr(inst, 'onStatus'):
804 result = inst.onStatus(client, cmd.args[0])
805 elif cmd.name == '_result':
806 if hasattr(inst, 'onResult'):
807 result = inst.onResult(client, cmd.args[0])
808 else:
809 res, code, args = Command(), '_result', dict()
810 try: result = inst.onCommand(client, cmd.name, *cmd.args)
811 except:
812 if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
813 code, args = '_error', dict()
814 res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
815 res.args, res.cmdData = args, None
816 if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
817 client.writeMessage(res.toMessage())
818 # TODO return result to caller
819 yield
820
821 def streamlistener(self, stream):
822 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
823 stream.recordfile = None # so that it doesn't complain about missing attribute
824 while True:
825 msg = (yield stream.recv())
826 if not msg:
827 if _debug: print 'stream closed'
828 self.closehandler(stream)
829 break
830 # if _debug: msg
831 multitask.add(self.streamhandler(stream, msg))
832
833 def streamhandler(self, stream, message):
834 '''A generator to handle a single message on the stream.'''
835 try:
836 if message.type == Message.RPC:
837 cmd = Command.fromMessage(message)
838 if _debug: print 'streamhandler received cmd=', cmd
839 if cmd.name == 'publish':
840 yield self.publishhandler(stream, cmd)
841 elif cmd.name == 'play':
842 yield self.playhandler(stream, cmd)
843 elif cmd.name == 'closeStream':
844 self.closehandler(stream)
845 else: # audio or video message
846 yield self.mediahandler(stream, message)
847 except GeneratorExit: pass
848 except StopIteration: pass
849 except:
850 if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
851
852 def publishhandler(self, stream, cmd):
853 '''A new stream is published. Store the information in the application instance.'''
854 try:
855 stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
856 stream.name = cmd.args[0]
857 if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
858 inst = self.clients[stream.client.path][0]
859 if (stream.name in inst.publishers):
860 raise ValueError, 'Stream name already in use'
861 inst.publishers[stream.name] = stream # store the client for publisher
862 result = inst.onPublish(stream.client, stream)
863
864 if stream.mode == 'record' or stream.mode == 'append':
865 stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
866 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
867 yield stream.send(response)
868 except ValueError, E: # some error occurred. inform the app.
869 if _debug: print 'error in publishing stream', str(E)
870 response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
871 yield stream.send(response)
872
873 def playhandler(self, stream, cmd):
874 '''A new stream is being played. Just updated the players list with this stream.'''
875 inst = self.clients[stream.client.path][0]
876 name = stream.name = cmd.args[0] # store the stream's name
877 start = cmd.args[1] if len(cmd.args) >= 2 else -2
878 if name not in inst.players:
879 inst.players[name] = [] # initialize the players for this stream name
880 if stream not in inst.players[name]: # store the stream as players of this name
881 inst.players[name].append(stream)
882 path = getfilename(stream.client.path, stream.name, self.root)
883 if os.path.exists(path):
884 stream.playfile = FLV().open(path)
885 multitask.add(stream.playfile.reader(stream))
886 if _debug: print 'playing stream=', name, 'start=', start
887 result = inst.onPlay(stream.client, stream)
888 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
889 yield stream.send(response)
890
891 def mediahandler(self, stream, message):
892 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
893 if stream.client is not None:
894 inst = self.clients[stream.client.path][0]
895 result = inst.onPublishData(stream.client, stream, message)
896 if result:
897 client = stream.client
898 for s in (inst.players.get(stream.name, [])):
899 # if _debug: print 'D', stream.name, s.name
900 result = inst.onPlayData(s.client, s, message)
901 if result:
902 yield s.send(message)
903 if stream.recordfile is not None:
904 stream.recordfile.write(message)
905
906 # The main routine to start, run and stop the service
907 if __name__ == '__main__':
908 print "optparse"
909 from optparse import OptionParser
910 parser = OptionParser()
911 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
912 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
913 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
914 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
915 (options, args) = parser.parse_args()
916
917 _debug = options.verbose
918 try:
919 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
920 agent = HTTPServer()
921 agent.root = options.root
922 agent.start(options.host, options.port)
923 if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
924 multitask.run()
925 except KeyboardInterrupt:
926 print traceback.print_exc()
927 pass
928 except:
929 print "exception"
930 print traceback.print_exc()
931 if _debug: print time.asctime(), 'Flash Server Stops'