我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xmlrpc.client.ServerProxy()。
def __init__(self, options, provider_options=None): """Initialize Gandi DNS provider.""" super(Provider, self).__init__(options) if provider_options is None: provider_options = {} api_endpoint = provider_options.get('api_endpoint') or 'https://rpc.gandi.net/xmlrpc/' self.apikey = self.options['auth_token'] self.api = xmlrpclib.ServerProxy(api_endpoint, allow_none=True) self.default_ttl = 3600 # self.domain_id is required by test suite self.domain_id = None self.zone_id = None self.domain = self.options['domain'].lower() # Authenicate against provider, # Make any requests required to get the domain's id for this provider, # so it can be used in subsequent calls. Should throw an error if # authentication fails for any reason, or if the domain does not exist.
def __init__(self, algorithm_host, algorithm, algorithm_port): self.algorithm = algorithm self.algorithm_port = algorithm_port if algorithm == "rethinkdb": self.rdb_connection = r.connect('localhost', self.algorithm_port) logging.info("Connection with RethinkDB successful") rethinkdb_setup(self.rdb_connection) self.appendFunction = partial(rethinkdb_append_entry, self.rdb_connection) elif algorithm == "paxos" or algorithm == "pso": if algorithm_host == '\'\'' or algorithm_host == '': algorithm_host = '127.0.0.1' self.cluster_rpc_client = ServerProxy("http://{}:{}".format(algorithm_host, CLUSTER_APPEND_PORT), allow_none=True) self.appendFunction = partial(cluster_append_entry, self.cluster_rpc_client) elif algorithm == "datastore": self.appendFunction = partial(datastore_append_entry) # wrapper used to execute multiple operations and register times
def configure_rethinkdb_gce(cluster, configure_daemons): # RethinkDB requires multiple nodes to join the master # RethinkDB requires every node to provide a set of ports # One port is used to interact with the node running the algorithm (driver_port), the others are framework's ones # On GCE we can always use the same set of ports, because they will be surely available (VM just spun up) print("Trying to contact remote master configure daemon...") # configure_daemon = rpcClient('http://{}:{}'.format(cluster[0]['address'], CONFIGURE_DAEMON_PORT)) print(configure_daemons[0].configure_rethinkdb_master(GCE_RETHINKDB_PORTS['cluster_port'], GCE_RETHINKDB_PORTS['driver_port'], GCE_RETHINKDB_PORTS['http_port'], cluster[0]['address'])) print("Trying to contact remote followers configure daemons...") for (i, node) in enumerate(cluster[1:]): print(configure_daemons[i + 1].configure_rethinkdb_follower(node['id'], cluster[0]['address'], GCE_RETHINKDB_PORTS['cluster_port'], GCE_RETHINKDB_PORTS['cluster_port'], GCE_RETHINKDB_PORTS['driver_port'], GCE_RETHINKDB_PORTS['http_port'])) # main
def test_multicall(self): try: p = xmlrpclib.ServerProxy(URL) multicall = xmlrpclib.MultiCall(p) multicall.add(2,3) multicall.pow(6,8) multicall.div(127,42) add_result, pow_result, div_result = multicall() self.assertEqual(add_result, 2+3) self.assertEqual(pow_result, 6**8) self.assertEqual(div_result, 127//42) except (xmlrpclib.ProtocolError, socket.error) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_non_existing_multicall(self): try: p = xmlrpclib.ServerProxy(URL) multicall = xmlrpclib.MultiCall(p) multicall.this_is_not_exists() result = multicall() # result.results contains; # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' # 'method "this_is_not_exists" is not supported'>}] self.assertEqual(result.results[0]['faultCode'], 1) self.assertEqual(result.results[0]['faultString'], '<class \'Exception\'>:method "this_is_not_exists" ' 'is not supported') except (xmlrpclib.ProtocolError, socket.error) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_two(self): p = xmlrpclib.ServerProxy(URL) #do three requests. self.assertEqual(p.pow(6,8), 6**8) self.assertEqual(p.pow(6,8), 6**8) self.assertEqual(p.pow(6,8), 6**8) p("close")() #they should have all been handled by a single request handler self.assertEqual(len(self.RequestHandler.myRequests), 1) #check that we did at least two (the third may be pending append #due to thread scheduling) self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) #test special attribute access on the serverproxy, through the __call__ #function.
def test_close(self): p = xmlrpclib.ServerProxy(URL) #do some requests with close. self.assertEqual(p.pow(6,8), 6**8) self.assertEqual(p.pow(6,8), 6**8) self.assertEqual(p.pow(6,8), 6**8) p("close")() #this should trigger a new keep-alive request self.assertEqual(p.pow(6,8), 6**8) self.assertEqual(p.pow(6,8), 6**8) self.assertEqual(p.pow(6,8), 6**8) p("close")() #they should have all been two request handlers, each having logged at least #two complete requests self.assertEqual(len(self.RequestHandler.myRequests), 2) self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
def test_basic(self): # check that flag is false by default flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header self.assertEqual(flagval, False) # enable traceback reporting xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True # test a call that shouldn't fail just as a smoke test try: p = xmlrpclib.ServerProxy(URL) self.assertEqual(p.pow(6,8), 6**8) except (xmlrpclib.ProtocolError, socket.error) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self): # use the broken message class xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass # Check that errors in the server send back exception/traceback # info when flag is set xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True try: p = xmlrpclib.ServerProxy(URL) p.pow(6,8) except (xmlrpclib.ProtocolError, socket.error) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e) and hasattr(e, "headers"): # We should get error info in the response expected_err = "invalid literal for int() with base 10: 'I am broken'" self.assertEqual(e.headers.get("X-exception"), expected_err) self.assertTrue(e.headers.get("X-traceback") is not None) else: self.fail('ProtocolError not raised')
def test_current_time(self): # Get the current time from xmlrpc.com. This code exercises # the minimal HTTP functionality in xmlrpclib. self.skipTest("time.xmlrpc.com is unreliable") server = xmlrpclib.ServerProxy("http://time.xmlrpc.com/RPC2") try: t0 = server.currentTime.getCurrentTime() except socket.error as e: self.skipTest("network error: %s" % e) return # Perform a minimal sanity check on the result, just to be sure # the request means what we think it means. t1 = xmlrpclib.DateTime() dt0 = xmlrpclib._datetime_type(t0.value) dt1 = xmlrpclib._datetime_type(t1.value) if dt0 > dt1: delta = dt0 - dt1 else: delta = dt1 - dt0 # The difference between the system time here and the system # time on the server should not be too big. self.assertTrue(delta.days <= 1)
def delete_from_chunk_servers(self, file): if file.type == NodeType.directory: for c in list(file.children): self.delete_from_chunk_servers(c) else: print('Start delete file', file.get_full_path()) for f_path, servers in file.chunks.items(): for cs in servers: try: cl = ServerProxy(cs) print('Send delete', f_path, 'to', cs) cl.delete_chunk(f_path) except: print('Failed to delete', f_path, 'from', cs) # get file\directory info by given path # path format: /my_dir/index/some.file # response format: # { 'status': Status.ok # 'type': NodeType.type # 'path': '/my_dir/index/some.file' - full path for directory # 'size': 2014 - size in bytes # 'chunks': { # '/my_dir/index/some.file_0': cs-2 # }
def get_file_content(self, path): info = self.ns.get_file_info(path) if info['status'] != Status.ok: return info['status'], None chunks = info['chunks'] content = "" data = {} for chunk, addr in chunks.items(): cs = ServerProxy(addr) chunk_data = cs.get_chunk(chunk) index = int(chunk.split("_")[-1]) data[index] = chunk_data i = 0 while i < len(data): content += data[i] i += 1 return Status.ok, content
def get_pypi_info(pkg_name): """get version information from pypi. If <pkg_name> is not found seach pypi. if <pkg_name> matches search results case; use the new value of pkg_name""" client = xmlrpclib.ServerProxy('https://pypi.python.org/pypi') ver_list = client.package_releases(pkg_name) if len(ver_list) == 0: search_list = client.search({'name': pkg_name}) for info in search_list: if pkg_name.lower() == info['name'].lower(): pkg_name = info['name'] break ver_list = client.package_releases(pkg_name) if len(ver_list) == 0: return pkg_name, 'not found', {} version = ver_list[0] xml_info = client.release_data(pkg_name, version) return pkg_name, version, xml_info
def test_multicall(self): try: p = xmlrpclib.ServerProxy(URL) multicall = xmlrpclib.MultiCall(p) multicall.add(2,3) multicall.pow(6,8) multicall.div(127,42) add_result, pow_result, div_result = multicall() self.assertEqual(add_result, 2+3) self.assertEqual(pow_result, 6**8) self.assertEqual(div_result, 127//42) except (xmlrpclib.ProtocolError, OSError) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_non_existing_multicall(self): try: p = xmlrpclib.ServerProxy(URL) multicall = xmlrpclib.MultiCall(p) multicall.this_is_not_exists() result = multicall() # result.results contains; # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' # 'method "this_is_not_exists" is not supported'>}] self.assertEqual(result.results[0]['faultCode'], 1) self.assertEqual(result.results[0]['faultString'], '<class \'Exception\'>:method "this_is_not_exists" ' 'is not supported') except (xmlrpclib.ProtocolError, OSError) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_gzip_decode_limit(self): max_gzip_decode = 20 * 1024 * 1024 data = b'\0' * max_gzip_decode encoded = xmlrpclib.gzip_encode(data) decoded = xmlrpclib.gzip_decode(encoded) self.assertEqual(len(decoded), max_gzip_decode) data = b'\0' * (max_gzip_decode + 1) encoded = xmlrpclib.gzip_encode(data) with self.assertRaisesRegex(ValueError, "max gzipped payload length exceeded"): xmlrpclib.gzip_decode(encoded) xmlrpclib.gzip_decode(encoded, max_decode=-1) #Test special attributes of the ServerProxy object
def test_basic(self): # check that flag is false by default flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header self.assertEqual(flagval, False) # enable traceback reporting xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True # test a call that shouldn't fail just as a smoke test try: p = xmlrpclib.ServerProxy(URL) self.assertEqual(p.pow(6,8), 6**8) except (xmlrpclib.ProtocolError, OSError) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self): # use the broken message class xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass # Check that errors in the server send back exception/traceback # info when flag is set xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True try: p = xmlrpclib.ServerProxy(URL) p.pow(6,8) except (xmlrpclib.ProtocolError, OSError) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e) and hasattr(e, "headers"): # We should get error info in the response expected_err = "invalid literal for int() with base 10: 'I am broken'" self.assertEqual(e.headers.get("X-exception"), expected_err) self.assertTrue(e.headers.get("X-traceback") is not None) else: self.fail('ProtocolError not raised')
def find_packages(self, name, constraint=None): packages = [] if constraint is not None: version_parser = VersionParser() constraint = version_parser.parse_constraints(constraint) with ServerProxy(self._url) as client: versions = client.package_releases(name, True) if constraint: versions = constraint.select([Version.coerce(v) for v in versions]) for version in versions: try: packages.append(Package(name, version)) except ValueError: continue return packages
def search(self, query, mode=0): results = [] search = { 'name': query } if mode == self.SEARCH_FULLTEXT: search['summary'] = query client = ServerProxy(self._url) hits = client.search(search, 'or') for hit in hits: results.append({ 'name': hit['name'], 'description': hit['summary'], 'version': hit['version'] }) return results
def __init__(self, username, password, url, use_mod_auth_kerb=False): if url.startswith('https://'): self._transport = SafeCookieTransport() elif url.startswith('http://'): self._transport = CookieTransport() else: raise TCMSError("Unrecognized URL scheme") self._transport.cookiejar = CookieJar() # print("COOKIES:", self._transport.cookiejar._cookies) self.server = xmlrpclib.ServerProxy( url, transport=self._transport, verbose=VERBOSE, allow_none=1 ) # Login, get a cookie into our cookie jar (login_dict): self.server.Auth.login(username, password) # Record the user ID in case the script wants this # self.user_id = login_dict['id'] # print('Logged in with cookie for user %i' % self.userId) # print("COOKIES:", self._transport.cookiejar._cookies)
def __init__(self, url): if url.startswith('https://'): self._transport = KerbTransport() elif url.startswith('http://'): raise TCMSError("Encrypted https communication required for " "Kerberos authentication.\nURL provided: {0}".format(url)) else: raise TCMSError("Unrecognized URL scheme: {0}".format(url)) self._transport.cookiejar = CookieJar() # print("COOKIES:", self._transport.cookiejar._cookies) self.server = xmlrpclib.ServerProxy( url, transport=self._transport, verbose=VERBOSE, allow_none=1 ) # Login, get a cookie into our cookie jar (login_dict): self.server.Auth.login_krbv()
def _use_pypi_xml_rpc(self): """Schedule analyses of packages based on PyPI index using XML-RPC. https://wiki.python.org/moin/PyPIXmlRpc """ client = xmlrpclib.ServerProxy('https://pypi.python.org/pypi') # get a list of package names packages = sorted(client.list_packages()) for idx, package in enumerate(packages[self.count.min:self.count.max]): releases = client.package_releases(package, True) # True for show_hidden arg self.log.debug("Scheduling #%d. (number versions: %d)", self.count.min + idx, self.nversions) for version in releases[:self.nversions]: self.analyses_selinon_flow(package, version)
def __init__(self, uri, transport=None, encoding=None, verbose=0, allow_none=0): xmlrpcclient.ServerProxy.__init__(self, uri, transport=transport, encoding=encoding, verbose=verbose, allow_none=allow_none) self.transport = transport self._session = None self.last_login_method = None self.last_login_params = None self.API_version = API_VERSION_1_1
def __getattr__(self, name): if name == 'handle': return self._session elif name == 'xenapi': return _Dispatcher(self.API_version, self.xenapi_request, None) elif name.startswith('login') or name.startswith('slave_local'): return lambda *params: self._login(name, params) elif name == 'logout': return _Dispatcher(self.API_version, self.xenapi_request, "logout") else: return xmlrpcclient.ServerProxy.__getattr__(self, name)
def search_package(name): """search package. :param str name: package name :rtype: list :return: package name list """ client = xmlrpc_client.ServerProxy(PYPI_URL) return [pkg for pkg in client.search({'name': name}) if pkg.get('name') == name]
def __init__(self, nodes): for n in nodes: self.nodes_rpc[str(n["id"])] = Server("http://{}:{}".format(n["address"], str(n["rpcPort"]))) for n in nodes: self.nodes_rpc[str(n["id"])].clean_all_qdisc() self.nodes_rpc[str(n["id"])].create_root_qdisc() for n in nodes: if not self.nodes_rpc[str(n["id"])].init_qdisc(): raise Exception("[{}] Error initializing qdiscs".format(NETEM_ERROR)) # modify connection from source to target using netem_command
def create_tasks(session, max_pkgs=MAX_PKGS): client = ServerProxy(PYPI_URL) return [get_package_info(session, pkg_name, downloads) for pkg_name, downloads in client.top_packages(max_pkgs)]
def get_pkg_info(pkg_name, downloads=0): # multiple asyncio jobs can not share a client client = ServerProxy(PYPI_URL) try: release = client.package_releases(pkg_name)[0] except IndexError: # marionette-transport, ll-orasql, and similar print(pkg_name, 'has no releases in PyPI!!') return pkg_info(pkg_name, downloads, False, False, 'PyPI error!!', '') troves = '\n'.join(client.release_data(pkg_name, release)['classifiers']) py2only = py2_only_classifier in troves py3 = py3_classifier in troves url = client.release_data(pkg_name, release)['package_url'] return pkg_info(pkg_name, downloads, py2only, py3, release, url)
def async_main(max_pkgs=MAX_PKGS): # ~ 32 secs for 200 pkgs on my MacBookPro loop = asyncio.get_event_loop() client = ServerProxy(PYPI_URL) futures = [loop.run_in_executor(None, get_pkg_info, pkg_name, downloads) for pkg_name, downloads in client.top_packages(max_pkgs)] return [(yield from fut) for fut in futures]
def VimRpc(address=None): if address is None: address = addr if address is None: print('ERROR No Valid ADDRESS') return -1 _serv_add = 'http://%s:%s' % (address) lorris = ServerProxy(_serv_add, allow_none=True) return lorris
def test_ssl_presence(self): try: import ssl except ImportError: has_ssl = False else: has_ssl = True try: xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() except NotImplementedError: self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") except socket.error: self.assertTrue(has_ssl)
def make_request_and_skipIf(condition, reason): # If we skip the test, we have to make a request because the # the server created in setUp blocks expecting one to come in. if not condition: return lambda func: func def decorator(func): def make_request_and_skip(self): try: xmlrpclib.ServerProxy(URL).my_function() except (xmlrpclib.ProtocolError, socket.error) as e: if not is_unavailable_exception(e): raise raise unittest.SkipTest(reason) return make_request_and_skip return decorator
def test_simple1(self): try: p = xmlrpclib.ServerProxy(URL) self.assertEqual(p.pow(6,8), 6**8) except (xmlrpclib.ProtocolError, socket.error) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_nonascii(self): start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' try: p = xmlrpclib.ServerProxy(URL) self.assertEqual(p.add(start_string, end_string), start_string + end_string) except (xmlrpclib.ProtocolError, socket.error) as e: # ignore failures due to non-blocking socket 'unavailable' errors if not is_unavailable_exception(e): # protocol error; provide additional information in test output self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) # [ch] The test 404 is causing lots of false alarms.