move example to different port
[multitaskhttpd.git] / httpd.py
1 # Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details.
2
3 '''
4
5
6 If an application wants to use this module as a library, it can launch the server as follows:
7 >>> agent = HTTPServer() # a new RTMP server instance
8 >>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory.
9 >>> agent.start() # start the server
10 >>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking.
11
12
13 If an application wants to specify a different application other than the default App, it can subclass it and supply the application by
14 setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when
15 the client connects to the server.
16
17 class MyApp(App): # a new MyApp extends the default App in rtmp module.
18 def __init__(self): # constructor just invokes base class constructor
19 App.__init__(self)
20 def onConnect(self, client, *args):
21 result = App.onConnect(self, client, *args) # invoke base class method first
22 def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client
23 client.call('connected', 'some-arg')
24 yield
25 multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback
26 ...
27 agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App})
28
29 Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application.
30 If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will
31 throw an exception and display the error message.
32
33 '''
34
35 import os, sys, time, struct, socket, traceback, multitask
36 import threading, Queue
37 import uuid
38 import select
39 from string import strip
40
41 from BaseHTTPServer import BaseHTTPRequestHandler
42 from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler
43 from cStringIO import StringIO
44 from cookielib import parse_ns_headers, CookieJar
45 from Cookie import SimpleCookie
46
47 global _debug
48 _debug = False
49 def set_debug(dbg):
50 global _debug
51 _debug = dbg
52
53 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
54
55 def setup(self):
56 self.connection = self.request
57 self.rfile = StringIO()
58 self.wfile = StringIO()
59
60 def finish(self):
61 pass
62
63 def info(self):
64 return self.headers
65
66 def get_full_url(self):
67 return self.path
68
69 def get_header(self, hdr, default):
70 return self.headers.getheader(hdr, default)
71
72 def add_cookies(self):
73 for k, v in self.response_cookies.items():
74 val = v.OutputString()
75 self.send_header("Set-Cookie", val)
76
77
78 def process_cookies(headers, remote, cookie_key="Cookie", add_sess=True):
79 ch = headers.getheaders(cookie_key)
80 if _debug:
81 print "messageReceived cookieheaders=", '; '.join(ch)
82 res = []
83 for c in ch:
84 c = c.split(";")
85 if len(c) == 0:
86 continue
87 c = map(strip, c)
88 c = filter(lambda x: x, c)
89 res += c
90 has_sess = False
91 response_cookies = SimpleCookie()
92 for c in res:
93 if _debug:
94 print "found cookie", repr(c)
95 name, value = c.split("=")
96 response_cookies[name] = value
97 #response_cookies[name]['path'] = "/"
98 #response_cookies[name]['domain'] = remote[0]
99 #response_cookies[name]['version'] = 0
100 if name == "session":
101 response_cookies[name]['expires'] = 50000
102 has_sess = True
103 if not add_sess:
104 return response_cookies
105 if not has_sess:
106 response_cookies['session'] = uuid.uuid4().hex
107 response_cookies['session']['expires'] = 50000
108 #response_cookies['session']['path'] = '/'
109 #response_cookies['session']['domain'] = remote[0]
110 #response_cookies['session']['version'] = 0
111 return response_cookies
112
113
114 class ConnectionClosed:
115 'raised when the client closed the connection'
116
117
118 class SockStream(object):
119 '''A class that represents a socket as a stream'''
120 def __init__(self, sock):
121 self.sock, self.buffer = sock, ''
122 self.bytesWritten = self.bytesRead = 0
123
124 def close(self):
125 self.sock.shutdown(1) # can't just close it.
126 self.sock = None
127
128 def readline(self):
129 try:
130 while True:
131 nl = self.buffer.find("\n")
132 if nl >= 0: # do not have newline in buffer
133 data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:]
134 raise StopIteration(data)
135 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
136 if not data: raise ConnectionClosed
137 if _debug: print 'socket.read[%d] %r'%(len(data), data)
138 self.bytesRead += len(data)
139 self.buffer += data
140 except StopIteration: raise
141 except: raise ConnectionClosed # anything else is treated as connection closed.
142
143 def read(self, count):
144 try:
145 while True:
146 if len(self.buffer) >= count: # do not have data in buffer
147 data, self.buffer = self.buffer[:count], self.buffer[count:]
148 raise StopIteration(data)
149 data = (yield multitask.recv(self.sock, 4096)) # read more from socket
150 if not data: raise ConnectionClosed
151 # if _debug: print 'socket.read[%d] %r'%(len(data), data)
152 self.bytesRead += len(data)
153 self.buffer += data
154 except StopIteration: raise
155 except: raise ConnectionClosed # anything else is treated as connection closed.
156
157 def unread(self, data):
158 self.buffer = data + self.buffer
159
160 def write(self, data):
161 while len(data) > 0: # write in 4K chunks each time
162 chunk, data = data[:4096], data[4096:]
163 self.bytesWritten += len(chunk)
164 if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk)
165 try: yield multitask.send(self.sock, chunk)
166 except: raise ConnectionClosed
167 if _debug: print 'socket.written'
168
169
170 class Protocol(object):
171
172 def __init__(self, sock):
173 self.stream = SockStream(sock)
174 self.writeLock = threading.Lock()
175 self.writeQueue = Queue.Queue()
176
177 def messageReceived(self, msg):
178 yield
179
180 def connectionClosed(self):
181 yield
182
183 def parse(self):
184 try:
185 yield self.parseRequests() # parse http requests
186 except ConnectionClosed:
187 #yield self.connectionClosed()
188 if _debug: print 'parse connection closed'
189 #yield self.server.queue.put((self, None)) # close connection
190
191 def removeConnection(self):
192 yield self.server.queue.put((self, None)) # close connection
193
194 def writeMessage(self, message):
195 try:
196 yield self.stream.write(message)
197 except ConnectionClosed:
198 yield self.connectionClosed()
199 except:
200 print traceback.print_exc()
201 #self.writeQueue.put(message)
202
203 def parseRequests(self):
204 '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
205
206 if _debug:
207 print "parseRequests start", repr(self)
208 self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
209 self.hr.close_connection = 1
210 self.cookies = CookieJar()
211
212 while True:
213
214 # prepare reading the request so that when it's handed
215 # over to "standard" HTTPRequestHandler, the data's already
216 # there.
217 if _debug:
218 print "parseRequests"
219 try:
220 readok = (yield multitask.readable(self.stream.sock, 5000))
221 except select.error:
222 print "select error: connection closed"
223 raise ConnectionClosed
224
225 if _debug:
226 print "readok", readok
227 print
228 raw_requestline = (yield self.stream.readline())
229 if _debug: print "parseRequests, line", raw_requestline
230 if not raw_requestline:
231 raise ConnectionClosed
232 data = ''
233 try:
234 while 1:
235 line = (yield self.stream.readline())
236 data += line
237 if line in ['\n', '\r\n']:
238 break
239 except StopIteration:
240 if _debug: print "parseRequests, stopiter"
241 raise
242 except:
243 if _debug:
244 print 'parseRequests', \
245 (traceback and traceback.print_exc() or None)
246
247 raise ConnectionClosed
248
249 self.hr.raw_requestline = raw_requestline
250 #pos = self.hr.rfile.tell()
251 pos = 0
252 self.hr.rfile.truncate(0)
253 self.hr.rfile.write(data)
254 if _debug:
255 print "parseRequests write after"
256 self.hr.rfile.seek(pos)
257 if _debug:
258 print "parseRequests seek after"
259
260 if not self.hr.parse_request():
261 raise ConnectionClosed
262 if _debug:
263 print "parseRequests parse_req after"
264 print "parseRequest headers", repr(self.hr.headers), str(self.hr.headers)
265 try:
266 yield self.messageReceived(self.hr)
267 except ConnectionClosed:
268 if _debug:
269 print 'parseRequests, ConnectionClosed '
270 raise StopIteration
271
272 except:
273 if _debug:
274 print 'messageReceived', \
275 (traceback and traceback.print_exc() or None)
276
277 def write(self):
278 '''Writes messages to stream'''
279 print "starting protocol write loop", repr(self)
280 while True:
281 while self.writeQueue.empty(): (yield multitask.sleep(0.01))
282 data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
283 if _debug: print "write to stream", repr(data)
284 if data is None:
285 # just in case TCP socket is not closed, close it.
286 try:
287 if _debug:
288 print "stream closing"
289 print self.stream
290 yield self.stream.close()
291 if _debug:
292 print "stream closed"
293 except: pass
294 break
295
296 try:
297 yield self.stream.write(data)
298 except ConnectionClosed:
299 yield self.connectionClosed()
300 except:
301 print traceback.print_exc()
302 print "ending protocol write loop", repr(self)
303
304
305 class Stream(object):
306 '''The stream object that is used for RTMP stream.'''
307 count = 0
308 def __init__(self, client):
309 self.client, self.id, self.name = client, 0, ''
310 self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute
311 self.queue = multitask.Queue()
312 self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1
313 if _debug: print self, 'created'
314
315 def close(self):
316 if _debug: print self, 'closing'
317 if self.recordfile is not None: self.recordfile.close(); self.recordfile = None
318 if self.playfile is not None: self.playfile.close(); self.playfile = None
319 self.client = None # to clear the reference
320 pass
321
322 def __repr__(self):
323 return self._name
324
325 def recv(self):
326 '''Generator to receive new Message on this stream, or None if stream is closed.'''
327 return self.queue.get()
328
329
330 class Client(Protocol):
331 '''The client object represents a single connected client to the server.'''
332 def __init__(self, sock, server, remote):
333 Protocol.__init__(self, sock)
334 self.server = server
335 self.remote = remote
336 self.streams = {}
337 self.queue = multitask.Queue() # receive queue used by application
338 multitask.add(self.parse())
339 #multitask.add(self.write())
340
341 def recv(self):
342 '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
343 return self.queue.get()
344
345 def connectionClosed(self):
346 '''Called when the client drops the connection'''
347 if _debug: 'Client.connectionClosed'
348 #self.writeMessage(None)
349 #yield self.queue.put((None,None))
350 if self.stream.sock:
351 yield self.stream.close()
352 else:
353 yield None
354
355 def messageReceived(self, msg):
356 if _debug: print 'messageReceived cmd=', msg.command, msg.path
357 msg.response_cookies = process_cookies(msg.headers, self.remote)
358
359 # slightly bad, this: read everything, put it into memory,
360 # but it helps to jump-start the project and enables use of
361 # standard http://python.org BaseHTTPRequestHandler.
362 # modification of mimetools.Message (actually rfc822) is
363 # otherwise required.
364 if msg.headers.has_key('content-length'):
365 max_chunk_size = 10*1024*1024
366 size_remaining = int(msg.headers["content-length"])
367 L = []
368 while size_remaining:
369 chunk_size = min(size_remaining, max_chunk_size)
370 data = (yield self.stream.read(chunk_size))
371 L.append(data)
372 size_remaining -= len(L[-1])
373
374 pos = msg.rfile.tell()
375 msg.rfile.write(''.join(L))
376 msg.rfile.seek(pos)
377
378 yield self.server.queue.put((self, msg)) # new connection
379
380 def rejectConnection(self, reason=''):
381 '''Method to reject an incoming client. just close.
382 TODO: report back an HTTP error with "reason" in it.
383 '''
384 self.removeConnection()
385
386
387 class Server(object):
388 '''A RTMP server listens for incoming connections and informs the app.'''
389 def __init__(self, sock):
390 '''Create an RTMP server on the given bound TCP socket. The server will terminate
391 when the socket is disconnected, or some other error occurs in listening.'''
392 self.sock = sock
393 self.queue = multitask.Queue() # queue to receive incoming client connections
394 multitask.add(self.run())
395
396 def recv(self):
397 '''Generator to wait for incoming client connections on this server and return
398 (client, args) or (None, None) if the socket is closed or some error.'''
399 return self.queue.get()
400
401 def run(self):
402 try:
403 while True:
404 sock, remote = (yield multitask.accept(self.sock)) # receive client TCP
405 if sock == None:
406 if _debug: print 'rtmp.Server accept(sock) returned None.'
407 break
408 if _debug: print 'connection received from', remote
409 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block
410 client = Client(sock, self, remote)
411 except:
412 if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None)
413 traceback.print_exc()
414
415 if (self.sock):
416 try: self.sock.close(); self.sock = None
417 except: pass
418 if (self.queue):
419 yield self.queue.put((None, None))
420 self.queue = None
421
422 class BaseApp(object):
423 '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
424 count = 0
425 def __init__(self):
426 self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1
427 self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients
428 if _debug: print self.name, 'created'
429 def __del__(self):
430 if _debug: print self.name, 'destroyed'
431 @property
432 def clients(self):
433 '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
434 return self._clients[1:] if self._clients is not None else []
435
436
437 class App(BaseApp, SimpleAppHTTPRequestHandler):
438 pass
439
440
441 class HTTPServer(object):
442 '''A RTMP server to record and stream HTTP.'''
443 def __init__(self):
444 '''Construct a new HttpServer. It initializes the local members.'''
445 self.sock = self.server = None
446 self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App}
447 self.clients = dict() # list of clients indexed by scope. First item in list is app instance.
448 self.root = ''
449
450 def start(self, host='0.0.0.0', port=8080):
451 '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.'''
452 if _debug: print 'start', host, port
453 if not self.server:
454 sock = self.sock = socket.socket(type=socket.SOCK_STREAM)
455 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
456 sock.bind((host, port))
457 if _debug: print 'listening on ', sock.getsockname()
458 sock.listen(5)
459 server = self.server = Server(sock) # start rtmp server on that socket
460 multitask.add(self.serverlistener())
461
462 def stop(self):
463 if _debug: print 'stopping HTTP server'
464 if self.server and self.sock:
465 try: self.sock.close(); self.sock = None
466 except: pass
467 self.server = None
468
469 def serverlistener(self):
470 '''Server listener (generator). It accepts all connections and invokes client listener'''
471 try:
472 while True: # main loop to receive new connections on the server
473 client, msg = (yield self.server.recv()) # receive an incoming client connection.
474 # TODO: we should reject non-localhost client connections.
475 if not client: # if the server aborted abnormally,
476 break # hence close the listener.
477 if _debug: print 'client connection received', client, msg
478
479 if msg is None:
480 yield client.connectionClosed()
481 session = client.session
482 del self.clients[session]
483 continue
484
485 if _debug:
486 print "cookies", str(msg.response_cookies)
487 session = msg.response_cookies['session'].value
488 client.session = session
489 name = msg.path
490 if _debug:
491 print "serverlistener", name
492 if '*' not in self.apps and name not in self.apps:
493 yield client.rejectConnection(reason='Application not found: ' + name)
494 else: # create application instance as needed and add in our list
495 if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
496 app = self.apps[name] if name in self.apps else self.apps['*'] # application class
497 if _debug:
498 print "clients", self.clients.keys()
499 if session in self.clients:
500 inst = self.clients[session][0]
501 else:
502 inst = app()
503 self.clients[session] = [inst]; inst._clients=self.clients[session]
504 msg.server = inst # whew! just in time!
505 try:
506 methodname = "on%s" % msg.command
507 if _debug:
508 print methodname, dir(inst)
509 method = getattr(inst, methodname, None)
510 yield method(client, msg)
511 result = None
512 close_connection = msg.close_connection
513 if _debug:
514 print "close connection", close_connection
515 except:
516 if _debug: traceback.print_exc()
517 yield client.rejectConnection(reason='Exception on %s' % methodname)
518 continue
519 if result is True or result is None:
520 if result is None:
521 msg.wfile.seek(0)
522 data = msg.wfile.read()
523 msg.wfile.seek(0)
524 msg.wfile.truncate()
525 yield client.writeMessage(data)
526 if close_connection:
527 if _debug:
528 print 'close_connection requested'
529 try:
530 yield client.connectionClosed()
531 raise ConnectionClosed
532 except ConnectionClosed:
533 if _debug:
534 print 'close_connection done'
535 pass
536 else:
537 if _debug:
538 print "result", result
539 yield client.rejectConnection(reason='Rejected in onConnect')
540 except StopIteration: raise
541 except:
542 if _debug:
543 print 'serverlistener exception', \
544 (sys and sys.exc_info() or None)
545 traceback.print_exc()
546
547
548 # The main routine to start, run and stop the service
549 if __name__ == '__main__':
550 print "optparse"
551 from optparse import OptionParser
552 parser = OptionParser()
553 parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'")
554 parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080')
555 parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'")
556 parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace')
557 (options, args) = parser.parse_args()
558
559 _debug = options.verbose
560 try:
561 if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
562 agent = HTTPServer()
563 agent.root = options.root
564 agent.start(options.host, options.port)
565 if _debug: print time.asctime(), 'HTTP Server Starts - %s:%d' % (options.host, options.port)
566 multitask.run()
567 except KeyboardInterrupt:
568 print traceback.print_exc()
569 pass
570 except:
571 print "exception"
572 print traceback.print_exc()
573 if _debug: print time.asctime(), 'HTTP Server Stops'