我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.exceptions()。
def new_version_available(self) -> bool: """ Checks the remote manifest to see if a new version is available :return: Whether or not there is a new version available """ if self.manifest.get("remote_manifest", "") == "": return False self.logger.info("Checking for update...") try: manifest = requests.get(self.manifest.get("remote_manifest", "")).json() version_remote = manifest["version"].split(".") version_local = self.manifest["version"].split(".") for i in range(len(version_local)): if int(version_local[i]) < int(get_item_from_list(version_remote, i, "0")): self.logger.info(f"An update is available. Current version is v{self.manifest['version']}, updated version is v{manifest['version']}.") return True self.logger.info("No update required.") return False except requests.exceptions.ConnectionError: self.logger.warning("Failed to check for update.") return False
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)): """Ensure that the IPFS daemon is running via HTTP before proceeding""" client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT) try: # OSError if ipfs not installed, redundant of below # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb')) # ConnectionError/AttributeError if IPFS daemon not running client.id() return True except (ConnectionError, exceptions.AttributeError): logError("Daemon is not running at http://" + hostAndPort) return False except OSError: logError("IPFS is likely not installed. " "See https://ipfs.io/docs/install/") sys.exit() except: logError('Unknown error in retrieving daemon status') logError(sys.exc_info()[0])
def enroll(self, enrollment_id, enrollment_secret): """Enroll a registered user in order to receive a signed X509 certificate Args: enrollment_id (str): The registered ID to use for enrollment enrollment_secret (str): The secret associated with the enrollment ID Returns: PEM-encoded X509 certificate Raises: RequestException: errors in requests.exceptions ValueError: Failed response, json parse error, args missing """ private_key = self._crypto.generate_private_key() csr = self._crypto.generate_csr(private_key, x509.Name( [x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))])) cert = self._ca_client.enroll( enrollment_id, enrollment_secret, csr.public_bytes(Encoding.PEM).decode("utf-8")) return Enrollment(private_key, cert)
def __init__(self, host, username=None, password=None, verify_ssl=True, user_agent=None, cert=None): self.host = host self.username = username self.password = password self.cert = cert self.verify_ssl = verify_ssl if not user_agent: self.user_agent = 'webinspectapi/' + version else: self.user_agent = user_agent if not self.verify_ssl: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Set auth_type based on what's been provided if username is not None: self.auth_type = 'basic' elif cert is not None: self.auth_type = 'certificate' else: self.auth_type = 'unauthenticated'
def find_decl_doc(self, name): raise IncludeError(name) import requests from requests.exceptions import InvalidSchema url = METATAB_ASSETS_URL + name + '.csv' try: # See if it exists online in the official repo r = requests.head(url, allow_redirects=False) if r.status_code == requests.codes.ok: return url except InvalidSchema: pass # It's probably FTP
def test_request_retry(self, mock_request): """Tests that two connection errors will be handled""" class CustomMock(object): i = 0 def connection_error(self, *args, **kwargs): self.i += 1 if self.i < 3: raise requests.exceptions.ConnectionError else: r = requests.Response() r.status_code = 204 return r mock_request.side_effect = CustomMock().connection_error cli = InfluxDBClient(database='db') cli.write_points( self.dummy_points )
def test_request_retry_raises(self, mock_request): """Tests that three connection errors will not be handled""" class CustomMock(object): i = 0 def connection_error(self, *args, **kwargs): self.i += 1 if self.i < 4: raise requests.exceptions.ConnectionError else: r = requests.Response() r.status_code = 200 return r mock_request.side_effect = CustomMock().connection_error cli = InfluxDBClient(database='db') with self.assertRaises(requests.exceptions.ConnectionError): cli.write_points(self.dummy_points)
def test_request_retry(self, mock_request): """Tests that two connection errors will be handled""" class CustomMock(object): i = 0 def connection_error(self, *args, **kwargs): self.i += 1 if self.i < 3: raise requests.exceptions.ConnectionError else: r = requests.Response() r.status_code = 200 return r mock_request.side_effect = CustomMock().connection_error cli = InfluxDBClient(database='db') cli.write_points( self.dummy_points )
def __init__(self, endpoint, resource_uri, optimization=True, max_elems=100, filter_query=None, filter_dialect=None): self.endpoint = endpoint self.resource_uri = resource_uri self.filter_dialect = None self.filter_query = None self.optimization = optimization self.max_elems = max_elems if filter_query is not None: try: self.filter_dialect = FILTER_DIALECT_MAP[filter_dialect] except KeyError: valid_opts = ', '.join(FILTER_DIALECT_MAP) raise exceptions.WSManInvalidFilterDialect( invalid_filter=filter_dialect, supported=valid_opts) self.filter_query = filter_query
def blender_id_server_validate(token) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]: """Validate the auth token with the server. @param token: the authentication token @type token: str @returns: tuple (expiry, error). The expiry is the expiry date of the token if it is valid, else None. The error is None if the token is valid, or an error message when it's invalid. """ import requests import requests.exceptions try: r = requests.post(blender_id_endpoint('u/validate_token'), data={'token': token}, verify=True) except requests.exceptions.RequestException as e: return (str(e), None) if r.status_code != 200: return (None, 'Authentication token invalid') response = r.json() return (response['token_expires'], None)
def make_authenticated_call(method, url, auth_token, data): """Makes a HTTP call authenticated with the OAuth token.""" import requests import requests.exceptions try: r = requests.request(method, blender_id_endpoint(url), data=data, headers={'Authorization': 'Bearer %s' % auth_token}, verify=True) except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as e: raise BlenderIdCommError(str(e)) return r
def track(token, uid, message, name='Message'): try: r = requests.post( TRACK_URL, params={"token": token, "uid": uid, "name": name}, data=json.dumps(message), headers={'Content-type': 'application/json'}, ) return r.json() except requests.exceptions.Timeout: # set up for a retry, or continue in a retry loop return False except (requests.exceptions.RequestException, ValueError) as e: # catastrophic error print(e) return False
def _request(self, method, url, data=None, **kwargs): try: response = self.session.request(method, url, data=data, **kwargs) if response.status_code == 200: return response else: try: data = response.json() except ValueError: data = None raise matchlight.error.APIError( response.status_code, response.reason, data) except requests.exceptions.RetryError: raise matchlight.error.ConnectionError( 'Matchlight API request failed with too many retries') except requests.exceptions.ConnectionError: raise matchlight.error.ConnectionError( 'Matchlight API request failed with connection error')
def register_update(eapi): """Ensures the inbound URL for the BTS is up to date.""" vpn_ip = system_utilities.get_vpn_ip() vpn_status = 'up' if vpn_ip else 'down' # This could fail when offline! Must handle connection exceptions. params = { 'bts_uuid': _get_snowflake(), 'vpn_status': vpn_status, 'vpn_ip': vpn_ip, 'federer_port': '80', } try: d = _send_cloud_req( requests.get, '/bts/register', 'BTS registration', params=params, headers=eapi.auth_header, timeout=11) if 'bts_secret' in d: conf['bts_secret'] = d['bts_secret'] except RegistrationError as ex: logger.error(str(ex))
def __exit__(self, exc, value, traceback): if self._is_reported: # if the user has already manually marked this response as failure or success # we can ignore the default haviour of letting the response code determine the outcome return exc is None if exc: if isinstance(value, ResponseError): self.failure(value) else: return False else: try: self.raise_for_status() except requests.exceptions.RequestException as e: self.failure(e) else: self.success() return True
def put_session(self, session, retry=True): try: r = requests.put(self.url + "/conns", auth=self.auth, json=session, timeout=20.0) except requests.exceptions.RequestException: dbg("Cannot connect to backend") return [] if r.status_code == 200: return r.json() elif retry: msg = r.raw.read() dbg("Backend upload failed, retrying (" + str(msg) + ")") return self.put_session(session, False) else: msg = r.raw.read() raise IOError(msg)
def put_sample_info(self, f, retry=True): try: sha256 = f["sha256"] r = requests.put(self.url + "/sample/" + sha256, auth=self.auth, json=f, timeout=20.0) except requests.exceptions.RequestException: dbg("Cannot connect to backend") return if r.status_code == 200: return r.json() elif retry: msg = r.raw.read() dbg("Backend upload failed, retrying (" + str(msg) + ")") return self.put_sample_info(f, False) else: msg = r.raw.read() raise IOError(msg)
def put_sample(self, data, retry=True): try: r = requests.post(self.url + "/file", auth=self.auth, data=data, timeout=20.0) except requests.exceptions.RequestException: dbg("Cannot connect to backend") return if r.status_code == 200: return elif retry: msg = r.raw.read() dbg("Backend upload failed, retrying (" + str(msg) + ")") return self.put_sample(sha256, filename, False) else: msg = r.raw.read() raise IOError(msg)
def test_reset_no_response_get(self, mock_time, mock_requests, mock_get_url): """ Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get. and: EzOutlet initialized with an IP address and timeout. When: Calling reset(post_reset_delay, ez_outlet_reset_interval). Then: ez_outlet._get_url is called using the IP address with ez_outlet.RESET_URL_PATH. and: requests.get(ez_outlet._get_url's result, timeout, proxies=PROXY_SETTINGS_NONE) is called. """ _ = mock_time # Given self.configure_mock_requests(mock_requests=mock_requests) # When try: self.uut.reset(post_reset_delay=self.post_reset_delay, ez_outlet_reset_interval=self.ez_outlet_reset_interval) except ezoutlet.exceptions.EzOutletError: pass # exception tested elsewhere # Then mock_get_url.assert_called_with(self.hostname, ez_outlet.EzOutlet.RESET_URL_PATH) mock_requests.get.assert_called_once_with(sample_url, timeout=self.timeout, proxies=PROXY_SETTINGS_NONE)
def test_reset_no_response_raise(self, mock_time, mock_requests, mock_get_url): """ Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get. and: EzOutlet initialized with an IP address and timeout. When: Calling reset(post_reset_delay, ez_outlet_reset_interval). Then: reset() raises ez_outlet.EzOutletError, e. and: str(e) == ez_outlet.EzOutlet.NO_RESPONSE_MSG.format(timeout). """ _ = mock_time _ = mock_get_url # Given self.configure_mock_requests(mock_requests=mock_requests) # When with self.assertRaises(ezoutlet.exceptions.EzOutletError) as e: self.uut.reset(post_reset_delay=self.post_reset_delay, ez_outlet_reset_interval=self.ez_outlet_reset_interval) # Then self.assertEqual(str(e.exception), ez_outlet.EzOutlet.NO_RESPONSE_MSG.format(self.timeout))
def test_reset_no_response_no_sleep(self, mock_time, mock_requests, mock_get_url): """ Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get. and: EzOutlet initialized with an IP address and timeout. When: Calling reset(post_reset_delay, ez_outlet_reset_interval). Then: time.sleep(post_reset_delay + ez_outlet_reset_interval) is _not_ called. """ _ = mock_get_url # Given self.configure_mock_requests(mock_requests=mock_requests) # When try: self.uut.reset(post_reset_delay=self.post_reset_delay, ez_outlet_reset_interval=self.ez_outlet_reset_interval) except ezoutlet.exceptions.EzOutletError: pass # exception tested elsewhere # Then mock_time.sleep.assert_not_called() # Suppress since PyCharm doesn't recognize @mock.patch.object # noinspection PyUnresolvedReferences
def test_reset_unexpected_response_get(self, mock_time, mock_requests, mock_get_url): """ Given: Mock requests module configured to give unexpected_response_contents and: EzOutlet initialized with an IP address and timeout. When: Calling reset(post_reset_delay, ez_outlet_reset_interval). Then: ez_outlet._get_url is called using the IP address with ez_outlet.RESET_URL_PATH. and: requests.get(ez_outlet._get_url's result, timeout, proxies=PROXY_SETTINGS_NONE) is called. """ _ = mock_time # Given self.configure_mock_requests(mock_requests=mock_requests) # When try: self.uut.reset(post_reset_delay=self.post_reset_delay, ez_outlet_reset_interval=self.ez_outlet_reset_interval) except ezoutlet.exceptions.EzOutletError: pass # exception tested elsewhere # Then mock_get_url.assert_called_with(self.hostname, ez_outlet.EzOutlet.RESET_URL_PATH) mock_requests.get.assert_called_once_with(sample_url, timeout=self.timeout, proxies=PROXY_SETTINGS_NONE)
def test_reset_unexpected_response_no_sleep(self, mock_time, mock_requests, mock_get_url): """ Given: Mock requests module configured to give unexpected_response_contents and: EzOutlet initialized with an IP address and timeout. When: Calling reset(post_reset_delay, ez_outlet_reset_interval). Then: time.sleep(post_reset_delay + ez_outlet_reset_interval) is _not_ called. """ _ = mock_get_url # Given self.configure_mock_requests(mock_requests=mock_requests) # When try: self.uut.reset(post_reset_delay=self.post_reset_delay, ez_outlet_reset_interval=self.ez_outlet_reset_interval) except ezoutlet.exceptions.EzOutletError: pass # exception tested elsewhere # Then mock_time.sleep.assert_not_called()
def directions(self, proxies=None): """ Returns list with translate directions >>> translate = YandexTranslate("trnsl.1.1.20130421T140201Z.323e508a33e9d84b.f1e0d9ca9bcd0a00b0ef71d82e6cf4158183d09e") >>> directions = translate.directions >>> len(directions) > 0 True """ try: response = requests.get(self.url("langs"), params={"key": self.api_key}, proxies=proxies) except requests.exceptions.ConnectionError: raise YandexTranslateException(self.error_codes[503]) else: response = response.json() status_code = response.get("code", 200) if status_code != 200: raise YandexTranslateException(status_code) return response.get("dirs")
def retrieve_pool(self, uuid): """Retrieve a :class:`qarnot.pool.Pool` from its uuid :param str uuid: Desired pool uuid :rtype: :class:`~qarnot.pool.Pool` :returns: Existing pool defined by the given uuid :raises qarnot.exceptions.MissingPoolException: pool does not exist :raises qarnot.exceptions.UnauthorizedException: invalid credentials :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details """ response = self._get(get_url('pool update', uuid=uuid)) if response.status_code == 404: raise MissingPoolException(response.json()['message']) raise_on_error(response) return Pool.from_json(self, response.json())
def retrieve_task(self, uuid): """Retrieve a :class:`qarnot.task.Task` from its uuid :param str uuid: Desired task uuid :rtype: :class:`~qarnot.task.Task` :returns: Existing task defined by the given uuid :raises qarnot.exceptions.MissingTaskException: task does not exist :raises qarnot.exceptions.UnauthorizedException: invalid credentials :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details """ response = self._get(get_url('task update', uuid=uuid)) if response.status_code == 404: raise MissingTaskException(response.json()['message']) raise_on_error(response) return Task.from_json(self, response.json())
def retrieve_disk(self, uuid): """Retrieve a :class:`~qarnot.disk.Disk` from its uuid :param str uuid: Desired disk uuid :rtype: :class:`~qarnot.disk.Disk` :returns: Existing disk defined by the given uuid :raises ValueError: no such disk :raises qarnot.exceptions.MissingDiskException: disk does not exist :raises qarnot.exceptions.UnauthorizedException: invalid credentials :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details """ response = self._get(get_url('disk info', name=uuid)) if response.status_code == 404: raise MissingDiskException(response.json()['message']) raise_on_error(response) return Disk.from_json(self, response.json())
def create_disk(self, description, lock=False, tags=None): """Create a new :class:`~qarnot.disk.Disk`. :param str description: a short description of the disk :param bool lock: prevents the disk to be removed accidentally :param tags: custom tags :type tags: list(`str`) :rtype: :class:`qarnot.disk.Disk` :returns: The created :class:`~qarnot.disk.Disk`. :raises qarnot.exceptions.MaxDiskException: disk quota reached :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details :raises qarnot.exceptions.UnauthorizedException: invalid credentials """ disk = Disk(self, description, lock=lock, tags=tags) disk.create() return disk
def profiles(self): """Get list of profiles available on the cluster. :rtype: List of :class:`Profile` :raises qarnot.exceptions.UnauthorizedException: invalid credentials :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details """ url = get_url('profiles') response = self._get(url) raise_on_error(response) profiles_list = [] for p in response.json(): url = get_url('profile details', profile=p) response2 = self._get(url) if response2.status_code == 404: continue raise_on_error(response2) profiles_list.append(Profile(response2.json())) return profiles_list
def _handles_connection_error(func): """ Decorator for handling ConnectionErrors raised by the requests library, raises a BrainServerError instead. :param func: the function being decorated """ @functools.wraps(func) def _handler(self, url, *args, **kwargs): try: return func(self, url, *args, **kwargs) except requests.exceptions.ConnectionError as e: message = "Unable to connect to domain: {}".format(url) raise BrainServerError(message) return _handler
def _post(self, url, data=None): """ Issues a POST request. :param url: The URL being posted to. :param data: Any additional data to bundle with the POST, as a dictionary. Defaults to None. """ log.debug('POST to %s with data %s...', url, str(data)) response = requests.post(url=url, headers={'Authorization': self._access_key}, json=data, allow_redirects=False) try: response.raise_for_status() self._raise_on_redirect(response) log.debug('POST %s results:\n%s', url, response.text) return _dict(response) except requests.exceptions.HTTPError as e: _handle_and_raise(response, e)
def _post_raw_data(self, url, data=None, headers=None): """ Issues a POST request without encoding data argument. :param url: The URL being posted to. :param data: Any additional data to bundle with the POST, as raw data to be used as the body. """ log.debug('POST raw data to %s ...', url) headers_out = {'Authorization': self._access_key} if headers: headers_out.update(headers) response = requests.post(url=url, headers=headers_out, data=data, allow_redirects=False) try: response.raise_for_status() self._raise_on_redirect(response) log.debug('POST %s results:\n%s', url, response.text) return _dict(response) except requests.exceptions.HTTPError as e: _handle_and_raise(response, e)
def _put_raw_data(self, url, data=None, headers=None): """ Issues a POST request without encoding data argument. :param url: The URL being posted to. :param data: Any additional data to bundle with the POST, as raw data to be used as the body. """ log.debug('PUT raw data to %s ...', url) headers_out = {'Authorization': self._access_key} if headers: headers_out.update(headers) response = requests.put(url=url, headers=headers_out, data=data, allow_redirects=False) try: response.raise_for_status() self._raise_on_redirect(response) log.debug('PUT %s results:\n%s', url, response.text) return _dict(response) except requests.exceptions.HTTPError as e: _handle_and_raise(response, e)
def _put(self, url, data=None): """ Issues a PUT request. :param url: The URL being PUT to. :param data: Any additional data to bundle with the POST, as a dictionary. Defaults to None. """ log.debug('PUT to %s with data %s...', url, str(data)) response = requests.put(url=url, headers={'Authorization': self._access_key}, json=data, allow_redirects=False) try: response.raise_for_status() self._raise_on_redirect(response) log.debug('PUT %s results:\n%s', url, response.text) return _dict(response) except requests.exceptions.HTTPError as e: _handle_and_raise(response, e)
def _delete(self, url): """ Issues a DELETE request. :param url: The URL to DELETE. """ log.debug('DELETE %s...', url) response = requests.delete(url=url, headers={'Authorization': self._access_key}, allow_redirects=False) try: response.raise_for_status() self._raise_on_redirect(response) log.debug('DELETE %s results:\n%s', url, response.text) return _dict(response) except requests.exceptions.HTTPError as e: _handle_and_raise(response, e)
def directions(self): """ Returns list with translate directions >>> translate = YandexTranslate("trnsl.1.1.20130421T140201Z.323e508a33e9d84b.f1e0d9ca9bcd0a00b0ef71d82e6cf4158183d09e") >>> directions = translate.directions >>> len(directions) > 0 True """ try: response = requests.get(self.url("langs"), params={"key": self.api_key}) except requests.exceptions.ConnectionError: raise YandexTranslateException(self.error_codes[503]) else: response = response.json() status_code = response.get("code", 200) if status_code != 200: raise YandexTranslateException(status_code) return response.get("dirs")
def wait_for_opsman_ready(inst, timeout): addr = get_addr(inst) def should_wait(): try: resp = requests.head( "https://{}/".format(addr), verify=False, timeout=1) return resp.status_code >= 400 except requests.exceptions.RequestException as ex: pass except requests.HTTPError as ex: print ex return True waitFor = wait_util.wait_while(should_wait) waitFor(timeout) # # Main deploy driver function #
def validate_creds(opts): session = Session(profile_name=opts.get('profile_name'), region_name=opts['region']) ec2 = session.resource("ec2") try: ec2.meta.client.describe_id_format() except botocore.exceptions.NoCredentialsError as ex: print "Missing ~/.aws/credentials ? missing profile_name from cfg file" print "http://boto3.readthedocs.org/en/latest/guide/configuration.html" print ex return False try: pivnet.Pivnet(token=opts['PIVNET_TOKEN']) except pivnet.AuthException as ex: print "Get API TOKEN from " print "https://network.pivotal.io/users/dashboard/edit-profile" print ex return False return True
def enroll(self, enrollment_id, enrollment_secret, csr): """Enroll a registered user in order to receive a signed X509 certificate Args: enrollment_id (str): The registered ID to use for enrollment enrollment_secret (str): The secret associated with the enrollment ID csr (bytes or file-like object): PEM-encoded PKCS#10 certificate signing request Returns: PEM-encoded X509 certificate Raises: RequestException: errors in requests.exceptions ValueError: Failed response, json parse error, args missing """ if not enrollment_id or not enrollment_secret or not csr: raise ValueError("Missing required parameters. " "'enrollmentID', 'enrollmentSecret' and 'csr'" " are all required.") if self._ca_name != "": req = { "caName": self._ca_name, "certificate_request": csr } else: req = {"certificate_request": csr} response = self._send_ca_post(path="enroll", json=req, auth=(enrollment_id, enrollment_secret), verify=self._ca_certs_path) _logger.debug("Raw response json {0}".format(response)) if response['success']: return base64.b64decode(response['result']['Cert']) else: raise ValueError("Enrollment failed with errors {0}" .format(response['errors']))
def send(self, data): '''Send the given data to Stitch, retrying on exceptions''' url = self.stitch_url headers = self.headers() response = self.session.post(url, headers=headers, data=data) response.raise_for_status() return response
def request_ovh_client(consumer_key=None): try: import requests import requests.exceptions except ImportError: print('The requests library is needed to perform this command:') print(' pip install requests') print(' apt-get install python3-requests') exit(1) client = Client( endpoint='ovh-eu', application_key='yVPsINnSdHOLTGHf', application_secret='5mVDkHaJw6rp5GYiYBUlgDv1vruRejkl', consumer_key=consumer_key ) if not consumer_key: response = client.request_consumerkey( access_rules=[ {'method': 'GET', 'path': '/paas/monitoring'}, {'method': 'GET', 'path': '/paas/monitoring/*'}, {'method': 'POST', 'path': '/paas/monitoring/*'}, {'method': 'PUT', 'path': '/paas/monitoring/*'}, {'method': 'DELETE', 'path': '/paas/monitoring/*'} ] ) response = response.json() print('New consumer key:', response['consumerKey']) print('Validate it at', response['validationUrl']) input('When you are ready, press Enter') try: client.get('/paas/monitoring') except requests.exceptions.HTTPError as e: if not e.response.status_code == 403: raise return request_ovh_client() return client
def test_requests_error_passthrough(self, mock_requests): mock_requests.exceptions = requests.exceptions mock_requests.request.side_effect = requests.exceptions.RequestException # pylint: disable=protected-access self.assertRaises(requests.exceptions.RequestException, self.net._send_request, 'GET', 'uri')
def test_head_get_post_error_passthrough(self): self.send_request.side_effect = requests.exceptions.RequestException for method in self.net.head, self.net.get: self.assertRaises( requests.exceptions.RequestException, method, 'GET', 'uri') self.assertRaises(requests.exceptions.RequestException, self.net.post, 'uri', obj=self.obj)
def _do_request(self, payload): payload = payload.build() LOG.debug('Sending request to %(endpoint)s: %(payload)s', {'endpoint': self.endpoint, 'payload': payload}) try: resp = requests.post( self.endpoint, auth=requests.auth.HTTPBasicAuth(self.username, self.password), data=payload, # TODO(ifarkas): enable cert verification verify=False) except Exception as e: # This is a hack for handling 'No route to host' ConnectionError, # so that the Traceback would not be shown if e.__class__ == 'requests.exceptions.ConnectionError': LOG.exception('Request failed (ConnectionError)') raise exceptions.WSManRequestFailure() if e.__class__ == 'requests.exceptions.RequestException': LOG.exception('Request failed') raise exceptions.WSManRequestFailure() else: raise LOG.debug('Received response from %(endpoint)s: %(payload)s', {'endpoint': self.endpoint, 'payload': resp.content}) if not resp.ok: raise exceptions.WSManInvalidResponse( status_code=resp.status_code, reason=resp.reason) else: return resp
def invoke(self, resource_uri, method, selectors=None, properties=None, expected_return_value=None): """Invokes a remote WS-Man method :param resource_uri: URI of the resource :param method: name of the method to invoke :param selectors: dictionary of selectors :param properties: dictionary of properties :param expected_return_value: expected return value reported back by the DRAC card. For return value codes check the profile documentation of the resource used in the method call. If not set, return value checking is skipped. :returns: an lxml.etree.Element object of the response received :raises: WSManRequestFailure on request failures :raises: WSManInvalidResponse when receiving invalid response :raises: DRACOperationFailed on error reported back by the DRAC interface :raises: DRACUnexpectedReturnValue on return value mismatch """ if selectors is None: selectors = {} if properties is None: properties = {} resp = super(WSManClient, self).invoke(resource_uri, method, selectors, properties) return_value = utils.find_xml(resp, 'ReturnValue', resource_uri).text if return_value == utils.RET_ERROR: message_elems = utils.find_xml(resp, 'Message', resource_uri, True) messages = [message_elem.text for message_elem in message_elems] raise exceptions.DRACOperationFailed(drac_messages=messages) if (expected_return_value is not None and return_value != expected_return_value): raise exceptions.DRACUnexpectedReturnValue( expected_return_value=expected_return_value, actual_return_value=return_value) return resp
def send_token_to_subclient(webservice_endpoint: str, user_id: str, subclient_token: str, subclient_id: str) -> str: """Sends the subclient-specific token to the subclient. The subclient verifies this token with BlenderID. If it's accepted, the subclient ensures there is a valid user created server-side. The ID of that user is returned. :returns: the user ID at the subclient. """ import requests import urllib.parse url = urllib.parse.urljoin(webservice_endpoint, 'blender_id/store_scst') try: r = requests.post(url, data={'user_id': user_id, 'subclient_id': subclient_id, 'token': subclient_token}, verify=True) r.raise_for_status() except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as e: raise BlenderIdCommError(str(e)) resp = r.json() if resp['status'] != 'success': raise BlenderIdCommError('Error sending subclient-specific token to %s, error is: %s' % (webservice_endpoint, resp)) return resp['subclient_user_id']
def shorten_url(url, botan_token, user_id): try: return requests.get(SHORTENER_URL, params={ 'token': botan_token, 'url': url, 'user_ids': str(user_id), }).text except requests.exceptions: return url
def get_stack(self, stack_name, debug=True): try: stack = self.cf_client.describe_stacks( StackName=stack_name ) return stack['Stacks'][0] except botocore.exceptions.ClientError as cf_error: if debug: self.logger.error(cf_error) except Exception as error: if debug: self.logger.error(error) return None