keep proxies in separate dictionary
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
12
13
14 Typically an application can launch this server as follows:
15 $ python rtmp.py
16
17 To know the command line options use the -h option:
18 $ python rtmp.py -h
19
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
23
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
32
33
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
39
40
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
44
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
47 App.__init__(self)
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
52 yield
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
54 ...
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
56
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
60
61 '''
62
63 import os, sys, time, struct, socket, traceback, multitask
64 import threading, Queue
65 import uuid
66 import select
67 from string import strip
68
69 from BaseHTTPServer import BaseHTTPRequestHandler
70 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
71 from cStringIO import StringIO
72 from cookielib import parse_ns_headers, CookieJar
73 from Cookie import SimpleCookie
74
75 global _debug
76 _debug = False
77 def set_debug(dbg):
78 global _debug
79 _debug = dbg
80
81 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
82
83 def setup(self):
84 self.connection = self.request
85 self.rfile = StringIO()
86 self.wfile = StringIO()
87
88 def finish(self):
89 pass
90
91 def info(self):
92 return self.headers
93
94 def get_full_url(self):
95 return self.path
96
97 def get_header(self, hdr, default):
98 return self.headers.getheader(hdr, default)
99
100 def add_cookies(self):
101 for k, v in self.response_cookies.items():
102 val = v.OutputString()
103 self.send_header("Set-Cookie", val)
104
105 def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
106 ch = headers.getheaders(cookie_key)
107 print "messageReceived cookieheaders=", '; '.join(ch)
108 res = []
109 for c in ch:
110 c = c.split(";")
111 if len(c) == 0:
112 continue
113 c = map(strip, c)
114 c = filter(lambda x: x, c)
115 res += c
116 has_sess = False
117 response_cookies = SimpleCookie()
118 for c in res:
119 print "found cookie", repr(c)
120 name, value = c.split("=")
121 response_cookies[name] = value
122 #response_cookies[name]['path'] = "/"
123 #response_cookies[name]['domain'] = remote[0]
124 #response_cookies[name]['version'] = 0
125 if name == "session":
126 response_cookies[name]['expires'] = 50000
127 has_sess = True
128 if not add_sess:
129 return response_cookies
130 if not has_sess:
131 response_cookies['session'] = uuid.uuid4().hex
132 response_cookies['session']['expires'] = 50000
133 #response_cookies['session']['path'] = '/'
134 #response_cookies['session']['domain'] = remote[0]
135 #response_cookies['session']['version'] = 0
136 return response_cookies
137
138 class ConnectionClosed:
139 'raised when the client closed the connection'
140
141 class SockStream(object):
142 '''A class that represents a socket as a stream'''
143 def __init__(self, sock):
144 self.sock, self.buffer = sock, ''
145 self.bytesWritten = self.bytesRead = 0
146
147 def close(self):
148 self.sock.shutdown(1) # can't just close it.
149 self.sock = None
150
151 def readline(self):
152 try:
153 while True:
154 nl = self.buffer.find("\n")
155 if nl >= 0: # do not have newline in buffer
156 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
157 raise StopIteration(data)
158 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
159 if not data: raise ConnectionClosed
160 if _debug: print 'socket.read[%d] %r'%(len(data), data)
161 self.bytesRead += len(data)
162 self.buffer += data
163 except StopIteration: raise
164 except: raise ConnectionClosed # anything else is treated as connection closed.
165
166 def read(self, count):
167 try:
168 while True:
169 if len(self.buffer) >= count: # do not have data in buffer
170 data, self.buffer = self.buffer[:count], self.buffer[count:]
171 raise StopIteration(data)
172 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
173 if not data: raise ConnectionClosed
174 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
175 self.bytesRead += len(data)
176 self.buffer += data
177 except StopIteration: raise
178 except: raise ConnectionClosed # anything else is treated as connection closed.
179
180 def unread(self, data):
181 self.buffer = data + self.buffer
182
183 def write(self, data):
184 while len(data) > 0: # write in 4K chunks each time
185 chunk, data = data[:4096], data[4096:]
186 self.bytesWritten += len(chunk)
187 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
188 try: yield multitask.send(self.sock, chunk)
189 except: raise ConnectionClosed
190 if _debug: print 'socket.written'
191
192
193 class Header(object):
194 FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
195
196 def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
197 self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
198 self.extendedtime = 0
199 if channel<64: self.hdrdata = chr(channel)
200 elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
201 else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
202
203 def _appendExtendedTimestamp(self, data):
204 if self.time == 0xFFFFFF:
205 data += struct.pack('>I', self.extendedtime)
206 return data
207
208 def toBytes(self, control):
209 data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
210 if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
211
212 data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
213 if control == Header.TIME: return self._appendExtendedTimestamp(data)
214
215 data += struct.pack('>I', self.size)[1:] # size
216 data += chr(self.type) # type
217 if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
218
219 data += struct.pack('<I', self.streamId) # add streamId
220 return self._appendExtendedTimestamp(data)
221
222 def __repr__(self):
223 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
224 % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
225
226 class Message(object):
227 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
228 RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
229 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
230
231 def __init__(self, hdr=None, data=''):
232 if hdr is None: hdr = Header()
233 self.header, self.data = hdr, data
234
235 # define properties type, streamId and time to access self.header.(property)
236 for p in ['type', 'streamId', 'time']:
237 exec 'def _g%s(self): return self.header.%s'%(p, p)
238 exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
239 exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
240 @property
241 def size(self): return len(self.data)
242
243 def __repr__(self):
244 return ("<Message header=%r data=%r>"% (self.header, self.data))
245
246 class Protocol(object):
247 # constants
248 PING_SIZE = 1536
249 DEFAULT_CHUNK_SIZE = 128
250 MIN_CHANNEL_ID = 3
251 PROTOCOL_CHANNEL_ID = 2
252
253 def __init__(self, sock):
254 self.stream = SockStream(sock)
255 self.lastReadHeaders = dict() # indexed by channelId
256 self.incompletePackets = dict() #indexed by channelId
257 self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
258 self.lastWriteHeaders = dict() # indexed by streamId
259 self.nextChannelId = Protocol.MIN_CHANNEL_ID
260 self.writeLock = threading.Lock()
261 self.writeQueue = Queue.Queue()
262
263 def messageReceived(self, msg):
264 yield
265
266 def protocolMessage(self, msg):
267 if msg.type == Message.ACK: # respond to ACK requests
268 response = Message()
269 response.type, response.data = msg.type, msg.data
270 self.writeMessage(response)
271 elif msg.type == Message.CHUNK_SIZE:
272 self.readChunkSize = struct.unpack('>L', msg.data)[0]
273
274 def connectionClosed(self):
275 yield
276
277 def parse(self):
278 try:
279 yield self.parseRequests() # parse http requests
280 except ConnectionClosed:
281 #yield self.connectionClosed()
282 if _debug: print 'parse connection closed'
283
284 def writeMessage(self, message):
285 try:
286 yield self.stream.write(message)
287 except ConnectionClosed:
288 yield self.connectionClosed()
289 except:
290 print traceback.print_exc()
291 #self.writeQueue.put(message)
292
293 def parseHandshake(self):
294 '''Parses the rtmp handshake'''
295 data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
296 yield self.stream.write(data)
297 data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
298 yield self.stream.write(data)
299
300 def parseRequests(self):
301 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
302
303 print "parseRequests start", repr(self)
304 self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
305 self.hr.close_connection = 1
306 self.cookies = CookieJar()
307
308 while True:
309
310 # prepare reading the request so that when it's handed
311 # over to "standard" HTTPRequestHandler, the data's already
312 # there.
313 print "parseRequests"
314 try:
315 readok = (yield multitask.readable(self.stream.sock, 5000))
316 except select.error:
317 print "select error: connection closed"
318 raise ConnectionClosed
319
320 print "readok", readok
321 print
322 raw_requestline = (yield self.stream.readline())
323 if _debug: print "parseRequests, line", raw_requestline
324 if not raw_requestline:
325 raise ConnectionClosed
326 data = ''
327 try:
328 while 1:
329 line = (yield self.stream.readline())
330 data += line
331 if line in ['\n', '\r\n']:
332 break
333 except StopIteration:
334 if _debug: print "parseRequests, stopiter"
335 raise
336 except:
337 if _debug:
338 print 'parseRequests', \
339 (traceback and traceback.print_exc() or None)
340
341 raise ConnectionClosed
342
343 self.hr.raw_requestline = raw_requestline
344 #pos = self.hr.rfile.tell()
345 pos = 0
346 self.hr.rfile.truncate(0)
347 self.hr.rfile.write(data)
348 print "parseRequests write after"
349 self.hr.rfile.seek(pos)
350 print "parseRequests seek after"
351
352 if not self.hr.parse_request():
353 raise ConnectionClosed
354 print "parseRequests parse_req after"
355 print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
356 try:
357 yield self.messageReceived(self.hr)
358 except ConnectionClosed:
359 if _debug:
360 print 'parseRequests, ConnectionClosed '
361 raise StopIteration
362
363 except:
364 if _debug:
365 print 'messageReceived', \
366 (traceback and traceback.print_exc() or None)
367
368 def write(self):
369 '''Writes messages to stream'''
370 print "starting protocol write loop", repr(self)
371 while True:
372 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
373 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
374 if _debug: print "write to stream", repr(data)
375 if data is None:
376 # just in case TCP socket is not closed, close it.
377 try:
378 print "stream closing"
379 print self.stream
380 yield self.stream.close()
381 print "stream closed"
382 except: pass
383 break
384
385 try:
386 yield self.stream.write(data)
387 except ConnectionClosed:
388 yield self.connectionClosed()
389 except:
390 print traceback.print_exc()
391 print "ending protocol write loop", repr(self)
392
393 class Command(object):
394 ''' Class for command / data messages'''
395 def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
396 '''Create a new command with given type, name, id, cmdData and args list.'''
397 self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
398
399 def __repr__(self):
400 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
401
402 def setArg(self, arg):
403 self.args.append(arg)
404
405 def getArg(self, index):
406 return self.args[index]
407
408 @classmethod
409 def fromMessage(cls, message):
410 ''' initialize from a parsed RTMP message'''
411 assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
412
413 length = len(message.data)
414 if length == 0: raise ValueError('zero length message data')
415
416 if message.type == Message.RPC3 or message.type == Message.DATA3:
417 assert message.data[0] == '\x00' # must be 0 in AMD3
418 data = message.data[1:]
419 else:
420 data = message.data
421
422 amfReader = amf.AMF0(data)
423
424 inst = cls()
425 inst.name = amfReader.read() # first field is command name
426
427 try:
428 if message.type == Message.RPC:
429 inst.id = amfReader.read() # second field *may* be message id
430 inst.cmdData = amfReader.read() # third is command data
431 else:
432 inst.id = 0
433 inst.args = [] # others are optional
434 while True:
435 inst.args.append(amfReader.read())
436 except EOFError:
437 pass
438 return inst
439
440 def toMessage(self):
441 msg = Message()
442 assert self.type
443 msg.type = self.type
444 output = amf.BytesIO()
445 amfWriter = amf.AMF0(output)
446 amfWriter.write(self.name)
447 if msg.type == Message.RPC or msg.type == Message.RPC3:
448 amfWriter.write(self.id)
449 amfWriter.write(self.cmdData)
450 for arg in self.args:
451 amfWriter.write(arg)
452 output.seek(0)
453 #hexdump.hexdump(output)
454 #output.seek(0)
455 if msg.type == Message.RPC3 or msg.type == Message.DATA3:
456 data = '\x00' + output.read()
457 else:
458 data = output.read()
459 msg.data = data
460 output.close()
461 return msg
462
463 def getfilename(path, name, root):
464 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
465 the the path present in the path variable.'''
466 ignore, ignore, scope = path.partition('/')
467 if scope: scope = scope + '/'
468 result = root + scope + name + '.flv'
469 if _debug: print 'filename=', result
470 return result
471
472 class Stream(object):
473 '''The stream object that is used for RTMP stream.'''
474 count = 0;
475 def __init__(self, client):
476 self.client, self.id, self.name = client, 0, ''
477 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
478 self.queue = multitask.Queue()
479 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
480 if _debug: print self, 'created'
481
482 def close(self):
483 if _debug: print self, 'closing'
484 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
485 if self.playfile is not None: self.playfile.close(); self.playfile = None
486 self.client = None # to clear the reference
487 pass
488
489 def __repr__(self):
490 return self._name;
491
492 def recv(self):
493 '''Generator to receive new Message on this stream, or None if stream is closed.'''
494 return self.queue.get()
495
496 def send(self, msg):
497 '''Method to send a Message or Command on this stream.'''
498 if isinstance(msg, Command):
499 msg = msg.toMessage()
500 msg.streamId = self.id
501 # if _debug: print self,'send'
502 if self.client is not None: self.client.writeMessage(msg)
503
504 class Client(Protocol):
505 '''The client object represents a single connected client to the server.'''
506 def __init__(self, sock, server, remote):
507 Protocol.__init__(self, sock)
508 self.server = server
509 self.remote = remote
510 self.agent = {}
511 self.streams = {}
512 self._nextCallId = 2
513 self._nextStreamId = 1
514 self.objectEncoding = 0.0
515 self.queue = multitask.Queue() # receive queue used by application
516 multitask.add(self.parse())
517 #multitask.add(self.write())
518
519 def recv(self):
520 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
521 return self.queue.get()
522
523 def connectionClosed(self):
524 '''Called when the client drops the connection'''
525 if _debug: 'Client.connectionClosed'
526 #self.writeMessage(None)
527 #yield self.queue.put((None,None))
528 if self.stream.sock:
529 yield self.stream.close()
530 else:
531 yield None
532
533 def messageReceived(self, msg):
534 if _debug: print 'messageReceived cmd=', msg.command, msg.path
535 msg.response_cookies = process_cookies(msg.headers, self.remote)
536
537 # slightly bad, this: read everything, put it into memory...
538 if msg.headers.has_key('content-length'):
539 max_chunk_size = 10*1024*1024
540 size_remaining = int(msg.headers["content-length"])
541 L = []
542 while size_remaining:
543 chunk_size = min(size_remaining, max_chunk_size)
544 data = (yield self.stream.read(chunk_size))
545 L.append(data)
546 size_remaining -= len(L[-1])
547
548 pos = msg.rfile.tell()
549 msg.rfile.write(''.join(L))
550 msg.rfile.seek(pos)
551
552 yield self.server.queue.put((self, msg)) # new connection
553
554 def accept(self):
555 '''Method to accept an incoming client.'''
556 response = Command()
557 response.id, response.name, response.type = 1, '_result', Message.RPC
558 if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
559 response.setArg(dict(level='status', code='NetConnection.Connect.Success',
560 description='Connection succeeded.', details=None,
561 objectEncoding=self.objectEncoding))
562 self.writeMessage(response.toMessage())
563
564 def rejectConnection(self, reason=''):
565 '''Method to reject an incoming client.'''
566 response = Command()
567 response.id, response.name, response.type = 1, '_error', Message.RPC
568 response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
569 description=reason, details=None))
570 self.writeMessage(response.toMessage())
571
572 def call(self, method, *args):
573 '''Call a (callback) method on the client.'''
574 cmd = Command()
575 cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
576 cmd.args, cmd.cmdData = args, None
577 self._nextCallId += 1
578 if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
579 self.writeMessage(cmd.toMessage())
580
581 def createStream(self):
582 ''' Create a stream on the server side'''
583 stream = Stream(self)
584 stream.id = self._nextStreamId
585 self.streams[stream.id] = stream
586 self._nextStreamId += 1
587 return stream
588
589
590 class Server(object):
591 '''A RTMP server listens for incoming connections and informs the app.'''
592 def __init__(self, sock):
593 '''Create an RTMP server on the given bound TCP socket. The server will terminate
594 when the socket is disconnected, or some other error occurs in listening.'''
595 self.sock = sock
596 self.queue = multitask.Queue() # queue to receive incoming client connections
597 multitask.add(self.run())
598
599 def recv(self):
600 '''Generator to wait for incoming client connections on this server and return
601 (client, args) or (None, None) if the socket is closed or some error.'''
602 return self.queue.get()
603
604 def run(self):
605 try:
606 while True:
607 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
608 if sock == None:
609 if _debug: print 'rtmp.Server accept(sock) returned None.'
610 break
611 if _debug: print 'connection received from', remote
612 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
613 client = Client(sock, self, remote)
614 except:
615 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
616 traceback.print_exc()
617
618 if (self.sock):
619 try: self.sock.close(); self.sock = None
620 except: pass
621 if (self.queue):
622 yield self.queue.put((None, None))
623 self.queue = None
624
625 class BaseApp(object):
626 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
627 count = 0
628 def __init__(self):
629 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
630 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
631 if _debug: print self.name, 'created'
632 def __del__(self):
633 if _debug: print self.name, 'destroyed'
634 @property
635 def clients(self):
636 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
637 return self._clients[1:] if self._clients is not None else []
638
639 class App(BaseApp, SimpleAppHTTPRequestHandler):
640 pass
641
642 class HTTPServer(object):
643 '''A RTMP server to record and stream Flash video.'''
644 def __init__(self):
645 '''Construct a new HttpServer. It initializes the local members.'''
646 self.sock = self.server = None
647 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
648 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
649 self.root = ''
650
651 def start(self, host='0.0.0.0', port=8080):
652 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
653 if _debug: print 'start', host, port
654 if not self.server:
655 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
656 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
657 sock.bind((host, port))
658 if _debug: print 'listening on ', sock.getsockname()
659 sock.listen(5)
660 server = self.server = Server(sock) # start rtmp server on that socket
661 multitask.add(self.serverlistener())
662
663 def stop(self):
664 if _debug: print 'stopping Flash server'
665 if self.server and self.sock:
666 try: self.sock.close(); self.sock = None
667 except: pass
668 self.server = None
669
670 def serverlistener(self):
671 '''Server listener (generator). It accepts all connections and invokes client listener'''
672 try:
673 while True: # main loop to receive new connections on the server
674 client, msg = (yield self.server.recv()) # receive an incoming client connection.
675 # TODO: we should reject non-localhost client connections.
676 if not client: # if the server aborted abnormally,
677 break # hence close the listener.
678 if _debug: print 'client connection received', client, msg
679 # if client.objectEncoding != 0 and client.objectEncoding != 3:
680 if client.objectEncoding != 0:
681 yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
682 yield client.connectionClosed()
683 else:
684 print "cookies", str(msg.response_cookies)
685 session = msg.response_cookies['session'].value
686 client.session = session
687 name = msg.path
688 print "serverlistener", name
689 if '*' not in self.apps and name not in self.apps:
690 yield client.rejectConnection(reason='Application not found: ' + name)
691 else: # create application instance as needed and add in our list
692 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
693 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
694 print "clients", self.clients.keys()
695 if session in self.clients:
696 inst = self.clients[session][0]
697 else:
698 inst = app()
699 self.clients[session] = [inst]; inst._clients=self.clients[session]
700 msg.server = inst # whew! just in time!
701 try:
702 methodname = "on%s" % msg.command
703 print methodname, dir(inst)
704 method = getattr(inst, methodname, None)
705 result = method(client, msg)
706 close_connection = msg.close_connection
707 print "close connection", close_connection
708 except:
709 if _debug: traceback.print_exc()
710 yield client.rejectConnection(reason='Exception on %s' % methodname)
711 continue
712 if result is True or result is None:
713 if result is None:
714 msg.wfile.seek(0)
715 data = msg.wfile.read()
716 msg.wfile.seek(0)
717 msg.wfile.truncate()
718 yield client.writeMessage(data)
719 if close_connection:
720 if _debug:
721 print 'close_connection requested'
722 try:
723 yield client.connectionClosed()
724 raise ConnectionClosed
725 except ConnectionClosed:
726 if _debug:
727 print 'close_connection done'
728 pass
729 else:
730 print "result", result
731 yield client.rejectConnection(reason='Rejected in onConnect')
732 except StopIteration: raise
733 except:
734 if _debug:
735 print 'serverlistener exception', \
736 (sys and sys.exc_info() or None)
737 traceback.print_exc()
738
739 def clientlistener(self, client):
740 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
741 try:
742 while True:
743 msg, arg = (yield client.recv()) # receive new message from client
744 if not msg: # if the client disconnected,
745 if _debug: print 'connection closed from client'
746 break # come out of listening loop.
747 if msg == 'command': # handle a new command
748 multitask.add(self.clienthandler(client, arg))
749 elif msg == 'stream': # a new stream is created, handle the stream.
750 arg.client = client
751 multitask.add(self.streamlistener(arg))
752 except StopIteration: raise
753 except:
754 if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
755 traceback.print_exc()
756
757 # client is disconnected, clear our state for application instance.
758 if _debug: print 'cleaning up client', client.path
759 inst = None
760 if client.path in self.clients:
761 inst = self.clients[client.path][0]
762 self.clients[client.path].remove(client)
763 for stream in client.streams.values(): # for all streams of this client
764 self.closehandler(stream)
765 client.streams.clear() # and clear the collection of streams
766 if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
767 if _debug: print 'removing the application instance'
768 inst = self.clients[client.path][0]
769 inst._clients = None
770 del self.clients[client.path]
771 if inst is not None: inst.onDisconnect(client)
772
773 def closehandler(self, stream):
774 '''A stream is closed explicitly when a closeStream command is received from given client.'''
775 if stream.client is not None:
776 inst = self.clients[stream.client.path][0]
777 if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
778 inst.onClose(stream.client, stream)
779 del inst.publishers[stream.name]
780 if stream.name in inst.players and stream in inst.players[stream.name]:
781 inst.onStop(stream.client, stream)
782 inst.players[stream.name].remove(stream)
783 if len(inst.players[stream.name]) == 0:
784 del inst.players[stream.name]
785 stream.close()
786
787 def clienthandler(self, client, cmd):
788 '''A generator to handle a single command on the client.'''
789 inst = self.clients[client.path][0]
790 if inst:
791 if cmd.name == '_error':
792 if hasattr(inst, 'onStatus'):
793 result = inst.onStatus(client, cmd.args[0])
794 elif cmd.name == '_result':
795 if hasattr(inst, 'onResult'):
796 result = inst.onResult(client, cmd.args[0])
797 else:
798 res, code, args = Command(), '_result', dict()
799 try: result = inst.onCommand(client, cmd.name, *cmd.args)
800 except:
801 if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
802 code, args = '_error', dict()
803 res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
804 res.args, res.cmdData = args, None
805 if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
806 client.writeMessage(res.toMessage())
807 # TODO return result to caller
808 yield
809
810 def streamlistener(self, stream):
811 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
812 stream.recordfile = None # so that it doesn't complain about missing attribute
813 while True:
814 msg = (yield stream.recv())
815 if not msg:
816 if _debug: print 'stream closed'
817 self.closehandler(stream)
818 break
819 # if _debug: msg
820 multitask.add(self.streamhandler(stream, msg))
821
822 def streamhandler(self, stream, message):
823 '''A generator to handle a single message on the stream.'''
824 try:
825 if message.type == Message.RPC:
826 cmd = Command.fromMessage(message)
827 if _debug: print 'streamhandler received cmd=', cmd
828 if cmd.name == 'publish':
829 yield self.publishhandler(stream, cmd)
830 elif cmd.name == 'play':
831 yield self.playhandler(stream, cmd)
832 elif cmd.name == 'closeStream':
833 self.closehandler(stream)
834 else: # audio or video message
835 yield self.mediahandler(stream, message)
836 except GeneratorExit: pass
837 except StopIteration: pass
838 except:
839 if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
840
841 def publishhandler(self, stream, cmd):
842 '''A new stream is published. Store the information in the application instance.'''
843 try:
844 stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
845 stream.name = cmd.args[0]
846 if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
847 inst = self.clients[stream.client.path][0]
848 if (stream.name in inst.publishers):
849 raise ValueError, 'Stream name already in use'
850 inst.publishers[stream.name] = stream # store the client for publisher
851 result = inst.onPublish(stream.client, stream)
852
853 if stream.mode == 'record' or stream.mode == 'append':
854 stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
855 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
856 yield stream.send(response)
857 except ValueError, E: # some error occurred. inform the app.
858 if _debug: print 'error in publishing stream', str(E)
859 response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
860 yield stream.send(response)
861
862 def playhandler(self, stream, cmd):
863 '''A new stream is being played. Just updated the players list with this stream.'''
864 inst = self.clients[stream.client.path][0]
865 name = stream.name = cmd.args[0] # store the stream's name
866 start = cmd.args[1] if len(cmd.args) >= 2 else -2
867 if name not in inst.players:
868 inst.players[name] = [] # initialize the players for this stream name
869 if stream not in inst.players[name]: # store the stream as players of this name
870 inst.players[name].append(stream)
871 path = getfilename(stream.client.path, stream.name, self.root)
872 if os.path.exists(path):
873 stream.playfile = FLV().open(path)
874 multitask.add(stream.playfile.reader(stream))
875 if _debug: print 'playing stream=', name, 'start=', start
876 result = inst.onPlay(stream.client, stream)
877 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
878 yield stream.send(response)
879
880 def mediahandler(self, stream, message):
881 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
882 if stream.client is not None:
883 inst = self.clients[stream.client.path][0]
884 result = inst.onPublishData(stream.client, stream, message)
885 if result:
886 client = stream.client
887 for s in (inst.players.get(stream.name, [])):
888 # if _debug: print 'D', stream.name, s.name
889 result = inst.onPlayData(s.client, s, message)
890 if result:
891 yield s.send(message)
892 if stream.recordfile is not None:
893 stream.recordfile.write(message)
894
895 # The main routine to start, run and stop the service
896 if __name__ == '__main__':
897 print "optparse"
898 from optparse import OptionParser
899 parser = OptionParser()
900 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
901 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
902 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
903 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
904 (options, args) = parser.parse_args()
905
906 _debug = options.verbose
907 try:
908 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
909 agent = HTTPServer()
910 agent.root = options.root
911 agent.start(options.host, options.port)
912 if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
913 multitask.run()
914 except KeyboardInterrupt:
915 print traceback.print_exc()
916 pass
917 except:
918 print "exception"
919 print traceback.print_exc()
920 if _debug: print time.asctime(), 'Flash Server Stops'