我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用urllib.error.URLError()。
def download(self, url, retry_count=3, headers=None, proxy=None, data=None): if url is None: return None try: req = request.Request(url, headers=headers, data=data) cookie = cookiejar.CookieJar() cookie_process = request.HTTPCookieProcessor(cookie) opener = request.build_opener() if proxy: proxies = {urlparse(url).scheme: proxy} opener.add_handler(request.ProxyHandler(proxies)) content = opener.open(req).read() except error.URLError as e: print('HtmlDownLoader download error:', e.reason) content = None if retry_count > 0: if hasattr(e, 'code') and 500 <= e.code < 600: #??? HTTPError ??? HTTP CODE ? 5XX ??????????????????? return self.download(url, retry_count-1, headers, proxy, data) return content
def requestData(url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 8) #bytes????? content = response.read().decode('gbk') except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!') return content
def getAbstractInfo(self): try: content = requestData(self.url, self.user_agent) self.getDetailList(content) except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!')
def requestData(self,url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 8) #bytes????? content = response.read().decode('utf-8') return content except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!')
def requestData(self,url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 3) #bytes????? content = response.read().decode('gbk') return content except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!')
def download(url, filename): """Download .su3 file, return True on success""" USER_AGENT = "Wget/1.11.4" url = "{}i2pseeds.su3".format(url) req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) try: with urllib.request.urlopen(req) as resp: with open(filename, 'wb') as f: f.write(resp.read()) if os.stat(filename).st_size > 0: return True else: return False except URLError as e: return False
def compute_dependencies(self, filename=REQUIRES): text = Utils.readf(filename) data = safe_urlencode([('text', text)]) if '--offline' in sys.argv: self.constraints = self.local_resolve(text) else: req = Request(get_resolve_url(), data) try: response = urlopen(req, timeout=TIMEOUT) except URLError as e: Logs.warn('The package server is down! %r' % e) self.constraints = self.local_resolve(text) else: ret = response.read() try: ret = ret.decode('utf-8') except Exception: pass self.trace(ret) self.constraints = parse_constraints(ret) self.check_errors()
def _woxikon_de_url_handler(target): ''' Query woxikon for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(unescape(decode_utf_8(response.read()))) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # other error except socket.timeout: # timeout error failed to be captured by URLError return 1 return web_content
def _jeck_ru_url_handler(target): ''' Query jiport for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(decode_utf_8(response.read())) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # any other error except socket.timeout: # if timeout error not captured by URLError return 1 return web_content
def sendResponse(reportID: str, actionName: str, respURL: str) -> None: if actionName == 'body': text = getBody(reportID) elif actionName == 'metadata': text = getMetadata(reportID) else: raise ValueError("Button %s not defined!" % actionName) ephemeralJson = {'response_type': 'ephemeral', 'replace_original': False, 'text': text} # Even if an attacker gets the verification token, we will still refuse to post the data to non-slack.com URLs if urlparse(respURL).hostname.endswith('slack.com'): requests.post(respURL, json.dumps(ephemeralJson).encode('utf-8')) else: ephemeralJson['text'] = (('Failed URL check, respURL=%s which is not on the slack.com domain name! This check ' 'is theoretically not required (since we verify the verification token), but done as ' 'an extra defensive step. To disable this, edit slackServer.py in the project root.') % respURL) requests.post(respURL, json.dumps(ephemeralJson).encode('utf-8')) raise URLError("respURL=%s not on slack.com domain!" % respURL)
def testGETOpenRedirect(url: str, cookies: Mapping[str, str]) -> Optional[str]: """ If the given URL redirects when accessed with the given cookies via GET, return the new URL, otherwise return None """ driver = SeleniumDrivers.getFirefoxDriver() driver.setCookies(url, cookies) try: driver.get(url) time.sleep(config.timeout) if driver.current_url == url: driver.reset() return None else: url = driver.current_url driver.reset() return url except (TimeoutException, URLError): driver.reset() return None
def testPOSTOpenRedirect(url: str, cookies: Mapping[str, str], data: Mapping[str, str]) -> Optional[str]: """ If the given URL redirects when accessed with the given cookies via POST, return the new URL, otherwise return None """ driver = SeleniumDrivers.getFirefoxDriver() driver.setCookies(url, cookies) try: driver.post(url, data) time.sleep(config.timeout) if driver.current_url == url: driver.reset() return None else: url = driver.current_url driver.reset() return url except (TimeoutException, URLError): driver.reset() return None
def testPOSTXSSDriver(url: str, cookies: Mapping[str, str], data: Mapping[str, str], driver: webdriver) -> \ Optional[str]: """ If the given URL pops an alert box when accessed with the given cookies, return the contents of the alert box, otherwise return None """ driver.setCookies(url, cookies) try: driver.post(url, data) WebDriverWait(driver, config.timeout).until(expected_conditions.alert_is_present()) # Note that despite the name switch_to_alert also handles prompt: # - http://selenium-python.readthedocs.io/navigating.html#popup-dialogs alert = driver.switch_to_alert() text = alert.text driver.reset() return text except (TimeoutException, URLError): driver.reset() return None
def query(location, cty_codes, query_method, fuzzy): results = [] try: base_url = get_geonames_base_url() username = get_geonames_user_name() query_string = base_url + 'username={user}&{query_method}={name}&' \ 'style=FULL&orderby={order}&startRow=0&maxRows=5&fuzzy={fuzzy}' \ .format(user=username, query_method=query_method, name=quote(location), order='relevance', fuzzy=fuzzy) if cty_codes and len(cty_codes) > 0: query_string = query_string + '&' + '&'.join([('country={}'.format(c)) for c in cty_codes]) json_decode = json.JSONDecoder() # used to parse json response response = urlopen(query_string) response_string = response.read().decode('utf-8') parsed_response = json_decode.decode(response_string) if parsed_response.get('geonames') and len(parsed_response.get('geonames')) > 0: for item in parsed_response['geonames']: results.append(parse(item)) except URLError as e: logger.info("Oops! something didn't go well") logger.info(e) return results
def __init__(self, url=None): self.url = url self.html = None self.links = [] self.soup = None self.text = None self.title = None req = Request(self.url, headers={'User-Agent': "Magic Browser"}) try: self.html = urlopen(req) except URLError as e: if hasattr(e, 'reason'): print('We failed to reach a server.') print('Reason: ', e.reason) elif hasattr(e, 'code'): print('The server couldn\'t fulfill the request.') print('Error code: ', e.code)
def load_url(url, timeout): # Build URL query to email signup page urlquery = "http://" + url + "/m-users-a-email_list-job-add-email-" + targetEmail + "-source-2.htm" print_out(Style.BRIGHT + Fore.WHITE + "Sending request to: " + url) # Build the request req = urllib.request.Request( urlquery, data=None, headers={ 'User-Agent': random.choice(useragents), 'Host': url } ) # Send try: f = urllib.request.urlopen(req) print_out(Style.BRIGHT + Fore.GREEN + "Successfully sent!") f.close() except urllib.error.URLError as e: print_out(Style.BRIGHT + Fore.RED + e.reason)
def run(self): if len(self.__update_rates) == 0: return # wait up to 120 seconds, to get some distortion self.__stop_event.wait(randint(0, 120)) while not self.__stop_event.is_set(): start = time.time() for update in self.__update_rates: rate = update[0] now = time.time() time_to_wait = round(start - now + rate / 1000, 0) interrupt = self.__stop_event.wait(time_to_wait) if interrupt: return try: self.start_calculation(update[1]) except URLError as e: logging.getLogger(__name__).error("Could not connect to InfluxDB: " + str(e)) except: logging.getLogger(__name__).error("Job execution failed", exc_info=True)
def __fetch_json(self, path, query_params): # add API KEY to params query_params["uid"] = Nomisweb.KEY query_string = Nomisweb.URL + path + str(urlencode(query_params)) #print(query_string) reply = {} try: response = request.urlopen(query_string, timeout=Nomisweb.Timeout) except (HTTPError, URLError) as error: print('ERROR: ', error, '\n', query_string) except timeout: print('ERROR: request timed out\n', query_string) else: reply = json.loads(response.read().decode("utf-8")) return reply # save metadata as JSON for future reference
def getLinks(articleUrl): try: html = urlopen("http://en.wikipedia.org"+articleUrl) except HTTPError: ServerLog.writeLog("HTTPError") return None except URLError: ServerLog.writeLog("URLError") print("Sleeping!") time.sleep(URLERROR_SLEEP_TIME) html = urlopen("http://en.wikipedia.org"+articleUrl) bsObj = BeautifulSoup(html, "lxml") return bsObj.find("div", {"id":"bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$")) # ??IP
def getHistoryIPs(pageUrl): pageUrl = pageUrl.replace("/wiki/", "") historyUrl = "http://en.wikipedia.org/w/index.php?title="+pageUrl+"&action=history" print("history url:", historyUrl) time.sleep(SLEEP_TIME) try: html = urlopen(historyUrl) except HTTPError: return None except URLError: print("Sleeping!") time.sleep(URLERROR_SLEEP_TIME) html = urlopen(historyUrl) bsObj = BeautifulSoup(html, "lxml") ipAddresses = bsObj.findAll("a", {"class":"mw-anonuserlink"}) addressList = set() for ipAddress in ipAddresses: print(pageUrl+": "+ipAddress.get_text()) addressList.add(ipAddress.get_text()) return addressList #????IP?? # ????IP?????
def getCountry(ipAddress): ''' ????IP???? ''' try: response = urlopen("http://freegeoip.net/json/" + ipAddress).read().decode('utf-8') except URLError: print("Sleeping!") time.sleep(URLERROR_SLEEP_TIME) response = urlopen("http://freegeoip.net/json/" + ipAddress).read().decode('utf-8') except: return 'Unknown' responseJson = json.loads(response) return responseJson.get("country_code") # ??????
def getLinks(articleUrl): ''' ???????????? ''' try: html = urlopen("http://en.wikipedia.org" + articleUrl) except HTTPError: return None except URLError: print("Sleeping!") time.sleep(URLERROR_SLEEP_TIME) html = urlopen("http://en.wikipedia.org" + articleUrl) bsObj = BeautifulSoup(html, "lxml") return bsObj.find("div", {"id": "bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$")) # ??????
def refresh_posts(posts): if not posts: return posts t = get_twitter_for_acc(posts[0].author) if not t: return try: tweets = t.statuses.lookup( _id=",".join((post.twitter_id for post in posts)), trim_user=True, tweet_mode='extended') except (URLError, TwitterError) as e: handle_error(e) refreshed_posts = list() for post in posts: tweet = next( (tweet for tweet in tweets if tweet['id_str'] == post.twitter_id), None) if not tweet: db.session.delete(post) else: post = db.session.merge(post_from_api_tweet_object(tweet)) refreshed_posts.append(post) return refreshed_posts
def give_a_summary(self): self.assistant.say("What would you like to know about?") text = self.assistant.listen().decipher() text = text.strip().replace(" ", "%20") request = Request( 'https://en.wikipedia.org/w/api.php?' 'format=json&action=query&prop=extracts&exintro=&explaintext=&titles=' + text ) try: response = urlopen(request) data = json.loads( response.read().decode( response.info().get_param('charset') or 'utf-8' ) ) output = data["query"]["pages"] final = output[list(output.keys())[0]]["extract"] return final except URLError: return "Unable to search your given query."
def download_page(url, referer, maxretries, timeout, pause): tries = 0 htmlpage = None while tries < maxretries and htmlpage is None: try: code = 404 req = request.Request(url) req.add_header('Referer', referer) req.add_header('User-agent', 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.30 (KHTML, like Gecko) Ubuntu/11.04 Chromium/12.0.742.91 Chrome/12.0.742.91 Safari/534.30') with closing(request.urlopen(req, timeout=timeout)) as f: code = f.getcode() htmlpage = f.read() sleep(pause) except (urlerror.URLError, socket.timeout, socket.error): tries += 1 if htmlpage: return htmlpage.decode('utf-8'), code else: return None, code
def _get(url): LOG.debug('GET {u!r}'.format(u=url)) # TODO proper error handling - or none try: response = urlopen(url) except HTTPError: raise except ContentTooShortError: raise except URLError: raise except Exception: raise LOG.debug('{} {}'.format(response.status, response.reason)) if response.status not in (200,): raise ValueError('{} {}'.format(response.status, response.reason)) return response
def geocode(address, timeout=5.0): """ ?????????? ?????? ????????? (longtitude, latitude,) ?? ?????? ?????? """ params = parse.urlencode({'sensor': False, 'address': address}) try: response = request.urlopen(conf.GEOCODE_URL + params, timeout=timeout) except error.URLError: return None try: dom = xml.dom.minidom.parseString(response.read()) location_elem = dom.getElementsByTagName('location')[0] lng = location_elem.getElementsByTagName('lng')[0] lat = location_elem.getElementsByTagName('lat')[0] except IndexError: return None return lng.firstChild.data, lat.firstChild.data
def get_info(cls, video_key): req = request.Request('http://rutube.ru/api/video/%s/?format=xml' % video_key, method='GET') try: logger.debug('{0.method} {0.full_url}'.format(req)) response = request.urlopen(req, timeout=3) except error.URLError: return None if response.status != 200: return None dom = minidom.parseString(response.read()) title = dom.getElementsByTagName('title').item(0) description = dom.getElementsByTagName('description').item(0) thumbnail = dom.getElementsByTagName('thumbnail_url').item(0) embed = dom.getElementsByTagName('html').item(0) return { 'title': title.firstChild.data, 'description': description.firstChild.data, 'preview_url': thumbnail.firstChild.data, 'embed': embed.firstChild.data }
def testPasswordProtectedSite(self): support.requires('network') with support.transient_internet('mueblesmoraleda.com'): url = 'http://mueblesmoraleda.com' robots_url = url + "/robots.txt" # First check the URL is usable for our purposes, since the # test site is a bit flaky. try: urlopen(robots_url) except HTTPError as e: if e.code not in {401, 403}: self.skipTest( "%r should return a 401 or 403 HTTP error, not %r" % (robots_url, e.code)) else: self.skipTest( "%r should return a 401 or 403 HTTP error, not succeed" % (robots_url)) parser = urllib.robotparser.RobotFileParser() parser.set_url(url) try: parser.read() except URLError: self.skipTest('%s is unavailable' % url) self.assertEqual(parser.can_fetch("*", robots_url), False)
def handle(self, fn_name, action, *args, **kwds): self.parent.calls.append((self, fn_name, args, kwds)) if action is None: return None elif action == "return self": return self elif action == "return response": res = MockResponse(200, "OK", {}, "") return res elif action == "return request": return Request("http://blah/") elif action.startswith("error"): code = action[action.rfind(" ")+1:] try: code = int(code) except ValueError: pass res = MockResponse(200, "OK", {}, "") return self.parent.error("http", args[0], res, code, "", {}) elif action == "raise": raise urllib.error.URLError("blah") assert False
def test_badly_named_methods(self): # test work-around for three methods that accidentally follow the # naming conventions for handler methods # (*_open() / *_request() / *_response()) # These used to call the accidentally-named methods, causing a # TypeError in real code; here, returning self from these mock # methods would either cause no exception, or AttributeError. from urllib.error import URLError o = OpenerDirector() meth_spec = [ [("do_open", "return self"), ("proxy_open", "return self")], [("redirect_request", "return self")], ] handlers = add_ordered_mock_handlers(o, meth_spec) o.add_handler(urllib.request.UnknownHandler()) for scheme in "do", "proxy", "redirect": self.assertRaises(URLError, o.open, scheme+"://example.com/")
def test_raise(self): # raising URLError stops processing of request o = OpenerDirector() meth_spec = [ [("http_open", "raise")], [("http_open", "return self")], ] handlers = add_ordered_mock_handlers(o, meth_spec) req = Request("http://example.com/") self.assertRaises(urllib.error.URLError, o.open, req) self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) ## def test_error(self): ## # XXX this doesn't actually seem to be used in standard library, ## # but should really be tested anyway...
def download_file(url, path, binary=True): if sys.version_info < (3,): import urllib2 request = urllib2 error = urllib2 else: import urllib.request import urllib.error request = urllib.request error = urllib.error if os.path.exists(path): return True try: data = request.urlopen(url, timeout=15).read() with open(path, 'wb' if binary else 'w') as f: f.write(data) return True except error.URLError as e: return False
def is_server_live(server_path: str) -> bool: # Ignore the server check if we're in production if settings.DEBUG: try: resp = request.urlopen(server_path) if resp.status == 200: logging.info('CRA liveserver is running') return True else: logging.warning('CRA liveserver is up but not serving bundle.js') return False except url_error.URLError as err: logging.warning('CRA liveserver is not running') return False else: return False
def pcl_put(options, source, target): """ ?????????? ?????????? ??????? ???????? ????? ? ????????? (pcl_put_retry) """ pcl_verbose("Transfer: {0} ({1}) -> {2}".format(source, pcl_human(os.path.getsize(source)), target), options.verbose) retry = 0 while True: try: pcl_put_retry(options, source, target) break except (pclURLError, pclBadStatusLine, pclCannotSendRequest, ssl.SSLError, socket.error, pclError) as e: pcl_can_query_retry(e) retry += 1 pcl_debug("Retry {0}/{1}: {2}".format(retry, options.retries, e), options.debug) if retry >= options.retries: raise pclError(1, e) time.sleep(options.delay)
def pcl_get(options, source, target): """ ?????????? ?????????? ??????? ????????? ????? ?? ????????? (pcl_get_retry) """ pcl_verbose("Transfer: {0} -> {1}".format(source, target), options.verbose) retry = 0 while True: try: pcl_get_retry(options, source, target) break except (pclURLError, pclBadStatusLine, pclCannotSendRequest, ssl.SSLError, socket.error, pclError) as e: pcl_can_query_retry(e) retry += 1 pcl_debug("Retry {0}/{1}: {2}".format(retry, options.retries, e), options.debug) if retry >= options.retries: raise pclError(1, e) time.sleep(options.delay)
def download(self, source, dest): """ Download an archive file. :param str source: URL pointing to an archive file. :param str dest: Local path location to download archive file to. """ # propogate all exceptions # URLError, OSError, etc proto, netloc, path, params, query, fragment = urlparse(source) if proto in ('http', 'https'): auth, barehost = splituser(netloc) if auth is not None: source = urlunparse((proto, barehost, path, params, query, fragment)) username, password = splitpasswd(auth) passman = HTTPPasswordMgrWithDefaultRealm() # Realm is set to None in add_password to force the username and password # to be used whatever the realm passman.add_password(None, source, username, password) authhandler = HTTPBasicAuthHandler(passman) opener = build_opener(authhandler) install_opener(opener) response = urlopen(source) try: with open(dest, 'wb') as dest_file: dest_file.write(response.read()) except Exception as e: if os.path.isfile(dest): os.unlink(dest) raise e # Mandatory file validation via Sha1 or MD5 hashing.
def _download(self): try: try: import urllib.request from urllib.error import URLError, HTTPError with urllib.request.urlopen(self.url) as response, \ open(self.outputfile_origin, 'wb') as outfile: shutil.copyfileobj(response, outfile) except (AttributeError, ImportError): import urllib urllib.urlretrieve(self.url, self.outputfile_origin) except (URLError, HTTPError, IOError, Exception) as e: logger.debug("Unable to retrieve %s for %s", self.url, e)
def test_download_failed_URLError(self, mock_urlopen): mock_urlopen.side_effect = URLError(None) fake_request = urllib2.Request('http://fakeurl.com') self.assertRaises( self.glance.RetryableError, self.glance._download_tarball_and_verify, fake_request, 'fake_staging_path')