我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.error.HTTPError()。
def requestData(url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 8) #bytes????? content = response.read().decode('gbk') except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!') return content
def getAbstractInfo(self): try: content = requestData(self.url, self.user_agent) self.getDetailList(content) except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!')
def requestData(self,url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 8) #bytes????? content = response.read().decode('utf-8') return content except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!')
def requestData(self,url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 3) #bytes????? content = response.read().decode('gbk') return content except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!')
def main(): parser = argparse.ArgumentParser() parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM, type=str, help='Search term (default: %(default)s)') parser.add_argument('-l', '--location', dest='location', default=DEFAULT_LOCATION, type=str, help='Search location (default: %(default)s)') input_values = parser.parse_args() try: query_api(input_values.term, input_values.location) except HTTPError as error: sys.exit( 'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format( error.code, error.url, error.read(), ) )
def fetch_amazon_item(isbn): req_count = 0 # 5???amazon?API???????????? while(req_count < 5): try: items = item_search(isbn) if items: return items else: return except HTTPError as e: # ???????( ???)??? print(e) req_count += 1 if req_count < 5: # sleep_time???????????? sleep_time = 2 ** req_count print('retry after {} second'.format(sleep_time)) sleep(sleep_time) else: # ??????????????? raise return
def put_status(self): host = os.getenv('HOST') url = "https://{host}/api/task/{id}".format(host=host, id=self.request.id) payload = { 'status': self.status, 'steps': self.steps, 'file_name': self.zim_file_name, 'time_stamp': { 'started': self.start_time, 'ended': self.ended_time } } headers = { 'Content-Type': 'application/json; charset=utf-8', 'token': self.token } request = urllib.request.Request(url, json.dumps(payload, cls=JSONEncoder).encode('utf-8'), headers, method='PUT') try: with urllib.request.urlopen(request) as response: code = response.code except HTTPError as error: code = error.code
def _woxikon_de_url_handler(target): ''' Query woxikon for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(unescape(decode_utf_8(response.read()))) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # other error except socket.timeout: # timeout error failed to be captured by URLError return 1 return web_content
def _jeck_ru_url_handler(target): ''' Query jiport for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(decode_utf_8(response.read())) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # any other error except socket.timeout: # if timeout error not captured by URLError return 1 return web_content
def get_sillok_element(url): try: html = urlopen(url) except HTTPError as e: pass try: bs_obj = BeautifulSoup(html.read(), "html.parser") sillok_id = url.split("/")[-1].split("_")[0] era_info = bs_obj.find("span",{"class": "tit_loc"}) title = bs_obj.find("h3", {"class": "search_tit ins_view_tit"}) source_info = bs_obj.find("ul", {"class": "ins_source"}) footnote_info = bs_obj.find("ul", {"class": "ins_footnote"}) kor_text = bs_obj.find("div", {"class": 'ins_view_in ins_left_in'}) main_text = ' '.join([text.get_text() + '|' for text in kor_text.findAll("p", {"class": "paragraph"})]) # delimiter '|' except: # find crash sillok urls print("Error url is : " + url) pass return (sillok_id, era_info.get_text(), title.get_text(), source_info.get_text(), footnote_info.get_text(), main_text)
def request(cls, uri, params={}, client=None, wrapper=FreesoundObject, method='GET',data=False): p = params if params else {} url = '%s?%s' % (uri, urlencode(p)) if params else uri d = urllib.urlencode(data) if data else None headers = {'Authorization':client.header} req = Request(url,d,headers) try: f = urlopen(req) except HTTPError as e: resp = e.read() if e.code >= 200 and e.code < 300: return resp else: return FreesoundException(e.code, json.loads(resp.decode("utf-8"))) resp = f.read() f.close() result = None try: result = json.loads(resp.decode("utf-8")) except: raise FreesoundException(0,"Couldn't parse response") if wrapper: return wrapper(result,client) return result
def kubectl_or_oc(server: str) -> str: """ Return "kubectl" or "oc", the command-line tool we should use. :param server: The URL of the cluster API server. """ if which("oc") is None: return "kubectl" # We've got oc, and possibly kubectl as well. We only want oc for OpenShift # servers, so check for an OpenShift API endpoint: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE try: with urlopen(server + "/version/openshift", context=ctx) as u: u.read() except HTTPError: return "kubectl" else: return "oc"
def __fetch_json(self, path, query_params): # add API KEY to params query_params["uid"] = Nomisweb.KEY query_string = Nomisweb.URL + path + str(urlencode(query_params)) #print(query_string) reply = {} try: response = request.urlopen(query_string, timeout=Nomisweb.Timeout) except (HTTPError, URLError) as error: print('ERROR: ', error, '\n', query_string) except timeout: print('ERROR: request timed out\n', query_string) else: reply = json.loads(response.read().decode("utf-8")) return reply # save metadata as JSON for future reference
def fetch_decode(url, encoding=None): """ Fetch url and decode. """ try: req = g.opener.open(url) except HTTPError as e: if e.getcode() == 503: time.sleep(.5) return fetch_decode(url, encoding) else: raise ct = req.headers['content-type'] if encoding: return req.read().decode(encoding) elif "charset=" in ct: dbg("charset: %s", ct) encoding = re.search(r"charset=([\w-]+)\s*(:?;|$)", ct).group(1) return req.read().decode(encoding) else: dbg("encoding unknown") return req.read()
def call_gdata(api, qs): """Make a request to the youtube gdata api.""" qs = dict(qs) qs['key'] = g.api_key url = g.urls['gdata'] + api + '?' + urlencode(qs) try: data = g.opener.open(url).read().decode('utf-8') except HTTPError as e: try: errdata = e.file.read().decode() error = json.loads(errdata)['error']['message'] errmsg = 'Youtube Error %d: %s' % (e.getcode(), error) except: errmsg = str(e) raise GdataError(errmsg) return json.loads(data)
def getLinks(articleUrl): try: html = urlopen("http://en.wikipedia.org"+articleUrl) except HTTPError: ServerLog.writeLog("HTTPError") return None except URLError: ServerLog.writeLog("URLError") print("Sleeping!") time.sleep(URLERROR_SLEEP_TIME) html = urlopen("http://en.wikipedia.org"+articleUrl) bsObj = BeautifulSoup(html, "lxml") return bsObj.find("div", {"id":"bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$")) # ??IP
def getHistoryIPs(pageUrl): pageUrl = pageUrl.replace("/wiki/", "") historyUrl = "http://en.wikipedia.org/w/index.php?title="+pageUrl+"&action=history" print("history url:", historyUrl) time.sleep(SLEEP_TIME) try: html = urlopen(historyUrl) except HTTPError: return None except URLError: print("Sleeping!") time.sleep(URLERROR_SLEEP_TIME) html = urlopen(historyUrl) bsObj = BeautifulSoup(html, "lxml") ipAddresses = bsObj.findAll("a", {"class":"mw-anonuserlink"}) addressList = set() for ipAddress in ipAddresses: print(pageUrl+": "+ipAddress.get_text()) addressList.add(ipAddress.get_text()) return addressList #????IP?? # ????IP?????
def getLinks(articleUrl): ''' ???????????? ''' try: html = urlopen("http://en.wikipedia.org" + articleUrl) except HTTPError: return None except URLError: print("Sleeping!") time.sleep(URLERROR_SLEEP_TIME) html = urlopen("http://en.wikipedia.org" + articleUrl) bsObj = BeautifulSoup(html, "lxml") return bsObj.find("div", {"id": "bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$")) # ??????
def get_validate_spark_version(version, repo): if "." in version: version = version.replace("v", "") if version not in VALID_SPARK_VERSIONS: print("Don't know about Spark version: {v}".format(v=version), file=stderr) sys.exit(1) return version else: github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version) request = Request(github_commit_url) request.get_method = lambda: 'HEAD' try: response = urlopen(request) except HTTPError as e: print("Couldn't validate Spark commit: {url}".format(url=github_commit_url), file=stderr) print("Received HTTP response code of {code}.".format(code=e.code), file=stderr) sys.exit(1) return version # Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/ # Last Updated: 2015-06-19 # For easy maintainability, please keep this manually-inputted dictionary sorted by key.
def _get(url): LOG.debug('GET {u!r}'.format(u=url)) # TODO proper error handling - or none try: response = urlopen(url) except HTTPError: raise except ContentTooShortError: raise except URLError: raise except Exception: raise LOG.debug('{} {}'.format(response.status, response.reason)) if response.status not in (200,): raise ValueError('{} {}'.format(response.status, response.reason)) return response
def _requests(url, kwargs): encoding = kwargs.get('encoding') method = kwargs.get('method', 'get').lower() meth = getattr(requests, str(method)) if method == 'get': url, data = _query(url, method, kwargs) kw = {} for k in allowed_args: if k in kwargs: kw[k] = kwargs[k] resp = meth(url=url, **kw) if not (200 <= resp.status_code < 300): raise HTTPError(resp.url, resp.status_code, resp.reason, resp.headers, None) if encoding: resp.encoding = encoding html = resp.text return html
def testPasswordProtectedSite(self): support.requires('network') with support.transient_internet('mueblesmoraleda.com'): url = 'http://mueblesmoraleda.com' robots_url = url + "/robots.txt" # First check the URL is usable for our purposes, since the # test site is a bit flaky. try: urlopen(robots_url) except HTTPError as e: if e.code not in {401, 403}: self.skipTest( "%r should return a 401 or 403 HTTP error, not %r" % (robots_url, e.code)) else: self.skipTest( "%r should return a 401 or 403 HTTP error, not succeed" % (robots_url)) parser = urllib.robotparser.RobotFileParser() parser.set_url(url) try: parser.read() except URLError: self.skipTest('%s is unavailable' % url) self.assertEqual(parser.can_fetch("*", robots_url), False)
def http_error_auth_reqed(self, authreq, host, req, headers): # host may be an authority (without userinfo) or a URL with an # authority # XXX could be multiple headers authreq = headers.get(authreq, None) if self.retried > 5: # retry sending the username:password 5 times before failing. raise HTTPError(req.get_full_url(), 401, "basic auth failed", headers, None) else: self.retried += 1 if authreq: mo = AbstractBasicAuthHandler.rx.search(authreq) if mo: scheme, quote, realm = mo.groups() if scheme.lower() == 'basic': response = self.retry_http_basic_auth(host, req, realm) if response and response.code != 401: self.retried = 0 return response
def obtain_geo_codes(self, place='New York'): """: :return: Returns tuple (longitude, latitude) for given place. Default value for place is New York """ data = {'address': place, 'language': 'en'} url = 'https://maps.googleapis.com/maps/api/geocode/json?' try: page = urlopen(url + urlencode(data)) except HTTPError as e: print(e.code) return None, None else: json_obj = json.loads(str(page.read(), 'utf-8')) # print(json_obj) return [(result['geometry']['location']['lng'], result['geometry']['location']['lat']) for result in json_obj['results']][0]
def __call__(self, env, start_response): try: if env["PATH_INFO"] == "/app": status, body, headers = self._serve_application() elif env["PATH_INFO"] == "/callback": status, body, headers = self._read_auth_token(env) else: status = "301 Moved" body = "" headers = {"Location": "/app"} except HTTPError as http_error: print("HTTPError occured:") print(http_error.read()) raise start_response(status, [(header, val) for header, val in list(headers.items())]) return [body]
def main(): config = get_config() try: urlopen(Request(index_url(), method='DELETE')) except HTTPError: pass sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((config['address'], config['port'])) per_second = list(range(0, 210, 10)) per_second[0] = 1 success_vals = [] for step in per_second: print('Doing {} loglines per sec'.format(step)) vals = run_step(sock, step, config['processor_spec']['to_dict'][0]) if step == 1: time.sleep(2) flush_url = '{}/_flush'.format(index_url()) resp = urlopen(Request(flush_url, urlencode({'wait_if_ongoing': 'true'}).encode('utf-8'))).read() success_vals.append(check_step(config['indexer_config']['host'], config['indexer_config']['port'], vals)) pyplot.plot(per_second, success_vals) pyplot.xlabel('Log lines per second') pyplot.ylabel('Successful') pyplot.savefig('lines_per_sec.png', bbox_inches='tight')
def url_request(self, url, file_path, proxy_url=None): try: urllib_request.urlretrieve(url, file_path) print(file_path) except HTTPError as e: print(e.code) if proxy_url is not None: print("Trying proxy URL") url = proxy_url await self.url_request(url, file_path) else: raise e except UnicodeEncodeError: # Special retry logic for IDN domains url = "http://" + url.replace("http://", "").encode("idna").decode("utf-8") await self.url_request(url, file_path)
def get(url, params={}): """Invoke an HTTP GET request on a url Args: url (string): URL endpoint to request params (dict): Dictionary of url parameters Returns: dict: JSON response as a dictionary """ request_url = url if len(params) > 0: request_url = "{}?{}".format(url, urlencode(params)) try: req = Request(request_url, headers={'User-Agent': 'Mozilla/5.0'}) response = json.loads(urlopen(req).read().decode("utf-8")) return response except HTTPError as err: raise MtgException(err.read())
def fetch_decode(url, encoding=None): """ Fetch url and decode. """ try: req = g.opener.open(url) except HTTPError as e: if e.getcode() == 503: time.sleep(.5) return fetch_decode(url, encoding) else: raise e ct = req.headers['content-type'] if encoding: return req.read().decode(encoding) elif "charset=" in ct: dbg("charset: %s", ct) encoding = re.search(r"charset=([\w-]+)\s*(:?;|$)", ct).group(1) return req.read().decode(encoding) else: dbg("encoding unknown") return req.read()
def _make_request(self, opener, request): """Make the API call and return the response. This is separated into it's own function, so we can mock it easily for testing. :param opener: :type opener: :param request: url payload to request :type request: urllib.Request object :return: urllib response """ try: return opener.open(request) except HTTPError as err: exc = handle_error(err) exc.__cause__ = None raise exc
def extract_urls(args): page = 1 is_articles = True urls = [] while is_articles: try: html = request.urlopen("{}/archive?page={}".format(args.url, page)) except error.HTTPError as e: # HTTP???????????????404, 403, 401??????? print(e.reason) break except error.URLError as e: # ??????????url??????????? print(e.reason) break soup = BeautifulSoup(html, "html.parser") articles = soup.find_all("a", class_="entry-title-link") for article in articles: urls.append(article.get("href")) if len(articles) == 0: # article????????? is_articles = False page += 1 return urls
def prepare_gpcontrol(self): try: response_raw = urllib.request.urlopen('http://10.5.5.9/gp/gpControl', timeout=5).read().decode('utf8') jsondata=json.loads(response_raw) response=jsondata["info"]["firmware_version"] if "HD5.03" in response or "HX" in response: #Only session cameras. connectedStatus=False while connectedStatus == False: req=urllib.request.urlopen("http://10.5.5.9/gp/gpControl/status") data = req.read() encoding = req.info().get_content_charset('utf-8') json_data = json.loads(data.decode(encoding)) #print(json_data["status"]["31"]) if json_data["status"]["31"] >= 1: connectedStatus=True except (HTTPError, URLError) as error: self.prepare_gpcontrol() except timeout: self.prepare_gpcontrol() print("Camera successfully connected!")
def getStatus(self, param, value=""): if self.whichCam() == "gpcontrol": try: req=urllib.request.urlopen("http://10.5.5.9/gp/gpControl/status", timeout=5) data = req.read() encoding = req.info().get_content_charset('utf-8') json_data = json.loads(data.decode(encoding)) return json_data[param][value] except (HTTPError, URLError) as error: return "" print("Error code:" + str(error.code) + "\nMake sure the connection to the WiFi camera is still active.") except timeout: return "" print("HTTP Timeout\nMake sure the connection to the WiFi camera is still active.") else: response = urllib.request.urlopen("http://10.5.5.9/camera/sx?t=" + self.getPassword(), timeout=5).read() response_hex = str(bytes.decode(base64.b16encode(response), 'utf-8')) return str(response_hex[param[0]:param[1]])
def getStatusRaw(self): if self.whichCam() == "gpcontrol": try: return urllib.request.urlopen("http://10.5.5.9/gp/gpControl/status", timeout=5).read().decode('utf-8') except (HTTPError, URLError) as error: return "" print("Error code:" + str(error.code) + "\nMake sure the connection to the WiFi camera is still active.") except timeout: return "" print("HTTP Timeout\nMake sure the connection to the WiFi camera is still active.") elif self.whichCam() == "auth": try: return urllib.request.urlopen("http://10.5.5.9/camera/sx?t=" + self.getPassword(), timeout=5).read() except (HTTPError, URLError) as error: return "" print("Error code:" + str(error.code) + "\nMake sure the connection to the WiFi camera is still active.") except timeout: return "" print("HTTP Timeout\nMake sure the connection to the WiFi camera is still active.") else: print("Error, camera not defined.")
def getMediaInfo(self, option): folder = "" file = "" size = "" try: raw_data = urllib.request.urlopen('http://10.5.5.9:8080/gp/gpMediaList').read().decode('utf-8') json_parse = json.loads(raw_data) for i in json_parse['media']: folder=i['d'] for i in json_parse['media']: for i2 in i['fs']: file = i2['n'] size = i2['s'] if option == "folder": return folder elif option == "file": return file elif option == "size": return self.parse_value("media_size", int(size)) except (HTTPError, URLError) as error: return "" print("Error code:" + str(error.code) + "\nMake sure the connection to the WiFi camera is still active.") except timeout: return "" print("HTTP Timeout\nMake sure the connection to the WiFi camera is still active.")
def send(self, data_to_send): """ Override the default resend mechanism in SenderBase. Stop resend when it fails.""" request_payload = json.dumps([a.write() for a in data_to_send]) request = HTTPClient.Request(self._service_endpoint_uri, bytearray(request_payload, 'utf-8'), {'Accept': 'application/json', 'Content-Type': 'application/json; charset=utf-8'}) try: response = HTTPClient.urlopen(request) status_code = response.getcode() if 200 <= status_code < 300: return except HTTPError as e: if e.getcode() == 400: return except Exception: # pylint: disable=broad-except if self.retry < 3: self.retry = self.retry + 1 else: return # Add our unsent data back on to the queue for data in data_to_send: self._queue.put(data)
def test_exception_wrapper_wraps_http_error(self): fake_request = flexmock( prepare=flexmock(), path=flexmock(), query=flexmock(), body=flexmock() ) fake_exception_fp = StringIO('{"error": "Bad Request"}') fake_exception_fp.seek(0) (flexmock(callfire_base) .should_receive('urlopen') .and_raise( HTTPError('url', 400, 'Bad Request', {}, fake_exception_fp))) with self.assertRaises(callfire_base.CallFireError) as cm: self.base._open_request(fake_request, 'GET') e = cm.exception self.assertIsInstance(e, callfire_base.CallFireError) self.assertIsInstance(e.wrapped_exc, HTTPError) self.assertEqual( str(e), 'HTTP Error 400: Bad Request: {"error": "Bad Request"}')
def icourses_download(url, merge=False, output_dir='.', **kwargs): icourses_parser = ICousesExactor(url=url) real_url = icourses_parser.icourses_cn_url_parser(**kwargs) title = icourses_parser.title if real_url is not None: for tries in range(0, 5): try: _, type_, size = url_info(real_url, faker=True) break except error.HTTPError: logging.warning('Failed to fetch the video file! Retrying...') sleep(random.Random().randint(0, 5)) # Prevent from blockage real_url = icourses_parser.icourses_cn_url_parser() title = icourses_parser.title print_info(site_info, title, type_, size) if not kwargs['info_only']: download_urls_chunked([real_url], title, 'flv', total_size=size, output_dir=output_dir, refer=url, merge=merge, faker=True, ignore_range=True, chunk_size=15000000, dyn_callback=icourses_parser.icourses_cn_url_parser) # Why not using VideoExtractor: This site needs specical download method
def http_error_auth_reqed(self, auth_header, host, req, headers): authreq = headers.get(auth_header, None) if self.retried > 5: # Don't fail endlessly - if we failed once, we'll probably # fail a second time. Hm. Unless the Password Manager is # prompting for the information. Crap. This isn't great # but it's better than the current 'repeat until recursion # depth exceeded' approach <wink> raise HTTPError(req.full_url, 401, "digest auth failed", headers, None) else: self.retried += 1 if authreq: scheme = authreq.split()[0] if scheme.lower() == 'digest': return self.retry_http_digest_auth(req, authreq) elif scheme.lower() != 'basic': raise ValueError("AbstractDigestAuthHandler does not support" " the following scheme: '%s'" % scheme)