1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
6 If an application wants to use this module as a library, it can launch the server as follows:
7 >>> agent = HTTPServer() # a new RTMP server instance
8 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
9 >>> agent.start() # start the server
10 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
13 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
14 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
15 the client connects to the server.
17 class MyApp(App): # a new MyApp extends the default App in rtmp module.
18 def __init__(self): # constructor just invokes base class constructor
20 def onConnect(self, client, *args):
21 result = App.onConnect(self, client, *args) # invoke base class method first
22 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
23 client.call('connected', 'some-arg')
25 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
27 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
29 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
30 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
31 throw an exception and display the error message.
35 import os
, sys
, time
, struct
, socket
, traceback
, multitask
36 import threading
, Queue
39 from string
import strip
41 from BaseHTTPServer
import BaseHTTPRequestHandler
42 from SimpleAppHTTPServer
import SimpleAppHTTPRequestHandler
43 from cStringIO
import StringIO
44 from cookielib
import parse_ns_headers
, CookieJar
45 from Cookie
import SimpleCookie
53 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler
):
56 self
.connection
= self
.request
57 self
.rfile
= StringIO()
58 self
.wfile
= StringIO()
66 def get_full_url(self
):
69 def get_header(self
, hdr
, default
):
70 return self
.headers
.getheader(hdr
, default
)
72 def add_cookies(self
):
73 for k
, v
in self
.response_cookies
.items():
74 val
= v
.OutputString()
75 self
.send_header("Set-Cookie", val
)
78 def process_cookies(headers
, remote
, cookie_key
="Cookie", add_sess
=True):
79 ch
= headers
.getheaders(cookie_key
)
81 print "messageReceived cookieheaders=", '; '.join(ch
)
88 c
= filter(lambda x
: x
, c
)
91 response_cookies
= SimpleCookie()
94 print "found cookie", repr(c
)
95 name
, value
= c
.split("=")
96 response_cookies
[name
] = value
97 #response_cookies[name]['path'] = "/"
98 #response_cookies[name]['domain'] = remote[0]
99 #response_cookies[name]['version'] = 0
100 if name
== "session":
101 response_cookies
[name
]['expires'] = 50000
104 return response_cookies
106 response_cookies
['session'] = uuid
.uuid4().hex
107 response_cookies
['session']['expires'] = 50000
108 #response_cookies['session']['path'] = '/'
109 #response_cookies['session']['domain'] = remote[0]
110 #response_cookies['session']['version'] = 0
111 return response_cookies
114 class ConnectionClosed
:
115 'raised when the client closed the connection'
118 class SockStream(object):
119 '''A class that represents a socket as a stream'''
120 def __init__(self
, sock
):
121 self
.sock
, self
.buffer = sock
, ''
122 self
.bytesWritten
= self
.bytesRead
= 0
125 self
.sock
.shutdown(1) # can't just close it.
131 nl
= self
.buffer.find("\n")
132 if nl
>= 0: # do not have newline in buffer
133 data
, self
.buffer = self
.buffer[:nl
+1], self
.buffer[nl
+1:]
134 raise StopIteration(data
)
135 data
= (yield multitask
.recv(self
.sock
, 4096)) # read more from socket
136 if not data
: raise ConnectionClosed
137 if _debug
: print 'socket.read[%d] %r'%(len(data
), data
)
138 self
.bytesRead
+= len(data
)
140 except StopIteration: raise
141 except: raise ConnectionClosed
# anything else is treated as connection closed.
143 def read(self
, count
):
146 if len(self
.buffer) >= count
: # do not have data in buffer
147 data
, self
.buffer = self
.buffer[:count
], self
.buffer[count
:]
148 raise StopIteration(data
)
149 data
= (yield multitask
.recv(self
.sock
, 4096)) # read more from socket
150 if not data
: raise ConnectionClosed
151 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
152 self
.bytesRead
+= len(data
)
154 except StopIteration: raise
155 except: raise ConnectionClosed
# anything else is treated as connection closed.
157 def unread(self
, data
):
158 self
.buffer = data
+ self
.buffer
160 def write(self
, data
):
161 while len(data
) > 0: # write in 4K chunks each time
162 chunk
, data
= data
[:4096], data
[4096:]
163 self
.bytesWritten
+= len(chunk
)
164 if _debug
: print 'socket.write[%d] %r'%(len(chunk
), chunk
)
165 try: yield multitask
.send(self
.sock
, chunk
)
166 except: raise ConnectionClosed
167 if _debug
: print 'socket.written'
170 class Protocol(object):
172 def __init__(self
, sock
):
173 self
.stream
= SockStream(sock
)
174 self
.writeLock
= threading
.Lock()
175 self
.writeQueue
= Queue
.Queue()
177 def messageReceived(self
, msg
):
180 def connectionClosed(self
):
185 yield self
.parseRequests() # parse http requests
186 except ConnectionClosed
:
187 #yield self.connectionClosed()
188 if _debug
: print 'parse connection closed'
189 #yield self.server.queue.put((self, None)) # close connection
191 def removeConnection(self
):
192 yield self
.server
.queue
.put((self
, None)) # close connection
194 def writeMessage(self
, message
):
196 yield self
.stream
.write(message
)
197 except ConnectionClosed
:
198 yield self
.connectionClosed()
200 print traceback
.print_exc()
201 #self.writeQueue.put(message)
203 def parseRequests(self
):
204 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
207 print "parseRequests start", repr(self
)
208 self
.hr
= MultitaskHTTPRequestHandler(self
.stream
, self
.remote
, None)
209 self
.hr
.close_connection
= 1
210 self
.cookies
= CookieJar()
214 # prepare reading the request so that when it's handed
215 # over to "standard" HTTPRequestHandler, the data's already
218 print "parseRequests"
220 readok
= (yield multitask
.readable(self
.stream
.sock
, 5000))
222 print "select error: connection closed"
223 raise ConnectionClosed
226 print "readok", readok
228 raw_requestline
= (yield self
.stream
.readline())
229 if _debug
: print "parseRequests, line", raw_requestline
230 if not raw_requestline
:
231 raise ConnectionClosed
235 line
= (yield self
.stream
.readline())
237 if line
in ['\n', '\r\n']:
239 except StopIteration:
240 if _debug
: print "parseRequests, stopiter"
244 print 'parseRequests', \
245 (traceback
and traceback
.print_exc() or None)
247 raise ConnectionClosed
249 self
.hr
.raw_requestline
= raw_requestline
250 #pos = self.hr.rfile.tell()
252 self
.hr
.rfile
.truncate(0)
253 self
.hr
.rfile
.write(data
)
255 print "parseRequests write after"
256 self
.hr
.rfile
.seek(pos
)
258 print "parseRequests seek after"
260 if not self
.hr
.parse_request():
261 raise ConnectionClosed
263 print "parseRequests parse_req after"
264 print "parseRequest headers", repr(self
.hr
.headers
), str(self
.hr
.headers
)
266 yield self
.messageReceived(self
.hr
)
267 except ConnectionClosed
:
269 print 'parseRequests, ConnectionClosed '
274 print 'messageReceived', \
275 (traceback
and traceback
.print_exc() or None)
278 '''Writes messages to stream'''
279 print "starting protocol write loop", repr(self
)
281 while self
.writeQueue
.empty(): (yield multitask
.sleep(0.01))
282 data
= self
.writeQueue
.get() # TODO this should be used using multitask.Queue and remove previous wait.
283 if _debug
: print "write to stream", repr(data
)
285 # just in case TCP socket is not closed, close it.
288 print "stream closing"
290 yield self
.stream
.close()
292 print "stream closed"
297 yield self
.stream
.write(data
)
298 except ConnectionClosed
:
299 yield self
.connectionClosed()
301 print traceback
.print_exc()
302 print "ending protocol write loop", repr(self
)
305 class Stream(object):
306 '''The stream object that is used for RTMP stream.'''
308 def __init__(self
, client
):
309 self
.client
, self
.id, self
.name
= client
, 0, ''
310 self
.recordfile
= self
.playfile
= None # so that it doesn't complain about missing attribute
311 self
.queue
= multitask
.Queue()
312 self
._name
= 'Stream[' + str(Stream
.count
) + ']'; Stream
.count
+= 1
313 if _debug
: print self
, 'created'
316 if _debug
: print self
, 'closing'
317 if self
.recordfile
is not None: self
.recordfile
.close(); self
.recordfile
= None
318 if self
.playfile
is not None: self
.playfile
.close(); self
.playfile
= None
319 self
.client
= None # to clear the reference
326 '''Generator to receive new Message on this stream, or None if stream is closed.'''
327 return self
.queue
.get()
330 class Client(Protocol
):
331 '''The client object represents a single connected client to the server.'''
332 def __init__(self
, sock
, server
, remote
):
333 Protocol
.__init
__(self
, sock
)
337 self
.queue
= multitask
.Queue() # receive queue used by application
338 multitask
.add(self
.parse())
339 #multitask.add(self.write())
342 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
343 return self
.queue
.get()
345 def connectionClosed(self
):
346 '''Called when the client drops the connection'''
347 if _debug
: 'Client.connectionClosed'
348 #self.writeMessage(None)
349 #yield self.queue.put((None,None))
351 yield self
.stream
.close()
355 def messageReceived(self
, msg
):
356 if _debug
: print 'messageReceived cmd=', msg
.command
, msg
.path
357 msg
.response_cookies
= process_cookies(msg
.headers
, self
.remote
)
359 # slightly bad, this: read everything, put it into memory,
360 # but it helps to jump-start the project and enables use of
361 # standard http://python.org BaseHTTPRequestHandler.
362 # modification of mimetools.Message (actually rfc822) is
363 # otherwise required.
364 if msg
.headers
.has_key('content-length'):
365 max_chunk_size
= 10*1024*1024
366 size_remaining
= int(msg
.headers
["content-length"])
368 while size_remaining
:
369 chunk_size
= min(size_remaining
, max_chunk_size
)
370 data
= (yield self
.stream
.read(chunk_size
))
372 size_remaining
-= len(L
[-1])
374 pos
= msg
.rfile
.tell()
375 msg
.rfile
.write(''.join(L
))
378 yield self
.server
.queue
.put((self
, msg
)) # new connection
380 def rejectConnection(self
, reason
=''):
381 '''Method to reject an incoming client. just close.
382 TODO: report back an HTTP error with "reason" in it.
384 self
.removeConnection()
387 class Server(object):
388 '''A RTMP server listens for incoming connections and informs the app.'''
389 def __init__(self
, sock
):
390 '''Create an RTMP server on the given bound TCP socket. The server will terminate
391 when the socket is disconnected, or some other error occurs in listening.'''
393 self
.queue
= multitask
.Queue() # queue to receive incoming client connections
394 multitask
.add(self
.run())
397 '''Generator to wait for incoming client connections on this server and return
398 (client, args) or (None, None) if the socket is closed or some error.'''
399 return self
.queue
.get()
404 sock
, remote
= (yield multitask
.accept(self
.sock
)) # receive client TCP
406 if _debug
: print 'rtmp.Server accept(sock) returned None.'
408 if _debug
: print 'connection received from', remote
409 sock
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1) # make it non-block
410 client
= Client(sock
, self
, remote
)
412 if _debug
: print 'rtmp.Server exception ', (sys
and sys
.exc_info() or None)
413 traceback
.print_exc()
416 try: self
.sock
.close(); self
.sock
= None
419 yield self
.queue
.put((None, None))
422 class BaseApp(object):
423 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
426 self
.name
= str(self
.__class
__.__name
__) + '[' + str(App
.count
) + ']'; App
.count
+= 1
427 self
.players
, self
.publishers
, self
._clients
= {}, {}, [] # Streams indexed by stream name, and list of clients
428 if _debug
: print self
.name
, 'created'
430 if _debug
: print self
.name
, 'destroyed'
433 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
434 return self
._clients
[1:] if self
._clients
is not None else []
437 class App(BaseApp
, SimpleAppHTTPRequestHandler
):
441 class HTTPServer(object):
442 '''A RTMP server to record and stream HTTP.'''
444 '''Construct a new HttpServer. It initializes the local members.'''
445 self
.sock
= self
.server
= None
446 self
.apps
= dict({'*': App
}) # supported applications: * means any as in {'*': App}
447 self
.clients
= dict() # list of clients indexed by scope. First item in list is app instance.
450 def start(self
, host
='0.0.0.0', port
=8080):
451 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
452 if _debug
: print 'start', host
, port
454 sock
= self
.sock
= socket
.socket(type=socket
.SOCK_STREAM
)
455 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
456 sock
.bind((host
, port
))
457 if _debug
: print 'listening on ', sock
.getsockname()
459 server
= self
.server
= Server(sock
) # start rtmp server on that socket
460 multitask
.add(self
.serverlistener())
463 if _debug
: print 'stopping HTTP server'
464 if self
.server
and self
.sock
:
465 try: self
.sock
.close(); self
.sock
= None
469 def serverlistener(self
):
470 '''Server listener (generator). It accepts all connections and invokes client listener'''
472 while True: # main loop to receive new connections on the server
473 client
, msg
= (yield self
.server
.recv()) # receive an incoming client connection.
474 # TODO: we should reject non-localhost client connections.
475 if not client
: # if the server aborted abnormally,
476 break # hence close the listener.
477 if _debug
: print 'client connection received', client
, msg
480 yield client
.connectionClosed()
481 session
= client
.session
482 del self
.clients
[session
]
486 print "cookies", str(msg
.response_cookies
)
487 session
= msg
.response_cookies
['session'].value
488 client
.session
= session
491 print "serverlistener", name
492 if '*' not in self
.apps
and name
not in self
.apps
:
493 yield client
.rejectConnection(reason
='Application not found: ' + name
)
494 else: # create application instance as needed and add in our list
495 if _debug
: print 'name=', name
, 'name in apps', str(name
in self
.apps
)
496 app
= self
.apps
[name
] if name
in self
.apps
else self
.apps
['*'] # application class
498 print "clients", self
.clients
.keys()
499 if session
in self
.clients
:
500 inst
= self
.clients
[session
][0]
503 self
.clients
[session
] = [inst
]; inst
._clients
=self
.clients
[session
]
504 msg
.server
= inst
# whew! just in time!
506 methodname
= "on%s" % msg
.command
508 print methodname
, dir(inst
)
509 method
= getattr(inst
, methodname
, None)
510 result
= method(client
, msg
)
511 close_connection
= msg
.close_connection
513 print "close connection", close_connection
515 if _debug
: traceback
.print_exc()
516 yield client
.rejectConnection(reason
='Exception on %s' % methodname
)
518 if result
is True or result
is None:
521 data
= msg
.wfile
.read()
524 yield client
.writeMessage(data
)
527 print 'close_connection requested'
529 yield client
.connectionClosed()
530 raise ConnectionClosed
531 except ConnectionClosed
:
533 print 'close_connection done'
537 print "result", result
538 yield client
.rejectConnection(reason
='Rejected in onConnect')
539 except StopIteration: raise
542 print 'serverlistener exception', \
543 (sys
and sys
.exc_info() or None)
544 traceback
.print_exc()
547 # The main routine to start, run and stop the service
548 if __name__
== '__main__':
550 from optparse
import OptionParser
551 parser
= OptionParser()
552 parser
.add_option('-i', '--host', dest
='host', default
='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
553 parser
.add_option('-p', '--port', dest
='port', default
=8080, type="int", help='listening port number. Default 8080')
554 parser
.add_option('-r', '--root', dest
='root', default
='./', help="document root directory. Default './'")
555 parser
.add_option('-d', '--verbose', dest
='verbose', default
=False, action
='store_true', help='enable debug trace')
556 (options
, args
) = parser
.parse_args()
558 _debug
= options
.verbose
560 if _debug
: print time
.asctime(), 'Options - %s:%d' % (options
.host
, options
.port
)
562 agent
.root
= options
.root
563 agent
.start(options
.host
, options
.port
)
564 if _debug
: print time
.asctime(), 'HTTP Server Starts - %s:%d' % (options
.host
, options
.port
)
566 except KeyboardInterrupt:
567 print traceback
.print_exc()
571 print traceback
.print_exc()
572 if _debug
: print time
.asctime(), 'HTTP Server Stops'