add jsonrpc service and test
authorLuke Kenneth Casson Leighton <lkcl@lkcl.net>
Mon, 12 Jul 2010 22:29:56 +0000 (23:29 +0100)
committerLuke Kenneth Casson Leighton <lkcl@lkcl.net>
Mon, 12 Jul 2010 22:29:56 +0000 (23:29 +0100)
SimpleJSONRPCServer.py [new file with mode: 0644]
httpd.py
jsonapp.py [new file with mode: 0644]
testjsonrpc.py [new file with mode: 0644]

diff --git a/SimpleJSONRPCServer.py b/SimpleJSONRPCServer.py
new file mode 100644 (file)
index 0000000..987503d
--- /dev/null
@@ -0,0 +1,375 @@
+
+# Originally taken from:
+# http://code.activestate.com/recipes/552751/
+# thanks to david decotigny
+
+# Heavily based on the XML-RPC implementation in python.
+# Based on the json-rpc specs: http://json-rpc.org/wiki/specification
+# The main deviation is on the error treatment. The official spec
+# would set the 'error' attribute to a string. This implementation
+# sets it to a dictionary with keys: message/traceback/type
+
+import cjson
+import SocketServer
+import SimpleAppHTTPServer
+#import BaseHTTPServer
+import sys
+import traceback
+try:
+    import fcntl
+except ImportError:
+    fcntl = None
+
+
+###
+### Server code
+###
+import SimpleXMLRPCServer
+
+
+class SimpleJSONRPCRequestHandler(SimpleAppHTTPServer.SimpleAppHTTPRequestHandler):
+    """Simple JSONRPC request handler class and HTTP GET Server
+
+    Handles all HTTP POST requests and attempts to decode them as
+    JSONRPC requests.
+
+    Handles all HTTP GET requests and serves the content from the
+    current directory.
+
+    """
+
+    # Class attribute listing the accessible path components;
+    # paths not on this list will result in a 404 error.
+    rpc_paths = ('/', '/JSON')
+
+    def __init__(self):
+
+        self.funcs = {}
+
+    def is_rpc_path_valid(self):
+        return True
+        if self.rpc_paths:
+            return self.path in self.rpc_paths
+        else:
+            # If .rpc_paths is empty, just assume all paths are legal
+            return True
+
+    def onPOST(self, client, *args):
+        """Handles the HTTP POST request.
+
+        Attempts to interpret all HTTP POST requests as XML-RPC calls,
+        which are forwarded to the server's _dispatch method for handling.
+        """
+        print "onPost", client, args
+        self.client = client
+        self.hr = args[0]
+
+        # Check that the path is legal
+        if not self.is_rpc_path_valid():
+            self.report_404()
+            return
+
+        print "about to read data"
+        try:
+            # Get arguments by reading body of request.
+            # We read this in chunks to avoid straining
+            # socket.read(); around the 10 or 15Mb mark, some platforms
+            # begin to have problems (bug #792570).
+            max_chunk_size = 10*1024*1024
+            size_remaining = int(self.hr.headers["content-length"])
+            L = []
+            print "size_remaining", size_remaining
+            while size_remaining:
+                chunk_size = min(size_remaining, max_chunk_size)
+                data = self.hr.rfile.read(chunk_size)
+                L.append(data)
+                size_remaining -= len(L[-1])
+            data = ''.join(L)
+
+            # In previous versions of SimpleXMLRPCServer, _dispatch
+            # could be overridden in this class, instead of in
+            # SimpleXMLRPCDispatcher. To maintain backwards compatibility,
+            # check to see if a subclass implements _dispatch and dispatch
+            # using that method if present.
+            response = self._marshaled_dispatch(
+                    data, getattr(self, '_dispatch', None)
+                )
+        except: # This should only happen if the module is buggy
+            # internal error, report as HTTP server error
+            self.hr.send_response(500)
+            self.hr.end_headers()
+        else:
+            # got a valid JSONRPC response
+            self.hr.send_response(200)
+            self.hr.send_header("Content-type", "text/x-json")
+            self.hr.send_header("Content-length", str(len(response)))
+            self.hr.end_headers()
+            self.hr.wfile.write(response)
+
+            # shut down the connection
+            #self.wfile.flush()
+            #self.connection.shutdown(1)
+
+    def report_404 (self):
+            # Report a 404 error
+        self.hr.send_response(404)
+        response = 'No such page'
+        self.hr.send_header("Content-type", "text/plain")
+        self.hr.send_header("Content-length", str(len(response)))
+        self.hr.end_headers()
+        self.hr.wfile.write(response)
+        # shut down the connection
+        self.hr.wfile.flush()
+        self.hr.connection.shutdown(1)
+
+    def register_function(self, function, name = None):
+        """Registers a function to respond to XML-RPC requests.
+
+        The optional name argument can be used to set a Unicode name
+        for the function.
+        """
+
+        if name is None:
+            name = function.__name__
+        self.funcs[name] = function
+
+
+    def _marshaled_dispatch(self, data, dispatch_method = None):
+        id = None
+        try:
+            req = cjson.decode(data)
+            method = req['method']
+            params = req['params']
+            id     = req['id']
+
+            if dispatch_method is not None:
+                result = dispatch_method(method, params)
+            else:
+                result = self._dispatch(method, params)
+            response = dict(id=id, result=result, error=None)
+        except:
+            extpe, exv, extrc = sys.exc_info()
+            err = dict(type=str(extpe),
+                       message=str(exv),
+                       traceback=''.join(traceback.format_tb(extrc)))
+            response = dict(id=id, result=None, error=err)
+        try:
+            return cjson.encode(response)
+        except:
+            extpe, exv, extrc = sys.exc_info()
+            err = dict(type=str(extpe),
+                       message=str(exv),
+                       traceback=''.join(traceback.format_tb(extrc)))
+            response = dict(id=id, result=None, error=err)
+            return cjson.encode(response)
+
+    def _dispatch(self, method, params):
+        """Dispatches the XML-RPC method.
+
+        XML-RPC calls are forwarded to a registered function that
+        matches the called XML-RPC method name. If no such function
+        exists then the call is forwarded to the registered instance,
+        if available.
+
+        If the registered instance has a _dispatch method then that
+        method will be called with the name of the XML-RPC method and
+        its parameters as a tuple
+        e.g. instance._dispatch('add',(2,3))
+
+        If the registered instance does not have a _dispatch method
+        then the instance will be searched to find a matching method
+        and, if found, will be called.
+
+        Methods beginning with an '_' are considered private and will
+        not be called.
+        """
+
+        func = self.funcs.get(method, None)
+
+        if func is not None:
+            print "params", params
+            return func(*params)
+        else:
+            raise Exception('method "%s" is not supported' % method)
+
+
+    #def log_request(self, code='-', size='-'):
+    #    """Selectively log an accepted request."""
+
+    #    if self.server.logRequests:
+    #        BaseHTTPServer.BaseHTTPRequestHandler.log_request(self, code, size)
+
+
+
+class SimpleJSONRPCServer:
+    """Simple JSON-RPC server.
+
+    Simple JSON-RPC server that allows functions and a single instance
+    to be installed to handle requests. The default implementation
+    attempts to dispatch JSON-RPC calls to the functions or instance
+    installed in the server. Override the _dispatch method inhereted
+    from SimpleJSONRPCDispatcher to change this behavior.
+    """
+
+    allow_reuse_address = True
+
+    def __init__(self, addr, requestHandler=SimpleJSONRPCRequestHandler,
+                 logRequests=True):
+        self.logRequests = logRequests
+
+        # [Bug #1222790] If possible, set close-on-exec flag; if a
+        # method spawns a subprocess, the subprocess shouldn't have
+        # the listening socket open.
+        if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
+            flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
+            flags |= fcntl.FD_CLOEXEC
+            fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
+
+
+###
+### Client code
+###
+import xmlrpclib
+
+class ResponseError(xmlrpclib.ResponseError):
+    pass
+class Fault(xmlrpclib.ResponseError):
+    pass
+
+def _get_response(file, sock):
+    data = ""
+    while 1:
+        if sock:
+            response = sock.recv(1024)
+        else:
+            response = file.read(1024)
+        if not response:
+            break
+        data += response
+
+    file.close()
+
+    return data
+
+class Transport(xmlrpclib.Transport):
+    def _parse_response(self, file, sock):
+        return _get_response(file, sock)
+
+class SafeTransport(xmlrpclib.SafeTransport):
+    def _parse_response(self, file, sock):
+        return _get_response(file, sock)
+
+class ServerProxy:
+    def __init__(self, uri, id=None, transport=None, use_datetime=0):
+        # establish a "logical" server connection
+
+        # get the url
+        import urllib
+        type, uri = urllib.splittype(uri)
+        if type not in ("http", "https"):
+            raise IOError, "unsupported JSON-RPC protocol"
+        self.__host, self.__handler = urllib.splithost(uri)
+        if not self.__handler:
+            self.__handler = "/JSON"
+
+        if transport is None:
+            if type == "https":
+                transport = SafeTransport(use_datetime=use_datetime)
+            else:
+                transport = Transport(use_datetime=use_datetime)
+
+        self.__transport = transport
+        self.__id        = id
+
+    def __request(self, methodname, params):
+        # call a method on the remote server
+
+        request = cjson.encode(dict(id=self.__id, method=methodname,
+                                    params=params))
+
+        data = self.__transport.request(
+            self.__host,
+            self.__handler,
+            request,
+            verbose=False
+            )
+
+        response = cjson.decode(data)
+
+        if response["id"] != self.__id:
+            raise ResponseError("Invalid request id (is: %s, expected: %s)" \
+                                % (response["id"], self.__id))
+        if response["error"] is not None:
+            raise Fault("JSON Error", response["error"])
+        return response["result"]
+
+    def __repr__(self):
+        return (
+            "<ServerProxy for %s%s>" %
+            (self.__host, self.__handler)
+            )
+
+    __str__ = __repr__
+
+    def __getattr__(self, name):
+        # magic method dispatcher
+        return xmlrpclib._Method(self.__request, name)
+
+
+def jsonremote(service):
+    """Make JSONRPCService a decorator so that you can write :
+    
+    chatservice = SimpleJSONRPCServer()
+
+    @jsonremote(chatservice, 'login')
+    def login(request, user_name):
+        (...)
+    """
+    def remotify(func):
+        if isinstance(service, SimpleJSONRPCServer):
+            service.register_function(func, func.__name__)
+        else:
+            emsg = 'Service "%s" not found' % str(service.__name__)
+            raise NotImplementedError, emsg
+        return func
+    return remotify
+
+
+if __name__ == '__main__':
+    if not len(sys.argv) > 1:
+        import socket
+        print 'Running JSON-RPC server on port 8000'
+        server = SimpleJSONRPCServer(("localhost", 8000))
+        server.register_function(pow)
+        server.register_function(lambda x,y: x+y, 'add')
+        server.register_function(lambda x: x, 'echo')
+        server.serve_forever()
+    else:
+        remote = ServerProxy(sys.argv[1])
+        print 'Using connection', remote
+
+        print repr(remote.add(1, 2))
+        aaa = remote.add
+        print repr(remote.pow(2, 4))
+        print aaa(5, 6)
+
+        try:
+            # Invalid parameters
+            aaa(5, "toto")
+            print "Successful execution of invalid code"
+        except Fault:
+            pass
+
+        try:
+            # Invalid parameters
+            aaa(5, 6, 7)
+            print "Successful execution of invalid code"
+        except Fault:
+            pass
+
+        try:
+            # Invalid method name
+            print repr(remote.powx(2, 4))
+            print "Successful execution of invalid code"
+        except Fault:
+            pass
index e85058e6764183d75d904388313412920f08c6f3..6537ee0e6071358815a45bd18765dd269ff50155 100644 (file)
--- a/httpd.py
+++ b/httpd.py
@@ -71,7 +71,11 @@ from cStringIO import StringIO
 from cookielib import parse_ns_headers, CookieJar
 from Cookie import SimpleCookie
 
+global _debug
 _debug = False
+def set_debug(dbg):
+    global _debug
+    _debug = dbg
 
 class MultitaskHTTPRequestHandler(BaseHTTPRequestHandler):
 
@@ -107,13 +111,7 @@ class SockStream(object):
         self.bytesWritten = self.bytesRead = 0
     
     def close(self):
-        print "sock close"
-        fno = self.sock.fileno()
-        self.sock.close()
-        print "writable?", self.sock, fno
-        yield multitask.writable(fno, timeout=0.1)
-        print "is it?"
-        del self.sock
+        self.sock.shutdown(1) # can't just close it.
         self.sock = None
         
     def readline(self):
@@ -261,7 +259,7 @@ class Protocol(object):
     
     def parseRequests(self):
         '''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
-        self.hr = MultitaskHTTPRequestHandler(self.stream, ("",0,), self.remote)
+        self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
         self.hr.close_connection = 1
         self.cookies = CookieJar()
 
@@ -494,6 +492,21 @@ class Client(Protocol):
             msg.response_cookies['session']['path'] = '/'
             msg.response_cookies['session']['domain'] = self.remote[0]
             msg.response_cookies['session']['version'] = 0
+
+        if msg.headers.has_key('content-length'):
+            max_chunk_size = 10*1024*1024
+            size_remaining = int(msg.headers["content-length"])
+            L = []
+            while size_remaining:
+                chunk_size = min(size_remaining, max_chunk_size)
+                data = (yield self.stream.read(chunk_size))
+                L.append(data)
+                size_remaining -= len(L[-1])
+
+            pos = msg.rfile.tell()
+            msg.rfile.write(''.join(L))
+            msg.rfile.seek(pos)
+
         yield self.server.queue.put((self, msg)) # new connection
 
     def accept(self):
@@ -567,7 +580,7 @@ class Server(object):
             yield self.queue.put((None, None))
             self.queue = None
 
-class App(SimpleAppHTTPRequestHandler):
+class BaseApp(object):
     '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
     count = 0
     def __init__(self):
@@ -580,13 +593,11 @@ class App(SimpleAppHTTPRequestHandler):
     def clients(self):
         '''everytime this property is accessed it returns a new list of clients connected to this instance.'''
         return self._clients[1:] if self._clients is not None else []
-    def onConnect(self, client, *args):
-        if _debug: print self.name, 'onConnect', client.path
-        return True
-    def onDisconnect(self, client):
-        if _debug: print self.name, 'onDisconnect', client.path
     
-class HttpServer(object):
+class App(BaseApp, SimpleAppHTTPRequestHandler):
+    pass
+    
+class HTTPServer(object):
     '''A RTMP server to record and stream Flash video.'''
     def __init__(self):
         '''Construct a new HttpServer. It initializes the local members.'''
@@ -622,7 +633,7 @@ class HttpServer(object):
                 # TODO: we should reject non-localhost client connections.
                 if not client:                # if the server aborted abnormally,
                     break                     #    hence close the listener.
-                if _debug: print 'client connection received', client, args
+                if _debug: print 'client connection received', client, msg
                 # if client.objectEncoding != 0 and client.objectEncoding != 3:
                 if client.objectEncoding != 0:
                     yield client.rejectConnection(reason='Unsupported encoding ' + str(client.objectEncoding) + '. Please use NetConnection.defaultObjectEncoding=ObjectEncoding.AMF0')
@@ -630,18 +641,23 @@ class HttpServer(object):
                 else:
                     print "cookies", str(msg.response_cookies)
                     session = msg.response_cookies['session'].value
-                    name, ignore, scope = msg.path.partition('/')
+                    name = msg.path
+                    print "serverlistener", name
                     if '*' not in self.apps and name not in self.apps:
                         yield client.rejectConnection(reason='Application not found: ' + name)
                     else: # create application instance as needed and add in our list
                         if _debug: print 'name=', name, 'name in apps', str(name in self.apps)
                         app = self.apps[name] if name in self.apps else self.apps['*'] # application class
                         print "clients", self.clients.keys()
-                        if session in self.clients: inst = self.clients[session][0]
-                        else: inst = app()
+                        if session in self.clients:
+                            inst = self.clients[session][0]
+                        else:
+                            inst = app()
+                        msg.server = inst # whew! just in time!
                         try: 
                             methodname = "on%s" % msg.command
-                            method =  getattr(inst, methodname, None)
+                            print methodname, dir(inst)
+                            method = getattr(inst, methodname, None)
                             result = method(client, msg)
                             close_connection = msg.close_connection
                             print "close connection", close_connection
@@ -846,7 +862,7 @@ if __name__ == '__main__':
     _debug = options.verbose
     try:
         if _debug: print time.asctime(), 'Options - %s:%d' % (options.host, options.port)
-        agent = HttpServer()
+        agent = HTTPServer()
         agent.root = options.root
         agent.start(options.host, options.port)
         if _debug: print time.asctime(), 'Flash Server Starts - %s:%d' % (options.host, options.port)
diff --git a/jsonapp.py b/jsonapp.py
new file mode 100644 (file)
index 0000000..92f13e1
--- /dev/null
@@ -0,0 +1,23 @@
+import multitask
+import httpd
+from SimpleJSONRPCServer import SimpleJSONRPCRequestHandler
+from httpd import HTTPServer, App, BaseApp
+
+class MyApp(SimpleJSONRPCRequestHandler, BaseApp):
+    '''An application instance containing any number of streams. Except for constructor all methods are generators.'''
+    count = 0
+    def __init__(self):
+        BaseApp.__init__(self)
+        SimpleJSONRPCRequestHandler.__init__(self)
+
+        self.register_function(self.echo)
+
+    def echo(self, parm):
+        return parm
+
+httpd.set_debug(True)
+agent = HTTPServer()   
+agent.apps = dict({'/json': MyApp, '*': App})
+agent.start()
+multitask.run() 
+
diff --git a/testjsonrpc.py b/testjsonrpc.py
new file mode 100644 (file)
index 0000000..47eae40
--- /dev/null
@@ -0,0 +1,13 @@
+import unittest
+import jsonrpclib
+
+class TestJsolait(unittest.TestCase):
+    def test_echo(self):
+        s = jsonrpclib.ServerProxy("http://127.0.0.1:8080/json", verbose=0)
+        reply = s.echo("hello")
+        print reply
+        #self.assert_(reply["result"] == "foo bar")
+
+
+if __name__=="__main__":
+    unittest.main()