我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用SimpleXMLRPCServer.SimpleXMLRPCServer()。
def parse_request(self): res = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.parse_request(self) if not res: return res database_name = self.path[1:] if not database_name: self.tryton = {'user': None, 'session': None} return res try: method, up64 = self.headers['Authorization'].split(None, 1) if method.strip().lower() == 'basic': user, password = base64.decodestring(up64).split(':', 1) user_id, session = security.login(database_name, user, password) self.tryton = {'user': user_id, 'session': session} return res except Exception: pass self.send_error(401, 'Unauthorized') self.send_header("WWW-Authenticate", 'Basic realm="Tryton"') return False
def rpcserver(self): """ rpcserver is intended to be launched as a separate thread. This class will launch an RPC server that can pass and receive debugging events to debugging clients. """ try: self.log.info("plugin.http.ObjectEditor: starting XML RPC Server") server = SimpleXMLRPCServer(addr=("localhost", 20758), logRequests=False, allow_none=1) #server.register_function(self.disable, "") server.register_function(self.connect, "connect") server.serve_forever() except: self.log.error("plugin.http.ObjectEditor: rpcserver: error " \ "connecting to remote") self.log.error(sys.exc_info())
def startXMLRPCServer(rpcInstance, address, port, logFilename): rpcInstance.logger = createLogger(logFilename) #server = SimpleXMLRPCServer.SimpleXMLRPCServer((address, port), allow_none=True, logRequests = True) server = VerboseFaultXMLRPCServer((address, port), LoggingSimpleXMLRPCRequestHandler, allow_none=True) print "SimpleXMLRPCServer Started. Listening on %s:%d..." % (address, port, ) server.register_instance(rpcInstance, True) try: while True: server.handle_request() #server.serve_forever() except KeyboardInterrupt: pass except Exception: raise finally: print "Exiting" server.server_close()
def test_basic(self): # check that flag is false by default flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header self.assertEqual(flagval, False) # enable traceback reporting SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True # test a call that shouldn't fail just as a smoke test try: p = xmlrpclib.ServerProxy(URL) self.assertEqual(p.pow(6,8), 6**8) except (xmlrpclib.ProtocolError, socket.error), e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self): # use the broken message class SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass # Check that errors in the server send back exception/traceback # info when flag is set SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True try: p = xmlrpclib.ServerProxy(URL) p.pow(6,8) except (xmlrpclib.ProtocolError, socket.error), e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e) and hasattr(e, "headers"): # We should get error info in the response expected_err = "invalid literal for int() with base 10: 'I am broken'" self.assertEqual(e.headers.get("x-exception"), expected_err) self.assertTrue(e.headers.get("x-traceback") is not None) else: self.fail('ProtocolError not raised')
def main(): autoWait() ea = ScreenEA() server = SimpleXMLRPCServer(("localhost", 9000)) server.register_function(is_connected, "is_connected") server.register_function(wrapper_get_raw, "get_raw") server.register_function(wrapper_get_function, "get_function") server.register_function(wrapper_Heads, "Heads") server.register_function(wrapper_Functions, "Functions") server.register_instance(IDAWrapper()) server.register_function(wrapper_quit, "quit") server.serve_forever() qexit(0)
def __init__(self, addr): SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, addr=addr, logRequests=False, allow_none=True, encoding='UTF-8') self.address, self.port = addr self._start_work_time = datetime.datetime.fromtimestamp(time.time()) self._last_interactive_time = None self.register_introspection_functions() self.register_function(self.get_start_work_time , 'get_start_work_time') self.register_function(self.get_last_interactive_time , 'get_last_interactive_time') self.register_function(self.get_pid, 'get_pid') self.register_function(self.get_log, 'get_log') self.register_function(self.get_binary_data, 'get_binary_data') self.register_function(self.send_binary_data, 'send_binary_data') self.register_function(self.is_working, 'is_working') self.register_function(self.shutdown_driver, 'shutdown_driver') self.register_package() print_msg('Server(%s:%s) - Started' % (self.address, self.port))
def _dispatch(self, method, params): _result = None print_msg('--- --- --- --- --- ---') print_msg('%s <<< %s' % (method, params)) try: _result = SimpleXMLRPCServer.SimpleXMLRPCServer._dispatch(self, method, params) except Exception, e: print_err('%s - Error: %s' % (method, e)) print_err('%s - Traceback:\n%s' % (method, traceback.format_exc())) raise if method in XMLRPCServer.LOG_FILTERED_METHODS: print_msg('%s >>> Binary Data' % method) else: print_msg('%s >>> %s' % (method, _result)) self._last_interactive_time = datetime.datetime.fromtimestamp(time.time()) return _result
def setupXMLRPCSocket(self): """ Listen for XML-RPC requests on a socket """ import SimpleXMLRPCServer import threading host="0.0.0.0" if self.XMLRPCport==0: return try: server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport),allow_none=True) except TypeError: print "2.4 Python did not allow allow_none=True!" server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport)) self.log("Set up XMLRPC Socket on %s port %d"%(host, self.XMLRPCport)) listener_object=listener_instance(self) server.register_instance(listener_object) #start new thread. lt=listener_thread(server, listener_object) lt.start() self.listener_thread=lt self.listener=listener_object return
def __init__(self, host, port, consumer=None): """host: the hostname or IP address of the RPC server port: the port for the RPC server consumer: the consuming object for received messages""" self.host = host self.port = port self.consumer = consumer self.server = SimpleXMLRPCServer((host, port), logRequests=False, allow_none=True) self.server.register_function(self._client_send, "client_send") self.msg = None
def __init__(self, server_address, HandlerClass, logRequests=1): self.handlers = set() SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, server_address, HandlerClass, logRequests)
def server_bind(self): self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
def server_close(self): SimpleXMLRPCServer.SimpleXMLRPCServer.server_close(self) for handler in self.handlers.copy(): self.shutdown_request(handler.request)
def run(self): server = SimpleXMLRPCServer(('127.0.0.1', PSO_APPEND_PORT), allow_none=True) server.register_function(self.pso_append, "append_entry") server.serve_forever()
def run(self): server = SimpleXMLRPCServer(('127.0.0.1', PAXOS_APPEND_PORT), allow_none=True) server.register_function(self.messenger.paxos_append, "append_entry") server.serve_forever()
def rpc_server(): rpcserver = SimpleXMLRPCServer.SimpleXMLRPCServer(('localhost', 8004), logRequests=False) rpcserver.register_function(announce, 'announce') print 'Starting xml rpc server...' rpcserver.serve_forever()
def __init__(self, addr, requestHandler=SecureXMLRPCRequestHandler, logRequests=1): """ This is the exact same code as SimpleXMLRPCServer.__init__ except it calls SecureTCPServer.__init__ instead of plain old TCPServer.__init__ """ self.funcs = {} self.logRequests = logRequests self.instance = None SecureTCPServer.__init__(self, addr, requestHandler)
def run(self): self.server = SimpleXMLRPCServer(('localhost', SIMULATOR_PORT), allow_none = True) self.server.register_instance(self.simulator) log.info("Listening on %s" % SIMULATOR_PORT) self.server.serve_forever()
def setUp(self): # enable traceback reporting SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True self.evt = threading.Event() # start server thread to handle requests serv_args = (self.evt, self.request_count, self.requestHandler) threading.Thread(target=self.threadFunc, args=serv_args).start() # wait for the server to be ready self.evt.wait(10) self.evt.clear()
def tearDown(self): # wait on the server thread to terminate self.evt.wait(10) # disable traceback reporting SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False # NOTE: The tests in SimpleServerTestCase will ignore failures caused by # "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This # condition occurs infrequently on some platforms, frequently on others, and # is apparently caused by using SimpleXMLRPCServer with a non-blocking socket # If the server class is updated at some point in the future to handle this # situation more gracefully, these tests should be modified appropriately.
def test_introspection4(self): # the SimpleXMLRPCServer doesn't support signatures, but # at least check that we can try making the call try: p = xmlrpclib.ServerProxy(URL) divsig = p.system.methodSignature('div') self.assertEqual(divsig, 'signatures not supported') except (xmlrpclib.ProtocolError, socket.error), e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_dotted_attribute(self): # Raises an AttributeError because private methods are not allowed. self.assertRaises(AttributeError, SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add') self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title')) # Get the test to run faster by sending a request with test_simple1. # This avoids waiting for the socket timeout. self.test_simple1()
def test_fail_no_info(self): # use the broken message class SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass try: p = xmlrpclib.ServerProxy(URL) p.pow(6,8) except (xmlrpclib.ProtocolError, socket.error), e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e) and hasattr(e, "headers"): # The two server-side error headers shouldn't be sent back in this case self.assertTrue(e.headers.get("X-exception") is None) self.assertTrue(e.headers.get("X-traceback") is None) else: self.fail('ProtocolError not raised')
def setUp(self): self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()
def main(): try: port = int(sys.argv[1]) #5001 pid = str(sys.argv[2]) #"1" server = SimpleXMLRPCServer(("localhost", port)) print("listen") participant = Participant(('participant'+pid), port) #pid, port server.register_instance(participant) server.serve_forever() except Exception,e: print e.args
def __init__(self, addr): threading.Thread.__init__(self) ip, port = addr self.addr = addr self.server = SimpleXMLRPCServer((ip, port), requestHandler=RequestHandler, logRequests=False, allow_none=True) # self.server.register_introspection_functions()
def init_server(self): self.server = SimpleXMLRPCServer(("localhost", 8000), allow_none=True) self.server.register_function(self.is_even, "is_even") self.server.register_function(self.db.add_condition_order, "add_condition_order") self.server.register_function(self.db.get_todo_orders, "get_todo_orders") self.server.register_function(self.db.get_all_orders, "get_all_orders") self.server.register_function(self.db.cancel_cond_order, "cancel_cond_order")
def tearDown(self): # wait on the server thread to terminate test_support.gc_collect() # to close the active connections self.evt.wait(10) # disable traceback reporting SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False # NOTE: The tests in SimpleServerTestCase will ignore failures caused by # "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This # condition occurs infrequently on some platforms, frequently on others, and # is apparently caused by using SimpleXMLRPCServer with a non-blocking socket # If the server class is updated at some point in the future to handle this # situation more gracefully, these tests should be modified appropriately.