我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用urllib3.exceptions.HTTPError()。
def _post(self, path, op='update', **kwargs): """ Issues a POST request to the MAAS REST API. """ try: response = self.client.post(path, **kwargs) payload = response.read() log.debug("Request %s results: [%s] %s", path, response.getcode(), payload) if response.getcode() == OK: return Response(True, yaml.load(payload)) else: return Response(False, payload) except HTTPError as e: log.error("Error encountered: %s for %s with params %s", str(e), path, str(kwargs)) return Response(False, None) except Exception as e: log.error("Post request raised exception: %s", e) return Response(False, None)
def _put(self, path, **kwargs): """ Issues a PUT request to the MAAS REST API. """ try: response = self.client.put(path, **kwargs) payload = response.read() log.debug("Request %s results: [%s] %s", path, response.getcode(), payload) if response.getcode() == OK: return Response(True, payload) else: return Response(False, payload) except HTTPError as e: log.error("Error encountered: %s with details: %s for %s with " "params %s", e, e.read(), path, str(kwargs)) return Response(False, None) except Exception as e: log.error("Put request raised exception: %s", e) return Response(False, None) ########################################################################### # DNS API - http://maas.ubuntu.com/docs2.0/api.html#dnsresource ###########################################################################
def call_api(api_fn, not_found=None): try: return api_fn() except ApiException as e: if e.status == 0: raise e elif e.status == 401: handle_error("You are not authorized!") elif e.status == 403: handle_http_error(e.body, e.status) elif e.status == 404 and not_found: not_found() else: handle_http_error(e.body, e.status) except LocationValueError as e: handle_error("RiseML is not configured! Please run 'riseml user login' first!") except HTTPError as e: handle_error('Could not connect to API ({host}:{port}{url}) — {exc_type}'.format( host=e.pool.host, port=e.pool.port, url=e.url, exc_type=e.__class__.__name__ ))
def test_broken_response(self): request = self.factory.get('/') urlopen_mock = MagicMock(side_effect=HTTPError()) with patch(URLOPEN, urlopen_mock), self.assertRaises(HTTPError): CustomProxyView.as_view()(request, path='/')
def check_api_config(api_url, api_key, timeout=180): print('Waiting %ss for successful login to %s with API key \'%s\' ...' % (timeout, api_url, api_key)) config = Configuration() old_api_host = config.host old_api_key = config.api_key['api_key'] config.host = api_url config.api_key['api_key'] = api_key api_client = ApiClient() client = AdminApi(api_client) start = time.time() while True: try: cluster_infos = client.get_cluster_infos() cluster_id = get_cluster_id(cluster_infos) print('Success! Cluster ID: %s' % cluster_id) config.api_key['api_key'] = old_api_key config.host = old_api_host return cluster_id except ApiException as exc: if exc.reason == 'UNAUTHORIZED': print(exc.status, 'Unauthorized - wrong api key?') sys.exit(1) elif time.time() - start < timeout: time.sleep(1) continue else: print(exc.status, exc.reason) sys.exit(1) except HTTPError as e: if time.time() - start < timeout: time.sleep(1) continue else: print('Unable to connecto to %s ' % api_url) # all uncaught http errors goes here print(e.reason) sys.exit(1)
def test_resync_http_error(self, m_sleep): self.driver._init_received.set() with patch.object(self.driver, "get_etcd_connection") as m_get: with patch("calico.etcddriver.driver.monotonic_time") as m_time: m_time.side_effect = iter([ 1, 10, RuntimeError() ]) m_get.side_effect = HTTPError() self.assertRaises(RuntimeError, self.driver._resync_and_merge)
def urlopen(self, method, path, **kwargs): """ Make a request using the next server according to the connection strategy, and retries up to max_retries attempts. Ultimately, if the request still failed, we reraise the HTTPError from urllib3. If at the start of the request, there are no known available hosts, we revive all dead connections and forcefully attempt to reconnect. """ # If we're trying to initiate a new connection, and # all connections are already dead, then we should flail # and attempt to connect to one of them if len(self.connections) == 0: self.force_revive() # We don't need strict host checking since our client is enforcing # the correct behavior anyways kwargs.setdefault('assert_same_host', False) try: for _ in xrange(self.max_retries): conn = self.strategy.next(self.connections) try: return conn.urlopen(method, path, **kwargs) except HTTPError: self.mark_dead(conn) if len(self.connections) == 0: raise finally: self.cleanup_dead()
def main(): parser = argparse.ArgumentParser() parser.add_argument('-v', help="show endpoints", action='store_const', const=True) parser.add_argument('--version', '-V', help="show version", action='version', version='RiseML CLI {}'.format(VERSION)) subparsers = parser.add_subparsers() # user ops add_whoami_parser(subparsers) add_user_parser(subparsers) # system ops add_system_parser(subparsers) add_account_parser(subparsers) # worklow ops add_init_parser(subparsers) add_train_parser(subparsers) #add_exec_parser(subparsers) add_monitor_parser(subparsers) #add_deploy_parser(subparsers) add_logs_parser(subparsers) add_kill_parser(subparsers) add_status_parser(subparsers) args = parser.parse_args(sys.argv[1:]) if args.v: print('api_url: %s' % get_api_url()) print('sync_url: %s' % get_sync_url()) print('stream_url: %s' % get_stream_url()) print('git_url: %s' % get_git_url()) if hasattr(args, 'run'): try: args.run(args) except HTTPError as e: # all uncaught http errors goes here handle_error(str(e)) except KeyboardInterrupt: print('\nAborting...') else: parser.print_usage()