6dc90c4acaa36816e9dea7b24990bdec198152de
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4 This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows:
5 1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking.
6 It also uses the App abstract class to implement the applications.
7 2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class
8 derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains
9 various streams from the client, represented using the Stream class.
10 3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read
11 and write of FLV file format.
12
13
14 Typically an application can launch this server as follows:
15 $ python rtmp.py
16
17 To know the command line options use the -h option:
18 $ python rtmp.py -h
19
20 To start the server with a different directory for recording and playing FLV files from, use the following command.
21 $ python rtmp.py -r some-other-directory/
22 Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names.
23
24 A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch
25 from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to
26 local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with
27 same URL and first connect then play the same stream name. If everything works fine you should be able to see the video
28 from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing,
29 it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to
30 see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect),
31 your played video will start appearing after some initial delay.
32
33
34 If an application wants to use this module as a library, it can launch the server as follows:
35 >>> agent = FlashServer() # a new RTMP server instance
36 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
37 >>> agent.start() # start the server
38 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
39
40
41 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
42 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
43 the client connects to the server.
44
45 class MyApp(App): # a new MyApp extends the default App in rtmp module.
46 def __init__(self): # constructor just invokes base class constructor
47 App.__init__(self)
48 def onConnect(self, client, *args):
49 result = App.onConnect(self, client, *args) # invoke base class method first
50 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
51 client.call('connected', 'some-arg')
52 yield
53 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
54 ...
55 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
56
57 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
58 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
59 throw an exception and display the error message.
60
61 '''
62
63 import os, sys, time, struct, socket, traceback, multitask
64 import threading, Queue
65 import uuid
66 from string import strip
67
68 from BaseHTTPServer import BaseHTTPRequestHandler
69 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
70 from cStringIO import StringIO
71 from cookielib import parse_ns_headers, CookieJar
72 from Cookie import SimpleCookie
73
74 global _debug
75 _debug = False
76 def set_debug(dbg):
77 global _debug
78 _debug = dbg
79
80 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
81
82 def setup(self):
83 self.connection = self.request
84 self.rfile = StringIO()
85 self.wfile = StringIO()
86
87 def finish(self):
88 pass
89
90 def info(self):
91 return self.headers
92
93 def get_full_url(self):
94 return self.path
95
96 def get_header(self, hdr, default):
97 return self.headers.getheader(hdr, default)
98
99 def add_cookies(self):
100 for k, v in self.response_cookies.items():
101 val = v.OutputString()
102 self.send_header("Set-Cookie", val)
103
104 def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
105 ch = headers.getheaders(cookie_key)
106 print "messageReceived cookieheaders=", '; '.join(ch)
107 res = []
108 for c in ch:
109 c = c.split(";")
110 if len(c) == 0:
111 continue
112 c = map(strip, c)
113 c = filter(lambda x: x, c)
114 res += c
115 has_sess = False
116 response_cookies = SimpleCookie()
117 for c in res:
118 print "found cookie", repr(c)
119 name, value = c.split("=")
120 response_cookies[name] = value
121 #response_cookies[name]['path'] = "/"
122 #response_cookies[name]['domain'] = remote[0]
123 #response_cookies[name]['version'] = 0
124 if name == "session":
125 response_cookies[name]['expires'] = 50000
126 has_sess = True
127 if not add_sess:
128 return response_cookies
129 if not has_sess:
130 response_cookies['session'] = uuid.uuid4().hex
131 response_cookies['session']['expires'] = 50000
132 #response_cookies['session']['path'] = '/'
133 #response_cookies['session']['domain'] = remote[0]
134 #response_cookies['session']['version'] = 0
135 return response_cookies
136
137 class ConnectionClosed:
138 'raised when the client closed the connection'
139
140 class SockStream(object):
141 '''A class that represents a socket as a stream'''
142 def __init__(self, sock):
143 self.sock, self.buffer = sock, ''
144 self.bytesWritten = self.bytesRead = 0
145
146 def close(self):
147 self.sock.shutdown(1) # can't just close it.
148 self.sock = None
149
150 def readline(self):
151 try:
152 while True:
153 nl = self.buffer.find("\n")
154 if nl >= 0: # do not have newline in buffer
155 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
156 raise StopIteration(data)
157 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
158 if not data: raise ConnectionClosed
159 if _debug: print 'socket.read[%d] %r'%(len(data), data)
160 self.bytesRead += len(data)
161 self.buffer += data
162 except StopIteration: raise
163 except: raise ConnectionClosed # anything else is treated as connection closed.
164
165 def read(self, count):
166 try:
167 while True:
168 if len(self.buffer) >= count: # do not have data in buffer
169 data, self.buffer = self.buffer[:count], self.buffer[count:]
170 raise StopIteration(data)
171 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
172 if not data: raise ConnectionClosed
173 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
174 self.bytesRead += len(data)
175 self.buffer += data
176 except StopIteration: raise
177 except: raise ConnectionClosed # anything else is treated as connection closed.
178
179 def unread(self, data):
180 self.buffer = data + self.buffer
181
182 def write(self, data):
183 while len(data) > 0: # write in 4K chunks each time
184 chunk, data = data[:4096], data[4096:]
185 self.bytesWritten += len(chunk)
186 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
187 try: yield multitask.send(self.sock, chunk)
188 except: raise ConnectionClosed
189 if _debug: print 'socket.written'
190
191
192 class Header(object):
193 FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0
194
195 def __init__(self, channel=0, time=0, size=None, type=None, streamId=0):
196 self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId
197 self.extendedtime = 0
198 if channel<64: self.hdrdata = chr(channel)
199 elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64)
200 else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256)
201
202 def _appendExtendedTimestamp(self, data):
203 if self.time == 0xFFFFFF:
204 data += struct.pack('>I', self.extendedtime)
205 return data
206
207 def toBytes(self, control):
208 data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:]
209 if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data)
210
211 data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time
212 if control == Header.TIME: return self._appendExtendedTimestamp(data)
213
214 data += struct.pack('>I', self.size)[1:] # size
215 data += chr(self.type) # type
216 if control == Header.MESSAGE: return self._appendExtendedTimestamp(data)
217
218 data += struct.pack('<I', self.streamId) # add streamId
219 return self._appendExtendedTimestamp(data)
220
221 def __repr__(self):
222 return ("<Header channel=%r time=%r size=%r type=%r (0x%02x) streamId=%r>"
223 % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId))
224
225 class Message(object):
226 # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3
227 RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \
228 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01
229
230 def __init__(self, hdr=None, data=''):
231 if hdr is None: hdr = Header()
232 self.header, self.data = hdr, data
233
234 # define properties type, streamId and time to access self.header.(property)
235 for p in ['type', 'streamId', 'time']:
236 exec 'def _g%s(self): return self.header.%s'%(p, p)
237 exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p)
238 exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p)
239 @property
240 def size(self): return len(self.data)
241
242 def __repr__(self):
243 return ("<Message header=%r data=%r>"% (self.header, self.data))
244
245 class Protocol(object):
246 # constants
247 PING_SIZE = 1536
248 DEFAULT_CHUNK_SIZE = 128
249 MIN_CHANNEL_ID = 3
250 PROTOCOL_CHANNEL_ID = 2
251
252 def __init__(self, sock):
253 self.stream = SockStream(sock)
254 self.lastReadHeaders = dict() # indexed by channelId
255 self.incompletePackets = dict() #indexed by channelId
256 self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE
257 self.lastWriteHeaders = dict() # indexed by streamId
258 self.nextChannelId = Protocol.MIN_CHANNEL_ID
259 self.writeLock = threading.Lock()
260 self.writeQueue = Queue.Queue()
261
262 def messageReceived(self, msg):
263 yield
264
265 def protocolMessage(self, msg):
266 if msg.type == Message.ACK: # respond to ACK requests
267 response = Message()
268 response.type, response.data = msg.type, msg.data
269 self.writeMessage(response)
270 elif msg.type == Message.CHUNK_SIZE:
271 self.readChunkSize = struct.unpack('>L', msg.data)[0]
272
273 def connectionClosed(self):
274 yield
275
276 def parse(self):
277 try:
278 yield self.parseRequests() # parse http requests
279 except ConnectionClosed:
280 yield self.connectionClosed()
281 if _debug: print 'parse connection closed'
282
283 def writeMessage(self, message):
284 self.writeQueue.put(message)
285
286 def parseHandshake(self):
287 '''Parses the rtmp handshake'''
288 data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping
289 yield self.stream.write(data)
290 data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping
291 yield self.stream.write(data)
292
293 def parseRequests(self):
294 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
295 self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
296 self.hr.close_connection = 1
297 self.cookies = CookieJar()
298
299 while True:
300
301 # prepare reading the request so that when it's handed
302 # over to "standard" HTTPRequestHandler, the data's already
303 # there.
304 print "parseRequests"
305 raw_requestline = (yield self.stream.readline())
306 if _debug: print "parseRequests, line", raw_requestline
307 if not raw_requestline:
308 raise ConnectionClosed
309 data = ''
310 try:
311 while 1:
312 line = (yield self.stream.readline())
313 data += line
314 if line in ['\n', '\r\n']:
315 break
316 except StopIteration:
317 if _debug: print "parseRequests, stopiter"
318 raise
319 except:
320 if _debug:
321 print 'parseRequests', \
322 (traceback and traceback.print_exc() or None)
323
324 raise ConnectionClosed
325
326 self.hr.raw_requestline = raw_requestline
327 pos = self.hr.rfile.tell()
328 #self.hr.rfile.truncate(0)
329 self.hr.rfile.write(data)
330 print "parseRequests write after"
331 self.hr.rfile.seek(pos)
332 print "parseRequests seek after"
333
334 if not self.hr.parse_request():
335 raise ConnectionClosed
336 print "parseRequests parse_req after"
337 print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
338 try:
339 yield self.messageReceived(self.hr)
340 except:
341 if _debug:
342 print 'messageReceived', \
343 (traceback and traceback.print_exc() or None)
344
345 def write(self):
346 '''Writes messages to stream'''
347 while True:
348 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
349 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
350 if _debug: print "write to stream", repr(data)
351 if data is None:
352 # just in case TCP socket is not closed, close it.
353 try:
354 print "stream closing"
355 print self.stream
356 yield self.stream.close()
357 print "stream closed"
358 except: pass
359 break
360
361 try:
362 yield self.stream.write(data)
363 except ConnectionClosed:
364 yield self.connectionClosed()
365 except:
366 print traceback.print_exc()
367
368 class Command(object):
369 ''' Class for command / data messages'''
370 def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]):
371 '''Create a new command with given type, name, id, cmdData and args list.'''
372 self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:]
373
374 def __repr__(self):
375 return ("<Command type=%r name=%r id=%r data=%r args=%r>" % (self.type, self.name, self.id, self.cmdData, self.args))
376
377 def setArg(self, arg):
378 self.args.append(arg)
379
380 def getArg(self, index):
381 return self.args[index]
382
383 @classmethod
384 def fromMessage(cls, message):
385 ''' initialize from a parsed RTMP message'''
386 assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3])
387
388 length = len(message.data)
389 if length == 0: raise ValueError('zero length message data')
390
391 if message.type == Message.RPC3 or message.type == Message.DATA3:
392 assert message.data[0] == '\x00' # must be 0 in AMD3
393 data = message.data[1:]
394 else:
395 data = message.data
396
397 amfReader = amf.AMF0(data)
398
399 inst = cls()
400 inst.name = amfReader.read() # first field is command name
401
402 try:
403 if message.type == Message.RPC:
404 inst.id = amfReader.read() # second field *may* be message id
405 inst.cmdData = amfReader.read() # third is command data
406 else:
407 inst.id = 0
408 inst.args = [] # others are optional
409 while True:
410 inst.args.append(amfReader.read())
411 except EOFError:
412 pass
413 return inst
414
415 def toMessage(self):
416 msg = Message()
417 assert self.type
418 msg.type = self.type
419 output = amf.BytesIO()
420 amfWriter = amf.AMF0(output)
421 amfWriter.write(self.name)
422 if msg.type == Message.RPC or msg.type == Message.RPC3:
423 amfWriter.write(self.id)
424 amfWriter.write(self.cmdData)
425 for arg in self.args:
426 amfWriter.write(arg)
427 output.seek(0)
428 #hexdump.hexdump(output)
429 #output.seek(0)
430 if msg.type == Message.RPC3 or msg.type == Message.DATA3:
431 data = '\x00' + output.read()
432 else:
433 data = output.read()
434 msg.data = data
435 output.close()
436 return msg
437
438 def getfilename(path, name, root):
439 '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is
440 the the path present in the path variable.'''
441 ignore, ignore, scope = path.partition('/')
442 if scope: scope = scope + '/'
443 result = root + scope + name + '.flv'
444 if _debug: print 'filename=', result
445 return result
446
447 class Stream(object):
448 '''The stream object that is used for RTMP stream.'''
449 count = 0;
450 def __init__(self, client):
451 self.client, self.id, self.name = client, 0, ''
452 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
453 self.queue = multitask.Queue()
454 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
455 if _debug: print self, 'created'
456
457 def close(self):
458 if _debug: print self, 'closing'
459 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
460 if self.playfile is not None: self.playfile.close(); self.playfile = None
461 self.client = None # to clear the reference
462 pass
463
464 def __repr__(self):
465 return self._name;
466
467 def recv(self):
468 '''Generator to receive new Message on this stream, or None if stream is closed.'''
469 return self.queue.get()
470
471 def send(self, msg):
472 '''Method to send a Message or Command on this stream.'''
473 if isinstance(msg, Command):
474 msg = msg.toMessage()
475 msg.streamId = self.id
476 # if _debug: print self,'send'
477 if self.client is not None: self.client.writeMessage(msg)
478
479 class Client(Protocol):
480 '''The client object represents a single connected client to the server.'''
481 def __init__(self, sock, server, remote):
482 Protocol.__init__(self, sock)
483 self.server = server
484 self.remote = remote
485 self.agent = {}
486 self.streams = {}
487 self._nextCallId = 2
488 self._nextStreamId = 1
489 self.objectEncoding = 0.0
490 self.queue = multitask.Queue() # receive queue used by application
491 multitask.add(self.parse())
492 multitask.add(self.write())
493
494 def recv(self):
495 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
496 return self.queue.get()
497
498 def connectionClosed(self):
499 '''Called when the client drops the connection'''
500 if _debug: 'Client.connectionClosed'
501 self.writeMessage(None)
502 yield self.queue.put((None,None))
503
504 def messageReceived(self, msg):
505 if _debug: print 'messageReceived cmd=', msg.command, msg.path
506 msg.response_cookies = process_cookies(msg.headers, self.remote)
507
508 # slightly bad, this: read everything, put it into memory...
509 if msg.headers.has_key('content-length'):
510 max_chunk_size = 10*1024*1024
511 size_remaining = int(msg.headers["content-length"])
512 L = []
513 while size_remaining:
514 chunk_size = min(size_remaining, max_chunk_size)
515 data = (yield self.stream.read(chunk_size))
516 L.append(data)
517 size_remaining -= len(L[-1])
518
519 pos = msg.rfile.tell()
520 msg.rfile.write(''.join(L))
521 msg.rfile.seek(pos)
522
523 yield self.server.queue.put((self, msg)) # new connection
524
525 def accept(self):
526 '''Method to accept an incoming client.'''
527 response = Command()
528 response.id, response.name, response.type = 1, '_result', Message.RPC
529 if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding
530 response.setArg(dict(level='status', code='NetConnection.Connect.Success',
531 description='Connection succeeded.', details=None,
532 objectEncoding=self.objectEncoding))
533 self.writeMessage(response.toMessage())
534
535 def rejectConnection(self, reason=''):
536 '''Method to reject an incoming client.'''
537 response = Command()
538 response.id, response.name, response.type = 1, '_error', Message.RPC
539 response.setArg(dict(level='status', code='NetConnection.Connect.Rejected',
540 description=reason, details=None))
541 self.writeMessage(response.toMessage())
542
543 def call(self, method, *args):
544 '''Call a (callback) method on the client.'''
545 cmd = Command()
546 cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
547 cmd.args, cmd.cmdData = args, None
548 self._nextCallId += 1
549 if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage()
550 self.writeMessage(cmd.toMessage())
551
552 def createStream(self):
553 ''' Create a stream on the server side'''
554 stream = Stream(self)
555 stream.id = self._nextStreamId
556 self.streams[stream.id] = stream
557 self._nextStreamId += 1
558 return stream
559
560
561 class Server(object):
562 '''A RTMP server listens for incoming connections and informs the app.'''
563 def __init__(self, sock):
564 '''Create an RTMP server on the given bound TCP socket. The server will terminate
565 when the socket is disconnected, or some other error occurs in listening.'''
566 self.sock = sock
567 self.queue = multitask.Queue() # queue to receive incoming client connections
568 multitask.add(self.run())
569
570 def recv(self):
571 '''Generator to wait for incoming client connections on this server and return
572 (client, args) or (None, None) if the socket is closed or some error.'''
573 return self.queue.get()
574
575 def run(self):
576 try:
577 while True:
578 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
579 if sock == None:
580 if _debug: print 'rtmp.Server accept(sock) returned None.'
581 break
582 if _debug: print 'connection received from', remote
583 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
584 client = Client(sock, self, remote)
585 except:
586 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
587 traceback.print_exc()
588
589 if (self.sock):
590 try: self.sock.close(); self.sock = None
591 except: pass
592 if (self.queue):
593 yield self.queue.put((None, None))
594 self.queue = None
595
596 class BaseApp(object):
597 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
598 count = 0
599 def __init__(self):
600 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
601 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
602 if _debug: print self.name, 'created'
603 def __del__(self):
604 if _debug: print self.name, 'destroyed'
605 @property
606 def clients(self):
607 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
608 return self._clients[1:] if self._clients is not None else []
609
610 class App(BaseApp, SimpleAppHTTPRequestHandler):
611 pass
612
613 class HTTPServer(object):
614 '''A RTMP server to record and stream Flash video.'''
615 def __init__(self):
616 '''Construct a new HttpServer. It initializes the local members.'''
617 self.sock = self.server = None
618 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
619 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
620 self.root = ''
621
622 def start(self, host='0.0.0.0', port=8080):
623 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
624 if _debug: print 'start', host, port
625 if not self.server:
626 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
627 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
628 sock.bind((host, port))
629 if _debug: print 'listening on ', sock.getsockname()
630 sock.listen(5)
631 server = self.server = Server(sock) # start rtmp server on that socket
632 multitask.add(self.serverlistener())
633
634 def stop(self):
635 if _debug: print 'stopping Flash server'
636 if self.server and self.sock:
637 try: self.sock.close(); self.sock = None
638 except: pass
639 self.server = None
640
641 def serverlistener(self):
642 '''Server listener (generator). It accepts all connections and invokes client listener'''
643 try:
644 while True: # main loop to receive new connections on the server
645 client, msg = (yield self.server.recv()) # receive an incoming client connection.
646 # TODO: we should reject non-localhost client connections.
647 if not client: # if the server aborted abnormally,
648 break # hence close the listener.
649 if _debug: print 'client connection received', client, msg
650 # if client.objectEncoding != 0 and client.objectEncoding != 3:
651 if client.objectEncoding != 0:
652 yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
653 yield client.connectionClosed()
654 else:
655 print "cookies", str(msg.response_cookies)
656 session = msg.response_cookies['session'].value
657 name = msg.path
658 print "serverlistener", name
659 if '*' not in self.apps and name not in self.apps:
660 yield client.rejectConnection(reason='Application not found: ' + name)
661 else: # create application instance as needed and add in our list
662 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
663 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
664 print "clients", self.clients.keys()
665 if session in self.clients:
666 inst = self.clients[session][0]
667 else:
668 inst = app()
669 msg.server = inst # whew! just in time!
670 try:
671 methodname = "on%s" % msg.command
672 print methodname, dir(inst)
673 method = getattr(inst, methodname, None)
674 result = method(client, msg)
675 close_connection = msg.close_connection
676 print "close connection", close_connection
677 except:
678 if _debug: traceback.print_exc()
679 yield client.rejectConnection(reason='Exception on %s' % methodname)
680 continue
681 if result is True or result is None:
682 if session not in self.clients:
683 self.clients[session] = [inst]; inst._clients=self.clients[session]
684 if result is None:
685 msg.wfile.seek(0)
686 data = msg.wfile.read()
687 msg.wfile.seek(0)
688 msg.wfile.truncate()
689 yield client.writeMessage(data)
690 if close_connection:
691 if _debug:
692 print 'close_connection requested'
693 try:
694 yield client.connectionClosed()
695 except ConnectionClosed:
696 if _debug:
697 print 'close_connection done'
698 pass
699 else:
700 print "result", result
701 yield client.rejectConnection(reason='Rejected in onConnect')
702 except StopIteration: raise
703 except:
704 if _debug:
705 print 'serverlistener exception', \
706 (sys and sys.exc_info() or None)
707 traceback.print_exc()
708
709 def clientlistener(self, client):
710 '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.'''
711 try:
712 while True:
713 msg, arg = (yield client.recv()) # receive new message from client
714 if not msg: # if the client disconnected,
715 if _debug: print 'connection closed from client'
716 break # come out of listening loop.
717 if msg == 'command': # handle a new command
718 multitask.add(self.clienthandler(client, arg))
719 elif msg == 'stream': # a new stream is created, handle the stream.
720 arg.client = client
721 multitask.add(self.streamlistener(arg))
722 except StopIteration: raise
723 except:
724 if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None)
725 traceback.print_exc()
726
727 # client is disconnected, clear our state for application instance.
728 if _debug: print 'cleaning up client', client.path
729 inst = None
730 if client.path in self.clients:
731 inst = self.clients[client.path][0]
732 self.clients[client.path].remove(client)
733 for stream in client.streams.values(): # for all streams of this client
734 self.closehandler(stream)
735 client.streams.clear() # and clear the collection of streams
736 if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance.
737 if _debug: print 'removing the application instance'
738 inst = self.clients[client.path][0]
739 inst._clients = None
740 del self.clients[client.path]
741 if inst is not None: inst.onDisconnect(client)
742
743 def closehandler(self, stream):
744 '''A stream is closed explicitly when a closeStream command is received from given client.'''
745 if stream.client is not None:
746 inst = self.clients[stream.client.path][0]
747 if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream
748 inst.onClose(stream.client, stream)
749 del inst.publishers[stream.name]
750 if stream.name in inst.players and stream in inst.players[stream.name]:
751 inst.onStop(stream.client, stream)
752 inst.players[stream.name].remove(stream)
753 if len(inst.players[stream.name]) == 0:
754 del inst.players[stream.name]
755 stream.close()
756
757 def clienthandler(self, client, cmd):
758 '''A generator to handle a single command on the client.'''
759 inst = self.clients[client.path][0]
760 if inst:
761 if cmd.name == '_error':
762 if hasattr(inst, 'onStatus'):
763 result = inst.onStatus(client, cmd.args[0])
764 elif cmd.name == '_result':
765 if hasattr(inst, 'onResult'):
766 result = inst.onResult(client, cmd.args[0])
767 else:
768 res, code, args = Command(), '_result', dict()
769 try: result = inst.onCommand(client, cmd.name, *cmd.args)
770 except:
771 if _debug: print 'Client.call exception', (sys and sys.exc_info() or None)
772 code, args = '_error', dict()
773 res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3)
774 res.args, res.cmdData = args, None
775 if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage()
776 client.writeMessage(res.toMessage())
777 # TODO return result to caller
778 yield
779
780 def streamlistener(self, stream):
781 '''Stream listener (generator). It receives stream message and invokes streamhandler.'''
782 stream.recordfile = None # so that it doesn't complain about missing attribute
783 while True:
784 msg = (yield stream.recv())
785 if not msg:
786 if _debug: print 'stream closed'
787 self.closehandler(stream)
788 break
789 # if _debug: msg
790 multitask.add(self.streamhandler(stream, msg))
791
792 def streamhandler(self, stream, message):
793 '''A generator to handle a single message on the stream.'''
794 try:
795 if message.type == Message.RPC:
796 cmd = Command.fromMessage(message)
797 if _debug: print 'streamhandler received cmd=', cmd
798 if cmd.name == 'publish':
799 yield self.publishhandler(stream, cmd)
800 elif cmd.name == 'play':
801 yield self.playhandler(stream, cmd)
802 elif cmd.name == 'closeStream':
803 self.closehandler(stream)
804 else: # audio or video message
805 yield self.mediahandler(stream, message)
806 except GeneratorExit: pass
807 except StopIteration: pass
808 except:
809 if _debug: print 'exception in streamhandler', (sys and sys.exc_info())
810
811 def publishhandler(self, stream, cmd):
812 '''A new stream is published. Store the information in the application instance.'''
813 try:
814 stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append
815 stream.name = cmd.args[0]
816 if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode
817 inst = self.clients[stream.client.path][0]
818 if (stream.name in inst.publishers):
819 raise ValueError, 'Stream name already in use'
820 inst.publishers[stream.name] = stream # store the client for publisher
821 result = inst.onPublish(stream.client, stream)
822
823 if stream.mode == 'record' or stream.mode == 'append':
824 stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode)
825 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)])
826 yield stream.send(response)
827 except ValueError, E: # some error occurred. inform the app.
828 if _debug: print 'error in publishing stream', str(E)
829 response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)])
830 yield stream.send(response)
831
832 def playhandler(self, stream, cmd):
833 '''A new stream is being played. Just updated the players list with this stream.'''
834 inst = self.clients[stream.client.path][0]
835 name = stream.name = cmd.args[0] # store the stream's name
836 start = cmd.args[1] if len(cmd.args) >= 2 else -2
837 if name not in inst.players:
838 inst.players[name] = [] # initialize the players for this stream name
839 if stream not in inst.players[name]: # store the stream as players of this name
840 inst.players[name].append(stream)
841 path = getfilename(stream.client.path, stream.name, self.root)
842 if os.path.exists(path):
843 stream.playfile = FLV().open(path)
844 multitask.add(stream.playfile.reader(stream))
845 if _debug: print 'playing stream=', name, 'start=', start
846 result = inst.onPlay(stream.client, stream)
847 response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)])
848 yield stream.send(response)
849
850 def mediahandler(self, stream, message):
851 '''Handle incoming media on the stream, by sending to other stream in this application instance.'''
852 if stream.client is not None:
853 inst = self.clients[stream.client.path][0]
854 result = inst.onPublishData(stream.client, stream, message)
855 if result:
856 client = stream.client
857 for s in (inst.players.get(stream.name, [])):
858 # if _debug: print 'D', stream.name, s.name
859 result = inst.onPlayData(s.client, s, message)
860 if result:
861 yield s.send(message)
862 if stream.recordfile is not None:
863 stream.recordfile.write(message)
864
865 # The main routine to start, run and stop the service
866 if __name__ == '__main__':
867 print "optparse"
868 from optparse import OptionParser
869 parser = OptionParser()
870 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
871 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
872 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
873 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
874 (options, args) = parser.parse_args()
875
876 _debug = options.verbose
877 try:
878 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
879 agent = HTTPServer()
880 agent.root = options.root
881 agent.start(options.host, options.port)
882 if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
883 multitask.run()
884 except KeyboardInterrupt:
885 print traceback.print_exc()
886 pass
887 except:
888 print "exception"
889 print traceback.print_exc()
890 if _debug: print time.asctime(), 'Flash Server Stops'