我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.error()。
def dorequest(url, data = "", method = 'GET'): try: if method == 'GET': response = urllib.request.urlopen(url, timeout=10).read() else: # use PUT/DELETE/POST, data should be encoded in ascii/bytes request = urllib.request.Request(url, data = data.encode('ascii'), method = method) response = urllib.request.urlopen(request, timeout=10).read() # etcd may return json result with response http error code # http error code will raise exception in urlopen # catch the HTTPError and get the json result except urllib.error.HTTPError as e: # e.fp must be read() in this except block. # the e will be deleted and e.fp will be closed after this block response = e.fp.read() # response is encoded in bytes. # recoded in utf-8 and loaded in json result = json.loads(str(response, encoding='utf-8')) return result # client to use etcd # not all APIs are implemented below. just implement what we want
def main(): parser = argparse.ArgumentParser() parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM, type=str, help='Search term (default: %(default)s)') parser.add_argument('-l', '--location', dest='location', default=DEFAULT_LOCATION, type=str, help='Search location (default: %(default)s)') input_values = parser.parse_args() try: query_api(input_values.term, input_values.location) except HTTPError as error: sys.exit( 'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format( error.code, error.url, error.read(), ) )
def onFinished(self ): self.hasFinished = True if self.request.error()==self.request.NoError: self.requestSucceeded.emit( self ) else: try: errorDescr = NetworkErrorDescrs[ self.request.error() ] except: #Unknown error errorDescr = None if errorDescr: QtGui.QMessageBox.warning( None, "Networkrequest Failed", "The request to \"%s\" failed with: %s" % (self.url, errorDescr) ) self.requestFailed.emit( self, self.request.error() ) self.finished.emit( self ) self.logger.debug("Request finished: %s", str(self) ) self.logger.debug("Remaining requests: %s", len(NetworkService.currentRequests) )
def get_pypi_src_download(package): url = 'https://pypi.python.org/pypi/%s/json'%(package,) fp = urllib.urlopen(url) try: try: data = fp.read() finally: fp.close() except urllib.error: raise RuntimeError("Cannot determine download link for %s"%(package,)) pkgdata = json.loads(data.decode('utf-8')) if 'urls' not in pkgdata: raise RuntimeError("Cannot determine download link for %s"%(package,)) for info in pkgdata['urls']: if info['packagetype'] == 'sdist' and info['url'].endswith('tar.gz'): return (info.get('md5_digest'), info['url']) raise RuntimeError("Cannot determine downlink link for %s"%(package,))
def get_img_and_text(self, plugin_config, cli_args): imgs = [] enable_safe_search = True if cli_args.safe_search else False self.logger.debug('setting image safe search to {}'.format(enable_safe_search)) if cli_args.keyword: self.logger.info('using custom keyword {}'.format(cli_args.keyword)) imgs = self._fetch_img_urls(cli_args.keyword, safe_search=enable_safe_search) else: imgs = self._get_images_for_random_keyword(safe_search=enable_safe_search) if not imgs: self.logger.error('no images found for given keyword') exit(1) if cli_args.keyword: img = random.choice(imgs)[2] else: img = imgs[0][2] # always choose first img because search key is random anyway self.logger.info('choosing image {}'.format(img)) return { 'img': self._read_from_url(img), 'text': '' }
def check_version(): try: response = request.urlopen("http://cli.puresec.io/verify/version/{}".format(puresec_cli.__version__)) except urllib.error.URLError: return try: response = json.loads(response.read().decode()) except ValueError: return if not isinstance(response, dict): return try: is_uptodate, last_version = response['is_uptodate'], response['last_version'] except KeyError: return if not is_uptodate: eprint("warn: you are using an outdated version of PureSec CLI (installed={}, latest={})".format(puresec_cli.__version__, last_version))
def TTSBaidu(self, tid, txt, lan, spd): ''' get the BAIDU.COM's TTS url filename: save the txt's Speech in the file with filetype .wav lan: language, 'en' for English or 'zh' for Chinese txt: the TTS text spd: the speedding of read ''' socket.setdefaulttimeout(34.0) try: #print('processing... ',tid, txt) ttsUrl = genTTSUrl(lan ,txt, spd) c = getpage(ttsUrl) #?master????? #print('processing...finished',tid) self.results[tid]=c except urllib.error.URLError as e: print("error:URLError ",e," we will try again...tid:",tid) self.TTSBaidu(tid, txt, lan, spd) except socket.timeout: print("error: TTSBaidu time out!, we will try again...tid:",tid ) self.TTSBaidu(tid, txt, lan, spd) finally: pass
def save_images(term, count): api_key = get_api_key() images = search_images(term, count, api_key) filenames = [] if not os.path.exists(SAVE_DIR): os.makedirs(SAVE_DIR) for i, img in enumerate(images): if img['encodingFormat'] == 'unknown': continue name = "{path}/{filename}.{ext}".format( path=SAVE_DIR, filename="_".join(term.split()) + str(i), ext=img['encodingFormat']) try: download_image(img['thumbnailUrl'], name) filenames.append(name) except urllib.error.HTTPError: pass return filenames
def log(self, message='', err=None, level='info'): """ Log a message """ if not level.lower() in [ 'critical', 'debug', 'error', 'fatal', 'info', 'warning' ]: level = 'info' if err: level = 'error' message += ' Threw exception:\n\t{}'.format(err) try: func = getattr(self.logger, level.lower()) func(message) except Exception as log_err: self.logger.critical( "Could not write to log. Threw exception:\n\t{}".format(log_err))
def __mkfile(self): """Create new file""" name = current = None curDir = newFile = None if 'name' in self._request and 'current' in self._request: name = self._request['name'] current = self._request['current'] curDir = self.__findDir(current, None) newFile = os.path.join(curDir, name) if not curDir or not name: self._response['error'] = 'Invalid parameters' elif not self.__isAllowed(curDir, 'write'): self._response['error'] = 'Access denied' elif not self.__checkName(name): self._response['error'] = 'Invalid name' elif os.path.exists(newFile): self._response['error'] = 'File or folder with the same name already exists' else: try: open(newFile, 'w').close() self._response['select'] = [self.__hash(newFile)] self.__content(curDir, False) except: self._response['error'] = 'Unable to create file'
def __rm(self): """Delete files and directories""" current = rmList = None curDir = rmFile = None if 'current' in self._request and 'targets[]' in self._request: current = self._request['current'] rmList = self._request['targets[]'] curDir = self.__findDir(current, None) if not rmList or not curDir: self._response['error'] = 'Invalid parameters' return False if not isinstance(rmList, list): rmList = [rmList] for rm in rmList: rmFile = self.__find(rm, curDir) if not rmFile: continue self.__remove(rmFile) # TODO if errorData not empty return error self.__content(curDir, True)
def __duplicate(self): """Create copy of files/directories""" if 'current' in self._request and 'target' in self._request: curDir = self.__findDir(self._request['current'], None) target = self.__find(self._request['target'], curDir) if not curDir or not target: self._response['error'] = 'Invalid parameters' return if not self.__isAllowed(target, 'read') or not self.__isAllowed(curDir, 'write'): self._response['error'] = 'Access denied' newName = self.__uniqueName(target) if not self.__copy(target, newName): self._response['error'] = 'Unable to create file copy' return self.__content(curDir, True) return
def __edit(self): """Save content in file""" error = '' if 'current' in self._request and 'target' in self._request and 'content' in self._request: curDir = self.__findDir(self._request['current'], None) curFile = self.__find(self._request['target'], curDir) error = curFile if curFile and curDir: if self.__isAllowed(curFile, 'write'): try: f = open(curFile, 'w+') f.write(self._request['content']) f.close() self._response['target'] = self.__info(curFile) except: self._response['error'] = 'Unable to write to file' else: self._response['error'] = 'Access denied' return self._response['error'] = 'Invalid parameters' return
def removeNoise (s): import re pattern_list = ["\[\[(.*?)\]\]", "{{(.*?)}}", "\[(.*?)\]"] clean = s for pattern in pattern_list: regex = re.compile(pattern) clean = re.sub (regex, "", clean) return clean #==============================================================================# #==============================================================================# # Get the html for a comic from the explainxkcd website # Extract the transcript from the text # Check if the transcript is mark as incomplete # if yes, mark it as so locally (for later updates) # Returns a dictionary (result) # If an error occured the status is not 0 and the error is passed in the error # field of the returned dictionary
def get_xkcd(number = 0): if number is 0: url ='https://xkcd.com/info.0.json' else: url = 'https://xkcd.com/{}/info.0.json'.format (number) response = {'status': 0, 'error': '', 'comic': ""} try: online_comic = urlopen(url).read () response['comic'] = json.loads (online_comic.decode('utf-8')) except urllib.error.HTTPError: response['status'] = -1 except IOError: response['status'] = -2 except: response['status'] = -3 return response #==============================================================================##==============================================================================#
def download_page(url, referer, maxretries, timeout, pause): tries = 0 htmlpage = None while tries < maxretries and htmlpage is None: try: code = 404 req = request.Request(url) req.add_header('Referer', referer) req.add_header('User-agent', 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.30 (KHTML, like Gecko) Ubuntu/11.04 Chromium/12.0.742.91 Chrome/12.0.742.91 Safari/534.30') with closing(request.urlopen(req, timeout=timeout)) as f: code = f.getcode() htmlpage = f.read() sleep(pause) except (urlerror.URLError, socket.timeout, socket.error): tries += 1 if htmlpage: return htmlpage.decode('utf-8'), code else: return None, code
def test_short_content_raises_ContentTooShortError(self): self.fakehttp(b'''HTTP/1.1 200 OK Date: Wed, 02 Jan 2008 03:03:54 GMT Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e Connection: close Content-Length: 100 Content-Type: text/html; charset=iso-8859-1 FF ''') def _reporthook(par1, par2, par3): pass with self.assertRaises(urllib.error.ContentTooShortError): try: urllib.request.urlretrieve('http://example.com/', reporthook=_reporthook) finally: self.unfakehttp()
def handle(self, fn_name, action, *args, **kwds): self.parent.calls.append((self, fn_name, args, kwds)) if action is None: return None elif action == "return self": return self elif action == "return response": res = MockResponse(200, "OK", {}, "") return res elif action == "return request": return Request("http://blah/") elif action.startswith("error"): code = action[action.rfind(" ")+1:] try: code = int(code) except ValueError: pass res = MockResponse(200, "OK", {}, "") return self.parent.error("http", args[0], res, code, "", {}) elif action == "raise": raise urllib.error.URLError("blah") assert False
def test_badly_named_methods(self): # test work-around for three methods that accidentally follow the # naming conventions for handler methods # (*_open() / *_request() / *_response()) # These used to call the accidentally-named methods, causing a # TypeError in real code; here, returning self from these mock # methods would either cause no exception, or AttributeError. from urllib.error import URLError o = OpenerDirector() meth_spec = [ [("do_open", "return self"), ("proxy_open", "return self")], [("redirect_request", "return self")], ] handlers = add_ordered_mock_handlers(o, meth_spec) o.add_handler(urllib.request.UnknownHandler()) for scheme in "do", "proxy", "redirect": self.assertRaises(URLError, o.open, scheme+"://example.com/")
def test_raise(self): # raising URLError stops processing of request o = OpenerDirector() meth_spec = [ [("http_open", "raise")], [("http_open", "return self")], ] handlers = add_ordered_mock_handlers(o, meth_spec) req = Request("http://example.com/") self.assertRaises(urllib.error.URLError, o.open, req) self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) ## def test_error(self): ## # XXX this doesn't actually seem to be used in standard library, ## # but should really be tested anyway...
def test_errors(self): h = urllib.request.HTTPErrorProcessor() o = h.parent = MockOpener() url = "http://example.com/" req = Request(url) # all 2xx are passed through r = MockResponse(200, "OK", {}, "", url) newr = h.http_response(req, r) self.assertIs(r, newr) self.assertFalse(hasattr(o, "proto")) # o.error not called r = MockResponse(202, "Accepted", {}, "", url) newr = h.http_response(req, r) self.assertIs(r, newr) self.assertFalse(hasattr(o, "proto")) # o.error not called r = MockResponse(206, "Partial content", {}, "", url) newr = h.http_response(req, r) self.assertIs(r, newr) self.assertFalse(hasattr(o, "proto")) # o.error not called # anything else calls o.error (and MockOpener returns None, here) r = MockResponse(502, "Bad gateway", {}, "", url) self.assertIsNone(h.http_response(req, r)) self.assertEqual(o.proto, "http") # o.error called self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
def test_invalid_redirect(self): from_url = "http://example.com/a.html" valid_schemes = ['http','https','ftp'] invalid_schemes = ['file','imap','ldap'] schemeless_url = "example.com/b.html" h = urllib.request.HTTPRedirectHandler() o = h.parent = MockOpener() req = Request(from_url) req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT for scheme in invalid_schemes: invalid_url = scheme + '://' + schemeless_url self.assertRaises(urllib.error.HTTPError, h.http_error_302, req, MockFile(), 302, "Security Loophole", MockHeaders({"location": invalid_url})) for scheme in valid_schemes: valid_url = scheme + '://' + schemeless_url h.http_error_302(req, MockFile(), 302, "That's fine", MockHeaders({"location": valid_url})) self.assertEqual(o.req.get_full_url(), valid_url)
def batch_sign(request): tasks = request.sign_session['tasks'] if not tasks: return redirect('ecs.dashboard.views.view_dashboard') task = _get_tasks(request.user).get(pk=tasks[0]) data = request.sign_session['data_func'](request, task) data['sign_session_id'] = request.sign_session.id sign_data = _store_sign_data(data) if request.user.email.startswith('signing_fail'): return sign_error(request, pdf_id=sign_data.id, error='forced failure', cause='requested force_fail, so we failed') return render(request, 'signature/batch.html', { 'sign_url': get_pdfas_url(request, sign_data), 'pdf_id': sign_data.id, })
def get_pdfas_url(request, sign_data): values = { 'connector': 'onlinebku', 'invoke-app-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_receive', kwargs={'pdf_id': sign_data.id})), 'invoke-app-url-target': '_top', 'invoke-app-error-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_error', kwargs={'pdf_id': sign_data.id})), 'locale': 'DE', 'num-bytes': str(len(sign_data['pdf_data'])), 'sig_type': 'SIGNATURBLOCK_DE', 'pdf-url': request.build_absolute_uri(reverse('ecs.signature.views.sign_send', kwargs={'pdf_id': sign_data.id})), 'verify-level': 'intOnly', # Dies bedeutet, dass eine Signaturprüfung durchgeführt wird, allerdings ohne Zertifikatsprüfung. 'filename': sign_data['document_filename'], #'preview': 'false', #'mode': 'binary', #'inline': 'false', #'pdf-id': sign_data.id, } data = urllib.parse.urlencode({k: v.encode('utf-8') for k, v in values.items()}) return '{0}Sign?{1}'.format(settings.PDFAS_SERVICE, data)
def stream_live_check(stream): url = "https://api.twitch.tv/kraken/streams/{}".format(stream.lower()) try: contents = json.loads(urllib.request.urlopen(url).read().decode("utf-8")) if contents["stream"] == None: status = "offline" bot_message = "{} is offline.".format(stream) else: #print(contents) name = contents["stream"]["channel"]["name"] title = contents["stream"]["channel"]["status"] game = contents["stream"]["channel"]["game"] viewers = contents["stream"]["viewers"] bot_message = "{0} is online.\n{0}'s title is: {1} \n{0} is playing {2} \nThere are {3} viewers \n".format(name,title,game,viewers) except urllib.error.URLError as e: if e.reason == "Not found" or e.reason == "Unprocessable Entity": bot_message = "That stream doesn't exist." else: bot_message = "There was an error proccessing your request." return bot_message
def test_badly_named_methods(self): # test work-around for three methods that accidentally follow the # naming conventions for handler methods # (*_open() / *_request() / *_response()) # These used to call the accidentally-named methods, causing a # TypeError in real code; here, returning self from these mock # methods would either cause no exception, or AttributeError. from urllib.error import URLError o = OpenerDirector() meth_spec = [ [("do_open", "return self"), ("proxy_open", "return self")], [("redirect_request", "return self")], ] add_ordered_mock_handlers(o, meth_spec) o.add_handler(urllib.request.UnknownHandler()) for scheme in "do", "proxy", "redirect": self.assertRaises(URLError, o.open, scheme+"://example.com/")
def __data_parser__(self, data): try: if data['mods']['itemlist']['data']['auctions']: search_results = data['mods']['itemlist']['data']['auctions'] return [{ 'intro': result["raw_title"], 'price': float(result["view_price"]), 'delivery': colorful_text(result["view_fee"], Fore.RED) if float(result["view_fee"]) > 0 else result["view_fee"], 'sales': int(result["view_sales"].split('?')[0]), 'belong': colorful_text("??", Fore.CYAN) if result.get('shopcard', {}).get('isTmall', False) else "??", 'url': result["detail_url"] } for result in search_results] error('Ops, get no goods..') return [] except KeyError: error('Ops, some key error happened..') return []
def get_base64_saver(loading, url): def callback(content): if isinstance(content, urllib.error.HTTPError): if content.getcode() == 404: loading[url] = 404 return elif isinstance(content, urllib.error.URLError): if (content.reason.errno == 11001 and content.reason.strerror == 'getaddrinfo failed'): loading[url] = 404 return return sublime.error_message('An unexpected error has occured: ' + str(content)) loading[url] = to_base64(content=content) return callback
def getTaskStatus(taskId): """Retrieve status of one or more long-running tasks. Args: taskId: ID of the task or a list of multiple IDs. Returns: List containing one object for each queried task, in the same order as the input array, each object containing the following values: id (string) ID of the task. state (string) State of the task, one of READY, RUNNING, COMPLETED, FAILED, CANCELLED; or UNKNOWN if the task with the specified ID doesn't exist. error_message (string) For a FAILED task, a description of the error. """ if isinstance(taskId, six.string_types): taskId = [taskId] args = {'q': ','.join(taskId)} return send_('/taskstatus', args, 'GET')
def __init__(self, http_response, response_body=None): """Sets the HTTP information in the error. Args: http_response: The response from the server, contains error information. response_body: string (optional) specified if the response has already been read from the http_response object. """ body = response_body or http_response.read() self.status = http_response.status self.reason = http_response.reason self.body = body self.headers = atom.http_core.get_headers(http_response) self.error_msg = 'Invalid response %s.' % self.status try: json_from_body = simplejson.loads(body) if isinstance(json_from_body, dict): self.error_msg = json_from_body.get('error', self.error_msg) except (ValueError, JSONDecodeError): pass
def _download_file(self, uri, file_path, **kwargs): """Downloads a file to disk from the specified URI. Note: to download a file in memory, use the GetContent() method. Args: uri: str The full URL to download the file from. file_path: str The full path to save the file to. kwargs: Other parameters to pass to self.get_content(). Raises: gdata.client.RequestError: on error response from server. """ f = open(file_path, 'wb') try: f.write(self._get_content(uri, **kwargs)) except gdata.client.RequestError as e: f.close() raise e f.flush() f.close()
def UpgradeToSessionToken(self, token=None): """Upgrades a single use AuthSub token to a session token. Args: token: A gdata.auth.AuthSubToken or gdata.auth.SecureAuthSubToken (optional) which is good for a single use but can be upgraded to a session token. If no token is passed in, the token is found by looking in the token_store by looking for a token for the current scope. Raises: NonAuthSubToken if the user's auth token is not an AuthSub token TokenUpgradeFailed if the server responded to the request with an error. """ if token is None: scopes = lookup_scopes(self.service) if scopes: token = self.token_store.find_token(scopes[0]) else: token = self.token_store.find_token(atom.token_store.SCOPE_ALL) if not isinstance(token, gdata.auth.AuthSubToken): raise NonAuthSubToken self.SetAuthSubToken(self.upgrade_to_session_token(token))
def change_cwd(path, quiet=False): """Return a context manager that changes the current working directory. Arguments: path: the directory to use as the temporary current working directory. quiet: if False (the default), the context manager raises an exception on error. Otherwise, it issues only a warning and keeps the current working directory the same. """ saved_dir = os.getcwd() try: os.chdir(path) except OSError: if not quiet: raise warnings.warn('tests may fail, unable to change CWD to: ' + path, RuntimeWarning, stacklevel=3) try: yield os.getcwd() finally: os.chdir(saved_dir)
def temp_cwd(name='tempcwd', quiet=False): """ Context manager that temporarily creates and changes the CWD. The function temporarily changes the current working directory after creating a temporary directory in the current directory with name *name*. If *name* is None, the temporary directory is created using tempfile.mkdtemp. If *quiet* is False (default) and it is not possible to create or change the CWD, an error is raised. If *quiet* is True, only a warning is raised and the original CWD is used. """ with temp_dir(path=name, quiet=quiet) as temp_path: with change_cwd(temp_path, quiet=quiet) as cwd_dir: yield cwd_dir
def _download(self): try: try: import urllib.request from urllib.error import URLError, HTTPError with urllib.request.urlopen(self.url) as response, \ open(self.outputfile_origin, 'wb') as outfile: shutil.copyfileobj(response, outfile) except (AttributeError, ImportError): import urllib urllib.urlretrieve(self.url, self.outputfile_origin) except (URLError, HTTPError, IOError, Exception) as e: logger.debug("Unable to retrieve %s for %s", self.url, e)
def _buy(self, amount, price): """Create a buy limit order""" params = {"amount": amount, "price": price} response = self._send_request(self.buy_url, params) if "error" in response: raise TradeException(response["error"])
def _sell(self, amount, price): """Create a sell limit order""" params = {"amount": amount, "price": price} response = self._send_request(self.sell_url, params) if "error" in response: raise TradeException(response["error"])
def __fetch_epg(channel: Channel, epg_url: EPGURL): try: html = utils.get_response(epg_url.url, epg_url.data) start_end_data = channel.epg_parser.parse_schedule_page(html) epg_utils.normalize_times(start_end_data, channel.epg_data.get_normalization()) return get_response(start_end_data, channel.channel_id) except urllib.error.URLError: return ''
def kegg_rest_request(query): url = 'http://rest.kegg.jp/%s' % (query) print(url) try: data = urllib.request.urlopen(url).read() except urllib.error.HTTPError as e: print("HTTP error: %d" % e.code) except urllib.error.URLError as e: print("Network error: %s" % e.reason.args[1]) return data
def check_errors(self): errors = False for c in self.constraints: if not c.required_version: errors = True reasons = c.why() if len(reasons) == 1: Logs.error('%s but no matching package could be found in this repository' % reasons[0]) else: Logs.error('Conflicts on package %r:' % c.pkgname) for r in reasons: Logs.error(' %s' % r) if errors: self.fatal('The package requirements cannot be satisfied!')
def load_constraints(self, pkgname, pkgver, requires=REQUIRES): try: return self.cache_constraints[(pkgname, pkgver)] except KeyError: #Logs.error("no key %r" % (pkgname, pkgver)) text = Utils.readf(os.path.join(get_distnet_cache(), pkgname, pkgver, requires)) ret = parse_constraints(text) self.cache_constraints[(pkgname, pkgver)] = ret return ret