我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用requests.ConnectTimeout()。
def update_status(): ''' -< True : Version is test; no update needed -> False : Check for update failed -> str : Latest version to update to ''' try: r = requests.get(constants.UPDATES, timeout=5) except (requests.ConnectionError, requests.ConnectTimeout, requests.ReadTimeout): return False else: latestVersion = r.json()['latest_version'] if constants.VERSION == latestVersion: return True else: return latestVersion
def get_title_by_url(url, timeout=5, pattern='<title>(.*?)</title>'): """return {url:title}, if title do not find we return{url:None}""" try: raw_http = requests.get(url, timeout=timeout) raw_http.encoding = raw_http.apparent_encoding except requests.ConnectionError or requests.ConnectTimeout: logger_util.log_debug('Connect failed to %s ' % url) return title = re.findall(pattern, raw_http.text) if not title: logger_util.log_debug('This page do not have title %s' % url) return {url: None} else: return {url: title[0]} # ----------------------------------------------------------------------
def test_raises_error_with_any_other_requests_exception(self, mock): exceptions = [ requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.ProxyError, requests.exceptions.SSLError, requests.exceptions.Timeout, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout, requests.exceptions.URLRequired, requests.exceptions.TooManyRedirects, requests.exceptions.MissingSchema, requests.exceptions.InvalidSchema, requests.exceptions.InvalidURL, requests.exceptions.InvalidHeader, requests.exceptions.ChunkedEncodingError, requests.exceptions.ContentDecodingError, requests.exceptions.StreamConsumedError, requests.exceptions.RetryError, requests.exceptions.UnrewindableBodyError, ] mock.side_effect = exceptions for _ in exceptions: with self.assertRaises(HTTPError): request('GET', 'foo')
def crawl(self, job: Job): try: worker = Worker(job) worker.start() worker.join() except ValueError as e: print("Couldn't parse url: ", job.url, e) pass except (requests.ConnectionError, requests.ConnectTimeout, requests.exceptions.SSLError) as e: print("Couldn't parse url: ", job.url, e.strerror) pass else: while not linkparser.links.empty(): job = linkparser.links.get() if job.priority < self._depth: self.crawl(job) linkparser.links.task_done()
def crawl(self, job: Job): try: r = requests.get(job.url) link_parser = LinkParser(job) link_parser.parse(r.text) links = link_parser.get_links() except ValueError as e: print("Couldn't parse url: ", job.url, e) pass except (requests.ConnectionError, requests.ConnectTimeout, requests.exceptions.SSLError) as e: print("Couldn't parse url: ", job.url, e.strerror) pass else: while not links.empty(): job = links.get() self.print_status(job, links) if job.priority < self._depth: self.crawl(job) links.task_done()
def __call__(self): try: req = requests.get(self.url, headers=self.header, timeout=10, proxies=self.proxies) except requests.ConnectTimeout or requests.exceptions.ReadTimeout as e: print(f"??{self.url}????") self.status = False return {"status": self.status, 'html': ''} try: encodeing = chardet.detect(req.content)['encoding'] html = req.content.decode(encodeing, errors='replace') except Exception as e: print(e) print("?????,??????......") self.status = False return {"status": self.status, 'html': ''} return {"status": self.status, 'html': html}
def test_on_exception(self, catch_signal): with patch('requests.Session.request', side_effect=ConnectTimeout): with catch_signal(on_exception) as handler: send_mail(fail_silently=True, **SEND_KWARGS) assert handler.called kwargs = handler.call_args[1] assert kwargs['sender'] == EmailBackend assert kwargs['signal'] == on_exception assert isinstance(kwargs['exception'], ConnectTimeout) assert len(kwargs['raw_messages']) == 1
def request(self, api_name=None, pk=None, method='get', use_auth=True, data=None, params=None, content_type='application/json'): if api_name in self.api_url_mapping: path = self.api_url_mapping.get(api_name) if pk and '%s' in path: path = path % pk else: path = '/' url = self.endpoint.rstrip('/') + path self.req = req = Request(url, method=method, data=data, params=params, content_type=content_type, app_name=self.app_name) if use_auth: if not self._auth: raise RequestError('Authentication required') else: self._auth.sign_request(req) try: result = req.request() if result.status_code > 500: logging.warning('Server internal error') except (requests.ConnectionError, requests.ConnectTimeout): result = FakeResponse() logging.warning('Connect endpoint: {} error'.format(self.endpoint)) return self.parse_result(result)
def main(url, timeout=30, redirect_unknown=True, debug=False): """Actual monitoring execution""" logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) if debug: # pragma: no cover logger.setLevel(logging.DEBUG) logger.info('debug logging enabled') # Check if URL is valid logger.debug('perform URL validation check') if not valid_http_url(url): nagios.plugin_exit(nagios.Codes.UNKNOWN, 'provided URL is not valid') # Send a HEAD request logger.debug('send HEAD request') try: response = requests.head(url, timeout=timeout) except requests.ConnectTimeout: nagios.plugin_exit(nagios.Codes.CRITICAL, 'connection timeout') except requests.ReadTimeout: nagios.plugin_exit(nagios.Codes.CRITICAL, 'no response received before' 'timeout') except requests.ConnectionError: nagios.plugin_exit(nagios.Codes.UNKNOWN, 'connection error') else: logger.debug('response received') if response.status_code == requests.codes.ok: # Response is OK nagios.plugin_exit(nagios.Codes.OK, 'status code is %d' % response.status_code) elif redirect_unknown and response.status_code == requests.codes.found: # Redirect considered as UNKNOWN nagios.plugin_exit(nagios.Codes.UNKNOWN, 'redirection with code %d' % response.status_code) else: # Other code, considered not working nagios.plugin_exit(nagios.Codes.CRITICAL, 'status code is %d' % response.status_code)
def __request_data(self): if self.__is_cache_valid(): return True # requesting data try: # setting 5 sec timeouts for connect and read r = requests.get(self.data_url, verify=False, allow_redirects=True, timeout=(5, 5)) except requests.ConnectionError as e: print("Unable to connect to ", self.data_url, " error is ", e, file=sys.stderr) return False except requests.ConnectTimeout as e: print("Timed out connection to ", self.data_url, " error is ", e, file=sys.stderr) return False except requests.ReadTimeout as e: print("Timed out while reading data from ", self.data_url, " error is ", e, file=sys.stderr) return False if r.status_code == 200: # got HTTP/200 for request - storing it in cache try: open(self.temp_file_name, mode="w").write(json.dumps(r.json())) except IOError as e: print("IO error while trying to store cache into file ", self.temp_file_name, " error is ", e, file=sys.stderr) return False return True else: return False
def isActiveLink(link): try: request = requests.get(link) return request.status_code except (requests.ConnectionError, requests.ConnectTimeout): return False
def downloadFile(url): tries = 0 while tries < 3: try: r = requests.get(BASE_URL + url) r.encoding = "utf-8" fileContent = r.text return fileContent.strip(), len(fileContent) except (requests.ConnectionError, requests.ConnectTimeout) as e: tries += 1 if tries >= 3: raise e # Decorator
def _download(self, request, spider): def _retry(): if self.retry_on_download_timeout: self.logger.debug('Read timed out, retry request {}'.format(request)) self.crawl(request, spider) try: self._process_request(request, spider) if request is None: return method = request.method.upper() resp = None kw_params = { 'timeout': self.download_timeout, 'cookies': request.cookies, 'headers': request.headers, 'proxies': { 'http': request.proxy, 'https': request.proxy } } self.logger.debug('[{}]<{} {}>'.format(spider.name, method, request.url)) if method == 'GET': resp = requests.get(request.url, **kw_params) elif method == 'POST': resp = requests.post(request.url, request.data, **kw_params) self._responses_queue.put((Response(resp.url, resp.status_code, resp.content, request, resp.cookies), spider)) except (requests.ReadTimeout, requests.ConnectTimeout, requests.ConnectionError): _retry() except Exception as err: self.logger.error(err, exc_info=True)
def search_by_baidu(self, url, index): # print 'baidu', GetNowTime() if url is None: url = r'https://www.baidu.com/s?&wd=site%3A' + self.domain try: baidu_rsp = requests.get(url=url, headers=headers, timeout=5) except requests.ConnectTimeout, requests.ConnectionError: # print e.message, "????????" return url, index
def is_https1(self): url = self.PREFIX_URL2 + self.raw_domain try: requests.get(url, timeout=4) except requests.ConnectionError, requests.ConnectTimeout: return False
def is_https2(self): url = self.PREFIX_URL1 + self.raw_domain try: requests.get(url, timeout=4) except requests.ConnectionError, requests.ConnectTimeout: return False
def get_html(url): for i in range(3): try: head=headers head['Referer']='https://www.bing.com' rsp = requests.get(url, headers=head, timeout=5) except requests.ConnectionError, requests.ConnectTimeout: # print e.message, 'bing?????????5?' time.sleep(5) continue if rsp.status_code == 200: root = etree.HTML(rsp.text) return root else: time.sleep(5)
def acme_enabled(url): "if given url can be hit and it looks like the acme hidden dir exists, return True." url = 'http://' + url + "/.well-known/acme-challenge/" # ll: http://lax.elifesciences.org/.well-known/acme-challenge try: resp = requests.head(url, allow_redirects=False) if 'crm.elifesciences' in url: return resp.status_code == 404 # apache behaves differently to nginx return resp.status_code == 403 # forbidden rather than not found. except (requests.ConnectionError, requests.ConnectTimeout): # couldn't connect for whatever reason return False
def _download(self,url='http://zt.bdinfo.net/speedtest/wo3G.rar',filepath="/work/test/tempdown"): ''' Download file from website to local file @param:url,download url @param:filepath:the file location u want to save file @return:None ''' if not url: return print('downloading:%s' % url) #r=requests.get(url,stream=True,timeout=5) filename=os.path.join(filepath,'.'+urlparse.urlsplit(url).path) chunk_size=1024*1024 print filename if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) try: r=requests.get(url,stream=True,timeout=5) with open(filename,'wb') as f: for data in r.iter_content(chunk_size=chunk_size): #data=temp.read(1024*1024) f.write(data) self._add_downloaded(url) except requests.ConnectTimeout,requests.ReadTimeout: print("Download %s timeout,this will redownload later.\n" % (url)) if self._redis_enable: self._r.lpush(self._download_list,url)
def _get_response(self,url,method='get',headers={},files=[],data=None,cookies=None,cert=None,timeout=30,**kwargs): method=method.upper() #self._s.headers.update(headers) pre=requests.Request(method=method,url=url,data=data,files=files) prepped=self._s.prepare_request(pre) try: with self._s.send(prepped,stream=True,cert=cert,timeout=timeout) as resp: #self._header.parse_header(dict(resp.headers)) #self._s.headers.update(self._header.get_default_header()) #content_type=resp.headers.get('content-type') #encoding=self._get_content_encoding(content_type) #regx=re.compile('.*(text\/html|text\/xml).*') if resp.status_code==requests.codes.OK: #Don't handle redirect url for now ''' with open('temp.txt','wb') as f: f.write(resp.content) ''' self._handler.handle_data(resp) elif resp.status_code!=requests.codes.OK: print("Connected url %s \t %d" % (url,resp.status_code)) else: #If the response is not html or xml ,save the url to redis pass except requests.ConnectTimeout,requests.ReadTimeout: print('Connect %s timeout .\n' % (url)) if self._redis_enable: self._r.lpush(self._start,url) else: task.url_task.append(url)
def request(method, url, *args, sign=True, **kwargs): custom_ca_certs_file = get_custom_ca_certs_file() if custom_ca_certs_file is not None and 'verify' not in kwargs: kwargs['verify'] = custom_ca_certs_file try: if sign: response = Request(url, method, *args, **kwargs).send() else: response = requests.request( method, url, *args, timeout=30, **kwargs) except HTTPError as error: raise error except (requests.exceptions.MissingSchema, requests.exceptions.InvalidSchema, requests.exceptions.URLRequired, requests.exceptions.InvalidURL): raise HTTPError('You have provided an invalid server URL.') except requests.ConnectTimeout: raise HTTPError('Connection timed out. Try again later.') except requests.ConnectionError: raise HTTPError('Server is not available. Try again later.') except requests.RequestException: raise HTTPError(UNKNOWN_ERROR) if response.status_code == 401: raise HTTPError('Unautorized. Did you set your credentials?') elif not response.ok: raise HTTPError(format_server_error(response)) return response
def test_raises_error_when_server_is_unavailable(self, mock): exceptions = [requests.ConnectionError, requests.ConnectTimeout] mock.side_effect = exceptions for _ in exceptions: with self.assertRaises(HTTPError): request('GET', 'foo')
def execute(self, method, *args): payload = dumps(args, methodname=method, allow_none=True) body = gzip.compress(payload.encode('utf8')) try: res = await self.loop.run_in_executor(None, self.__request, body) data, _ = loads(res.text, use_datetime=True) if isinstance(data, (tuple, list)) and len(data) > 0 and len(data[0]) > 0: if isinstance(data[0][0], dict) and 'faultCode' in data[0][0]: raise DedimaniaFault(faultCode=data[0][0]['faultCode'], faultString=data[0][0]['faultString']) self.retries = 0 return data[0] raise DedimaniaTransportException('Invalid response from dedimania!') except (ConnectionError, ReadTimeout, ConnectionRefusedError) as e: raise DedimaniaTransportException(e) from e except ConnectTimeout as e: raise DedimaniaTransportException(e) from e except DedimaniaTransportException: # Try to setup new session. self.retries += 1 if self.retries > 5: raise DedimaniaTransportException('Dedimania didn\'t gave the right answer after few retries!') self.client = requests.session() try: await self.authenticate() return await self.execute(method, *args) except Exception as e: logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e))) handle_exception(e, __name__, 'execute') raise DedimaniaTransportException('Could not retrieve data from dedimania!') except DedimaniaFault as e: if 'Bad SessionId' in e.faultString or ('SessionId' in e.faultString and 'not found' in e.faultString): try: self.retries += 1 if self.retries > 5: raise DedimaniaTransportException('Max retries reached for reauthenticating with dedimania!') await self.authenticate() return await self.execute(method, *args) except: return logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e))) handle_exception(e, __name__, 'execute', extra_data={ 'dedimania_retries': self.retries, }) raise DedimaniaTransportException('Could not retrieve data from dedimania!')
def get_urls(self, get_proxie_or_not=False): """ :type get_proxie_or_not: bool :param get_proxie_or_not: ??????ip :return: ?????url???????url?? """ list_url = [] try: if get_proxie_or_not: p = Proxies() p.get_ip_and_port() self.session.proxies = { "http": p.ip_and_port, "https": p.ip_and_port } response = self.session.get(self.start_url, timeout=30) if response.status_code == 200: html = response.content else: # ??selenium+phantomjs??????? # ??phantomjs???????? desired_capabilities = DesiredCapabilities.PHANTOMJS.copy() headers = self.headers for key, value in headers.iteritems(): desired_capabilities['phantomjs.page.customHeaders.{}'.format(key)] = value driver = webdriver.PhantomJS( desired_capabilities=desired_capabilities ) driver.get(self.start_url) html = driver.page_source driver.quit() soup = BeautifulSoup(html, 'lxml') # ?????????????BeautifulSoup????? urls = soup.find() assert urls is not None repeat_num = 0 for url in urls: if url['href'] not in list_url: list_url.append(url['href']) else: repeat_num += 1 print "??%d??????????" % repeat_num except requests.ConnectTimeout: print "url????????????????" if list_url: return list_url else: print "??url????????" raise ValueError
def get_htmls(self, urls, get_proxie_or_not=False): """ :type urls: list :type get_proxie_or_not: bool :param urls: ?????url?? :param get_proxie_or_not: ??????ip :return: ???????html?? """ list_html = [] for url in urls: try: if get_proxie_or_not: p = Proxies() p.get_ip_and_port() self.session.proxies = { "http": p.ip_and_port, "https": p.ip_and_port } response = self.session.get(url, timeout=30) if response.status_code == 200: html = response.content else: # ??selenium+phantomjs??????? # ??phantomjs???????? desired_capabilities = DesiredCapabilities.PHANTOMJS.copy() headers = self.headers for key, value in headers.iteritems(): desired_capabilities['phantomjs.page.customHeaders.{}'.format(key)] = value driver = webdriver.PhantomJS( desired_capabilities=desired_capabilities ) driver.get(self.start_url) html = driver.page_source driver.quit() assert html is not None list_html.append(BeautifulSoup(html, 'lxml')) except requests.ConnectTimeout: print "url????" if list_html: return list_html else: print "??html???????????" raise ValueError
def checkin(cred, code): ''' -> 0: Successful check-in -> 1: No internet connection -> 2: Invalid credentials -> 3: Not connected to SunwayEdu Wi-Fi -> 4: Invalid code -> 5: Wrong class -> 6: Already checked-in ''' # Start a session session = requests.Session() # Login to iZone payload = { 'form_action': 'submitted', 'student_uid': cred[0], 'password': cred[1], } try: r = session.post(constants.LOGIN, data=payload) except requests.ConnectionError: return 1 if not r.history: return 2 # Check for SunwayEdu Wi-Fi try: r = requests.get(constants.WIFI, timeout=2) except requests.ConnectTimeout: return 3 except requests.ConnectionError: return 1 # Check-in with code try: r = session.post(constants.CHECKIN, data={'checkin_code': code}, timeout=2) except (requests.ReadTimeout, requests.ConnectionError): return 1 if 'Checkin code not valid.' in r.text or \ 'The specified URL cannot be found.' in r.text: return 4 if 'You cannot check in to a class you are not a part of.' in r.text: return 5 if 'You have already checked in' in r.text: return 6 return 0