我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.ReadTimeout()。
def update_status(): ''' -< True : Version is test; no update needed -> False : Check for update failed -> str : Latest version to update to ''' try: r = requests.get(constants.UPDATES, timeout=5) except (requests.ConnectionError, requests.ConnectTimeout, requests.ReadTimeout): return False else: latestVersion = r.json()['latest_version'] if constants.VERSION == latestVersion: return True else: return latestVersion
def get_params(self, container_id: str) -> Optional[Dict[str, Any]]: if self._config.cache_params and container_id in self._params_cache: logger.debug("Returning cached params for container {0}".format(container_id)) return self._params_cache[container_id] logger.debug("[{0}] Starting to fetch params for {1}".format(threading.current_thread().name, container_id)) try: params = self._client.inspect_container(container_id) except NotFound as e: logger.warning("Container {0} not found - {1}.".format(container_id, e)) return None except (ReadTimeout, ProtocolError, JSONDecodeError) as e: logger.error("Communication error when fetching params for container {0}: {1}".format(container_id, e)) return {} except Exception as e: logger.error("Unexpected error when fetching params for container {0}: {1}".format(container_id, e)) return {} logger.debug("[{0}] Params fetched for {1}".format(threading.current_thread().name, container_id)) if not self._config.cache_params: return params logger.debug("[{0}] Storing params of {1} in cache".format(threading.current_thread().name, container_id)) self._params_cache[container_id] = params return params
def _run_once(self): try: if self.settings.DO_INBOX or self.settings.DO_FALSEPOS: await self._process_messages() asyncio.sleep(5) for multi in settings.MULTIREDDITS: if self.settings.DO_OC: asyncio.sleep(5) await self._process_oc_stream(multi) for multi in settings.MULTIREDDITS + [settings.PARENT_SUB]: if self.settings.DO_MODLOG: asyncio.sleep(5) await self._process_network_modlog(multi) except (HTTPException, requests.ReadTimeout, requests.ConnectionError) as ex: LOG.error('%s: %s', type(ex), ex) else: LOG.debug('All tasks processed.') # ======================================================
def main(url, timeout=30, redirect_unknown=True, debug=False): """Actual monitoring execution""" logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) if debug: # pragma: no cover logger.setLevel(logging.DEBUG) logger.info('debug logging enabled') # Check if URL is valid logger.debug('perform URL validation check') if not valid_http_url(url): nagios.plugin_exit(nagios.Codes.UNKNOWN, 'provided URL is not valid') # Send a HEAD request logger.debug('send HEAD request') try: response = requests.head(url, timeout=timeout) except requests.ConnectTimeout: nagios.plugin_exit(nagios.Codes.CRITICAL, 'connection timeout') except requests.ReadTimeout: nagios.plugin_exit(nagios.Codes.CRITICAL, 'no response received before' 'timeout') except requests.ConnectionError: nagios.plugin_exit(nagios.Codes.UNKNOWN, 'connection error') else: logger.debug('response received') if response.status_code == requests.codes.ok: # Response is OK nagios.plugin_exit(nagios.Codes.OK, 'status code is %d' % response.status_code) elif redirect_unknown and response.status_code == requests.codes.found: # Redirect considered as UNKNOWN nagios.plugin_exit(nagios.Codes.UNKNOWN, 'redirection with code %d' % response.status_code) else: # Other code, considered not working nagios.plugin_exit(nagios.Codes.CRITICAL, 'status code is %d' % response.status_code)
def __request_data(self): if self.__is_cache_valid(): return True # requesting data try: # setting 5 sec timeouts for connect and read r = requests.get(self.data_url, verify=False, allow_redirects=True, timeout=(5, 5)) except requests.ConnectionError as e: print("Unable to connect to ", self.data_url, " error is ", e, file=sys.stderr) return False except requests.ConnectTimeout as e: print("Timed out connection to ", self.data_url, " error is ", e, file=sys.stderr) return False except requests.ReadTimeout as e: print("Timed out while reading data from ", self.data_url, " error is ", e, file=sys.stderr) return False if r.status_code == 200: # got HTTP/200 for request - storing it in cache try: open(self.temp_file_name, mode="w").write(json.dumps(r.json())) except IOError as e: print("IO error while trying to store cache into file ", self.temp_file_name, " error is ", e, file=sys.stderr) return False return True else: return False
def _download(self, request, spider): def _retry(): if self.retry_on_download_timeout: self.logger.debug('Read timed out, retry request {}'.format(request)) self.crawl(request, spider) try: self._process_request(request, spider) if request is None: return method = request.method.upper() resp = None kw_params = { 'timeout': self.download_timeout, 'cookies': request.cookies, 'headers': request.headers, 'proxies': { 'http': request.proxy, 'https': request.proxy } } self.logger.debug('[{}]<{} {}>'.format(spider.name, method, request.url)) if method == 'GET': resp = requests.get(request.url, **kw_params) elif method == 'POST': resp = requests.post(request.url, request.data, **kw_params) self._responses_queue.put((Response(resp.url, resp.status_code, resp.content, request, resp.cookies), spider)) except (requests.ReadTimeout, requests.ConnectTimeout, requests.ConnectionError): _retry() except Exception as err: self.logger.error(err, exc_info=True)
def check_container(self, container_id: str, check_source: CheckSource, remove_from_cache: bool=False) \ -> Optional[Container]: try: if remove_from_cache: self.remove_from_cache(container_id) if not self._config.disable_params: params = self.get_params(container_id) else: params = {} if not self._config.disable_metrics: logger.debug("[{0}] Starting to fetch metrics for {1}".format(threading.current_thread().name, container_id)) metrics = self._client.stats(container=container_id, decode=True, stream=False) else: metrics = {} logger.debug("[{0}] Fetched data for container {1}".format(threading.current_thread().name, container_id)) except NotFound as e: logger.warning("Container {0} not found - {1}.".format(container_id, e)) return None except (ReadTimeout, ProtocolError, JSONDecodeError) as e: logger.error("Communication error when fetching info about container {0}: {1}".format(container_id, e)) return None except Exception as e: logger.error("Unexpected error when fetching info about container {0}: {1}".format(container_id, e)) return None if params is None or metrics is None: logger.warning("Params or metrics were not fetched for container {}. Not returning container." .format(container_id)) return None return Container(container_id, params, metrics, 0, check_source)
def check_containers(self, check_source: CheckSource) -> Iterable[Container]: with self._padlock: if self._check_in_progress: logger.warning("[{0}] Previous check did not yet complete, consider increasing CHECK_INTERVAL_S" .format(threading.current_thread().name)) return self._check_in_progress = True logger.debug("Periodic check start: connecting to get the list of containers") self.last_check_containers_run_start_timestamp = datetime.datetime.utcnow() try: containers = self._client.containers(quiet=True) logger.debug("[{0}] Fetched containers list from docker daemon".format(threading.current_thread().name)) except (ReadTimeout, ProtocolError, JSONDecodeError) as e: logger.error("Timeout while trying to get list of containers from docker: {0}".format(e)) with self._padlock: self._check_in_progress = False self.last_periodic_run_ok = False return except Exception as e: logger.error("Unexpected error while trying to get list of containers from docker: {0}".format(e)) with self._padlock: self._check_in_progress = False self.last_periodic_run_ok = False return ids = [container['Id'] for container in containers] for container_id in ids: container = self.check_container(container_id, check_source) if container is None: continue yield container logger.debug("Containers checked") if self._config.cache_params: logger.debug("Purging cache") self.purge_cache(ids) self.last_periodic_run_ok = True self.last_check_containers_run_end_timestamp = datetime.datetime.utcnow() self.last_check_containers_run_time = self.last_check_containers_run_end_timestamp \ - self.last_check_containers_run_start_timestamp logger.debug("Periodic check done") with self._padlock: self._check_in_progress = False
def get_events_observable(self) -> Iterable[Any]: successful = False ev = None while not successful: try: ev = self._client.events(decode=True) except (ReadTimeout, ProtocolError, JSONDecodeError) as e: logger.error("Communication error when subscribing for container events, retrying in 5s: {0}".format(e)) time.sleep(5) except Exception as e: logger.error("Unexpected error when subscribing for container events, retrying in 5s: {0}".format(e)) time.sleep(5) successful = True return ev
def kill_container(self, container: Container) -> None: try: self._client.stop(container.params['Id']) except (ReadTimeout, ProtocolError) as e: logger.error("Communication error when stopping container {0}: {1}".format(container.cid, e)) except Exception as e: logger.error("Unexpected error when stopping container {0}: {1}".format(container.cid, e))
def run(self): while True: stream = submission_stream(self.r, 'all', verbosity=0) try: for post in stream: self._do_post(post) except (HTTPException, requests.ReadTimeout, requests.ConnectionError) as e: LOG.error('{}: {}'.format(type(e), e)) else: LOG.error('Stream ended.') LOG.info('Sleeping for {} minutes.'.format(RETRY_MINUTES)) sleep(60 * RETRY_MINUTES)
def _get_remote_svg_tile(hass, host, port, prefix, name, width_tiles, c1, c2): """Get remote SVG file.""" url_tile = URL_TILE_MASK.format(host, port, prefix, name, width_tiles) ok, r_svg, status = False, None, -1 try: r_svg = yield from hass.async_add_job( partial(requests.get, url_tile, timeout=15)) status = r_svg.status_code ok = r_svg.ok except (requests.ReadTimeout, requests.ConnectionError): pass if ok: # yield from asyncio.sleep(0) color1 = ', '.join([str(x) for x in c1]) color2 = ', '.join([str(x) for x in c2]) mask_bg = ('background-image: radial-gradient(' 'farthest-corner at 70% 70%, rgba({}), rgba({}));' .format(color1, color2)) svg_text_sub = RG_TILE_BACKGROUND.sub( '{}{}'.format(MASK_SVG_STYLE, mask_bg), r_svg.content.decode(), count=1) # svg_text_sub = RG_TILE_SIZE.sub( # # 'viewBox="0 0 600 400"', # 'viewBox="0 0 300 250"', # svg_text_sub, count=1) # LOGGER.warning(svg_text_sub[:300]) return svg_text_sub.encode() LOGGER.info('TILE REQUEST ERROR [code:{}]: {} - {}' .format(status, r_svg, url_tile)) return None
def execute(self, method, *args): payload = dumps(args, methodname=method, allow_none=True) body = gzip.compress(payload.encode('utf8')) try: res = await self.loop.run_in_executor(None, self.__request, body) data, _ = loads(res.text, use_datetime=True) if isinstance(data, (tuple, list)) and len(data) > 0 and len(data[0]) > 0: if isinstance(data[0][0], dict) and 'faultCode' in data[0][0]: raise DedimaniaFault(faultCode=data[0][0]['faultCode'], faultString=data[0][0]['faultString']) self.retries = 0 return data[0] raise DedimaniaTransportException('Invalid response from dedimania!') except (ConnectionError, ReadTimeout, ConnectionRefusedError) as e: raise DedimaniaTransportException(e) from e except ConnectTimeout as e: raise DedimaniaTransportException(e) from e except DedimaniaTransportException: # Try to setup new session. self.retries += 1 if self.retries > 5: raise DedimaniaTransportException('Dedimania didn\'t gave the right answer after few retries!') self.client = requests.session() try: await self.authenticate() return await self.execute(method, *args) except Exception as e: logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e))) handle_exception(e, __name__, 'execute') raise DedimaniaTransportException('Could not retrieve data from dedimania!') except DedimaniaFault as e: if 'Bad SessionId' in e.faultString or ('SessionId' in e.faultString and 'not found' in e.faultString): try: self.retries += 1 if self.retries > 5: raise DedimaniaTransportException('Max retries reached for reauthenticating with dedimania!') await self.authenticate() return await self.execute(method, *args) except: return logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e))) handle_exception(e, __name__, 'execute', extra_data={ 'dedimania_retries': self.retries, }) raise DedimaniaTransportException('Could not retrieve data from dedimania!')
def oembed_json(url, cache_failures=True): """oembed_json(url, *, cache_failures=True) Asks Noembed_ for the embedding HTML code for arbitrary URLs. Sites supported include Youtube, Vimeo, Twitter and many others. Successful embeds are always cached for 30 days. Failures are cached if ``cache_failures`` is ``True`` (the default). The durations are as follows: - Connection errors are cached 60 seconds with the hope that the connection failure is only transient. - HTTP errors codes and responses in an unexpected format (no JSON) are cached for 24 hours. The return value is always a dictionary, but it may be empty. """ # Thundering herd problem etc... key = 'oembed-url-%s-data' % md5(url.encode('utf-8')).hexdigest() data = cache.get(key) if data is not None: return data try: data = requests.get( 'https://noembed.com/embed', params={ 'url': url, 'nowrap': 'on', 'maxwidth': 1200, 'maxheight': 800, }, timeout=2, ).json() except (requests.ConnectionError, requests.ReadTimeout): # Connection failed? Hopefully temporary, try again soon. timeout = 60 except (ValueError, requests.HTTPError): # Oof... HTTP error code, or no JSON? Try again tomorrow, # and we should really log this. timeout = 86400 else: # Perfect, cache for 30 days cache.set(key, data, timeout=30 * 86400) return data if cache_failures: cache.set(key, {}, timeout=timeout) return {}
def get_data(self): self.get_url_and_html() for html in self.list_html: try: soup = BeautifulSoup(str(html), 'lxml') # soup?????????????? if soup.find("tr").findAll("td"): ip_and_port = ( soup.find("tr").findAll("td")[1].get_text() + ":" + soup.find("tr").findAll("td")[2].get_text() ) proxies = { "http": ip_and_port, "https": ip_and_port } # ??ip?????????2? response = requests.get( "http://1212.ip138.com/ic.asp", headers=headers, proxies=proxies, timeout=2 ) if response.status_code == 200: self.ip_and_port = ip_and_port print "ip???????" + self.ip_and_port print "??????:{},?????:{},?????:{},?????:{},?????:{},?????:{}".format( str(soup.find("tr").findAll("td")[3].get_text()).replace("\n", ""), soup.find("tr").findAll("td")[4].get_text(), soup.find("tr").findAll("td")[5].get_text(), soup.find("tr").findAll("td")[6].find({"div", "title"}).attrs["title"], soup.find("tr").findAll("td")[7].find({"div", "title"}).attrs["title"], soup.find("tr").findAll("td")[8].get_text(), soup.find("tr").findAll("td")[9].get_text() ) break else: print "http????200" raise requests.ConnectionError except requests.ReadTimeout: print "?ip??????????????ip" except requests.ConnectionError: print "?ip????" except Exception as e: print "??????????:%(errorName)s\n?????:\n%(detailInfo)s" % { "errorName": e, "detailInfo": traceback.format_exc()}
def checkin(cred, code): ''' -> 0: Successful check-in -> 1: No internet connection -> 2: Invalid credentials -> 3: Not connected to SunwayEdu Wi-Fi -> 4: Invalid code -> 5: Wrong class -> 6: Already checked-in ''' # Start a session session = requests.Session() # Login to iZone payload = { 'form_action': 'submitted', 'student_uid': cred[0], 'password': cred[1], } try: r = session.post(constants.LOGIN, data=payload) except requests.ConnectionError: return 1 if not r.history: return 2 # Check for SunwayEdu Wi-Fi try: r = requests.get(constants.WIFI, timeout=2) except requests.ConnectTimeout: return 3 except requests.ConnectionError: return 1 # Check-in with code try: r = session.post(constants.CHECKIN, data={'checkin_code': code}, timeout=2) except (requests.ReadTimeout, requests.ConnectionError): return 1 if 'Checkin code not valid.' in r.text or \ 'The specified URL cannot be found.' in r.text: return 4 if 'You cannot check in to a class you are not a part of.' in r.text: return 5 if 'You have already checked in' in r.text: return 6 return 0