我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.Timeout()。
def _do_healthcheck(self, containers, config): path = config.get('HEALTHCHECK_URL', '/') timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1)) if not _etcd_client: raise exceptions.HealthcheckException('no etcd client available') for container in containers: try: key = "/deis/services/{self}/{container.job_id}".format(**locals()) url = "http://{}{}".format(_etcd_client.get(key).value, path) response = requests.get(url, timeout=timeout) if response.status_code != requests.codes.OK: raise exceptions.HealthcheckException( "app failed health check (got '{}', expected: '200')".format( response.status_code)) except (requests.Timeout, requests.ConnectionError, KeyError) as e: raise exceptions.HealthcheckException( 'failed to connect to container ({})'.format(e))
def download(self, url, retry_count=3, headers=None, proxies=None, data=None): ''' :param url: ????? URL ?? :param retry_count: ?? url ???????? :param headers: http header={'X':'x', 'X':'x'} :param proxies: ???? proxies={"https": "http://12.112.122.12:3212"} :param data: ?? urlencode(post_data) ? POST ?? :return: ?????? None ''' if headers: self.request_session.headers.update(headers) try: if data: content = self.request_session.post(url, data, proxies=proxies).content else: content = self.request_session.get(url, proxies=proxies).content except (ConnectionError, Timeout) as e: print('Downloader download ConnectionError or Timeout:' + str(e)) content = None if retry_count > 0: self.download(url, retry_count - 1, headers, proxies, data) except Exception as e: print('Downloader download Exception:' + str(e)) content = None return content
def state_destroy_model(self): base_url = self.module.params['base_url'] uuid = self.module.params['uuid'] uri = "%s/model/%s" % (base_url, uuid) try: req = requests.delete(uri) if req.status_code == 200: self.module.exit_json(changed=True) except requests.ConnectionError as connect_error: self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error)) except requests.Timeout as timeout_error: self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error)) except requests.RequestException as request_exception: self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception)) self.module.exit_json(changed=False)
def test_requests_timeout(): class GetPersonAPI(HTTPEater): request_cls = Model response_cls = Model url = 'http://example.com/' def timeout(*args, **kwargs): # pylint: disable=unused-argument raise requests.Timeout() api = GetPersonAPI() with requests_mock.Mocker() as mock: mock.get( 'http://example.com/', text=timeout ) with pytest.raises(EaterTimeoutError): api()
def test_authentication(self): try: resp = self.get('account/', op='list_authorisation_tokens') except requests.Timeout as ex: raise errors.TransientDriverError("Timeout connection to MaaS") except Exception as ex: raise errors.PersistentDriverError( "Error accessing MaaS: %s" % str(ex)) if resp.status_code in [401, 403]: raise errors.PersistentDriverError( "MaaS API Authentication Failed") if resp.status_code in [500, 503]: raise errors.TransientDriverError("Received 50x error from MaaS") if resp.status_code != 200: raise errors.PersistentDriverError( "Received unexpected error from MaaS") return True
def api_delete(server_name, api, session_id): #Header and URL for delete call headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.delete(url, headers=headers, verify=False) except requests.ConnectionError: raise TintrRequestsiApiException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # PUT
def api_put(server_name, api, payload, session_id): headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.put(url, data=json.dumps(payload), headers=headers, verify=False) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # POST
def api_post(server_name, api, payload, session_id): headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.post(url, data=json.dumps(payload), headers=headers, verify=False) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # Login.
def download_file(server_name, report_url, session_id, file_name): headers = {'content-type': 'application/json'} try: r = requests.get(report_url, headers=headers, verify=False, stream=True) # if HTTP Response is not 200 then raise an exception if r.status_code != 200: message = "The HTTP response for get call to the server is not 200." raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text) with open(file_name, 'w') as file_h: for block in r.iter_content(4096): file_h.write(block) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except Exception as e: raise TintriRequestsException("An unexpected error: " + e.__str__())
def test_get_podm_status_Offline_by_http_exception(self, mock_get): mock_get.side_effect = requests.ConnectionError self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) mock_get.asset_called_once_with('url', auth=auth.HTTPBasicAuth('username', 'password')) # SSL Error mock_get.side_effect = requests.exceptions.SSLError self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) self.assertEqual(mock_get.call_count, 2) # Timeout mock_get.side_effect = requests.Timeout self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) self.assertEqual(mock_get.call_count, 3)
def setUp(self): def getLogger(name): self.mock_logger = mock.Mock() return self.mock_logger sys.modules['logging'].getLogger = getLogger def get(url, headers): get_return = mock.Mock() get_return.ok = True get_return.json = mock.Mock() get_return.json.return_value = {'data': {'status': 1}} return get_return sys.modules['requests'].get = get self.env = EnvironmentVarGuard() self.env.set('CACHET_TOKEN', 'token2') self.configuration = Configuration('config.yml') sys.modules['requests'].Timeout = Timeout sys.modules['requests'].ConnectionError = ConnectionError sys.modules['requests'].HTTPError = HTTPError
def _call(self, verb, url, data={}, params={}, headers={}): if headers: self.session.headers.update(headers) log.info('Call %s with data %s', url, data) resp = self.session.request(verb, url, json=data, params=params) status_code = resp.status_code try: resp.raise_for_status() except requests.HTTPError as exc: log.debug('Error occured, endpoint : %s, apikey : %s', url, self.apikey) return resp, status_code except requests.Timeout: log.error('Request Timeout to %si', url) return False, status_code except requests.RequestException: log.error('Requests Error') return False, status_code else: return resp, status_code
def _search_md(url='http://169.254.169.254/latest/meta-data/iam/'): d = {} try: r = requests.get(url, timeout=.1) if r.content: fields = r.content.split('\n') for field in fields: if field.endswith('/'): d[field[0:-1]] = get_iam_role(url + field) else: val = requests.get(url + field).content if val[0] == '{': val = json.loads(val) else: p = val.find('\n') if p > 0: val = r.content.split('\n') d[field] = val except (requests.Timeout, requests.ConnectionError): pass return d
def state_destroy_image(self): base_url = self.module.params['base_url'] uuid = self.module.params['uuid'] uri = "%s/image/%s" % (base_url, uuid) try: if not self.module.check_mode: req = requests.delete(uri) if req.status_code == 200: self.module.exit_json(changed=True) else: self.module.fail_json(msg="Unknown Hanlon API error", apierror=req.text) self.module.exit_json(changed=True) except requests.ConnectionError as connect_error: self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error)) except requests.Timeout as timeout_error: self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error)) except requests.RequestException as request_exception: self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception)) self.module.exit_json(changed=False)
def state_destroy_policy(self): base_url = self.module.params['base_url'] uuid = self.module.params['uuid'] uri = "%s/policy/%s" % (base_url, uuid) try: if not self.module.check_mode: req = requests.delete(uri) if req.status_code == 200: self.module.exit_json(changed=True) else: self.module.fail_json(msg="Unknown error", apierror=req.text) self.module.exit_json(changed=True) except requests.ConnectionError as connect_error: self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error)) except requests.Timeout as timeout_error: self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error)) except requests.RequestException as request_exception: self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
def state_destroy_active_model(self): uri = "%s/active_model/%s" % (self.base_url, self.uuid) try: if not self.module.check_mode: req = requests.delete(uri) if req.status_code == 200: self.module.exit_json(changed=True) else: self.module.fail_json(msg="Unknown error", apierror=req.text) self.module.exit_json(changed=True) except requests.ConnectionError as connect_error: self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error)) except requests.Timeout as timeout_error: self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error)) except requests.RequestException as request_exception: self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception))
def check_model_state(self): base_url = self.module.params['base_url'] model_name = self.module.params['label'] uri = "%s/model" % base_url try: json_result, http_success = hanlon_get_request(uri) for response in json_result['response']: uri = response['@uri'] model, http_success = hanlon_get_request(uri) if http_success: model_response = model['response'] if model_response['@label'] == model_name: return 'present', model_response['@uuid'] except requests.ConnectionError as connect_error: self.module.fail_json(msg="Connection Error; confirm Hanlon base_url.", apierror=str(connect_error)) except requests.Timeout as timeout_error: self.module.fail_json(msg="Timeout Error; confirm status of Hanlon server", apierror=str(timeout_error)) except requests.RequestException as request_exception: self.module.fail_json(msg="Unknown Request library failure", apierror=str(request_exception)) return 'absent', None
def test_convert_request_exception(self): """ Converts known request exceptions into Globus NetworkErrors, confirms expected values. """ # NetworkError conv = convert_request_exception(self.exc) self.assertIsInstance(conv, NetworkError) self.assertEqual(conv.underlying_exception.args, self.exc.args) # Timeout Error conv = convert_request_exception(self.timeout_exc) self.assertIsInstance(conv, GlobusTimeoutError) self.assertEqual(conv.underlying_exception.args, self.timeout_exc.args) # Connection Error conv = convert_request_exception(self.connection_exc) self.assertIsInstance(conv, GlobusConnectionError) self.assertEqual(conv.underlying_exception.args, self.connection_exc.args)
def parse_proxy(proxies_url): proxies['http'] = proxies_url check = False try: r = requests.get('http://www.baidu.com', proxies=proxies, timeout=5, headers=header_info) if r and r.status_code == 200: logging.info('===========Successful===============') logging.info('|| ???? ||----> ??: (%f)s ??IP: (%s) ' % (r.elapsed.total_seconds(), proxies_url)) logging.info('====================================') # can_be_use.append(proxies['http']) model.save_proxy(proxies_url) check = True except (requests.ConnectionError, requests.Timeout): logging.info(u'|| ?? or ???? ?? ||----> ??IP: (%s) ' % proxies_url) pass except Exception as e: logging.warn(e) pass return check
def watch_services_node(url, file): payload = {'recursive': 'true', 'wait': 'true'} while True: try: r = requests.get(url=url, params=payload) return_body = r.json() # Checking that return dict is empty if return_body: watch_thread = watchetcdutils.ChangeNginxThread(confloc=file, thread_id=watchthreadid, payload=return_body, lock=thread_lock) watch_thread.start() # confutils = nginxconfutils.NginxConfUtils(confloc=file, thread_id=watchthreadid) # confutils.load_conf() except requests.Timeout: logger.info("Timeout happened")
def http_request(self, method, url, **kwargs): method = method.upper() verify_ssl = kwargs.pop('verify', None) or self.ssl_verify proxies = kwargs.pop('proxies', None) or self.proxies new_headers = kwargs.pop('headers', None) if new_headers: headers = self.token_header.copy() headers.update(new_headers) else: headers = self.token_header uri = self.server + url try: raw_data = kwargs.get("data", None) if raw_data: log.debug("Sending HTTP {0} {1} with {2}".format(method, url, raw_data)) r = self.session.request(method, uri, headers=headers, verify=verify_ssl, proxies=proxies, timeout=self._timeout, **kwargs) log.debug('HTTP {0:s} {1:s} took {2:.3f}s (response {3:d})'.format(method, url, calculate_elapsed_time(r.elapsed), r.status_code)) except requests.Timeout as timeout_error: raise TimeoutError(uri=uri, original_exception=timeout_error) except requests.ConnectionError as connection_error: raise ApiError("Received a network connection error from {0:s}: {1:s}".format(self.server, str(connection_error)), original_exception=connection_error) except Exception as e: raise ApiError("Unknown exception when connecting to server: {0:s}".format(str(e)), original_exception=e) else: if r.status_code == 404: raise ObjectNotFoundError(uri=uri, message=r.text) elif r.status_code == 401: raise UnauthorizedError(uri=uri, action=method, message=r.text) elif r.status_code >= 400: raise ServerError(error_code=r.status_code, message=r.text) return r
def test_remote_status_timeout(): with mock.patch("umapi_client.connection.requests.Session.get") as mock_get: mock_get.side_effect = requests.Timeout conn = Connection(**mock_connection_params) _, remote_status = conn.status(remote=True) assert remote_status["status"].startswith("Unreachable")
def test_get_timeout(): with mock.patch("umapi_client.connection.requests.Session.get") as mock_get: mock_get.side_effect = requests.Timeout conn = Connection(**dict(mock_connection_params, retry_max_attempts=7)) pytest.raises(UnavailableError, conn.make_call, "") assert mock_get.call_count == 7
def test_post_timeout(): with mock.patch("umapi_client.connection.requests.Session.post") as mock_post: mock_post.side_effect = requests.Timeout conn = Connection(**dict(mock_connection_params, retry_max_attempts=2)) pytest.raises(UnavailableError, conn.make_call, "", [3, 5]) assert mock_post.call_count == 2
def _make_request(self, url, protocol='https'): try: return self.session.get('{}://{}'.format(protocol, url), timeout=5, verify=False) except requests.Timeout: return False except requests.ConnectionError as e: logging.debug('Connection Error: {}'.format(e)) return False
def get_service(self, service, decrypt_blind=False): """Get a service's metadata and secrets.""" # Return a dict, always with an attribute that specifies whether or not # the function was able to successfully get a result. ret = {'result': False} try: # Make a request to confidant with the provided url, to fetch the # service providing the service name and base64 encoded # token for authentication. response = self.request_session.get( '{0}/v1/services/{1}'.format(self.config['url'], service), auth=(self._get_username(), self._get_token()), allow_redirects=False, timeout=2 ) except requests.ConnectionError: logging.error('Failed to connect to confidant.') return ret except requests.Timeout: logging.error('Confidant request timed out.') return ret if not self._check_response_code(response, expected=[200, 404]): return ret if response.status_code == 404: logging.debug('Service not found in confidant.') ret['result'] = True return ret try: data = response.json() if decrypt_blind: data['blind_credentials'] = self._decrypt_blind_credentials( data['blind_credentials'] ) except ValueError: logging.exception( 'Received badly formatted json data from confidant.' ) return ret ret['service'] = data ret['result'] = True return ret
def get_blind_credential(self, id, decrypt_blind=False): """Get a blind credential from ID.""" # Return a dict, always with an attribute that specifies whether or not # the function was able to successfully get a result. ret = {'result': False} try: # Make a request to confidant with the provided url, to fetch the # service providing the service name and base64 encoded # token for authentication. response = self.request_session.get( '{0}/v1/blind_credentials/{1}'.format(self.config['url'], id), auth=(self._get_username(), self._get_token()), allow_redirects=False, timeout=2 ) except requests.ConnectionError: logging.error('Failed to connect to confidant.') return ret except requests.Timeout: logging.error('Confidant request timed out.') return ret if not self._check_response_code(response, expected=[200, 404]): return ret if response.status_code == 404: logging.debug('Blind credential not found in confidant.') ret['result'] = False return ret try: data = response.json() if decrypt_blind: data['decrypted_credential_pairs'] = self._get_decrypted_pairs( data ) except ValueError: logging.error('Received badly formatted json data from confidant.') return ret ret['blind_credential'] = data ret['result'] = True return ret
def list_blind_credentials(self): """Get a list of blind credentials.""" # Return a dict, always with an attribute that specifies whether or not # the function was able to successfully get a result. ret = {'result': False} try: # Make a request to confidant with the provided url, to fetch the # service providing the service name and base64 encoded # token for authentication. response = self.request_session.get( '{0}/v1/blind_credentials'.format(self.config['url']), auth=(self._get_username(), self._get_token()), allow_redirects=False, timeout=2 ) except requests.ConnectionError: logging.error('Failed to connect to confidant.') return ret except requests.Timeout: logging.error('Confidant request timed out.') return ret if not self._check_response_code(response, expected=[200]): return ret try: data = response.json() except ValueError: logging.error('Received badly formatted json data from confidant.') return ret ret['blind_credentials'] = data['blind_credentials'] ret['result'] = True return ret
def scrape_page_for_open_location(self, my_webpage): # logger.info(u"scraping", url) try: my_webpage.scrape_for_fulltext_link() if my_webpage.error: self.error += my_webpage.error if my_webpage.is_open: my_open_location = my_webpage.mint_open_location() self.open_locations.append(my_open_location) # logger.info(u"found open version at", webpage.url) else: # logger.info(u"didn't find open version at", webpage.url) pass except requests.Timeout, e: self.error += "Timeout in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except requests.exceptions.ConnectionError, e: self.error += "ConnectionError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except requests.exceptions.ChunkedEncodingError, e: self.error += "ChunkedEncodingError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except requests.exceptions.RequestException, e: self.error += "RequestException in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except etree.XMLSyntaxError, e: self.error += "XMLSyntaxError in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error) except Exception, e: self.error += "Exception in scrape_page_for_open_location on {}: {}".format(my_webpage, unicode(e.message).encode("utf-8")) logger.info(self.error)
def req(url, hdr): try: res = requests.get(urljoin(BASE_URL, url), headers=hdr, timeout=10.0) except requests.Timeout: raise RequestTimeoutError(url) except requests.ConnectionError: raise RequestTimeoutError(url) if res.status_code != 200: raise StatusCodeError(url, res.status_code) return res
def download_file(self, file_name, sub_url): """ ????????? ?????? ??????????????? """ sid = sub_url.split('/')[-1] r = requests.post('http://subhd.com/ajax/down_ajax', data={'sub_id': sid}, headers=self.headers) content = r.content.decode('unicode-escape') if json.loads(content)['success'] is False: return None, None, 'false' res = re.search('http:.*(?=")', r.content.decode('unicode-escape')) download_link = res.group(0).replace('\\/', '/') try: with closing(requests.get(download_link, stream=True)) as response: chunk_size = 1024 # ??????? # ?????? content_size = int(response.headers['content-length']) bar = ProgressBar(prefix + ' Get', file_name.strip(), content_size) sub_data_bytes = b'' for data in response.iter_content(chunk_size=chunk_size): sub_data_bytes += data bar.refresh(len(sub_data_bytes)) # sub_data_bytes = requests.get(download_link, timeout=10).content except requests.Timeout: return None, None, 'false' if 'rar' in download_link: datatype = '.rar' elif 'zip' in download_link: datatype = '.zip' elif '7z' in download_link: datatype = '.7z' else: datatype = 'Unknown' return datatype, sub_data_bytes, 'success'
def download_file(self, file_name, sub_url): """ ????????? ?????? ??????????????? """ s = requests.session() r = s.get(sub_url, headers=self.headers) bs_obj = BeautifulSoup(r.text, 'html.parser') a = bs_obj.find('div', {'class': 'subtitle-links'}).a download_link = a.attrs['href'] try: with closing(requests.get(download_link, stream=True)) as response: chunk_size = 1024 # ??????? # ?????? content_size = int(response.headers['content-length']) bar = ProgressBar(prefix + ' Get', file_name.strip(), content_size) sub_data_bytes = b'' for data in response.iter_content(chunk_size=chunk_size): sub_data_bytes += data bar.refresh(len(sub_data_bytes)) # sub_data_bytes = requests.get(download_link, timeout=10).content except requests.Timeout: return None, None if 'rar' in download_link: datatype = '.rar' elif 'zip' in download_link: datatype = '.zip' elif '7z' in download_link: datatype = '.7z' else: datatype = 'Unknown' return datatype, sub_data_bytes
def request(self, **kwargs) -> Model: """ Make a HTTP request of of type method. You should generally leave this method alone. If you need to customise the behaviour use the methods that this method uses. """ kwargs = self.get_request_kwargs(request_model=self.request_model, **kwargs) # get_request_kwargs can permanently alter the url, method and session self.url = kwargs.pop('url', self.url) self.method = kwargs.pop('method', self.method) self.session = kwargs.pop('session', self.session) try: response = getattr(self.session, self.method)(self.url, **kwargs) return self.create_response_model(response, self.request_model) except requests.Timeout: raise EaterTimeoutError("%s.%s for URL '%s' timed out." % ( type(self).__name__, self.method, self.url )) except requests.RequestException as exc_info: raise EaterConnectError("Exception raised for URL '%s'." % self.url) from exc_info
def download_file(self, report_url, file_name): """ Downloads the file pointed by URL. Args: report_url (str): URL returned from API from which file can be downloaded file_name (str): Name to be used for downloaded file """ headers = {'content-type': 'application/json'} try: r = requests.get(report_url, headers=headers, verify=False, stream=True) if r.status_code != 200: message = "The HTTP response for get call on: %s is %s" % (report_url, r.status_code) raise TintriServerError(r.status_code, message=message) with open(file_name, 'w') as file_h: for block in r.iter_content(4096): file_h.write(block) except TintriServerError: raise except requests.ConnectionError: raise TintriError("API Connection error occurred.") except requests.HTTPError: raise TintriError("HTTP error occurred.") except requests.Timeout: raise TintriError("Request timed out.") except Exception as e: raise TintriError("An unexpected error occurred: " + e.__str__())