From 5c852280623c517ee6bbadc8b77cdc5cef50607a Mon Sep 17 00:00:00 2001 From: Luke Kenneth Casson Leighton Date: Mon, 12 Jul 2010 20:13:25 +0100 Subject: [PATCH] multitasking web server --- SimpleAppHTTPServer.py | 222 +++++++ httpd.py | 850 ++++++++++++++++++++++++++ multitask.py | 1281 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 2353 insertions(+) create mode 100644 SimpleAppHTTPServer.py create mode 100644 httpd.py create mode 100644 multitask.py diff --git a/SimpleAppHTTPServer.py b/SimpleAppHTTPServer.py new file mode 100644 index 0000000..a615e2b --- /dev/null +++ b/SimpleAppHTTPServer.py @@ -0,0 +1,222 @@ +"""Simple HTTP Server. + +This module builds on BaseHTTPServer by implementing the standard GET +and HEAD requests in a fairly straightforward manner. + +""" + + +__version__ = "0.6" + +__all__ = ["SimpleHTTPRequestHandler"] + +import os +import posixpath +import BaseHTTPServer +import urllib +import urlparse +import cgi +import shutil +import mimetypes +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + + +class SimpleAppHTTPRequestHandler(object): + + """Simple HTTP request handler with GET and HEAD commands. + + This serves files from the current directory and any of its + subdirectories. The MIME type for files is determined by + calling the .guess_type() method. + + The GET and HEAD requests are identical except that the HEAD + request omits the actual contents of the file. + + """ + + server_version = "SimpleHTTP/" + __version__ + + def onGET(self, client, *args): + """Serve a GET request.""" + self.client = client + hr = args[0] + self.path = hr.path + ka = hr.request_version == "HTTP/1.1" + f = self.send_head(hr, ka) + if f: + self.copyfile(f, hr.wfile) + f.close() + + def onHEAD(self): + self.client = client + """Serve a HEAD request.""" + hr = args[0] + f = self.send_head(hr, False) + if f: + f.close() + + def send_head(self, hr, ka): + """Common code for GET and HEAD commands. + + This sends the response code and MIME headers. + + Return value is either a file object (which has to be copied + to the outputfile by the caller unless the command was HEAD, + and must be closed by the caller under all circumstances), or + None, in which case the caller has nothing further to do. + + """ + path = self.translate_path(self.path) + f = None + if os.path.isdir(path): + if not self.path.endswith('/'): + # redirect browser - doing basically what apache does + hr.send_response(301) + hr.send_header("Location", self.path + "/") + hr.end_headers() + return None + for index in "index.html", "index.htm": + index = os.path.join(path, index) + if os.path.exists(index): + path = index + break + else: + return self.list_directory(hr, path, ka) + ctype = self.guess_type(path) + if ctype.startswith('text/'): + mode = 'r' + else: + mode = 'rb' + try: + f = open(path, mode) + except IOError: + hr.send_error(404, "File not found") + return None + hr.send_response(200) + hr.send_header("Content-type", ctype) + fs = os.fstat(f.fileno()) + hr.send_header("Content-Length", str(fs[6])) + hr.send_header("Last-Modified", hr.date_time_string(fs.st_mtime)) + if ka: + hr.send_header("Connection", "keep-alive") + self.client.cookies.add_cookie_header(hr) + hr.end_headers() + return f + + def list_directory(self, hr, path, ka): + """Helper to produce a directory listing (absent index.html). + + Return value is either a file object, or None (indicating an + error). In either case, the headers are sent, making the + interface the same as for send_head(). + + """ + try: + list = os.listdir(path) + except os.error: + self.send_error(404, "No permission to list directory") + return None + list.sort(key=lambda a: a.lower()) + f = StringIO() + displaypath = cgi.escape(urllib.unquote(self.path)) + f.write("Directory listing for %s\n" % displaypath) + f.write("

Directory listing for %s

\n" % displaypath) + f.write("
\n\n
\n") + length = f.tell() + f.seek(0) + hr.send_response(200) + hr.send_header("Content-type", "text/html") + hr.send_header("Content-Length", str(length)) + if ka: + hr.send_header("Connection", "keep-alive") + self.client.cookies.add_cookie_header(hr) + hr.end_headers() + return f + + def translate_path(self, path): + """Translate a /-separated PATH to the local filename syntax. + + Components that mean special things to the local file system + (e.g. drive or directory names) are ignored. (XXX They should + probably be diagnosed.) + + """ + # abandon query parameters + path = urlparse.urlparse(path)[2] + path = posixpath.normpath(urllib.unquote(path)) + words = path.split('/') + words = filter(None, words) + path = os.getcwd() + for word in words: + drive, word = os.path.splitdrive(word) + head, word = os.path.split(word) + if word in (os.curdir, os.pardir): continue + path = os.path.join(path, word) + return path + + def copyfile(self, source, outputfile): + """Copy all data between two file objects. + + The SOURCE argument is a file object open for reading + (or anything with a read() method) and the DESTINATION + argument is a file object open for writing (or + anything with a write() method). + + The only reason for overriding this would be to change + the block size or perhaps to replace newlines by CRLF + -- note however that this the default server uses this + to copy binary data as well. + + """ + shutil.copyfileobj(source, outputfile) + + def guess_type(self, path): + """Guess the type of a file. + + Argument is a PATH (a filename). + + Return value is a string of the form type/subtype, + usable for a MIME Content-type header. + + The default implementation looks the file's extension + up in the table self.extensions_map, using application/octet-stream + as a default; however it would be permissible (if + slow) to look inside the data to make a better guess. + + """ + + base, ext = posixpath.splitext(path) + if ext in self.extensions_map: + return self.extensions_map[ext] + ext = ext.lower() + if ext in self.extensions_map: + return self.extensions_map[ext] + else: + return self.extensions_map[''] + + if not mimetypes.inited: + mimetypes.init() # try to read system mime.types + extensions_map = mimetypes.types_map.copy() + extensions_map.update({ + '': 'application/octet-stream', # Default + '.py': 'text/plain', + '.c': 'text/plain', + '.h': 'text/plain', + }) + diff --git a/httpd.py b/httpd.py new file mode 100644 index 0000000..a7cd598 --- /dev/null +++ b/httpd.py @@ -0,0 +1,850 @@ +# Copyright (c) 2007-2009, Mamta Singh. All rights reserved. see README for details. + +''' +This is a simple implementation of a Flash RTMP server to accept connections and stream requests. The module is organized as follows: +1. The FlashServer class is the main class to provide the server abstraction. It uses the multitask module for co-operative multitasking. + It also uses the App abstract class to implement the applications. +2. The Server class implements a simple server to receive new Client connections and inform the FlashServer application. The Client class + derived from Protocol implements the RTMP client functions. The Protocol class implements the base RTMP protocol parsing. A Client contains + various streams from the client, represented using the Stream class. +3. The Message, Header and Command represent RTMP message, header and command respectively. The FLV class implements functions to perform read + and write of FLV file format. + + +Typically an application can launch this server as follows: +$ python rtmp.py + +To know the command line options use the -h option: +$ python rtmp.py -h + +To start the server with a different directory for recording and playing FLV files from, use the following command. +$ python rtmp.py -r some-other-directory/ +Note the terminal '/' in the directory name. Without this, it is just used as a prefix in FLV file names. + +A test client is available in testClient directory, and can be compiled using Flex Builder. Alternatively, you can use the SWF file to launch +from testClient/bin-debug after starting the server. Once you have launched the client in the browser, you can connect to +local host by clicking on 'connect' button. Then click on publish button to publish a stream. Open another browser with +same URL and first connect then play the same stream name. If everything works fine you should be able to see the video +from first browser to the second browser. Similar, in the first browser, if you check the record box before publishing, +it will create a new FLV file for the recorded stream. You can close the publishing stream and play the recorded stream to +see your recording. Note that due to initial delay in timestamp (in case publish was clicked much later than connect), +your played video will start appearing after some initial delay. + + +If an application wants to use this module as a library, it can launch the server as follows: +>>> agent = FlashServer() # a new RTMP server instance +>>> agent.root = 'flvs/' # set the document root to be 'flvs' directory. Default is current './' directory. +>>> agent.start() # start the server +>>> multitask.run() # this is needed somewhere in the application to actually start the co-operative multitasking. + + +If an application wants to specify a different application other than the default App, it can subclass it and supply the application by +setting the server's apps property. The following example shows how to define "myapp" which invokes a 'connected()' method on client when +the client connects to the server. + +class MyApp(App): # a new MyApp extends the default App in rtmp module. + def __init__(self): # constructor just invokes base class constructor + App.__init__(self) + def onConnect(self, client, *args): + result = App.onConnect(self, client, *args) # invoke base class method first + def invokeAdded(self, client): # define a method to invoke 'connected("some-arg")' on Flash client + client.call('connected', 'some-arg') + yield + multitask.add(invokeAdded(self, client)) # need to invoke later so that connection is established before callback +... +agent.apps = dict({'myapp': MyApp, 'someapp': MyApp, '*': App}) + +Now the client can connect to rtmp://server/myapp or rtmp://server/someapp and will get connected to this MyApp application. +If the client doesn't define "function connected(arg:String):void" in the NetConnection.client object then the server will +throw an exception and display the error message. + +''' + +import os, sys, time, struct, socket, traceback, multitask +import threading, Queue +import uuid + +from BaseHTTPServer import BaseHTTPRequestHandler +from SimpleAppHTTPServer import SimpleAppHTTPRequestHandler +from cStringIO import StringIO +from cookielib import parse_ns_headers, CookieJar + +_debug = False + +class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler): + + def setup(self): + self.connection = self.request + self.rfile = StringIO() + self.wfile = StringIO() + + def finish(self): + pass + + def info(self): + return self.headers + + def get_full_url(self): + return self.path + + def get_header(self, hdr, default): + return self.headers.getheader(hdr, default) + + def has_header(self, hdr): + return False + + def is_unverifiable(self): + return False + + def add_unredirected_header(self, name, val): + if name == 'Cookie': + self.send_header("Set-Cookie", val) + +class ConnectionClosed: + 'raised when the client closed the connection' + +class SockStream(object): + '''A class that represents a socket as a stream''' + def __init__(self, sock): + self.sock, self.buffer = sock, '' + self.bytesWritten = self.bytesRead = 0 + + def close(self): + print "sock close" + fno = self.sock.fileno() + self.sock.close() + print "writable?", self.sock, fno + yield multitask.writable(fno, timeout=0.1) + print "is it?" + del self.sock + self.sock = None + + def readline(self): + try: + while True: + nl = self.buffer.find("\n") + if nl >= 0: # do not have newline in buffer + data, self.buffer = self.buffer[:nl+1], self.buffer[nl+1:] + raise StopIteration(data) + data = (yield multitask.recv(self.sock, 4096)) # read more from socket + if not data: raise ConnectionClosed + if _debug: print 'socket.read[%d] %r'%(len(data), data) + self.bytesRead += len(data) + self.buffer += data + except StopIteration: raise + except: raise ConnectionClosed # anything else is treated as connection closed. + + def read(self, count): + try: + while True: + if len(self.buffer) >= count: # do not have data in buffer + data, self.buffer = self.buffer[:count], self.buffer[count:] + raise StopIteration(data) + data = (yield multitask.recv(self.sock, 4096)) # read more from socket + if not data: raise ConnectionClosed + # if _debug: print 'socket.read[%d] %r'%(len(data), data) + self.bytesRead += len(data) + self.buffer += data + except StopIteration: raise + except: raise ConnectionClosed # anything else is treated as connection closed. + + def unread(self, data): + self.buffer = data + self.buffer + + def write(self, data): + while len(data) > 0: # write in 4K chunks each time + chunk, data = data[:4096], data[4096:] + self.bytesWritten += len(chunk) + if _debug: print 'socket.write[%d] %r'%(len(chunk), chunk) + try: yield multitask.send(self.sock, chunk) + except: raise ConnectionClosed + if _debug: print 'socket.written' + + +class Header(object): + FULL, MESSAGE, TIME, SEPARATOR, MASK = 0x00, 0x40, 0x80, 0xC0, 0xC0 + + def __init__(self, channel=0, time=0, size=None, type=None, streamId=0): + self.channel, self.time, self.size, self.type, self.streamId = channel, time, size, type, streamId + self.extendedtime = 0 + if channel<64: self.hdrdata = chr(channel) + elif channel<(64+256): self.hdrdata = '\x00'+chr(channel-64) + else: self.hdrdata = '\x01'+chr((channel-64)%256)+chr((channel-64)/256) + + def _appendExtendedTimestamp(self, data): + if self.time == 0xFFFFFF: + data += struct.pack('>I', self.extendedtime) + return data + + def toBytes(self, control): + data = chr(ord(self.hdrdata[0]) | control) + self.hdrdata[1:] + if control == Header.SEPARATOR: return self._appendExtendedTimestamp(data) + + data += struct.pack('>I', self.time & 0xFFFFFF)[1:] # add time + if control == Header.TIME: return self._appendExtendedTimestamp(data) + + data += struct.pack('>I', self.size)[1:] # size + data += chr(self.type) # type + if control == Header.MESSAGE: return self._appendExtendedTimestamp(data) + + data += struct.pack('" + % (self.channel, self.time, self.size, self.type, self.type or 0, self.streamId)) + +class Message(object): + # message types: RPC3, DATA3,and SHAREDOBJECT3 are used with AMF3 + RPC, RPC3, DATA, DATA3, SHAREDOBJ, SHAREDOBJ3, AUDIO, VIDEO, ACK, CHUNK_SIZE = \ + 0x14, 0x11, 0x12, 0x0F, 0x13, 0x10, 0x08, 0x09, 0x03, 0x01 + + def __init__(self, hdr=None, data=''): + if hdr is None: hdr = Header() + self.header, self.data = hdr, data + + # define properties type, streamId and time to access self.header.(property) + for p in ['type', 'streamId', 'time']: + exec 'def _g%s(self): return self.header.%s'%(p, p) + exec 'def _s%s(self, %s): self.header.%s = %s'%(p, p, p, p) + exec '%s = property(fget=_g%s, fset=_s%s)'%(p, p, p) + @property + def size(self): return len(self.data) + + def __repr__(self): + return (""% (self.header, self.data)) + +class Protocol(object): + # constants + PING_SIZE = 1536 + DEFAULT_CHUNK_SIZE = 128 + MIN_CHANNEL_ID = 3 + PROTOCOL_CHANNEL_ID = 2 + + def __init__(self, sock): + self.stream = SockStream(sock) + self.lastReadHeaders = dict() # indexed by channelId + self.incompletePackets = dict() #indexed by channelId + self.readChunkSize = self.writeChunkSize = Protocol.DEFAULT_CHUNK_SIZE + self.lastWriteHeaders = dict() # indexed by streamId + self.nextChannelId = Protocol.MIN_CHANNEL_ID + self.writeLock = threading.Lock() + self.writeQueue = Queue.Queue() + + def messageReceived(self, msg): + yield + + def protocolMessage(self, msg): + if msg.type == Message.ACK: # respond to ACK requests + response = Message() + response.type, response.data = msg.type, msg.data + self.writeMessage(response) + elif msg.type == Message.CHUNK_SIZE: + self.readChunkSize = struct.unpack('>L', msg.data)[0] + + def connectionClosed(self): + yield + + def parse(self): + try: + yield self.parseRequests() # parse http requests + except ConnectionClosed: + yield self.connectionClosed() + if _debug: print 'parse connection closed' + + def writeMessage(self, message): + self.writeQueue.put(message) + + def parseHandshake(self): + '''Parses the rtmp handshake''' + data = (yield self.stream.read(Protocol.PING_SIZE + 1)) # bound version and first ping + yield self.stream.write(data) + data = (yield self.stream.read(Protocol.PING_SIZE)) # bound second ping + yield self.stream.write(data) + + def parseRequests(self): + '''Parses complete messages until connection closed. Raises ConnectionLost exception.''' + self.hr = MultitaskHTTPRequestHandler(self.stream, ("",0,), self.remote) + self.hr.close_connection = 1 + self.cookies = CookieJar() + + while True: + + # prepare reading the request so that when it's handed + # over to "standard" HTTPRequestHandler, the data's already + # there. + print "parseRequests" + raw_requestline = (yield self.stream.readline()) + if _debug: print "parseRequests, line", raw_requestline + if not raw_requestline: + raise ConnectionClosed + data = '' + try: + while 1: + line = (yield self.stream.readline()) + data += line + if line in ['\n', '\r\n']: + break + except StopIteration: + if _debug: print "parseRequests, stopiter" + raise + except: + if _debug: + print 'parseRequests', \ + (traceback and traceback.print_exc() or None) + + raise ConnectionClosed + + self.hr.raw_requestline = raw_requestline + self.hr.rfile.write(data) + print "parseRequests write after" + self.hr.rfile.seek(0) + print "parseRequests seek after" + + if not self.hr.parse_request(): + raise ConnectionClosed + print "parseRequests parse_req after" + try: + yield self.messageReceived(self.hr) + except: + if _debug: + print 'messageReceived', \ + (traceback and traceback.print_exc() or None) + + def write(self): + '''Writes messages to stream''' + while True: + while self.writeQueue.empty(): (yield multitask.sleep(0.01)) + data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait. + if _debug: print "write to stream", repr(data) + if data is None: + # just in case TCP socket is not closed, close it. + try: + print "stream closing" + print self.stream + yield self.stream.close() + print "stream closed" + except: pass + break + + try: + yield self.stream.write(data) + except ConnectionClosed: + yield self.connectionClosed() + except: + print traceback.print_exc() + +class Command(object): + ''' Class for command / data messages''' + def __init__(self, type=Message.RPC, name=None, id=None, cmdData=None, args=[]): + '''Create a new command with given type, name, id, cmdData and args list.''' + self.type, self.name, self.id, self.cmdData, self.args = type, name, id, cmdData, args[:] + + def __repr__(self): + return ("" % (self.type, self.name, self.id, self.cmdData, self.args)) + + def setArg(self, arg): + self.args.append(arg) + + def getArg(self, index): + return self.args[index] + + @classmethod + def fromMessage(cls, message): + ''' initialize from a parsed RTMP message''' + assert (message.type in [Message.RPC, Message.RPC3, Message.DATA, Message.DATA3]) + + length = len(message.data) + if length == 0: raise ValueError('zero length message data') + + if message.type == Message.RPC3 or message.type == Message.DATA3: + assert message.data[0] == '\x00' # must be 0 in AMD3 + data = message.data[1:] + else: + data = message.data + + amfReader = amf.AMF0(data) + + inst = cls() + inst.name = amfReader.read() # first field is command name + + try: + if message.type == Message.RPC: + inst.id = amfReader.read() # second field *may* be message id + inst.cmdData = amfReader.read() # third is command data + else: + inst.id = 0 + inst.args = [] # others are optional + while True: + inst.args.append(amfReader.read()) + except EOFError: + pass + return inst + + def toMessage(self): + msg = Message() + assert self.type + msg.type = self.type + output = amf.BytesIO() + amfWriter = amf.AMF0(output) + amfWriter.write(self.name) + if msg.type == Message.RPC or msg.type == Message.RPC3: + amfWriter.write(self.id) + amfWriter.write(self.cmdData) + for arg in self.args: + amfWriter.write(arg) + output.seek(0) + #hexdump.hexdump(output) + #output.seek(0) + if msg.type == Message.RPC3 or msg.type == Message.DATA3: + data = '\x00' + output.read() + else: + data = output.read() + msg.data = data + output.close() + return msg + +def getfilename(path, name, root): + '''return the file name for the given stream. The name is derived as root/scope/name.flv where scope is + the the path present in the path variable.''' + ignore, ignore, scope = path.partition('/') + if scope: scope = scope + '/' + result = root + scope + name + '.flv' + if _debug: print 'filename=', result + return result + +class Stream(object): + '''The stream object that is used for RTMP stream.''' + count = 0; + def __init__(self, client): + self.client, self.id, self.name = client, 0, '' + self.recordfile = self.playfile = None # so that it doesn't complain about missing attribute + self.queue = multitask.Queue() + self._name = 'Stream[' + str(Stream.count) + ']'; Stream.count += 1 + if _debug: print self, 'created' + + def close(self): + if _debug: print self, 'closing' + if self.recordfile is not None: self.recordfile.close(); self.recordfile = None + if self.playfile is not None: self.playfile.close(); self.playfile = None + self.client = None # to clear the reference + pass + + def __repr__(self): + return self._name; + + def recv(self): + '''Generator to receive new Message on this stream, or None if stream is closed.''' + return self.queue.get() + + def send(self, msg): + '''Method to send a Message or Command on this stream.''' + if isinstance(msg, Command): + msg = msg.toMessage() + msg.streamId = self.id + # if _debug: print self,'send' + if self.client is not None: self.client.writeMessage(msg) + +class Client(Protocol): + '''The client object represents a single connected client to the server.''' + def __init__(self, sock, server, remote): + Protocol.__init__(self, sock) + self.server = server + self.remote = remote + self.agent = {} + self.streams = {} + self._nextCallId = 2 + self._nextStreamId = 1 + self.objectEncoding = 0.0 + self.queue = multitask.Queue() # receive queue used by application + multitask.add(self.parse()) + multitask.add(self.write()) + + def recv(self): + '''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.''' + return self.queue.get() + + def connectionClosed(self): + '''Called when the client drops the connection''' + if _debug: 'Client.connectionClosed' + self.writeMessage(None) + yield self.queue.put((None,None)) + + def messageReceived(self, msg): + if _debug: print 'messageReceived cmd=', msg.command, msg.path + ch = msg.headers.getheaders("Cookie") + ch = parse_ns_headers(ch) + cookies = self.cookies._cookies_from_attrs_set(ch, msg) + has_sess = False + for c in cookies: + self.cookies.set_cookie(c) + if c.name == "session": + has_sess = True + msg.sess_cookie = c + if not has_sess: + t = ("session", uuid.uuid4().hex, {}, {}) + msg.sess_cookie = self.cookies._cookie_from_cookie_tuple(t, msg) + self.cookies.set_cookie(msg.sess_cookie) + yield self.server.queue.put((self, msg)) # new connection + + def accept(self): + '''Method to accept an incoming client.''' + response = Command() + response.id, response.name, response.type = 1, '_result', Message.RPC + if _debug: print 'Client.accept() objectEncoding=', self.objectEncoding + response.setArg(dict(level='status', code='NetConnection.Connect.Success', + description='Connection succeeded.', details=None, + objectEncoding=self.objectEncoding)) + self.writeMessage(response.toMessage()) + + def rejectConnection(self, reason=''): + '''Method to reject an incoming client.''' + response = Command() + response.id, response.name, response.type = 1, '_error', Message.RPC + response.setArg(dict(level='status', code='NetConnection.Connect.Rejected', + description=reason, details=None)) + self.writeMessage(response.toMessage()) + + def call(self, method, *args): + '''Call a (callback) method on the client.''' + cmd = Command() + cmd.id, cmd.name, cmd.type = self._nextCallId, method, (self.objectEncoding == 0.0 and Message.RPC or Message.RPC3) + cmd.args, cmd.cmdData = args, None + self._nextCallId += 1 + if _debug: print 'Client.call method=', method, 'args=', args, ' msg=', cmd.toMessage() + self.writeMessage(cmd.toMessage()) + + def createStream(self): + ''' Create a stream on the server side''' + stream = Stream(self) + stream.id = self._nextStreamId + self.streams[stream.id] = stream + self._nextStreamId += 1 + return stream + + +class Server(object): + '''A RTMP server listens for incoming connections and informs the app.''' + def __init__(self, sock): + '''Create an RTMP server on the given bound TCP socket. The server will terminate + when the socket is disconnected, or some other error occurs in listening.''' + self.sock = sock + self.queue = multitask.Queue() # queue to receive incoming client connections + multitask.add(self.run()) + + def recv(self): + '''Generator to wait for incoming client connections on this server and return + (client, args) or (None, None) if the socket is closed or some error.''' + return self.queue.get() + + def run(self): + try: + while True: + sock, remote = (yield multitask.accept(self.sock)) # receive client TCP + if sock == None: + if _debug: print 'rtmp.Server accept(sock) returned None.' + break + if _debug: print 'connection received from', remote + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # make it non-block + client = Client(sock, self, remote) + except: + if _debug: print 'rtmp.Server exception ', (sys and sys.exc_info() or None) + traceback.print_exc() + + if (self.sock): + try: self.sock.close(); self.sock = None + except: pass + if (self.queue): + yield self.queue.put((None, None)) + self.queue = None + +class App(SimpleAppHTTPRequestHandler): + '''An application instance containing any number of streams. Except for constructor all methods are generators.''' + count = 0 + def __init__(self): + self.name = str(self.__class__.__name__) + '[' + str(App.count) + ']'; App.count += 1 + self.players, self.publishers, self._clients = {}, {}, [] # Streams indexed by stream name, and list of clients + if _debug: print self.name, 'created' + def __del__(self): + if _debug: print self.name, 'destroyed' + @property + def clients(self): + '''everytime this property is accessed it returns a new list of clients connected to this instance.''' + return self._clients[1:] if self._clients is not None else [] + def onConnect(self, client, *args): + if _debug: print self.name, 'onConnect', client.path + return True + def onDisconnect(self, client): + if _debug: print self.name, 'onDisconnect', client.path + +class HttpServer(object): + '''A RTMP server to record and stream Flash video.''' + def __init__(self): + '''Construct a new HttpServer. It initializes the local members.''' + self.sock = self.server = None + self.apps = dict({'*': App}) # supported applications: * means any as in {'*': App} + self.clients = dict() # list of clients indexed by scope. First item in list is app instance. + self.root = '' + + def start(self, host='0.0.0.0', port=8080): + '''This should be used to start listening for RTMP connections on the given port, which defaults to 8080.''' + if _debug: print 'start', host, port + if not self.server: + sock = self.sock = socket.socket(type=socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + if _debug: print 'listening on ', sock.getsockname() + sock.listen(5) + server = self.server = Server(sock) # start rtmp server on that socket + multitask.add(self.serverlistener()) + + def stop(self): + if _debug: print 'stopping Flash server' + if self.server and self.sock: + try: self.sock.close(); self.sock = None + except: pass + self.server = None + + def serverlistener(self): + '''Server listener (generator). It accepts all connections and invokes client listener''' + try: + while True: # main loop to receive new connections on the server + client, msg = (yield self.server.recv()) # receive an incoming client connection. + # TODO: we should reject non-localhost client connections. + if not client: # if the server aborted abnormally, + break # hence close the listener. + if _debug: print 'client connection received', client, args + # if client.objectEncoding != 0 and client.objectEncoding != 3: + if client.objectEncoding != 0: + yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0') + yield client.connectionClosed() + else: + print "cookies", str(client.cookies) + session = msg.sess_cookie.value + name, ignore, scope = msg.path.partition('/') + if '*' not in self.apps and name not in self.apps: + yield client.rejectConnection(reason='Application not found: ' + name) + else: # create application instance as needed and add in our list + if _debug: print 'name=', name, 'name in apps', str(name in self.apps) + app = self.apps[name] if name in self.apps else self.apps['*'] # application class + if session in self.clients: inst = self.clients[session][0] + else: inst = app() + try: + methodname = "on%s" % msg.command + method = getattr(inst, methodname, None) + result = method(client, msg) + close_connection = msg.close_connection + print "close connection", close_connection + except: + if _debug: traceback.print_exc() + yield client.rejectConnection(reason='Exception on %s' % methodname) + continue + if result is True or result is None: + if session not in self.clients: + self.clients[session] = [inst]; inst._clients=self.clients[session] + self.clients[session].append(client) + msg.wfile.seek(0) + data = msg.wfile.read() + msg.wfile.seek(0) + msg.wfile.truncate() + yield client.writeMessage(data) + if close_connection: + if _debug: + print 'close_connection requested' + try: + yield client.connectionClosed() + except ClientClosed: + if _debug: + print 'close_connection done' + pass + else: + yield client.rejectConnection(reason='Rejected in onConnect') + except StopIteration: raise + except: + if _debug: + print 'serverlistener exception', \ + (sys and sys.exc_info() or None) + traceback.print_exc() + + def clientlistener(self, client): + '''Client listener (generator). It receives a command and invokes client handler, or receives a new stream and invokes streamlistener.''' + try: + while True: + msg, arg = (yield client.recv()) # receive new message from client + if not msg: # if the client disconnected, + if _debug: print 'connection closed from client' + break # come out of listening loop. + if msg == 'command': # handle a new command + multitask.add(self.clienthandler(client, arg)) + elif msg == 'stream': # a new stream is created, handle the stream. + arg.client = client + multitask.add(self.streamlistener(arg)) + except StopIteration: raise + except: + if _debug: print 'clientlistener exception', (sys and sys.exc_info() or None) + traceback.print_exc() + + # client is disconnected, clear our state for application instance. + if _debug: print 'cleaning up client', client.path + inst = None + if client.path in self.clients: + inst = self.clients[client.path][0] + self.clients[client.path].remove(client) + for stream in client.streams.values(): # for all streams of this client + self.closehandler(stream) + client.streams.clear() # and clear the collection of streams + if client.path in self.clients and len(self.clients[client.path]) == 1: # no more clients left, delete the instance. + if _debug: print 'removing the application instance' + inst = self.clients[client.path][0] + inst._clients = None + del self.clients[client.path] + if inst is not None: inst.onDisconnect(client) + + def closehandler(self, stream): + '''A stream is closed explicitly when a closeStream command is received from given client.''' + if stream.client is not None: + inst = self.clients[stream.client.path][0] + if stream.name in inst.publishers and inst.publishers[stream.name] == stream: # clear the published stream + inst.onClose(stream.client, stream) + del inst.publishers[stream.name] + if stream.name in inst.players and stream in inst.players[stream.name]: + inst.onStop(stream.client, stream) + inst.players[stream.name].remove(stream) + if len(inst.players[stream.name]) == 0: + del inst.players[stream.name] + stream.close() + + def clienthandler(self, client, cmd): + '''A generator to handle a single command on the client.''' + inst = self.clients[client.path][0] + if inst: + if cmd.name == '_error': + if hasattr(inst, 'onStatus'): + result = inst.onStatus(client, cmd.args[0]) + elif cmd.name == '_result': + if hasattr(inst, 'onResult'): + result = inst.onResult(client, cmd.args[0]) + else: + res, code, args = Command(), '_result', dict() + try: result = inst.onCommand(client, cmd.name, *cmd.args) + except: + if _debug: print 'Client.call exception', (sys and sys.exc_info() or None) + code, args = '_error', dict() + res.id, res.name, res.type = cmd.id, code, (client.objectEncoding == 0.0 and Message.RPC or Message.RPC3) + res.args, res.cmdData = args, None + if _debug: print 'Client.call method=', code, 'args=', args, ' msg=', res.toMessage() + client.writeMessage(res.toMessage()) + # TODO return result to caller + yield + + def streamlistener(self, stream): + '''Stream listener (generator). It receives stream message and invokes streamhandler.''' + stream.recordfile = None # so that it doesn't complain about missing attribute + while True: + msg = (yield stream.recv()) + if not msg: + if _debug: print 'stream closed' + self.closehandler(stream) + break + # if _debug: msg + multitask.add(self.streamhandler(stream, msg)) + + def streamhandler(self, stream, message): + '''A generator to handle a single message on the stream.''' + try: + if message.type == Message.RPC: + cmd = Command.fromMessage(message) + if _debug: print 'streamhandler received cmd=', cmd + if cmd.name == 'publish': + yield self.publishhandler(stream, cmd) + elif cmd.name == 'play': + yield self.playhandler(stream, cmd) + elif cmd.name == 'closeStream': + self.closehandler(stream) + else: # audio or video message + yield self.mediahandler(stream, message) + except GeneratorExit: pass + except StopIteration: pass + except: + if _debug: print 'exception in streamhandler', (sys and sys.exc_info()) + + def publishhandler(self, stream, cmd): + '''A new stream is published. Store the information in the application instance.''' + try: + stream.mode = 'live' if len(cmd.args) < 2 else cmd.args[1] # live, record, append + stream.name = cmd.args[0] + if _debug: print 'publishing stream=', stream.name, 'mode=', stream.mode + inst = self.clients[stream.client.path][0] + if (stream.name in inst.publishers): + raise ValueError, 'Stream name already in use' + inst.publishers[stream.name] = stream # store the client for publisher + result = inst.onPublish(stream.client, stream) + + if stream.mode == 'record' or stream.mode == 'append': + stream.recordfile = FLV().open(getfilename(stream.client.path, stream.name, self.root), stream.mode) + response = Command(name='onStatus', id=cmd.id, args=[dict(level='status', code='NetStream.Publish.Start', description='', details=None)]) + yield stream.send(response) + except ValueError, E: # some error occurred. inform the app. + if _debug: print 'error in publishing stream', str(E) + response = Command(name='onStatus', id=cmd.id, args=[dict(level='error',code='NetStream.Publish.BadName',description=str(E),details=None)]) + yield stream.send(response) + + def playhandler(self, stream, cmd): + '''A new stream is being played. Just updated the players list with this stream.''' + inst = self.clients[stream.client.path][0] + name = stream.name = cmd.args[0] # store the stream's name + start = cmd.args[1] if len(cmd.args) >= 2 else -2 + if name not in inst.players: + inst.players[name] = [] # initialize the players for this stream name + if stream not in inst.players[name]: # store the stream as players of this name + inst.players[name].append(stream) + path = getfilename(stream.client.path, stream.name, self.root) + if os.path.exists(path): + stream.playfile = FLV().open(path) + multitask.add(stream.playfile.reader(stream)) + if _debug: print 'playing stream=', name, 'start=', start + result = inst.onPlay(stream.client, stream) + response = Command(name='onStatus', id=cmd.id, args=[dict(level='status',code='NetStream.Play.Start', description=stream.name, details=None)]) + yield stream.send(response) + + def mediahandler(self, stream, message): + '''Handle incoming media on the stream, by sending to other stream in this application instance.''' + if stream.client is not None: + inst = self.clients[stream.client.path][0] + result = inst.onPublishData(stream.client, stream, message) + if result: + client = stream.client + for s in (inst.players.get(stream.name, [])): + # if _debug: print 'D', stream.name, s.name + result = inst.onPlayData(s.client, s, message) + if result: + yield s.send(message) + if stream.recordfile is not None: + stream.recordfile.write(message) + +# The main routine to start, run and stop the service +if __name__ == '__main__': + print "optparse" + from optparse import OptionParser + parser = OptionParser() + parser.add_option('-i', '--host', dest='host', default='0.0.0.0', help="listening IP address. Default '0.0.0.0'") + parser.add_option('-p', '--port', dest='port', default=8080, type="int", help='listening port number. Default 8080') + parser.add_option('-r', '--root', dest='root', default='./', help="document root directory. Default './'") + parser.add_option('-d', '--verbose', dest='verbose', default=False, action='store_true', help='enable debug trace') + (options, args) = parser.parse_args() + + _debug = options.verbose + try: + if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port) + agent = HttpServer() + agent.root = options.root + agent.start(options.host, options.port) + if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port) + multitask.run() + except KeyboardInterrupt: + print traceback.print_exc() + pass + except: + print "exception" + print traceback.print_exc() + if _debug: print time.asctime(), 'Flash Server Stops' diff --git a/multitask.py b/multitask.py new file mode 100644 index 0000000..0ccdfd2 --- /dev/null +++ b/multitask.py @@ -0,0 +1,1281 @@ +################################################################################ +# +# Copyright (c) 2007 Christopher J. Stawarz +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +################################################################################ + + + +""" + +Cooperative multitasking and asynchronous I/O using generators + +multitask allows Python programs to use generators (a.k.a. coroutines) +to perform cooperative multitasking and asynchronous I/O. +Applications written using multitask consist of a set of cooperating +tasks that yield to a shared task manager whenever they perform a +(potentially) blocking operation, such as I/O on a socket or getting +data from a queue. The task manager temporarily suspends the task +(allowing other tasks to run in the meantime) and then restarts it +when the blocking operation is complete. Such an approach is suitable +for applications that would otherwise have to use select() and/or +multiple threads to achieve concurrency. + +The functions and classes in the multitask module allow tasks to yield +for I/O operations on sockets and file descriptors, adding/removing +data to/from queues, or sleeping for a specified interval. When +yielding, a task can also specify a timeout. If the operation for +which the task yielded has not completed after the given number of +seconds, the task is restarted, and a Timeout exception is raised at +the point of yielding. + +As a very simple example, here's how one could use multitask to allow +two unrelated tasks to run concurrently: + + >>> def printer(message): + ... while True: + ... print message + ... yield + ... + >>> multitask.add(printer('hello')) + >>> multitask.add(printer('goodbye')) + >>> multitask.run() + hello + goodbye + hello + goodbye + hello + goodbye + [and so on ...] + +For a more useful example, here's how one could implement a +multitasking server that can handle multiple concurrent client +connections: + + def listener(sock): + while True: + conn, address = (yield multitask.accept(sock)) + multitask.add(client_handler(conn)) + + def client_handler(sock): + while True: + request = (yield multitask.recv(sock, 1024)) + if not request: + break + response = handle_request(request) + yield multitask.send(sock, response) + + multitask.add(listener(sock)) + multitask.run() + +Tasks can also yield other tasks, which allows for composition of +tasks and reuse of existing multitasking code. A child task runs +until it either completes or raises an exception. To return output to +its parent, a child task raises StopIteration, passing the output +value(s) to the StopIteration constructor. An unhandled exception +raised within a child task is propagated to its parent. For example: + + >>> def parent(): + ... print (yield return_none()) + ... print (yield return_one()) + ... print (yield return_many()) + ... try: + ... yield raise_exception() + ... except Exception, e: + ... print 'caught exception: %s' % e + ... + >>> def return_none(): + ... yield + ... # do nothing + ... # or return + ... # or raise StopIteration + ... # or raise StopIteration(None) + ... + >>> def return_one(): + ... yield + ... raise StopIteration(1) + ... + >>> def return_many(): + ... yield + ... raise StopIteration(2, 3) # or raise StopIteration((2, 3)) + ... + >>> def raise_exception(): + ... yield + ... raise RuntimeError('foo') + ... + >>> multitask.add(parent()) + >>> multitask.run() + None + 1 + (2, 3) + caught exception: foo + +""" + + +import collections +import errno +from functools import partial +import heapq +import os +import select +import sys +import time +import types + + +__author__ = 'Christopher Stawarz ' +__version__ = '0.2.0' +__revision__ = int('$Revision: 5025 $'.split()[1]) + + + +################################################################################ +# +# Timeout exception type +# +################################################################################ + + + +class Timeout(Exception): + 'Raised in a yielding task when an operation times out' + pass + + + +################################################################################ +# +# _ChildTask class +# +################################################################################ + + + +class _ChildTask(object): + + def __init__(self, parent, task): + self.parent = parent + self.task = task + + def send(self, value): + return self.task.send(value) + + def throw(self, type, value=None, traceback=None): + return self.task.throw(type, value, traceback) + + + +################################################################################ +# +# YieldCondition class +# +################################################################################ + + + +class YieldCondition(object): + + """ + + Base class for objects that are yielded by a task to the task + manager and specify the condition(s) under which the task should + be restarted. Only subclasses of this class are useful to + application code. + + """ + + def __init__(self, timeout=None): + """ + + If timeout is None, the task will be suspended indefinitely + until the condition is met. Otherwise, if the condition is + not met within timeout seconds, a Timeout exception will be + raised in the yielding task. + + """ + + self.task = None + self.handle_expiration = None + + if timeout is None: + self.expiration = None + else: + self.expiration = time.time() + float(timeout) + + def _expires(self): + return (self.expiration is not None) + + + +################################################################################ +# +# _SleepDelay class and related functions +# +################################################################################ + + + +class _SleepDelay(YieldCondition): + + def __init__(self, seconds): + seconds = float(seconds) + if seconds <= 0.0: + raise ValueError("'seconds' must be greater than 0") + super(_SleepDelay, self).__init__(seconds) + + +def sleep(seconds): + """ + + A task that yields the result of this function will be resumed + after the specified number of seconds have elapsed. For example: + + while too_early(): + yield sleep(5) # Sleep for five seconds + do_something() # Done sleeping; get back to work + + """ + + return _SleepDelay(seconds) + + + +################################################################################ +# +# FDReady class and related functions +# +################################################################################ + + + +class FDReady(YieldCondition): + + """ + + A task that yields an instance of this class will be suspended + until a specified file descriptor is ready for I/O. + + """ + + def __init__(self, fd, read=False, write=False, exc=False, timeout=None): + """ + + Resume the yielding task when fd is ready for reading, + writing, and/or "exceptional" condition handling. fd can be + any object accepted by select.select() (meaning an integer or + an object with a fileno() method that returns an integer). + Any exception raised by select() due to fd will be re-raised + in the yielding task. + + If timeout is not None, a Timeout exception will be raised in + the yielding task if fd is not ready after timeout seconds + have elapsed. + + """ + + super(FDReady, self).__init__(timeout) + + self.fd = (fd if _is_file_descriptor(fd) else fd.fileno()) + + if not (read or write or exc): + raise ValueError("'read', 'write', and 'exc' cannot all be false") + self.read = read + self.write = write + self.exc = exc + + def fileno(self): + 'Return the file descriptor on which the yielding task is waiting' + return self.fd + + def _add_to_fdsets(self, read_fds, write_fds, exc_fds): + for add, fdset in ((self.read, read_fds), + (self.write, write_fds), + (self.exc, exc_fds)): + if add: + fdset.add(self) + + def _remove_from_fdsets(self, read_fds, write_fds, exc_fds): + for fdset in (read_fds, write_fds, exc_fds): + fdset.discard(self) + + +def _is_file_descriptor(fd): + return isinstance(fd, (int, long)) + + +def readable(fd, timeout=None): + """ + + A task that yields the result of this function will be resumed + when fd is readable. If timeout is not None, a Timeout exception + will be raised in the yielding task if fd is not readable after + timeout seconds have elapsed. For example: + + try: + yield readable(sock, timeout=5) + data = sock.recv(1024) + except Timeout: + # No data after 5 seconds + + """ + + return FDReady(fd, read=True, timeout=timeout) + + +def writable(fd, timeout=None): + """ + + A task that yields the result of this function will be resumed + when fd is writable. If timeout is not None, a Timeout exception + will be raised in the yielding task if fd is not writable after + timeout seconds have elapsed. For example: + + try: + yield writable(sock, timeout=5) + nsent = sock.send(data) + except Timeout: + # Can't send after 5 seconds + + """ + + return FDReady(fd, write=True, timeout=timeout) + + + +################################################################################ +# +# FDAction class and related functions +# +################################################################################ + + + +class FDAction(FDReady): + + """ + + A task that yields an instance of this class will be suspended + until an I/O operation on a specified file descriptor is complete. + + """ + + def __init__(self, fd, func, args=(), kwargs={}, read=False, write=False, + exc=False): + """ + + Resume the yielding task when fd is ready for reading, + writing, and/or "exceptional" condition handling. fd can be + any object accepted by select.select() (meaning an integer or + an object with a fileno() method that returns an integer). + Any exception raised by select() due to fd will be re-raised + in the yielding task. + + The value of the yield expression will be the result of + calling func with the specified args and kwargs (which + presumably performs a read, write, or other I/O operation on + fd). If func raises an exception, it will be re-raised in the + yielding task. Thus, FDAction is really just a convenient + subclass of FDReady that requests that the task manager + perform an I/O operation on the calling task's behalf. + + If kwargs contains a timeout argument that is not None, a + Timeout exception will be raised in the yielding task if fd is + not ready after timeout seconds have elapsed. + + """ + + timeout = kwargs.pop('timeout', None) + super(FDAction, self).__init__(fd, read, write, exc, timeout) + + self.func = func + self.args = args + self.kwargs = kwargs + + def _eval(self): + return self.func(*(self.args), **(self.kwargs)) + + +def read(fd, *args, **kwargs): + """ + + A task that yields the result of this function will be resumed + when fd is readable, and the value of the yield expression will be + the result of reading from fd. If a timeout keyword is given and + is not None, a Timeout exception will be raised in the yielding + task if fd is not readable after timeout seconds have elapsed. + Other arguments will be passed to the read function (os.read() if + fd is an integer, fd.read() otherwise). For example: + + try: + data = (yield read(fd, 1024, timeout=5)) + except Timeout: + # No data after 5 seconds + + """ + + func = (partial(os.read, fd) if _is_file_descriptor(fd) else fd.read) + return FDAction(fd, func, args, kwargs, read=True) + + +def readline(fd, *args, **kwargs): + """ + + A task that yields the result of this function will be resumed + when fd is readable, and the value of the yield expression will be + the result of reading a line from fd. If a timeout keyword is + given and is not None, a Timeout exception will be raised in the + yielding task if fd is not readable after timeout seconds have + elapsed. Other arguments will be passed to fd.readline(). For + example: + + try: + data = (yield readline(fd, timeout=5)) + except Timeout: + # No data after 5 seconds + + """ + + return FDAction(fd, fd.readline, args, kwargs, read=True) + + +def write(fd, *args, **kwargs): + """ + + A task that yields the result of this function will be resumed + when fd is writable, and the value of the yield expression will be + the result of writing to fd. If a timeout keyword is given and is + not None, a Timeout exception will be raised in the yielding task + if fd is not writable after timeout seconds have elapsed. Other + arguments will be passed to the write function (os.write() if fd + is an integer, fd.write() otherwise). For example: + + try: + nbytes = (yield write(fd, data, timeout=5)) + except Timeout: + # Can't write after 5 seconds + + """ + + func = (partial(os.write, fd) if _is_file_descriptor(fd) else fd.write) + return FDAction(fd, func, args, kwargs, write=True) + + +def accept(sock, *args, **kwargs): + """ + + A task that yields the result of this function will be resumed + when sock is readable, and the value of the yield expression will + be the result of accepting a new connection on sock. If a timeout + keyword is given and is not None, a Timeout exception will be + raised in the yielding task if sock is not readable after timeout + seconds have elapsed. Other arguments will be passed to + sock.accept(). For example: + + try: + conn, address = (yield accept(sock, timeout=5)) + except Timeout: + # No connections after 5 seconds + + """ + + return FDAction(sock, sock.accept, args, kwargs, read=True) + + +def recv(sock, *args, **kwargs): + """ + + A task that yields the result of this function will be resumed + when sock is readable, and the value of the yield expression will + be the result of receiving from sock. If a timeout keyword is + given and is not None, a Timeout exception will be raised in the + yielding task if sock is not readable after timeout seconds have + elapsed. Other arguments will be passed to sock.recv(). For + example: + + try: + data = (yield recv(sock, 1024, timeout=5)) + except Timeout: + # No data after 5 seconds + + """ + + return FDAction(sock, sock.recv, args, kwargs, read=True) + + +def recvfrom(sock, *args, **kwargs): + """ + + A task that yields the result of this function will be resumed + when sock is readable, and the value of the yield expression will + be the result of receiving from sock. If a timeout keyword is + given and is not None, a Timeout exception will be raised in the + yielding task if sock is not readable after timeout seconds have + elapsed. Other arguments will be passed to sock.recvfrom(). For + example: + + try: + data, address = (yield recvfrom(sock, 1024, timeout=5)) + except Timeout: + # No data after 5 seconds + + """ + + return FDAction(sock, sock.recvfrom, args, kwargs, read=True) + + +def send(sock, *args, **kwargs): + """ + + A task that yields the result of this function will be resumed + when sock is writable, and the value of the yield expression will + be the result of sending to sock. If a timeout keyword is given + and is not None, a Timeout exception will be raised in the + yielding task if sock is not writable after timeout seconds have + elapsed. Other arguments will be passed to the sock.send(). For + example: + + try: + nsent = (yield send(sock, data, timeout=5)) + except Timeout: + # Can't send after 5 seconds + + """ + + return FDAction(sock, sock.send, args, kwargs, write=True) + + +def sendto(sock, *args, **kwargs): + """ + + A task that yields the result of this function will be resumed + when sock is writable, and the value of the yield expression will + be the result of sending to sock. If a timeout keyword is given + and is not None, a Timeout exception will be raised in the + yielding task if sock is not writable after timeout seconds have + elapsed. Other arguments will be passed to the sock.sendto(). + For example: + + try: + nsent = (yield sendto(sock, data, address, timeout=5)) + except Timeout: + # Can't send after 5 seconds + + """ + + return FDAction(sock, sock.sendto, args, kwargs, write=True) + + + +################################################################################ +# +# Queue and _QueueAction classes +# +################################################################################ + + + +class Queue(object): + + """ + + A multi-producer, multi-consumer FIFO queue (similar to + Queue.Queue) that can be used for exchanging data between tasks + + """ + + def __init__(self, contents=(), maxsize=0): + """ + + Create a new Queue instance. contents is a sequence (empty by + default) containing the initial contents of the queue. If + maxsize is greater than 0, the queue will hold a maximum of + maxsize items, and put() will block until space is available + in the queue. + + """ + + self.maxsize = int(maxsize) + self._queue = collections.deque(contents) + + def __len__(self): + 'Return the number of items in the queue' + return len(self._queue) + + def _get(self): + return self._queue.popleft() + + def _put(self, item): + self._queue.append(item) + + def empty(self): + 'Return True is the queue is empty, False otherwise' + return (len(self) == 0) + + def full(self): + 'Return True is the queue is full, False otherwise' + return ((len(self) >= self.maxsize) if (self.maxsize > 0) else False) + + def get(self, timeout=None): + """ + + A task that yields the result of this method will be resumed + when an item is available in the queue, and the value of the + yield expression will be the item. If timeout is not None, a + Timeout exception will be raised in the yielding task if an + item is not available after timeout seconds have elapsed. For + example: + + try: + item = (yield queue.get(timeout=5)) + except Timeout: + # No item available after 5 seconds + + """ + + return _QueueAction(self, timeout=timeout) + + def put(self, item, timeout=None): + """ + + A task that yields the result of this method will be resumed + when item has been added to the queue. If timeout is not + None, a Timeout exception will be raised in the yielding task + if no space is available after timeout seconds have elapsed. + For example: + + try: + yield queue.put(item, timeout=5) + except Timeout: + # No space available after 5 seconds + + """ + + return _QueueAction(self, item, timeout=timeout) + + +class _QueueAction(YieldCondition): + + NO_ITEM = object() + + def __init__(self, queue, item=NO_ITEM, timeout=None): + super(_QueueAction, self).__init__(timeout) + if not isinstance(queue, Queue): + raise TypeError("'queue' must be a Queue instance") + self.queue = queue + self.item = item + + +################################################################################ +# +# SmartQueue and _SmartQueueAction classes +# +################################################################################ + + + +class SmartQueue(object): + + """ + + A multi-producer, multi-consumer FIFO queue (similar to + Queue.Queue) that can be used for exchanging data between tasks. + The difference with Queue is that this implements filtering criteria + on get and allows multiple get to be signalled for the same put. + On the downside, this uses list instead of deque and has lower + performance. + + """ + + def __init__(self, contents=(), maxsize=0): + """ + + Create a new Queue instance. contents is a sequence (empty by + default) containing the initial contents of the queue. If + maxsize is greater than 0, the queue will hold a maximum of + maxsize items, and put() will block until space is available + in the queue. + + """ + + self.maxsize = int(maxsize) + self._pending = list(contents) + + def __len__(self): + 'Return the number of items in the queue' + return len(self._pending) + + def _get(self, criteria=None): + #self._pending = filter(lambda x: x[1]<=now, self._pending) # remove expired ones + if criteria: + found = filter(lambda x: criteria(x), self._pending) # check any matching criteria + if found: + self._pending.remove(found[0]) + return found[0] + else: + return None + else: + return self._pending.pop(0) if self._pending else None + + def _put(self, item): + self._pending.append(item) + + def empty(self): + 'Return True is the queue is empty, False otherwise' + return (len(self) == 0) + + def full(self): + 'Return True is the queue is full, False otherwise' + return ((len(self) >= self.maxsize) if (self.maxsize > 0) else False) + + def get(self, timeout=None, criteria=None): + """ + + A task that yields the result of this method will be resumed + when an item is available in the queue and the item matches the + given criteria (a function, usually lambda), and the value of the + yield expression will be the item. If timeout is not None, a + Timeout exception will be raised in the yielding task if an + item is not available after timeout seconds have elapsed. For + example: + + try: + item = (yield queue.get(timeout=5, criteria=lambda x: x.name='kundan')) + except Timeout: + # No item available after 5 seconds + + """ + + return _SmartQueueAction(self, timeout=timeout, criteria=criteria) + + def put(self, item, timeout=None): + """ + + A task that yields the result of this method will be resumed + when item has been added to the queue. If timeout is not + None, a Timeout exception will be raised in the yielding task + if no space is available after timeout seconds have elapsed. + TODO: Otherwise if space is available, the timeout specifies how + long to keep the item in the queue before discarding it if it + is not fetched in a get. In this case it doesnot throw exception. + For example: + + try: + yield queue.put(item, timeout=5) + except Timeout: + # No space available after 5 seconds + + """ + + return _SmartQueueAction(self, item, timeout=timeout) + + +class _SmartQueueAction(YieldCondition): + + NO_ITEM = object() + + def __init__(self, queue, item=NO_ITEM, timeout=None, criteria=None): + super(_SmartQueueAction, self).__init__(timeout) + if not isinstance(queue, SmartQueue): + raise TypeError("'queue' must be a SmartQueue instance") + self.queue = queue + self.item = item + self.criteria = criteria + self.expires = (timeout is not None) and (time.time() + timeout) or 0 + + +################################################################################ +# +# TaskManager class +# +################################################################################ + + + +class TaskManager(object): + + """ + + Engine for running a set of cooperatively-multitasking tasks + within a single Python thread + + """ + + def __init__(self): + """ + + Create a new TaskManager instance. Generally, there will only + be one of these per Python process. If you want to run two + existing instances simultaneously, merge them first, then run + one or the other. + + """ + + self._queue = collections.deque() + self._read_waits = set() + self._write_waits = set() + self._exc_waits = set() + self._queue_waits = collections.defaultdict(self._double_deque) + self._timeouts = [] + + @staticmethod + def _double_deque(): + return (collections.deque(), collections.deque()) + + def merge(self, other): + """ + + Merge this TaskManager with another. After the merge, the two + objects share the same (merged) internal data structures, so + either can be used to manage the combined task set. + + """ + + if not isinstance(other, TaskManager): + raise TypeError("'other' must be a TaskManager instance") + + # Merge the data structures + self._queue.extend(other._queue) + self._read_waits |= other._read_waits + self._write_waits |= other._write_waits + self._exc_waits |= other._exc_waits + self._queue_waits.update(other._queue_waits) + self._timeouts.extend(other._timeouts) + heapq.heapify(self._timeouts) + + # Make other reference the merged data structures. This is + # necessary because other's tasks may reference and use other + # (e.g. to add a new task in response to an event). + other._queue = self._queue + other._read_waits = self._read_waits + other._write_waits = self._write_waits + other._exc_waits = self._exc_waits + other._queue_waits = self._queue_waits + other._timeouts = self._timeouts + + def add(self, task): + 'Add a new task (i.e. a generator instance) to the run queue' + + if not isinstance(task, types.GeneratorType): + raise TypeError("'task' must be a generator") + self._enqueue(task) + + def _enqueue(self, task, input=None, exc_info=()): + self._queue.append((task, input, exc_info)) + + def run(self): + """ + + Call run_next() repeatedly until there are no tasks that are + currently runnable, waiting for I/O, or waiting to time out. + Note that this method can block indefinitely (e.g. if there + are only I/O waits and no timeouts). If this is unacceptable, + use run_next() instead. + + """ + while self.has_runnable() or self.has_io_waits() or self.has_timeouts(): + self.run_next() + + def has_runnable(self): + """ + + Return True is there are runnable tasks in the queue, False + otherwise + + """ + return bool(self._queue) + + def has_io_waits(self): + """ + + Return True is there are tasks waiting for I/O, False + otherwise + + """ + return bool(self._read_waits or self._write_waits or self._exc_waits) + + def has_timeouts(self): + """ + + Return True is there are tasks with pending timeouts, False + otherwise + + """ + return bool(self._timeouts) + + def run_next(self, timeout=None): + """ + + Perform one iteration of the run cycle: check whether any + pending I/O operations can be performed, check whether any + timeouts have expired, then run all currently runnable tasks. + + The timeout argument specifies the maximum time to wait for + some task to become runnable. If timeout is None and there + are no currently runnable tasks, but there are tasks waiting + to perform I/O or time out, then this method will block until + at least one of the waiting tasks becomes runnable. To + prevent this method from blocking indefinitely, use timeout to + specify the maximum number of seconds to wait. + + If there are runnable tasks in the queue when run_next() is + called, then it will check for I/O readiness using a + non-blocking call to select() (i.e. a poll), and only + already-expired timeouts will be handled. This ensures both + that the task manager is never idle when tasks can be run and + that tasks waiting for I/O never starve. + + """ + + if self.has_io_waits(): + self._handle_io_waits(self._fix_run_timeout(timeout)) + + if self.has_timeouts(): + self._handle_timeouts(self._fix_run_timeout(timeout)) + + # Run all tasks currently in the queue + for dummy in xrange(len(self._queue)): + task, input, exc_info = self._queue.popleft() + try: + if exc_info: + output = task.throw(*exc_info) + else: + output = task.send(input) + except StopIteration, e: + if isinstance(task, _ChildTask): + if not e.args: + output = None + elif len(e.args) == 1: + output = e.args[0] + else: + output = e.args + self._enqueue(task.parent, input=output) + except: + if isinstance(task, _ChildTask): + # Propagate exception to parent + self._enqueue(task.parent, exc_info=sys.exc_info()) + else: + # No parent task, so just die + raise + else: + self._handle_task_output(task, output) + + def _fix_run_timeout(self, timeout): + if self.has_runnable(): + # Don't block if there are tasks in the queue + timeout = 0.0 + elif self.has_timeouts(): + # If there are timeouts, block only until the first expiration + expiration_timeout = max(0.0, self._timeouts[0][0] - time.time()) + if (timeout is None) or (timeout > expiration_timeout): + timeout = expiration_timeout + return timeout + + def _handle_io_waits(self, timeout): + # The error handling here is (mostly) borrowed from Twisted + try: + read_ready, write_ready, exc_ready = \ + select.select(self._read_waits, + self._write_waits, + self._exc_waits, + timeout) + except (TypeError, ValueError): + self._remove_bad_file_descriptors() + except (select.error, IOError), err: + if err[0] == errno.EINTR: + pass + elif ((err[0] == errno.EBADF) or + ((sys.platform == 'win32') and + (err[0] == 10038))): # WSAENOTSOCK + self._remove_bad_file_descriptors() + else: + # Not an error we can handle, so die + raise + else: + for fd in set(read_ready + write_ready + exc_ready): + try: + input = (fd._eval() if isinstance(fd, FDAction) else None) + self._enqueue(fd.task, input=input) + except: + self._enqueue(fd.task, exc_info=sys.exc_info()) + fd._remove_from_fdsets(self._read_waits, + self._write_waits, + self._exc_waits) + if fd._expires(): + self._remove_timeout(fd) + + def _remove_bad_file_descriptors(self): + for fd in (self._read_waits | self._write_waits | self._exc_waits): + try: + select.select([fd], [fd], [fd], 0.0) + except: + self._enqueue(fd.task, exc_info=sys.exc_info()) + fd._remove_from_fdsets(self._read_waits, + self._write_waits, + self._exc_waits) + if fd._expires(): + self._remove_timeout(fd) + + def _add_timeout(self, item, handler): + item.handle_expiration = handler + heapq.heappush(self._timeouts, (item.expiration, item)) + + def _remove_timeout(self, item): + self._timeouts.remove((item.expiration, item)) + heapq.heapify(self._timeouts) + + def _handle_timeouts(self, timeout): + if (not self.has_runnable()) and (timeout > 0.0): + time.sleep(timeout) + + current_time = time.time() + + while self._timeouts and (self._timeouts[0][0] <= current_time): + item = heapq.heappop(self._timeouts)[1] + if isinstance(item, _SleepDelay): + self._enqueue(item.task) + else: + self._enqueue(item.task, exc_info=(Timeout,)) + item.handle_expiration() + + def _handle_task_output(self, task, output): + if isinstance(output, types.GeneratorType): + self._enqueue(_ChildTask(task, output)) + elif isinstance(output, YieldCondition): + output.task = task + if isinstance(output, _SleepDelay): + self._add_timeout(output, None) + elif isinstance(output, FDReady): + self._handle_fdready(task, output) + elif isinstance(output, _QueueAction): + self._handle_queue_action(task, output) + elif isinstance(output, _SmartQueueAction): + self._handle_smart_queue_action(task, output) + else: + # Return any other output as input and send task to + # end of queue + self._enqueue(task, input=output) + + def _handle_fdready(self, task, output): + output._add_to_fdsets(self._read_waits, + self._write_waits, + self._exc_waits) + if output._expires(): + self._add_timeout(output, + (lambda: + output._remove_from_fdsets(self._read_waits, + self._write_waits, + self._exc_waits))) + + def _handle_queue_action(self, task, output): + get_waits, put_waits = self._queue_waits[output.queue] + + if output.item is output.NO_ITEM: + # Action is a get + if output.queue.empty(): + get_waits.append(output) + if output._expires(): + self._add_timeout(output, + (lambda: get_waits.remove(output))) + else: + item = output.queue._get() + self._enqueue(task, input=item) + if put_waits: + action = put_waits.popleft() + output.queue._put(action.item) + self._enqueue(action.task) + if action._expires(): + self._remove_timeout(action) + else: + # Action is a put + if output.queue.full(): + put_waits.append(output) + if output._expires(): + self._add_timeout(output, + (lambda: put_waits.remove(output))) + else: + output.queue._put(output.item) + self._enqueue(task) + if get_waits: + action = get_waits.popleft() + item = output.queue._get() + self._enqueue(action.task, input=item) + if action._expires(): + self._remove_timeout(action) + + + def _handle_smart_queue_action(self, task, output): + get_waits, put_waits = self._queue_waits[output.queue] + + if output.item is output.NO_ITEM: + # Action is a get + item = output.queue._get(criteria=output.criteria) + if item is None: + get_waits.append(output) + if output._expires(): + self._add_timeout(output, + (lambda: get_waits.remove(output))) + else: + self._enqueue(task, input=item) + if put_waits: + action = put_waits.popleft() + output.queue._put(action.item) + self._enqueue(action.task) + if action._expires(): + self._remove_timeout(action) + else: + # Action is a put + if output.queue.full(): + put_waits.append(output) + if output._expires(): + self._add_timeout(output, + (lambda: put_waits.remove(output))) + else: + output.queue._put(output.item) + self._enqueue(task) + if get_waits: + actions = [] + for action in get_waits: + item = output.queue._get(criteria=action.criteria) + if item is not None: + actions.append((action, item)) + for action,item in actions: + get_waits.remove(action) + self._enqueue(action.task, input=item) + if action._expires(): + self._remove_timeout(action) + + + +################################################################################ +# +# Default TaskManager instance +# +################################################################################ + + + +_default_task_manager = None + + +def get_default_task_manager(): + 'Return the default TaskManager instance' + global _default_task_manager + if _default_task_manager is None: + _default_task_manager = TaskManager() + return _default_task_manager + + +def add(task): + 'Add a task to the default TaskManager instance' + get_default_task_manager().add(task) + + +def run(): + 'Run the default TaskManager instance' + get_default_task_manager().run() + + + +################################################################################ +# +# Test routine +# +################################################################################ + + + +if __name__ == '__main__': + if sys.platform == 'win32': + # Make sure WSAStartup() is called + import socket + + def printer(name): + for i in xrange(1, 4): + print '%s:\t%d' % (name, i) + yield + + t = TaskManager() + t.add(printer('first')) + t.add(printer('second')) + t.add(printer('third')) + + queue = Queue() + + def receiver(): + print 'receiver started' + print 'receiver received: %s' % (yield queue.get()) + print 'receiver finished' + + def sender(): + print 'sender started' + yield queue.put('from sender') + print 'sender finished' + + def bad_descriptor(): + print 'bad_descriptor running' + try: + yield readable(12) + except: + print 'exception in bad_descriptor:', sys.exc_info()[1] + + def sleeper(): + print 'sleeper started' + yield sleep(1) + print 'sleeper finished' + + def timeout_immediately(): + print 'timeout_immediately running' + try: + yield Queue().get(timeout=0) + except Timeout: + print 'timeout_immediately timed out' + + t2 = TaskManager() + t2.add(receiver()) + t2.add(bad_descriptor()) + t2.add(sender()) + t2.add(sleeper()) + t2.add(timeout_immediately()) + + def parent(): + print 'child returned: %s' % ((yield child()),) + try: + yield child(raise_exc=True) + except: + print 'exception in child:', sys.exc_info()[1] + + def child(raise_exc=False): + yield + if raise_exc: + raise RuntimeError('foo') + raise StopIteration(1, 2, 3) + + t3 = TaskManager() + t3.add(parent()) + + t.merge(t2) + t.merge(t3) + t.run() + + assert not(t.has_runnable() or t.has_io_waits() or t.has_timeouts()) -- 2.30.2