Python requests 模块,Timeout() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.Timeout()

项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def _do_healthcheck(self, containers, config):
        path = config.get('HEALTHCHECK_URL', '/')
        timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1))
        if not _etcd_client:
            raise exceptions.HealthcheckException('no etcd client available')
        for container in containers:
            try:
                key = "/deis/services/{self}/{container.job_id}".format(**locals())
                url = "http://{}{}".format(_etcd_client.get(key).value, path)
                response = requests.get(url, timeout=timeout)
                if response.status_code != requests.codes.OK:
                    raise exceptions.HealthcheckException(
                        "app failed health check (got '{}', expected: '200')".format(
                            response.status_code))
            except (requests.Timeout, requests.ConnectionError, KeyError) as e:
                raise exceptions.HealthcheckException(
                    'failed to connect to container ({})'.format(e))
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def download(self, url, retry_count=3, headers=None, proxies=None, data=None):
        '''
        :param url: ????? URL ??
        :param retry_count: ?? url ????????
        :param headers: http header={'X':'x', 'X':'x'}
        :param proxies: ???? proxies={"https": "http://12.112.122.12:3212"}
        :param data: ?? urlencode(post_data) ? POST ??
        :return: ?????? None
        '''
        if headers:
            self.request_session.headers.update(headers)
        try:
            if data:
                content = self.request_session.post(url, data, proxies=proxies).content
            else:
                content = self.request_session.get(url, proxies=proxies).content
        except (ConnectionError, Timeout) as e:
            print('Downloader download ConnectionError or Timeout:' + str(e))
            content = None
            if retry_count > 0:
                self.download(url, retry_count - 1, headers, proxies, data)
        except Exception as e:
            print('Downloader download Exception:' + str(e))
            content = None
        return content
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_model(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/model/%s" % (base_url, uuid)

        try:
            req = requests.delete(uri)
            if req.status_code == 200:
                self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        self.module.exit_json(changed=False)
项目:eater    作者:alexhayes    | 项目源码 | 文件源码
def test_requests_timeout():
    class GetPersonAPI(HTTPEater):
        request_cls = Model
        response_cls = Model
        url = 'http://example.com/'

    def timeout(*args, **kwargs):  # pylint: disable=unused-argument
        raise requests.Timeout()

    api = GetPersonAPI()

    with requests_mock.Mocker() as mock:
        mock.get(
            'http://example.com/',
            text=timeout
        )
        with pytest.raises(EaterTimeoutError):
            api()
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def test_authentication(self):
        try:
            resp = self.get('account/', op='list_authorisation_tokens')
        except requests.Timeout as ex:
            raise errors.TransientDriverError("Timeout connection to MaaS")
        except Exception as ex:
            raise errors.PersistentDriverError(
                "Error accessing MaaS: %s" % str(ex))

        if resp.status_code in [401, 403]:
            raise errors.PersistentDriverError(
                "MaaS API Authentication Failed")

        if resp.status_code in [500, 503]:
            raise errors.TransientDriverError("Received 50x error from MaaS")

        if resp.status_code != 200:
            raise errors.PersistentDriverError(
                "Received unexpected error from MaaS")

        return True
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def _do_healthcheck(self, containers, config):
        path = config.get('HEALTHCHECK_URL', '/')
        timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1))
        if not _etcd_client:
            raise exceptions.HealthcheckException('no etcd client available')
        for container in containers:
            try:
                key = "/deis/services/{self}/{container.job_id}".format(**locals())
                url = "http://{}{}".format(_etcd_client.get(key).value, path)
                response = requests.get(url, timeout=timeout)
                if response.status_code != requests.codes.OK:
                    raise exceptions.HealthcheckException(
                        "app failed health check (got '{}', expected: '200')".format(
                            response.status_code))
            except (requests.Timeout, requests.ConnectionError, KeyError) as e:
                raise exceptions.HealthcheckException(
                    'failed to connect to container ({})'.format(e))
项目:tintri-stats    作者:genebean    | 项目源码 | 文件源码
def api_delete(server_name, api, session_id):
    #Header and URL for delete call
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.delete(url, headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintrRequestsiApiException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# PUT
项目:tintri-stats    作者:genebean    | 项目源码 | 文件源码
def api_put(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.put(url, data=json.dumps(payload),
                         headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# POST
项目:tintri-stats    作者:genebean    | 项目源码 | 文件源码
def api_post(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.post(url, data=json.dumps(payload),
                          headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# Login.
项目:tintri-stats    作者:genebean    | 项目源码 | 文件源码
def download_file(server_name, report_url, session_id, file_name):
    headers = {'content-type': 'application/json'}

    try:
        r = requests.get(report_url, headers=headers, verify=False, stream=True)
        # if HTTP Response is not 200 then raise an exception
        if r.status_code != 200:
            message = "The HTTP response for get call to the server is not 200."
            raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)

        with open(file_name, 'w') as file_h:
            for block in r.iter_content(4096):
                file_h.write(block)

    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except Exception as e:
        raise TintriRequestsException("An unexpected error: " + e.__str__())
项目:valence    作者:openstack    | 项目源码 | 文件源码
def test_get_podm_status_Offline_by_http_exception(self, mock_get):
        mock_get.side_effect = requests.ConnectionError
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        mock_get.asset_called_once_with('url',
                                        auth=auth.HTTPBasicAuth('username',
                                                                'password'))
        # SSL Error
        mock_get.side_effect = requests.exceptions.SSLError
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        self.assertEqual(mock_get.call_count, 2)
        # Timeout
        mock_get.side_effect = requests.Timeout
        self.assertEqual(redfish.pod_status('url', 'username', 'password'),
                         constants.PODM_STATUS_OFFLINE)
        self.assertEqual(mock_get.call_count, 3)
项目:cachet-url-monitor    作者:mtakaki    | 项目源码 | 文件源码
def setUp(self):
        def getLogger(name):
            self.mock_logger = mock.Mock()
            return self.mock_logger

        sys.modules['logging'].getLogger = getLogger

        def get(url, headers):
            get_return = mock.Mock()
            get_return.ok = True
            get_return.json = mock.Mock()
            get_return.json.return_value = {'data': {'status': 1}}
            return get_return

        sys.modules['requests'].get = get

        self.env = EnvironmentVarGuard()
        self.env.set('CACHET_TOKEN', 'token2')

        self.configuration = Configuration('config.yml')
        sys.modules['requests'].Timeout = Timeout
        sys.modules['requests'].ConnectionError = ConnectionError
        sys.modules['requests'].HTTPError = HTTPError
项目:pypdns    作者:fousti    | 项目源码 | 文件源码
def _call(self, verb, url, data={}, params={}, headers={}):
        if headers:
            self.session.headers.update(headers)

        log.info('Call %s with data %s', url, data)
        resp = self.session.request(verb, url, json=data, params=params)
        status_code = resp.status_code
        try:
            resp.raise_for_status()
        except requests.HTTPError as exc:
            log.debug('Error occured, endpoint : %s, apikey : %s',
                      url, self.apikey)
            return resp, status_code

        except requests.Timeout:
            log.error('Request Timeout to %si', url)
            return False, status_code
        except requests.RequestException:
            log.error('Requests Error')
            return False, status_code
        else:
            return resp, status_code
项目:tintrimon    作者:silveraid    | 项目源码 | 文件源码
def api_delete(server_name, api, session_id):
    #Header and URL for delete call
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.delete(url, headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintrRequestsiApiException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# PUT
项目:tintrimon    作者:silveraid    | 项目源码 | 文件源码
def api_put(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.put(url, data=json.dumps(payload),
                         headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# POST
项目:tintrimon    作者:silveraid    | 项目源码 | 文件源码
def api_post(server_name, api, payload, session_id):
    headers = {'content-type': 'application/json',
               'cookie': 'JSESSIONID='+session_id }

    url = 'https://' + server_name + API + api

    try:
        # Invoke the API.
        r = requests.post(url, data=json.dumps(payload),
                          headers=headers, verify=False)
    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except:
        raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.")

    return r


# Login.
项目:tintrimon    作者:silveraid    | 项目源码 | 文件源码
def download_file(server_name, report_url, session_id, file_name):
    headers = {'content-type': 'application/json'}

    try:
        r = requests.get(report_url, headers=headers, verify=False, stream=True)
        # if HTTP Response is not 200 then raise an exception
        if r.status_code != 200:
            message = "The HTTP response for get call to the server is not 200."
            raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text)

        with open(file_name, 'w') as file_h:
            for block in r.iter_content(4096):
                file_h.write(block)

    except requests.ConnectionError:
        raise TintriRequestsException("API Connection error occurred.")
    except requests.HTTPError:
        raise TintriRequestsException("HTTP error occurred.")
    except requests.Timeout:
        raise TintriRequestsException("Request timed out.")
    except Exception as e:
        raise TintriRequestsException("An unexpected error: " + e.__str__())
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def _search_md(url='http://169.254.169.254/latest/meta-data/iam/'):
    d = {}
    try:
        r = requests.get(url, timeout=.1)
        if r.content:
            fields = r.content.split('\n')
            for field in fields:
                if field.endswith('/'):
                    d[field[0:-1]] = get_iam_role(url + field)
                else:
                    val = requests.get(url + field).content
                    if val[0] == '{':
                        val = json.loads(val)
                    else:
                        p = val.find('\n')
                        if p > 0:
                            val = r.content.split('\n')
                    d[field] = val
    except (requests.Timeout, requests.ConnectionError):
        pass
    return d
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def _search_md(url='http://169.254.169.254/latest/meta-data/iam/'):
    d = {}
    try:
        r = requests.get(url, timeout=.1)
        if r.content:
            fields = r.content.split('\n')
            for field in fields:
                if field.endswith('/'):
                    d[field[0:-1]] = get_iam_role(url + field)
                else:
                    val = requests.get(url + field).content
                    if val[0] == '{':
                        val = json.loads(val)
                    else:
                        p = val.find('\n')
                        if p > 0:
                            val = r.content.split('\n')
                    d[field] = val
    except (requests.Timeout, requests.ConnectionError):
        pass
    return d
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_image(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/image/%s" % (base_url, uuid)

        try:
            if not self.module.check_mode:
                req = requests.delete(uri)
                if req.status_code == 200:
                    self.module.exit_json(changed=True)
                else:
                    self.module.fail_json(msg="Unknown Hanlon API error", apierror=req.text)
            self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        self.module.exit_json(changed=False)
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_policy(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/policy/%s" % (base_url, uuid)

        try:
            if not self.module.check_mode:
                req = requests.delete(uri)
                if req.status_code == 200:
                    self.module.exit_json(changed=True)
                else:
                    self.module.fail_json(msg="Unknown error", apierror=req.text)
            self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_active_model(self):
        uri = "%s/active_model/%s" % (self.base_url, self.uuid)

        try:
            if not self.module.check_mode:
                req = requests.delete(uri)
                if req.status_code == 200:
                    self.module.exit_json(changed=True)
                else:
                    self.module.fail_json(msg="Unknown error", apierror=req.text)
            self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def check_model_state(self):
        base_url = self.module.params['base_url']
        model_name = self.module.params['label']

        uri = "%s/model" % base_url
        try:
            json_result, http_success = hanlon_get_request(uri)

            for response in json_result['response']:
                uri = response['@uri']
                model, http_success = hanlon_get_request(uri)
                if http_success:
                    model_response = model['response']
                    if model_response['@label'] == model_name:
                        return 'present', model_response['@uuid']
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        return 'absent', None
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_image(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/image/%s" % (base_url, uuid)

        try:
            if not self.module.check_mode:
                req = requests.delete(uri)
                if req.status_code == 200:
                    self.module.exit_json(changed=True)
                else:
                    self.module.fail_json(msg="Unknown Hanlon API error", apierror=req.text)
            self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        self.module.exit_json(changed=False)
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_active_model(self):
        uri = "%s/active_model/%s" % (self.base_url, self.uuid)

        try:
            if not self.module.check_mode:
                req = requests.delete(uri)
                if req.status_code == 200:
                    self.module.exit_json(changed=True)
                else:
                    self.module.fail_json(msg="Unknown error", apierror=req.text)
            self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def check_model_state(self):
        base_url = self.module.params['base_url']
        model_name = self.module.params['label']

        uri = "%s/model" % base_url
        try:
            json_result, http_success = hanlon_get_request(uri)

            for response in json_result['response']:
                uri = response['@uri']
                model, http_success = hanlon_get_request(uri)
                if http_success:
                    model_response = model['response']
                    if model_response['@label'] == model_name:
                        return 'present', model_response['@uuid']
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        return 'absent', None
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_model(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/model/%s" % (base_url, uuid)

        try:
            req = requests.delete(uri)
            if req.status_code == 200:
                self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        self.module.exit_json(changed=False)
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_image(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/image/%s" % (base_url, uuid)

        try:
            if not self.module.check_mode:
                req = requests.delete(uri)
                if req.status_code == 200:
                    self.module.exit_json(changed=True)
                else:
                    self.module.fail_json(msg="Unknown Hanlon API error", apierror=req.text)
            self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        self.module.exit_json(changed=False)
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_policy(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/policy/%s" % (base_url, uuid)

        try:
            if not self.module.check_mode:
                req = requests.delete(uri)
                if req.status_code == 200:
                    self.module.exit_json(changed=True)
                else:
                    self.module.fail_json(msg="Unknown error", apierror=req.text)
            self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_active_model(self):
        uri = "%s/active_model/%s" % (self.base_url, self.uuid)

        try:
            if not self.module.check_mode:
                req = requests.delete(uri)
                if req.status_code == 200:
                    self.module.exit_json(changed=True)
                else:
                    self.module.fail_json(msg="Unknown error", apierror=req.text)
            self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def check_model_state(self):
        base_url = self.module.params['base_url']
        model_name = self.module.params['label']

        uri = "%s/model" % base_url
        try:
            json_result, http_success = hanlon_get_request(uri)

            for response in json_result['response']:
                uri = response['@uri']
                model, http_success = hanlon_get_request(uri)
                if http_success:
                    model_response = model['response']
                    if model_response['@label'] == model_name:
                        return 'present', model_response['@uuid']
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        return 'absent', None
项目:dcaf    作者:dxc-technology    | 项目源码 | 文件源码
def state_destroy_model(self):
        base_url = self.module.params['base_url']
        uuid = self.module.params['uuid']

        uri = "%s/model/%s" % (base_url, uuid)

        try:
            req = requests.delete(uri)
            if req.status_code == 200:
                self.module.exit_json(changed=True)
        except requests.ConnectionError as connect_error:
            self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error))
        except requests.Timeout as timeout_error:
            self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error))
        except requests.RequestException as request_exception:
            self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))

        self.module.exit_json(changed=False)
项目:globus-sdk-python    作者:globus    | 项目源码 | 文件源码
def test_convert_request_exception(self):
        """
        Converts known request exceptions into Globus NetworkErrors,
        confirms expected values.
        """
        # NetworkError
        conv = convert_request_exception(self.exc)
        self.assertIsInstance(conv, NetworkError)
        self.assertEqual(conv.underlying_exception.args,
                         self.exc.args)
        # Timeout Error
        conv = convert_request_exception(self.timeout_exc)
        self.assertIsInstance(conv, GlobusTimeoutError)
        self.assertEqual(conv.underlying_exception.args,
                         self.timeout_exc.args)
        # Connection Error
        conv = convert_request_exception(self.connection_exc)
        self.assertIsInstance(conv, GlobusConnectionError)
        self.assertEqual(conv.underlying_exception.args,
                         self.connection_exc.args)
项目:depot_tools    作者:webrtc-uwp    | 项目源码 | 文件源码
def _search_md(url='http://169.254.169.254/latest/meta-data/iam/'):
    d = {}
    try:
        r = requests.get(url, timeout=.1)
        if r.content:
            fields = r.content.split('\n')
            for field in fields:
                if field.endswith('/'):
                    d[field[0:-1]] = get_iam_role(url + field)
                else:
                    val = requests.get(url + field).content
                    if val[0] == '{':
                        val = json.loads(val)
                    else:
                        p = val.find('\n')
                        if p > 0:
                            val = r.content.split('\n')
                    d[field] = val
    except (requests.Timeout, requests.ConnectionError):
        pass
    return d
项目:Leancloud_imxie_cloud    作者:xcc3641    | 项目源码 | 文件源码
def parse_proxy(proxies_url):
    proxies['http'] = proxies_url
    check = False
    try:
        r = requests.get('http://www.baidu.com', proxies=proxies, timeout=5, headers=header_info)
        if r and r.status_code == 200:
            logging.info('===========Successful===============')
            logging.info('|| ???? ||----> ??: (%f)s  ??IP: (%s) ' % (r.elapsed.total_seconds(), proxies_url))
            logging.info('====================================')
            # can_be_use.append(proxies['http'])
            model.save_proxy(proxies_url)
            check = True
    except (requests.ConnectionError, requests.Timeout):
        logging.info(u'|| ?? or ???? ?? ||----> ??IP: (%s) ' % proxies_url)
        pass
    except Exception as e:
        logging.warn(e)
        pass
    return check
项目:Service-Descovery-Stack    作者:Tinhorn    | 项目源码 | 文件源码
def watch_services_node(url, file):
    payload = {'recursive': 'true', 'wait': 'true'}
    while True:
        try:
            r = requests.get(url=url, params=payload)
            return_body = r.json()

            # Checking that return dict is empty
            if return_body:
                watch_thread = watchetcdutils.ChangeNginxThread(confloc=file, thread_id=watchthreadid, payload=return_body,
                                                                lock=thread_lock)
                watch_thread.start()
                # confutils = nginxconfutils.NginxConfUtils(confloc=file, thread_id=watchthreadid)
                # confutils.load_conf()

        except requests.Timeout:
            logger.info("Timeout happened")
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def http_request(self, method, url, **kwargs):
        method = method.upper()

        verify_ssl = kwargs.pop('verify', None) or self.ssl_verify
        proxies = kwargs.pop('proxies', None) or self.proxies

        new_headers = kwargs.pop('headers', None)
        if new_headers:
            headers = self.token_header.copy()
            headers.update(new_headers)
        else:
            headers = self.token_header

        uri = self.server + url

        try:
            raw_data = kwargs.get("data", None)
            if raw_data:
                log.debug("Sending HTTP {0} {1} with {2}".format(method, url, raw_data))
            r = self.session.request(method, uri, headers=headers, verify=verify_ssl, proxies=proxies,
                                     timeout=self._timeout, **kwargs)
            log.debug('HTTP {0:s} {1:s} took {2:.3f}s (response {3:d})'.format(method, url,
                                                                               calculate_elapsed_time(r.elapsed),
                                                                               r.status_code))
        except requests.Timeout as timeout_error:
            raise TimeoutError(uri=uri, original_exception=timeout_error)
        except requests.ConnectionError as connection_error:
            raise ApiError("Received a network connection error from {0:s}: {1:s}".format(self.server,
                                                                                          str(connection_error)),
                           original_exception=connection_error)
        except Exception as e:
            raise ApiError("Unknown exception when connecting to server: {0:s}".format(str(e)),
                           original_exception=e)
        else:
            if r.status_code == 404:
                raise ObjectNotFoundError(uri=uri, message=r.text)
            elif r.status_code == 401:
                raise UnauthorizedError(uri=uri, action=method, message=r.text)
            elif r.status_code >= 400:
                raise ServerError(error_code=r.status_code, message=r.text)
            return r
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def test_remote_status_timeout():
    with mock.patch("umapi_client.connection.requests.Session.get") as mock_get:
        mock_get.side_effect = requests.Timeout
        conn = Connection(**mock_connection_params)
        _, remote_status = conn.status(remote=True)
        assert remote_status["status"].startswith("Unreachable")
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def test_get_timeout():
    with mock.patch("umapi_client.connection.requests.Session.get") as mock_get:
        mock_get.side_effect = requests.Timeout
        conn = Connection(**dict(mock_connection_params, retry_max_attempts=7))
        pytest.raises(UnavailableError, conn.make_call, "")
        assert mock_get.call_count == 7
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def test_post_timeout():
    with mock.patch("umapi_client.connection.requests.Session.post") as mock_post:
        mock_post.side_effect = requests.Timeout
        conn = Connection(**dict(mock_connection_params, retry_max_attempts=2))
        pytest.raises(UnavailableError, conn.make_call, "", [3, 5])
        assert mock_post.call_count == 2
项目:db_wlan_manager    作者:sistason    | 项目源码 | 文件源码
def _make_request(self, url, protocol='https'):
        try:
            return self.session.get('{}://{}'.format(protocol, url), timeout=5, verify=False)
        except requests.Timeout:
            return False
        except requests.ConnectionError as e:
            logging.debug('Connection Error: {}'.format(e))
            return False
项目:python-confidant-client    作者:lyft    | 项目源码 | 文件源码
def get_service(self, service, decrypt_blind=False):
        """Get a service's metadata and secrets."""
        # Return a dict, always with an attribute that specifies whether or not
        # the function was able to successfully get a result.
        ret = {'result': False}
        try:
            # Make a request to confidant with the provided url, to fetch the
            # service providing the service name and base64 encoded
            # token for authentication.
            response = self.request_session.get(
                '{0}/v1/services/{1}'.format(self.config['url'], service),
                auth=(self._get_username(), self._get_token()),
                allow_redirects=False,
                timeout=2
            )
        except requests.ConnectionError:
            logging.error('Failed to connect to confidant.')
            return ret
        except requests.Timeout:
            logging.error('Confidant request timed out.')
            return ret
        if not self._check_response_code(response, expected=[200, 404]):
            return ret
        if response.status_code == 404:
            logging.debug('Service not found in confidant.')
            ret['result'] = True
            return ret
        try:
            data = response.json()
            if decrypt_blind:
                data['blind_credentials'] = self._decrypt_blind_credentials(
                    data['blind_credentials']
                )
        except ValueError:
            logging.exception(
                'Received badly formatted json data from confidant.'
            )
            return ret
        ret['service'] = data
        ret['result'] = True
        return ret
项目:python-confidant-client    作者:lyft    | 项目源码 | 文件源码
def get_blind_credential(self, id, decrypt_blind=False):
        """Get a blind credential from ID."""
        # Return a dict, always with an attribute that specifies whether or not
        # the function was able to successfully get a result.
        ret = {'result': False}
        try:
            # Make a request to confidant with the provided url, to fetch the
            # service providing the service name and base64 encoded
            # token for authentication.
            response = self.request_session.get(
                '{0}/v1/blind_credentials/{1}'.format(self.config['url'], id),
                auth=(self._get_username(), self._get_token()),
                allow_redirects=False,
                timeout=2
            )
        except requests.ConnectionError:
            logging.error('Failed to connect to confidant.')
            return ret
        except requests.Timeout:
            logging.error('Confidant request timed out.')
            return ret
        if not self._check_response_code(response, expected=[200, 404]):
            return ret
        if response.status_code == 404:
            logging.debug('Blind credential not found in confidant.')
            ret['result'] = False
            return ret
        try:
            data = response.json()
            if decrypt_blind:
                data['decrypted_credential_pairs'] = self._get_decrypted_pairs(
                    data
                )
        except ValueError:
            logging.error('Received badly formatted json data from confidant.')
            return ret
        ret['blind_credential'] = data
        ret['result'] = True
        return ret
项目:python-confidant-client    作者:lyft    | 项目源码 | 文件源码
def list_blind_credentials(self):
        """Get a list of blind credentials."""
        # Return a dict, always with an attribute that specifies whether or not
        # the function was able to successfully get a result.
        ret = {'result': False}
        try:
            # Make a request to confidant with the provided url, to fetch the
            # service providing the service name and base64 encoded
            # token for authentication.
            response = self.request_session.get(
                '{0}/v1/blind_credentials'.format(self.config['url']),
                auth=(self._get_username(), self._get_token()),
                allow_redirects=False,
                timeout=2
            )
        except requests.ConnectionError:
            logging.error('Failed to connect to confidant.')
            return ret
        except requests.Timeout:
            logging.error('Confidant request timed out.')
            return ret
        if not self._check_response_code(response, expected=[200]):
            return ret
        try:
            data = response.json()
        except ValueError:
            logging.error('Received badly formatted json data from confidant.')
            return ret
        ret['blind_credentials'] = data['blind_credentials']
        ret['result'] = True
        return ret
项目:oadoi    作者:Impactstory    | 项目源码 | 文件源码
def scrape_page_for_open_location(self, my_webpage):
        # logger.info(u"scraping", url)
        try:
            my_webpage.scrape_for_fulltext_link()

            if my_webpage.error:
                self.error += my_webpage.error

            if my_webpage.is_open:
                my_open_location = my_webpage.mint_open_location()
                self.open_locations.append(my_open_location)
                # logger.info(u"found open version at", webpage.url)
            else:
                # logger.info(u"didn't find open version at", webpage.url)
                pass

        except requests.Timeout, e:
            self.error += "Timeout in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except requests.exceptions.ConnectionError, e:
            self.error += "ConnectionError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except requests.exceptions.ChunkedEncodingError, e:
            self.error += "ChunkedEncodingError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except requests.exceptions.RequestException, e:
            self.error += "RequestException in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except etree.XMLSyntaxError, e:
            self.error += "XMLSyntaxError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
        except Exception, e:
            self.error += "Exception in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8"))
            logger.info(self.error)
项目:GitHub-Repo-Recommender    作者:frankyjuang    | 项目源码 | 文件源码
def req(url, hdr):
    try:
        res = requests.get(urljoin(BASE_URL, url), headers=hdr, timeout=10.0)
    except requests.Timeout:
        raise RequestTimeoutError(url)
    except requests.ConnectionError:
        raise RequestTimeoutError(url)
    if res.status_code != 200:
        raise StatusCodeError(url, res.status_code)
    return res
项目:GetSubtitles    作者:gyh1621    | 项目源码 | 文件源码
def download_file(self, file_name, sub_url):

        """ ????????? ?????? ??????????????? """

        sid = sub_url.split('/')[-1]

        r = requests.post('http://subhd.com/ajax/down_ajax',
                          data={'sub_id': sid},
                          headers=self.headers)

        content = r.content.decode('unicode-escape')
        if json.loads(content)['success'] is False:
            return None, None, 'false'
        res = re.search('http:.*(?=")', r.content.decode('unicode-escape'))
        download_link = res.group(0).replace('\\/', '/')
        try:
            with closing(requests.get(download_link, stream=True)) as response:
                chunk_size = 1024  # ???????
                # ??????
                content_size = int(response.headers['content-length'])
                bar = ProgressBar(prefix + ' Get',
                                  file_name.strip(), content_size)
                sub_data_bytes = b''
                for data in response.iter_content(chunk_size=chunk_size):
                    sub_data_bytes += data
                    bar.refresh(len(sub_data_bytes))
            # sub_data_bytes = requests.get(download_link, timeout=10).content
        except requests.Timeout:
            return None, None, 'false'
        if 'rar' in download_link:
            datatype = '.rar'
        elif 'zip' in download_link:
            datatype = '.zip'
        elif '7z' in download_link:
            datatype = '.7z'
        else:
            datatype = 'Unknown'

        return datatype, sub_data_bytes, 'success'
项目:GetSubtitles    作者:gyh1621    | 项目源码 | 文件源码
def download_file(self, file_name, sub_url):

        """ ????????? ?????? ??????????????? """

        s = requests.session()
        r = s.get(sub_url, headers=self.headers)
        bs_obj = BeautifulSoup(r.text, 'html.parser')
        a = bs_obj.find('div', {'class': 'subtitle-links'}).a
        download_link = a.attrs['href']

        try:
            with closing(requests.get(download_link, stream=True)) as response:
                chunk_size = 1024  # ???????
                # ??????
                content_size = int(response.headers['content-length'])
                bar = ProgressBar(prefix + ' Get',
                                  file_name.strip(), content_size)
                sub_data_bytes = b''
                for data in response.iter_content(chunk_size=chunk_size):
                    sub_data_bytes += data
                    bar.refresh(len(sub_data_bytes))
            # sub_data_bytes = requests.get(download_link, timeout=10).content
        except requests.Timeout:
            return None, None
        if 'rar' in download_link:
            datatype = '.rar'
        elif 'zip' in download_link:
            datatype = '.zip'
        elif '7z' in download_link:
            datatype = '.7z'
        else:
            datatype = 'Unknown'

        return datatype, sub_data_bytes
项目:eater    作者:alexhayes    | 项目源码 | 文件源码
def request(self, **kwargs) -> Model:
        """
        Make a HTTP request of of type method.

        You should generally leave this method alone. If you need to customise the behaviour use the methods that
        this method uses.
        """
        kwargs = self.get_request_kwargs(request_model=self.request_model, **kwargs)

        # get_request_kwargs can permanently alter the url, method and session
        self.url = kwargs.pop('url', self.url)
        self.method = kwargs.pop('method', self.method)
        self.session = kwargs.pop('session', self.session)

        try:
            response = getattr(self.session, self.method)(self.url, **kwargs)
            return self.create_response_model(response, self.request_model)

        except requests.Timeout:
            raise EaterTimeoutError("%s.%s for URL '%s' timed out." % (
                type(self).__name__,
                self.method,
                self.url
            ))

        except requests.RequestException as exc_info:
            raise EaterConnectError("Exception raised for URL '%s'." % self.url) from exc_info
项目:tintri-python-sdk    作者:Tintri    | 项目源码 | 文件源码
def download_file(self, report_url, file_name):
        """
        Downloads the file pointed by URL.

        Args:

            report_url (str): URL returned from API from which file can be downloaded
            file_name (str): Name to be used for downloaded file 
        """
        headers = {'content-type': 'application/json'}

        try:
            r = requests.get(report_url, headers=headers, verify=False, stream=True)
            if r.status_code != 200:
                message = "The HTTP response for get call on: %s is %s" % (report_url, r.status_code)
                raise TintriServerError(r.status_code, message=message)

            with open(file_name, 'w') as file_h:
                for block in r.iter_content(4096):
                    file_h.write(block)

        except TintriServerError:
            raise    
        except requests.ConnectionError:
            raise TintriError("API Connection error occurred.")
        except requests.HTTPError:
            raise TintriError("HTTP error occurred.")
        except requests.Timeout:
            raise TintriError("Request timed out.")
        except Exception as e:
            raise TintriError("An unexpected error occurred: " + e.__str__())