Python requests 模块,exceptions() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.exceptions()

项目:Chainmail    作者:Chainmail-Project    | 项目源码 | 文件源码
def new_version_available(self) -> bool:
        """
        Checks the remote manifest to see if a new version is available
        :return: Whether or not there is a new version available
        """
        if self.manifest.get("remote_manifest", "") == "":
            return False
        self.logger.info("Checking for update...")
        try:
            manifest = requests.get(self.manifest.get("remote_manifest", "")).json()
            version_remote = manifest["version"].split(".")
            version_local = self.manifest["version"].split(".")
            for i in range(len(version_local)):
                if int(version_local[i]) < int(get_item_from_list(version_remote, i, "0")):
                    self.logger.info(f"An update is available. Current version is v{self.manifest['version']}, updated version is v{manifest['version']}.")
                    return True
            self.logger.info("No update required.")
            return False
        except requests.exceptions.ConnectionError:
            self.logger.warning("Failed to check for update.")
            return False
项目:ipwb    作者:oduwsdl    | 项目源码 | 文件源码
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
    """Ensure that the IPFS daemon is running via HTTP before proceeding"""
    client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)

    try:
        # OSError if ipfs not installed, redundant of below
        # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))

        # ConnectionError/AttributeError if IPFS daemon not running
        client.id()
        return True
    except (ConnectionError, exceptions.AttributeError):
        logError("Daemon is not running at http://" + hostAndPort)
        return False
    except OSError:
        logError("IPFS is likely not installed. "
                 "See https://ipfs.io/docs/install/")
        sys.exit()
    except:
        logError('Unknown error in retrieving daemon status')
        logError(sys.exc_info()[0])
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def enroll(self, enrollment_id, enrollment_secret):
        """Enroll a registered user in order to receive a signed X509 certificate

        Args:
            enrollment_id (str): The registered ID to use for enrollment
            enrollment_secret (str): The secret associated with the
                                     enrollment ID

        Returns: PEM-encoded X509 certificate

        Raises:
            RequestException: errors in requests.exceptions
            ValueError: Failed response, json parse error, args missing

        """
        private_key = self._crypto.generate_private_key()
        csr = self._crypto.generate_csr(private_key, x509.Name(
            [x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))]))
        cert = self._ca_client.enroll(
            enrollment_id, enrollment_secret,
            csr.public_bytes(Encoding.PEM).decode("utf-8"))

        return Enrollment(private_key, cert)
项目:webinspectapi    作者:target    | 项目源码 | 文件源码
def __init__(self, host, username=None, password=None, verify_ssl=True, user_agent=None, cert=None):

        self.host = host
        self.username = username
        self.password = password
        self.cert = cert
        self.verify_ssl = verify_ssl

        if not user_agent:
            self.user_agent = 'webinspectapi/' + version
        else:
            self.user_agent = user_agent

        if not self.verify_ssl:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # Set auth_type based on what's been provided
        if username is not None:
            self.auth_type = 'basic'
        elif cert is not None:
            self.auth_type = 'certificate'
        else:
            self.auth_type = 'unauthenticated'
项目:metatab    作者:Metatab    | 项目源码 | 文件源码
def find_decl_doc(self, name):


        raise IncludeError(name)

        import requests
        from requests.exceptions import InvalidSchema
        url = METATAB_ASSETS_URL + name + '.csv'
        try:
            # See if it exists online in the official repo
            r = requests.head(url, allow_redirects=False)
            if r.status_code == requests.codes.ok:
                return url

        except InvalidSchema:
            pass  # It's probably FTP
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def test_request_retry(self, mock_request):
        """Tests that two connection errors will be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 3:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 204
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')
        cli.write_points(
            self.dummy_points
        )
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def test_request_retry_raises(self, mock_request):
        """Tests that three connection errors will not be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 4:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')

        with self.assertRaises(requests.exceptions.ConnectionError):
            cli.write_points(self.dummy_points)
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def test_request_retry(self, mock_request):
        """Tests that two connection errors will be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 3:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')
        cli.write_points(
            self.dummy_points
        )
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def test_request_retry_raises(self, mock_request):
        """Tests that three connection errors will not be handled"""

        class CustomMock(object):
            i = 0

            def connection_error(self, *args, **kwargs):
                self.i += 1

                if self.i < 4:
                    raise requests.exceptions.ConnectionError
                else:
                    r = requests.Response()
                    r.status_code = 200
                    return r

        mock_request.side_effect = CustomMock().connection_error

        cli = InfluxDBClient(database='db')

        with self.assertRaises(requests.exceptions.ConnectionError):
            cli.write_points(self.dummy_points)
项目:python-wsmanclient    作者:intelsdi-x    | 项目源码 | 文件源码
def __init__(self, endpoint, resource_uri, optimization=True,
                 max_elems=100, filter_query=None, filter_dialect=None):
        self.endpoint = endpoint
        self.resource_uri = resource_uri
        self.filter_dialect = None
        self.filter_query = None
        self.optimization = optimization
        self.max_elems = max_elems

        if filter_query is not None:
            try:
                self.filter_dialect = FILTER_DIALECT_MAP[filter_dialect]
            except KeyError:
                valid_opts = ', '.join(FILTER_DIALECT_MAP)
                raise exceptions.WSManInvalidFilterDialect(
                    invalid_filter=filter_dialect, supported=valid_opts)

            self.filter_query = filter_query
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def blender_id_server_validate(token) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]:
    """Validate the auth token with the server.

    @param token: the authentication token
    @type token: str
    @returns: tuple (expiry, error).
        The expiry is the expiry date of the token if it is valid, else None.
        The error is None if the token is valid, or an error message when it's invalid.
    """

    import requests
    import requests.exceptions

    try:
        r = requests.post(blender_id_endpoint('u/validate_token'),
                          data={'token': token}, verify=True)
    except requests.exceptions.RequestException as e:
        return (str(e), None)

    if r.status_code != 200:
        return (None, 'Authentication token invalid')

    response = r.json()
    return (response['token_expires'], None)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def make_authenticated_call(method, url, auth_token, data):
    """Makes a HTTP call authenticated with the OAuth token."""

    import requests
    import requests.exceptions

    try:
        r = requests.request(method,
                             blender_id_endpoint(url),
                             data=data,
                             headers={'Authorization': 'Bearer %s' % auth_token},
                             verify=True)
    except (requests.exceptions.HTTPError,
            requests.exceptions.ConnectionError) as e:
        raise BlenderIdCommError(str(e))

    return r
项目:flibusta_bot    作者:Kurbezz    | 项目源码 | 文件源码
def track(token, uid, message, name='Message'):
    try:
        r = requests.post(
            TRACK_URL,
            params={"token": token, "uid": uid, "name": name},
            data=json.dumps(message),
            headers={'Content-type': 'application/json'},
        )
        return r.json()
    except requests.exceptions.Timeout:
        # set up for a retry, or continue in a retry loop
        return False
    except (requests.exceptions.RequestException, ValueError) as e:
        # catastrophic error
        print(e)
        return False
项目:python-matchlightsdk    作者:TerbiumLabs    | 项目源码 | 文件源码
def _request(self, method, url, data=None, **kwargs):
        try:
            response = self.session.request(method, url, data=data, **kwargs)
            if response.status_code == 200:
                return response
            else:
                try:
                    data = response.json()
                except ValueError:
                    data = None
                raise matchlight.error.APIError(
                    response.status_code, response.reason, data)
        except requests.exceptions.RetryError:
            raise matchlight.error.ConnectionError(
                'Matchlight API request failed with too many retries')
        except requests.exceptions.ConnectionError:
            raise matchlight.error.ConnectionError(
                'Matchlight API request failed with connection error')
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def register_update(eapi):
    """Ensures the inbound URL for the BTS is up to date."""
    vpn_ip = system_utilities.get_vpn_ip()
    vpn_status = 'up' if vpn_ip else 'down'

    # This could fail when offline! Must handle connection exceptions.
    params = {
        'bts_uuid': _get_snowflake(),
        'vpn_status': vpn_status,
        'vpn_ip': vpn_ip,
        'federer_port': '80',
    }
    try:
        d = _send_cloud_req(
            requests.get,
            '/bts/register',
            'BTS registration',
            params=params,
            headers=eapi.auth_header,
            timeout=11)
        if 'bts_secret' in d:
            conf['bts_secret'] = d['bts_secret']
    except RegistrationError as ex:
        logger.error(str(ex))
项目:aiolocust    作者:kpidata    | 项目源码 | 文件源码
def __exit__(self, exc, value, traceback):
        if self._is_reported:
            # if the user has already manually marked this response as failure or success
            # we can ignore the default haviour of letting the response code determine the outcome
            return exc is None

        if exc:
            if isinstance(value, ResponseError):
                self.failure(value)
            else:
                return False
        else:
            try:
                self.raise_for_status()
            except requests.exceptions.RequestException as e:
                self.failure(e)
            else:
                self.success()
        return True
项目:telnet-iot-honeypot    作者:Phype    | 项目源码 | 文件源码
def put_session(self, session, retry=True):

        try:
            r = requests.put(self.url + "/conns", auth=self.auth, json=session, timeout=20.0)
        except requests.exceptions.RequestException:
            dbg("Cannot connect to backend")
            return []

        if r.status_code == 200:
            return r.json()
        elif retry:
            msg = r.raw.read()
            dbg("Backend upload failed, retrying (" + str(msg) + ")")
            return self.put_session(session, False)
        else:
            msg = r.raw.read()
            raise IOError(msg)
项目:telnet-iot-honeypot    作者:Phype    | 项目源码 | 文件源码
def put_sample_info(self, f, retry=True):
        try:
            sha256 = f["sha256"]
            r = requests.put(self.url + "/sample/" + sha256, auth=self.auth, json=f, timeout=20.0)
        except requests.exceptions.RequestException:
            dbg("Cannot connect to backend")
            return

        if r.status_code == 200:
            return r.json()
        elif retry:
            msg = r.raw.read()
            dbg("Backend upload failed, retrying (" + str(msg) + ")")
            return self.put_sample_info(f, False)
        else:
            msg = r.raw.read()
            raise IOError(msg)
项目:telnet-iot-honeypot    作者:Phype    | 项目源码 | 文件源码
def put_sample(self, data, retry=True):

        try:
            r = requests.post(self.url + "/file", auth=self.auth, data=data, timeout=20.0)
        except requests.exceptions.RequestException:
            dbg("Cannot connect to backend")
            return

        if r.status_code == 200:
            return
        elif retry:
            msg = r.raw.read()
            dbg("Backend upload failed, retrying (" + str(msg) + ")")
            return self.put_sample(sha256, filename, False)
        else:
            msg = r.raw.read()
            raise IOError(msg)
项目:ezoutlet    作者:jtpereyda    | 项目源码 | 文件源码
def test_reset_no_response_get(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: ez_outlet._get_url is called using the IP address with ez_outlet.RESET_URL_PATH.
         and: requests.get(ez_outlet._get_url's result, timeout, proxies=PROXY_SETTINGS_NONE) is called.
        """
        _ = mock_time

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        try:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)
        except ezoutlet.exceptions.EzOutletError:
            pass  # exception tested elsewhere

        # Then
        mock_get_url.assert_called_with(self.hostname, ez_outlet.EzOutlet.RESET_URL_PATH)
        mock_requests.get.assert_called_once_with(sample_url, timeout=self.timeout, proxies=PROXY_SETTINGS_NONE)
项目:ezoutlet    作者:jtpereyda    | 项目源码 | 文件源码
def test_reset_no_response_raise(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: reset() raises ez_outlet.EzOutletError, e.
         and: str(e) == ez_outlet.EzOutlet.NO_RESPONSE_MSG.format(timeout).
        """
        _ = mock_time
        _ = mock_get_url

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        with self.assertRaises(ezoutlet.exceptions.EzOutletError) as e:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)

        # Then
        self.assertEqual(str(e.exception),
                         ez_outlet.EzOutlet.NO_RESPONSE_MSG.format(self.timeout))
项目:ezoutlet    作者:jtpereyda    | 项目源码 | 文件源码
def test_reset_no_response_no_sleep(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests configured to raise requests.exceptions.ConnectTimeout on get.
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: time.sleep(post_reset_delay + ez_outlet_reset_interval) is _not_ called.
        """
        _ = mock_get_url

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        try:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)
        except ezoutlet.exceptions.EzOutletError:
            pass  # exception tested elsewhere

        # Then
        mock_time.sleep.assert_not_called()


# Suppress since PyCharm doesn't recognize @mock.patch.object
# noinspection PyUnresolvedReferences
项目:ezoutlet    作者:jtpereyda    | 项目源码 | 文件源码
def test_reset_unexpected_response_get(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests module configured to give unexpected_response_contents
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: ez_outlet._get_url is called using the IP address with ez_outlet.RESET_URL_PATH.
         and: requests.get(ez_outlet._get_url's result, timeout, proxies=PROXY_SETTINGS_NONE) is called.
        """
        _ = mock_time

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        try:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)
        except ezoutlet.exceptions.EzOutletError:
            pass  # exception tested elsewhere

        # Then
        mock_get_url.assert_called_with(self.hostname, ez_outlet.EzOutlet.RESET_URL_PATH)
        mock_requests.get.assert_called_once_with(sample_url, timeout=self.timeout, proxies=PROXY_SETTINGS_NONE)
项目:ezoutlet    作者:jtpereyda    | 项目源码 | 文件源码
def test_reset_unexpected_response_no_sleep(self, mock_time, mock_requests, mock_get_url):
        """
        Given: Mock requests module configured to give unexpected_response_contents
          and: EzOutlet initialized with an IP address and timeout.
        When: Calling reset(post_reset_delay, ez_outlet_reset_interval).
        Then: time.sleep(post_reset_delay + ez_outlet_reset_interval) is _not_ called.
        """
        _ = mock_get_url

        # Given
        self.configure_mock_requests(mock_requests=mock_requests)

        # When
        try:
            self.uut.reset(post_reset_delay=self.post_reset_delay,
                           ez_outlet_reset_interval=self.ez_outlet_reset_interval)
        except ezoutlet.exceptions.EzOutletError:
            pass  # exception tested elsewhere

        # Then
        mock_time.sleep.assert_not_called()
项目:Facebook-Bot    作者:codelovin    | 项目源码 | 文件源码
def directions(self, proxies=None):
    """
    Returns list with translate directions
    >>> translate = YandexTranslate("trnsl.1.1.20130421T140201Z.323e508a33e9d84b.f1e0d9ca9bcd0a00b0ef71d82e6cf4158183d09e")
    >>> directions = translate.directions
    >>> len(directions) > 0
    True
    """
    try:
      response = requests.get(self.url("langs"), params={"key": self.api_key}, proxies=proxies)
    except requests.exceptions.ConnectionError:
      raise YandexTranslateException(self.error_codes[503])
    else:
      response = response.json()
    status_code = response.get("code", 200)
    if status_code != 200:
      raise YandexTranslateException(status_code)
    return response.get("dirs")
项目:qarnot-sdk-python    作者:qarnot    | 项目源码 | 文件源码
def retrieve_pool(self, uuid):
        """Retrieve a :class:`qarnot.pool.Pool` from its uuid

        :param str uuid: Desired pool uuid
        :rtype: :class:`~qarnot.pool.Pool`
        :returns: Existing pool defined by the given uuid
        :raises qarnot.exceptions.MissingPoolException: pool does not exist
        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        """

        response = self._get(get_url('pool update', uuid=uuid))
        if response.status_code == 404:
            raise MissingPoolException(response.json()['message'])
        raise_on_error(response)
        return Pool.from_json(self, response.json())
项目:qarnot-sdk-python    作者:qarnot    | 项目源码 | 文件源码
def retrieve_task(self, uuid):
        """Retrieve a :class:`qarnot.task.Task` from its uuid

        :param str uuid: Desired task uuid
        :rtype: :class:`~qarnot.task.Task`
        :returns: Existing task defined by the given uuid
        :raises qarnot.exceptions.MissingTaskException: task does not exist
        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        """

        response = self._get(get_url('task update', uuid=uuid))
        if response.status_code == 404:
            raise MissingTaskException(response.json()['message'])
        raise_on_error(response)
        return Task.from_json(self, response.json())
项目:qarnot-sdk-python    作者:qarnot    | 项目源码 | 文件源码
def retrieve_disk(self, uuid):
        """Retrieve a :class:`~qarnot.disk.Disk` from its uuid

        :param str uuid: Desired disk uuid
        :rtype: :class:`~qarnot.disk.Disk`
        :returns: Existing disk defined by the given uuid
        :raises ValueError: no such disk
        :raises qarnot.exceptions.MissingDiskException: disk does not exist
        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        """

        response = self._get(get_url('disk info', name=uuid))
        if response.status_code == 404:
            raise MissingDiskException(response.json()['message'])
        raise_on_error(response)
        return Disk.from_json(self, response.json())
项目:qarnot-sdk-python    作者:qarnot    | 项目源码 | 文件源码
def create_disk(self, description, lock=False, tags=None):
        """Create a new :class:`~qarnot.disk.Disk`.

        :param str description: a short description of the disk
        :param bool lock: prevents the disk to be removed accidentally
        :param tags: custom tags
        :type tags: list(`str`)

        :rtype: :class:`qarnot.disk.Disk`
        :returns: The created :class:`~qarnot.disk.Disk`.

        :raises qarnot.exceptions.MaxDiskException: disk quota reached
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        """
        disk = Disk(self, description, lock=lock, tags=tags)
        disk.create()
        return disk
项目:qarnot-sdk-python    作者:qarnot    | 项目源码 | 文件源码
def profiles(self):
        """Get list of profiles available on the cluster.

        :rtype: List of :class:`Profile`

        :raises qarnot.exceptions.UnauthorizedException: invalid credentials
        :raises qarnot.exceptions.QarnotGenericException: API general error, see message for details
        """

        url = get_url('profiles')
        response = self._get(url)
        raise_on_error(response)
        profiles_list = []
        for p in response.json():
            url = get_url('profile details', profile=p)
            response2 = self._get(url)
            if response2.status_code == 404:
                continue
            raise_on_error(response2)
            profiles_list.append(Profile(response2.json()))
        return profiles_list
项目:bonsai-cli    作者:BonsaiAI    | 项目源码 | 文件源码
def _handles_connection_error(func):
    """
    Decorator for handling ConnectionErrors raised by the requests
    library, raises a BrainServerError instead.

    :param func: the function being decorated
    """
    @functools.wraps(func)
    def _handler(self, url, *args, **kwargs):
        try:
            return func(self, url, *args, **kwargs)
        except requests.exceptions.ConnectionError as e:
            message = "Unable to connect to domain: {}".format(url)
            raise BrainServerError(message)

    return _handler
项目:bonsai-cli    作者:BonsaiAI    | 项目源码 | 文件源码
def _post(self, url, data=None):
        """
        Issues a POST request.
        :param url: The URL being posted to.
        :param data: Any additional data to bundle with the POST, as a
                     dictionary. Defaults to None.
        """
        log.debug('POST to %s with data %s...', url, str(data))
        response = requests.post(url=url,
                                 headers={'Authorization': self._access_key},
                                 json=data,
                                 allow_redirects=False)
        try:
            response.raise_for_status()
            self._raise_on_redirect(response)
            log.debug('POST %s results:\n%s', url, response.text)
            return _dict(response)
        except requests.exceptions.HTTPError as e:
            _handle_and_raise(response, e)
项目:bonsai-cli    作者:BonsaiAI    | 项目源码 | 文件源码
def _post_raw_data(self, url, data=None, headers=None):
        """
        Issues a POST request without encoding data argument.
        :param url: The URL being posted to.
        :param data: Any additional data to bundle with the POST, as raw data
                     to be used as the body.
        """
        log.debug('POST raw data to %s ...', url)
        headers_out = {'Authorization': self._access_key}
        if headers:
            headers_out.update(headers)

        response = requests.post(url=url,
                                 headers=headers_out,
                                 data=data,
                                 allow_redirects=False)

        try:
            response.raise_for_status()
            self._raise_on_redirect(response)
            log.debug('POST %s results:\n%s', url, response.text)
            return _dict(response)
        except requests.exceptions.HTTPError as e:
            _handle_and_raise(response, e)
项目:bonsai-cli    作者:BonsaiAI    | 项目源码 | 文件源码
def _put_raw_data(self, url, data=None, headers=None):
        """
        Issues a POST request without encoding data argument.
        :param url: The URL being posted to.
        :param data: Any additional data to bundle with the POST, as raw data
                     to be used as the body.
        """
        log.debug('PUT raw data to %s ...', url)
        headers_out = {'Authorization': self._access_key}
        if headers:
            headers_out.update(headers)

        response = requests.put(url=url,
                                headers=headers_out,
                                data=data,
                                allow_redirects=False)

        try:
            response.raise_for_status()
            self._raise_on_redirect(response)
            log.debug('PUT %s results:\n%s', url, response.text)
            return _dict(response)
        except requests.exceptions.HTTPError as e:
            _handle_and_raise(response, e)
项目:bonsai-cli    作者:BonsaiAI    | 项目源码 | 文件源码
def _put(self, url, data=None):
        """
        Issues a PUT request.
        :param url: The URL being PUT to.
        :param data: Any additional data to bundle with the POST, as a
                     dictionary. Defaults to None.
        """
        log.debug('PUT to %s with data %s...', url, str(data))
        response = requests.put(url=url,
                                headers={'Authorization': self._access_key},
                                json=data,
                                allow_redirects=False)
        try:
            response.raise_for_status()
            self._raise_on_redirect(response)
            log.debug('PUT %s results:\n%s', url, response.text)
            return _dict(response)
        except requests.exceptions.HTTPError as e:
            _handle_and_raise(response, e)
项目:bonsai-cli    作者:BonsaiAI    | 项目源码 | 文件源码
def _delete(self, url):
        """
        Issues a DELETE request.
        :param url: The URL to DELETE.
        """
        log.debug('DELETE %s...', url)
        response = requests.delete(url=url,
                                   headers={'Authorization': self._access_key},
                                   allow_redirects=False)
        try:
            response.raise_for_status()
            self._raise_on_redirect(response)
            log.debug('DELETE %s results:\n%s', url, response.text)
            return _dict(response)
        except requests.exceptions.HTTPError as e:
            _handle_and_raise(response, e)
项目:Genum    作者:la0rg    | 项目源码 | 文件源码
def directions(self):
        """
        Returns list with translate directions
        >>> translate = YandexTranslate("trnsl.1.1.20130421T140201Z.323e508a33e9d84b.f1e0d9ca9bcd0a00b0ef71d82e6cf4158183d09e")
        >>> directions = translate.directions
        >>> len(directions) > 0
        True
        """
        try:
            response = requests.get(self.url("langs"), params={"key": self.api_key})
        except requests.exceptions.ConnectionError:
            raise YandexTranslateException(self.error_codes[503])
        else:
            response = response.json()
        status_code = response.get("code", 200)
        if status_code != 200:
            raise YandexTranslateException(status_code)
        return response.get("dirs")
项目:cfawsinit    作者:mandarjog    | 项目源码 | 文件源码
def wait_for_opsman_ready(inst, timeout):
    addr = get_addr(inst)

    def should_wait():
        try:
            resp = requests.head(
                "https://{}/".format(addr),
                verify=False, timeout=1)
            return resp.status_code >= 400
        except requests.exceptions.RequestException as ex:
            pass
        except requests.HTTPError as ex:
            print ex
        return True

    waitFor = wait_util.wait_while(should_wait)
    waitFor(timeout)


#
# Main deploy driver function
#
项目:cfawsinit    作者:mandarjog    | 项目源码 | 文件源码
def validate_creds(opts):
    session = Session(profile_name=opts.get('profile_name'),
                      region_name=opts['region'])
    ec2 = session.resource("ec2")
    try:
        ec2.meta.client.describe_id_format()
    except botocore.exceptions.NoCredentialsError as ex:
        print "Missing ~/.aws/credentials ? missing profile_name from cfg file"
        print "http://boto3.readthedocs.org/en/latest/guide/configuration.html"
        print ex
        return False

    try:
        pivnet.Pivnet(token=opts['PIVNET_TOKEN'])
    except pivnet.AuthException as ex:
        print "Get API TOKEN from "
        print "https://network.pivotal.io/users/dashboard/edit-profile"
        print ex
        return False

    return True
项目:fabric-sdk-py    作者:hyperledger    | 项目源码 | 文件源码
def enroll(self, enrollment_id, enrollment_secret, csr):
        """Enroll a registered user in order to receive a signed X509 certificate

        Args:
            enrollment_id (str): The registered ID to use for enrollment
            enrollment_secret (str): The secret associated with the
                                     enrollment ID
            csr (bytes or file-like object): PEM-encoded PKCS#10 certificate
                                             signing request

        Returns: PEM-encoded X509 certificate

        Raises:
            RequestException: errors in requests.exceptions
            ValueError: Failed response, json parse error, args missing

        """
        if not enrollment_id or not enrollment_secret or not csr:
            raise ValueError("Missing required parameters. "
                             "'enrollmentID', 'enrollmentSecret' and 'csr'"
                             " are all required.")

        if self._ca_name != "":
            req = {
                "caName": self._ca_name,
                "certificate_request": csr
            }
        else:
            req = {"certificate_request": csr}

        response = self._send_ca_post(path="enroll", json=req,
                                      auth=(enrollment_id, enrollment_secret),
                                      verify=self._ca_certs_path)

        _logger.debug("Raw response json {0}".format(response))

        if response['success']:
            return base64.b64decode(response['result']['Cert'])
        else:
            raise ValueError("Enrollment failed with errors {0}"
                             .format(response['errors']))
项目:target-stitch    作者:singer-io    | 项目源码 | 文件源码
def send(self, data):
        '''Send the given data to Stitch, retrying on exceptions'''
        url = self.stitch_url
        headers = self.headers()
        response = self.session.post(url, headers=headers, data=data)
        response.raise_for_status()
        return response
项目:sauna    作者:NicolasLM    | 项目源码 | 文件源码
def request_ovh_client(consumer_key=None):
    try:
        import requests
        import requests.exceptions
    except ImportError:
        print('The requests library is needed to perform this command:')
        print('    pip install requests')
        print('    apt-get install python3-requests')
        exit(1)
    client = Client(
        endpoint='ovh-eu',
        application_key='yVPsINnSdHOLTGHf',
        application_secret='5mVDkHaJw6rp5GYiYBUlgDv1vruRejkl',
        consumer_key=consumer_key
    )
    if not consumer_key:
        response = client.request_consumerkey(
            access_rules=[
                {'method': 'GET', 'path': '/paas/monitoring'},
                {'method': 'GET', 'path': '/paas/monitoring/*'},
                {'method': 'POST', 'path': '/paas/monitoring/*'},
                {'method': 'PUT', 'path': '/paas/monitoring/*'},
                {'method': 'DELETE', 'path': '/paas/monitoring/*'}
            ]
        )
        response = response.json()
        print('New consumer key:', response['consumerKey'])
        print('Validate it at', response['validationUrl'])
        input('When you are ready, press Enter')
    try:
        client.get('/paas/monitoring')
    except requests.exceptions.HTTPError as e:
        if not e.response.status_code == 403:
            raise
        return request_ovh_client()

    return client
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_requests_error_passthrough(self, mock_requests):
        mock_requests.exceptions = requests.exceptions
        mock_requests.request.side_effect = requests.exceptions.RequestException
        # pylint: disable=protected-access
        self.assertRaises(requests.exceptions.RequestException,
                          self.net._send_request, 'GET', 'uri')
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def test_head_get_post_error_passthrough(self):
        self.send_request.side_effect = requests.exceptions.RequestException
        for method in self.net.head, self.net.get:
            self.assertRaises(
                requests.exceptions.RequestException, method, 'GET', 'uri')
        self.assertRaises(requests.exceptions.RequestException,
                          self.net.post, 'uri', obj=self.obj)
项目:python-wsmanclient    作者:intelsdi-x    | 项目源码 | 文件源码
def _do_request(self, payload):
        payload = payload.build()
        LOG.debug('Sending request to %(endpoint)s: %(payload)s',
                  {'endpoint': self.endpoint, 'payload': payload})
        try:
            resp = requests.post(
                self.endpoint,
                auth=requests.auth.HTTPBasicAuth(self.username, self.password),
                data=payload,
                # TODO(ifarkas): enable cert verification
                verify=False)

        except Exception as e:
            # This is a hack for handling 'No route to host' ConnectionError,
            # so that the Traceback would not be shown
            if e.__class__ == 'requests.exceptions.ConnectionError':
                LOG.exception('Request failed (ConnectionError)')
                raise exceptions.WSManRequestFailure()
            if e.__class__ == 'requests.exceptions.RequestException':
                LOG.exception('Request failed')
                raise exceptions.WSManRequestFailure()
            else:
                raise

        LOG.debug('Received response from %(endpoint)s: %(payload)s',
                  {'endpoint': self.endpoint, 'payload': resp.content})
        if not resp.ok:
            raise exceptions.WSManInvalidResponse(
                status_code=resp.status_code,
                reason=resp.reason)
        else:
            return resp
项目:python-wsmanclient    作者:intelsdi-x    | 项目源码 | 文件源码
def invoke(self, resource_uri, method, selectors=None, properties=None,
               expected_return_value=None):
        """Invokes a remote WS-Man method

        :param resource_uri: URI of the resource
        :param method: name of the method to invoke
        :param selectors: dictionary of selectors
        :param properties: dictionary of properties
        :param expected_return_value: expected return value reported back by
            the DRAC card. For return value codes check the profile
            documentation of the resource used in the method call. If not set,
            return value checking is skipped.
        :returns: an lxml.etree.Element object of the response received
        :raises: WSManRequestFailure on request failures
        :raises: WSManInvalidResponse when receiving invalid response
        :raises: DRACOperationFailed on error reported back by the DRAC
                 interface
        :raises: DRACUnexpectedReturnValue on return value mismatch
        """
        if selectors is None:
            selectors = {}

        if properties is None:
            properties = {}

        resp = super(WSManClient, self).invoke(resource_uri, method, selectors,
                                               properties)

        return_value = utils.find_xml(resp, 'ReturnValue', resource_uri).text
        if return_value == utils.RET_ERROR:
            message_elems = utils.find_xml(resp, 'Message', resource_uri, True)
            messages = [message_elem.text for message_elem in message_elems]
            raise exceptions.DRACOperationFailed(drac_messages=messages)

        if (expected_return_value is not None and
                return_value != expected_return_value):
            raise exceptions.DRACUnexpectedReturnValue(
                expected_return_value=expected_return_value,
                actual_return_value=return_value)

        return resp
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def send_token_to_subclient(webservice_endpoint: str, user_id: str,
                            subclient_token: str, subclient_id: str) -> str:
    """Sends the subclient-specific token to the subclient.

    The subclient verifies this token with BlenderID. If it's accepted, the
    subclient ensures there is a valid user created server-side. The ID of
    that user is returned.

    :returns: the user ID at the subclient.
    """

    import requests
    import urllib.parse

    url = urllib.parse.urljoin(webservice_endpoint, 'blender_id/store_scst')
    try:
        r = requests.post(url,
                          data={'user_id': user_id,
                                'subclient_id': subclient_id,
                                'token': subclient_token},
                          verify=True)
        r.raise_for_status()
    except (requests.exceptions.HTTPError,
            requests.exceptions.ConnectionError) as e:
        raise BlenderIdCommError(str(e))
    resp = r.json()

    if resp['status'] != 'success':
        raise BlenderIdCommError('Error sending subclient-specific token to %s, error is: %s'
                                 % (webservice_endpoint, resp))

    return resp['subclient_user_id']
项目:flibusta_bot    作者:Kurbezz    | 项目源码 | 文件源码
def shorten_url(url, botan_token, user_id):
    try:
        return requests.get(SHORTENER_URL, params={
            'token': botan_token,
            'url': url,
            'user_ids': str(user_id),
        }).text
    except requests.exceptions:
        return url
项目:cli    作者:madcore-ai    | 项目源码 | 文件源码
def get_stack(self, stack_name, debug=True):
        try:
            stack = self.cf_client.describe_stacks(
                StackName=stack_name
            )
            return stack['Stacks'][0]
        except botocore.exceptions.ClientError as cf_error:
            if debug:
                self.logger.error(cf_error)
        except Exception as error:
            if debug:
                self.logger.error(error)

        return None