我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用urllib2.URLError()。
def get(self, url, proxy=None): if proxy: proxy = urllib2.ProxyHandler({'http': proxy}) opener = urllib2.build_opener(proxy) urllib2.install_opener(opener) try: response = urllib2.urlopen(url) except HTTPError, e: resp = e.read() self.status_code = e.code except URLError, e: resp = e.read() self.status_code = e.code else: self.status_code = response.code resp = response.read() return resp
def run(self): data = self.getData() value = { data: { "type": self.data_type } } json_data = json.dumps(value) post_data = json_data.encode('utf-8') headers = {'Content-Type': 'application/json'} try: request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers) response = urllib2.urlopen(request) report = json.loads(response.read()) self.report(report) except urllib2.HTTPError: self.error("Hippocampe: " + str(sys.exc_info()[1])) except urllib2.URLError: self.error("Hippocampe: service is not available") except Exception as e: self.unexpectedError(e)
def downloadFilesSave(links, fileFormat): # main function if (links == 'EMPTY'): # if links list is empty return ' NO LINKS FOUND !' for link in links: name = random.randint(0, 10000001) if (name in os.listdir(os.getcwd())): # random name to files name = random.randint(0, 10000001) if (format not in ['zip', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']): try: saveFile=open(str(name)+'.' + fileFormat, 'w') saveFile.write(urllib2.urlopen(link).read()) saveFile.close() except urllib2.URLError: pass else: try: saveFile=open(str(name)+'.' + fileFormat, 'wb') saveFile.write(urllib2.urlopen(link).read()) saveFile.close() except urllib2.URLError: pass return ' {} DOWNLOADS SUCCESSFULL YET !'.format(len(os.listdir(os.getcwd())))
def send_result(email, result, title, urn): """ Args: email (str): address to send the results result (obj): results to send title (str): urn (str): uniform resource name Returns: str: response from endpoint """ url = 'https://mongoaud.it/results' headers = {'Content-type': 'application/json', 'Accept': 'application/json'} values = {'email': email, 'result': result, 'title': title, 'urn': urn, 'date': get_date()} try: req = urllib2.Request(url, json.dumps(values), headers) response = urllib2.urlopen(req) return response.read() except (urllib2.HTTPError, urllib2.URLError) as exc: return "Sadly enough, we are having technical difficulties at the moment, " \ "please try again later.\n\n%s" % str(exc)
def check_version(version): # if application is binary then check for latest version if getattr(sys, 'frozen', False): try: url = "https://api.github.com/repos/stampery/mongoaudit/releases/latest" req = urllib2.urlopen(url) releases = json.loads(req.read()) latest = releases["tag_name"] if version < latest: print("mongoaudit version " + version) print("There's a new version " + latest) _upgrade(releases) except (urllib2.HTTPError, urllib2.URLError): print("Couldn't check for upgrades") except os.error: print("Couldn't write mongoaudit binary")
def TestSite(url): protocheck(url) print "Trying: " + url try: urllib2.urlopen(url, timeout=3) except urllib2.HTTPError, e: if e.code == 405: print url + " found!" print "Now the brute force will begin! >:)" if e.code == 404: printout(str(e), YELLOW) print " - XMLRPC has been moved, removed, or blocked" sys.exit() except urllib2.URLError, g: printout("Could not identify XMLRPC. Please verify the domain.\n", YELLOW) sys.exit() except socket.timeout as e: print type(e) printout("The socket timed out, try it again.", YELLOW) sys.exit()
def compute_dependencies(self, filename=REQUIRES): text = Utils.readf(filename) data = safe_urlencode([('text', text)]) if '--offline' in sys.argv: self.constraints = self.local_resolve(text) else: req = Request(get_resolve_url(), data) try: response = urlopen(req, timeout=TIMEOUT) except URLError as e: Logs.warn('The package server is down! %r' % e) self.constraints = self.local_resolve(text) else: ret = response.read() try: ret = ret.decode('utf-8') except Exception: pass self.trace(ret) self.constraints = parse_constraints(ret) self.check_errors()
def search(self, url, offset=1, maxoffset=0, title=""): current_offset = 0 data = "" self.p.reset(title=title) while current_offset <= maxoffset: self.p.rotate() temp_url = re.sub(r'\[\[OFFSET\]\]', str(current_offset), url) try: headers = { 'User-Agent' : self.user_agent } req = urllib2.Request(temp_url, None, headers) data += urllib2.urlopen(req).read() except urllib2.URLError as e: self.display.error("Could not access [%s]" % (title)) return data except Exception as e: print e current_offset += offset self.p.done() return data
def getURLContents(self, url, data=None): "Returns the contents of the given URL as an Unicode string" s = "" success = False req = Request(url, data, {'User-agent': self.useragent}) try: f = urlopen(req) s = f.read() f.close() success = True except HTTPError, e: print 'Server error: ', e.code if (self.verbose and BaseHTTPRequestHandler.responses.has_key(e.code)): title, msg = BaseHTTPRequestHandler.responses[e.code] print title + ": " + msg except URLError, e: print 'Connection error: ', e.reason dammit = UnicodeDammit(s) return (success, dammit.unicode)
def getCookie(self): """ This method is the first to be called when initializing a Google dorking object through this library. It is used to retrieve the Google session cookie needed to perform the further search """ try: conn = self.opener.open("http://www.google.com/ncr") headers = conn.info() except urllib2.HTTPError, e: headers = e.info() except urllib2.URLError, e: errMsg = "unable to connect to Google" raise sqlmapConnectionException, errMsg
def _api_call(url, opener): """ Makes a REST call against the Couchbase API. Args: url (str): The URL to get, including endpoint Returns: list: The JSON response """ try: urllib2.install_opener(opener) resp = urllib2.urlopen(url, timeout=http_timeout) except (urllib2.HTTPError, urllib2.URLError) as e: collectd.error("Error making API call (%s) %s" % (e, url)) return None try: return json.load(resp) except ValueError, e: collectd.error("Error parsing JSON for API call (%s) %s" % (e, url)) return None
def _woxikon_de_url_handler(target): ''' Query woxikon for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(unescape(decode_utf_8(response.read()))) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # other error except socket.timeout: # timeout error failed to be captured by URLError return 1 return web_content
def _jeck_ru_url_handler(target): ''' Query jiport for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(decode_utf_8(response.read())) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # any other error except socket.timeout: # if timeout error not captured by URLError return 1 return web_content
def get_target(): global client, db cursor = db.Shodita.find({"bot":"Shizuka"}) for document in cursor: if check_domain_mongodb(document["ip"], document["dominio"]): print colores.verde + "[INFO] Domain: " + document["dominio"] + " already scanned" + colores.normal pass else: url = "http://" + document["dominio"] headers = {'User-Agent' : 'Mozilla 5.10'} request = Request(url, None, headers) try: response = urlopen(request, timeout=10) if response.code == 200 or response.code == "OK": html = response.read() if detect_wp(html, document["dominio"]) == True: insert_mongodb("WordPress", document["dominio"], document["ip"]) print colores.verde + "[+][INFO] " + document["dominio"] + " is WordPress" + colores.normal if detect_joomla(html): insert_mongodb("Joomla", document["dominio"], document["ip"]) print colores.verde + "[+][INFO] " + document["dominio"] + " is Joomla" + colores.normal if detect_drupal(html): insert_mongodb("Drupal", document["dominio"], document["ip"]) print colores.verde + "[+][INFO] " + document["dominio"] + " is Drupal" + colores.normal except URLError, e: continue except httplib.BadStatusLine: continue except: continue
def mm_heartbeat(self): # Check if stop or set next timer if self.shutdown: return threading.Timer(self.hb_timer, self.mm_heartbeat).start() address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSHB") logger.debug("Sending MM Heatbeat") try: response = urlopen(address).read() except URLError as err: logger.error("URLError: %s", err.reason) return logger.debug("Response: " + response)
def send_remote_shutdown_command(self): try: from urllib import request as url_request URLError = url_request.URLError except ImportError: import urllib2 as url_request import urllib2 URLError = urllib2.URLError try: url_request.urlopen("http://127.0.0.1:%d/shutdown" % self.port) except URLError: return count = 0 while self.is_connectable(): if count == 30: break count += 1 time.sleep(1)
def request_url(url, referer='http://www.google.com'): common.plugin.log('request_url : %s' % url) req = urllib2.Request(url) req.addheaders = [('Referer', referer),('Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.2.3) Gecko/20100101 Firefox/11.0 ( .NET CLR 3.5.30729)')] try: response = urllib2.urlopen(req) data = response.read() response.close() except urllib2.URLError as e: common.plugin.log_error("Remote request error for URL %s: %r" % (url,e)) return except socket.timeout, e: common.plugin.log_error("Remote request error for URL %s: %r" % (url,e)) return return data
def basic_auth(server="http://127.0.0.1"): """ to use basic login with a different server from gluon.contrib.login_methods.basic_auth import basic_auth auth.settings.login_methods.append(basic_auth('http://server')) """ def basic_login_aux(username, password, server=server): key = base64.b64encode(username + ':' + password) headers = {'Authorization': 'Basic ' + key} request = urllib2.Request(server, None, headers) try: urllib2.urlopen(request) return True except (urllib2.URLError, urllib2.HTTPError): return False return basic_login_aux
def block_until_http_ready(urlstring, wait_time=10, timeout=240): "Blocks until server at urlstring can respond to http requests" server_ready = False t_elapsed = 0 while not server_ready and t_elapsed < timeout: try: sys.stdout.write('.') sys.stdout.flush() req = urllib2.Request(urlstring) response = urllib2.urlopen(req) #if response.code == 200: server_ready = True except urllib2.URLError: pass time.sleep(wait_time) t_elapsed += wait_time
def run(self): if len(self.__update_rates) == 0: return # wait up to 120 seconds, to get some distortion self.__stop_event.wait(randint(0, 120)) while not self.__stop_event.is_set(): start = time.time() for update in self.__update_rates: rate = update[0] now = time.time() time_to_wait = round(start - now + rate / 1000, 0) interrupt = self.__stop_event.wait(time_to_wait) if interrupt: return try: self.start_calculation(update[1]) except URLError as e: logging.getLogger(__name__).error("Could not connect to InfluxDB: " + str(e)) except: logging.getLogger(__name__).error("Job execution failed", exc_info=True)
def download_and_import(self, repo): try: response = urllib2.urlopen(GITHUB_LINK.format(repo)) response_sio = StringIO.StringIO(response.read()) with zipfile.ZipFile(response_sio) as repo_zip: repo_zip.extractall(tempfile.tempdir) deck_base_name = repo.split("/")[-1] deck_directory_wb = Path(tempfile.tempdir).joinpath(deck_base_name + "-" + BRANCH_NAME) deck_directory = Path(tempfile.tempdir).joinpath(deck_base_name) utils.fs_remove(deck_directory) deck_directory_wb.rename(deck_directory) # Todo progressbar on download AnkiJsonImporter.import_deck(self.collection, deck_directory) except (urllib2.URLError, urllib2.HTTPError, OSError) as error: aqt.utils.showWarning("Error while trying to get deck from Github: {}".format(error)) raise
def check_for_update(): if os.path.exists(FILE_UPDATE): mtime = os.path.getmtime(FILE_UPDATE) last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d') today = datetime.utcnow().strftime('%Y-%m-%d') if last == today: return try: with open(FILE_UPDATE, 'a'): os.utime(FILE_UPDATE, None) request = urllib2.Request( CORE_VERSION_URL, urllib.urlencode({'version': main.__version__}), ) response = urllib2.urlopen(request) with open(FILE_UPDATE, 'w') as update_json: update_json.write(response.read()) except (urllib2.HTTPError, urllib2.URLError): pass
def get_page(self, url, data=None): handlers = [PoolHTTPHandler] opener = urllib2.build_opener(*handlers) if data: data = urllib.urlencode(data) request = urllib2.Request(url, data, self.headers) try: response = opener.open(request) return response.read() except (urllib2.HTTPError, urllib2.URLError), e: raise BrowserError(url, str(e)) except (socket.error, socket.sslerror), msg: raise BrowserError(url, msg) except socket.timeout, e: raise BrowserError(url, "timeout") except KeyboardInterrupt: raise except: raise BrowserError(url, "unknown error")
def postInfo(self, requestParams): logging.info("About to phone home to [%s].", self.url) req = urllib2.Request(self.url) req.add_header('Content-Type', 'application/json') resp = None try: resp = urllib2.urlopen(req, json.dumps(requestParams), timeout = 30, **self.kwargs) resp = resp.read() except urllib2.HTTPError, e: logging.error("HTTPError: %s", str(e.code)) except urllib2.URLError, e: logging.error("URLError: %s", str(e.reason)) except httplib.HTTPException, e: logging.error("HTTPException: %s", str(e)) except Exception, e: logging.exception("Unexpected error: %s", str(e)) return resp
def download(url, headers, proxy, num_retries, data=None): print 'Downloading:', url request = urllib2.Request(url, data, headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: response = opener.open(request) html = response.read() code = response.code except urllib2.URLError as e: print 'Download error:', e.reason html = '' if hasattr(e, 'code'): code = e.code if num_retries > 0 and 500 <= code < 600: # retry 5XX HTTP errors html = download(url, headers, proxy, num_retries-1, data) else: code = None return html
def download(url, headers, proxy, num_retries, data=None): print 'Downloading:', url request = urllib2.Request(url, data, headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: response = opener.open(request) html = response.read() code = response.code except urllib2.URLError as e: print 'Download error:', e.reason html = '' if hasattr(e, 'code'): code = e.code if num_retries > 0 and 500 <= code < 600: # retry 5XX HTTP errors return download(url, headers, proxy, num_retries-1, data) else: code = None return html
def download5(url, user_agent='wswp', proxy=None, num_retries=2): """Download function with support for proxies""" print 'Downloading:', url headers = {'User-agent': user_agent} request = urllib2.Request(url, headers=headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: html = opener.open(request).read() except urllib2.URLError as e: print 'Download error:', e.reason html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # retry 5XX HTTP errors html = download5(url, user_agent, proxy, num_retries-1) return html
def getpage(self, pagenum): try: url = self.baseurl + self.seeLZ + '&pn=' + str(pagenum) request = urllib2.Request(url) response = urllib2.urlopen(request) page = BeautifulSoup(response, "html5lib") return page except urllib2.URLError, e: if hasattr(e, 'reason'): print u"?????????????", e.reason return None
def sub_app_get(self, offset): count = 0 while True: try: f = urllib2.urlopen('http://localhost:5050%s' % offset) except urllib2.URLError, e: if hasattr(e, 'reason') and type(e.reason) == urllib2.socket.error: # i.e. process not started up yet count += 1 time.sleep(1) assert count < 5, '%s: %r; %r' % (offset, e, e.args) else: print 'Error opening url: %s' % offset assert 0, e # Print exception else: break return f.read()
def _get_content(self, url): http_request = urllib2.Request(url=url) api_key = self.config.get('api_key') if api_key: http_request.add_header('Authorization', api_key) try: http_response = urllib2.urlopen(http_request) except urllib2.HTTPError, e: if e.getcode() == 404: raise ContentNotFoundError('HTTP error: %s' % e.code) else: raise ContentFetchError('HTTP error: %s' % e.code) except urllib2.URLError, e: raise ContentFetchError('URL error: %s' % e.reason) except httplib.HTTPException, e: raise ContentFetchError('HTTP Exception: %s' % e) except socket.error, e: raise ContentFetchError('HTTP socket error: %s' % e) except Exception, e: raise ContentFetchError('HTTP general exception: %s' % e) return http_response.read()
def browse_amazon_url(self): '''Opens Amazon page for current book's ASIN using user's local store''' # Try to use the nearest Amazon store to the user. # If this fails we'll default to .com, the user will have to manually # edit the preferences file to fix it (it is a simple text file). if not prefs['tld']: import json from collections import defaultdict from urllib2 import urlopen, URLError try: country = json.loads(urlopen('http://ipinfo.io/json').read())['country'] except (URLError, KeyError): country = 'unknown' country_tld = defaultdict(lambda: 'com', {'AU': 'com.au', 'BR': 'com.br', 'CA': 'ca', 'CN': 'cn', 'FR': 'fr', 'DE': 'de', 'IN': 'in', 'IT': 'it', 'JP': 'co.jp', 'MX': 'com.mx', 'NL': 'nl', 'ES': 'es', 'GB': 'co.uk', 'US': 'com'}) prefs['tld'] = country_tld[country] webbrowser.open('https://www.amazon.{0}/gp/product/{1}/'.format(prefs['tld'], self._asin_edit.text()))
def check_for_update(): if os.path.exists(FILE_UPDATE): mtime = os.path.getmtime(FILE_UPDATE) last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d') today = datetime.utcnow().strftime('%Y-%m-%d') if last == today: return try: with open(FILE_UPDATE, 'a'): os.utime(FILE_UPDATE, None) request = urllib2.Request( CORE_VERSION_URL, urllib.urlencode({'version': __version__}), ) response = urllib2.urlopen(request) with open(FILE_UPDATE, 'w') as update_json: update_json.write(response.read()) except (urllib2.HTTPError, urllib2.URLError): pass
def http_download(url, target_path): """Download file to local Args: - url(string): url request path - target_path(string): download destination Raises: IOError urllib2.URLError """ try: resp = urllib2.urlopen(url) except urllib2.URLError, e: if not hasattr(e, 'code'): raise resp = e if resp.code != 200: raise IOError("Request url(%s) expect 200 but got %d" %(url, resp.code)) with open(target_path, 'wb') as f: shutil.copyfileobj(resp, f) return target_path
def test_create_session_invalid_form(self, mock_validate_url_form): mock_validate_url_form.side_effect = [False] base_URL = "invalidForm.com/" with self.assertRaises(URLError) as err: API.apiCalls.ApiCalls( client_id="", client_secret="", base_URL=base_URL, username="", password="" ) self.assertTrue("not a valid URL" in str(err.exception)) mock_validate_url_form.assert_called_with(base_URL)
def create_session(self): """ create session to be re-used until expiry for get and post calls returns session (OAuth2Session object) """ if self.base_URL[-1:] != "/": self.base_URL = self.base_URL + "/" if validate_URL_form(self.base_URL): oauth_service = self.get_oauth_service() access_token = self.get_access_token(oauth_service) self._session = oauth_service.get_session(access_token) if self.validate_URL_existence(self.base_URL, use_session=True) is\ False: raise Exception("Cannot create session. " + "Verify your credentials are correct.") else: raise URLError(self.base_URL + " is not a valid URL")
def linksExtractor(url, fileFormat='png'): tag = 'a' attr = 'href' if (fileFormat in ['png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']): tag = 'img' attr = 'src' try: headers={'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'} req=urllib2.Request(url, None, headers) htmlDoc=urllib2.urlopen(req).read() except urllib2.HTTPError as err: print("Server Response : " + str(err.code())) return "Server refused to connect!" except urllib2.URLError: return 'Invalid URL!' page = BeautifulSoup(htmlDoc, 'html.parser') page.prettify() res = [] for link in page.find_all(tag): pre = link.get(attr) pre = str(pre) if (pre[-len(fileFormat):] == fileFormat): res.append(pre) else: pass if (len(res) < 1): return 'EMPTY' return res
def update_plex(): Logger.info("plex - sending request to update Plex") url = 'http://%s/library/sections/all/refresh?X-Plex-Token=%s' % (PLEX_IP, PLEX_TOKEN) try: urllib2.urlopen(url).read() except urllib2.HTTPError, e: Logger.warning("plex - unable to make request to Plex - HTTP Error %s", str(e.code)) except urllib2.URLError, e: Logger.warning("plex - unable to make request to Plex - URL Error %s", e.reason) else: Logger.info("plex - update successful")
def goglib_get_banner(game_name, icon_path, banner_path): req = urllib2.Request('https://www.gog.com/game/' + game_name) try: game_page = urllib2.urlopen(req) game_page_content = game_page.read() soup = BeautifulSoup(game_page_content, 'lxml') raw_data = soup.findAll(attrs={'name':'og:image'}) banner_url = raw_data[0]['content'].encode('utf-8') if banner_url.startswith('http'): banner_req = urllib2.Request(banner_url) else: banner_req = urllib2.Request('https:' + banner_url) banner_data = urllib2.urlopen(banner_req).read() banner_file = open(banner_path + '/' + game_name + '.jpg', 'wb') banner_file.write(banner_data) banner_file.close() pic_src = Image.open(banner_path + '/' + game_name + '.jpg') scale_lvl = 240/float(pic_src.size[1]) scaled_width = int(float(pic_src.size[0])*scale_lvl) pic = pic_src.resize((scaled_width, 240), PIL.Image.ANTIALIAS) pic.save(banner_path + '/' + game_name + '.jpg') #~ if banner_url.startswith('http'): #~ goglib_recreate_banner.goglib_recreate_banner(game_name, icon_path, banner_path) new_pic = Image.open(banner_path + '/' + game_name + '.jpg') pic_grey = new_pic.convert('L') pic_grey.save(banner_path + '/unavailable/' + game_name + '.jpg') except urllib2.URLError as e: print e.reason except urllib2.HTTPError as e: print e.code print e.read()
def get_banner(game_name, url, banner_path, lib): banner_req = urllib2.Request(url) try: if not os.path.exists(banner_path): os.makedirs(banner_path) banner_data = urllib2.urlopen(banner_req).read() banner_file = open(banner_path + '/' + game_name + '.jpg', 'wb') banner_file.write(banner_data) banner_file.close() pic_src = Image.open(banner_path + '/' + game_name + '.jpg') pic = pic_src.resize((518, 240), PIL.Image.ANTIALIAS) pic.save(banner_path + '/' + game_name + '.jpg') if lib == 'goglib': if not os.path.exists(banner_path + '/unavailable/'): os.makedirs(banner_path + '/unavailable/') new_pic = Image.open(banner_path + '/' + game_name + '.jpg') pic_grey = new_pic.convert('L') pic_grey.save(banner_path + '/unavailable/' + game_name + '.jpg') except urllib2.URLError as e: print e.reason except urllib2.HTTPError as e: print e.code print e.read()
def httpcall(url): useragent_list() referer_list() code=0 if url.count("?")>0: param_joiner="&" else: param_joiner="?" request = urllib2.Request(url + param_joiner + buildblock(random.randint(3,10)) + '=' + buildblock(random.randint(3,10))) request.add_header('User-Agent', random.choice(headers_useragents)) request.add_header('Cache-Control', 'no-cache') request.add_header('Accept-Charset', 'ISO-8859-1,utf-8;q=0.7,*;q=0.7') request.add_header('Referer', random.choice(headers_referers) + buildblock(random.randint(5,10))) request.add_header('Keep-Alive', random.randint(110,120)) request.add_header('Connection', 'keep-alive') request.add_header('Host',host) try: urllib2.urlopen(request) except urllib2.HTTPError, e: #print e.code set_flag(1) print 'Response Code 500' code=500 except urllib2.URLError, e: #print e.reason sys.exit() else: inc_counter() urllib2.urlopen(request) return(code) #http caller thread
def __http_get(self, url, timeout = 3): try: r = urllib2.urlopen(url, timeout = timeout) return json.load(r) except urllib2.URLError as e: self.__logger.error("Error fetching %s - %s", url, e.reason)
def get_real_url(url, loaded_urls): real_url = None response = None try: req = Request(url, headers={"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"}) response = urlopen(req) real_url = response.geturl() print 'Real_url is: ' + str(real_url) if real_url in loaded_urls: print 'URL had been downloaded in previous ' real_url = None except IOError as e: #If there is any IOError print("IOError on url "+str(url)) print e except HTTPError as e: #If there is any HTTPError print("HTTPError on url "+str(url)) print e except URLError as e: print("URLError on url "+str(url)) print e if response: response.close() return real_url
def download_image(url, save_dir, loaded_urls=None): real_url = None response = None save_image_name = None try: req = Request(url, headers={"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"}) response = urlopen(req) real_url = response.geturl() if loaded_urls and real_url in loaded_urls: print 'URL had been downloaded in previous searching' real_url = None else: img_name = hashlib.md5(real_url).hexdigest() save_image_name = save_dir + '/' + img_name + '.' + CONFIGS[u'search_file_type'] print 'Try to save image ' + real_url + ' into file: ' + save_image_name output_file = open(save_image_name,'wb') data = response.read() output_file.write(data) #response.close() except IOError as e: #If there is any IOError print("IOError on url "+str(url)) print e except HTTPError as e: #If there is any HTTPError print("HTTPError on url "+str(url)) print e except URLError as e: print("URLError on url "+str(url)) print e if response: response.close() return real_url, save_image_name ############## End of Functions to get real urls and download images ############ ############## Main Program ############
def test_download_failed_URLError(self, mock_urlopen): mock_urlopen.side_effect = URLError(None) fake_request = urllib2.Request('http://fakeurl.com') self.assertRaises( self.glance.RetryableError, self.glance._download_tarball_and_verify, fake_request, 'fake_staging_path')