我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib2.build_opener()。
def get(self, url, proxy=None): if proxy: proxy = urllib2.ProxyHandler({'http': proxy}) opener = urllib2.build_opener(proxy) urllib2.install_opener(opener) try: response = urllib2.urlopen(url) except HTTPError, e: resp = e.read() self.status_code = e.code except URLError, e: resp = e.read() self.status_code = e.code else: self.status_code = response.code resp = response.read() return resp
def download_from_url(url): proxy = env_server.get_proxy() if proxy['enabled']: server = proxy['server'].replace('http://', '') proxy_dict = { 'http': 'http://{login}:{pass}@{0}'.format(server, **proxy) } proxy_handler = urllib2.ProxyHandler(proxy_dict) auth = urllib2.HTTPBasicAuthHandler() opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler) urllib2.install_opener(opener) run_thread = tc.ServerThread(env_inst.ui_main) run_thread.kwargs = dict(url=url, timeout=1) run_thread.routine = urllib2.urlopen run_thread.run() result_thread = tc.treat_result(run_thread, silent=True) if result_thread.isFailed(): return False else: return result_thread.result
def pContent(url): try: request_web = urllib2.Request(url);agent = 'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.0.6)' request_web.add_header('User-Agent', agent);opener_web = urllib2.build_opener() text = opener_web.open(request_web).read();strreg = re.compile('(?<=href=")(.*?)(?=")') names = strreg.findall(text);opener_web.close() for name in names: if site in name or '=' in name or name.startswith('/'): global collected collected.append(name) elif site in name and EXT in name: collected.append(name) elif 'http://' in name: collected.append(name) except: pass
def ipcheck(proxy): try: pxhandle = urllib2.ProxyHandler({"http": proxy}) opener = urllib2.build_opener(pxhandle) urllib2.install_opener(opener) myip = urllib2.urlopen('http://www.whatismyip.com/automation/n09230945.asp').read() xs = re.findall(('\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}'), StripTags(myip)) if xs[0] == myipadress or myipadress == myip: trans_list.append(proxy) print proxy[:-1],"\t- ALIVE -", timer(), "- TRANSPARENT" elif xs == None: pass else: anon_list.append(proxy) print proxy[:-1],"\t- ALIVE -", timer(), "- EXT-iP :",xs[0] except KeyboardInterrupt: print "\n\nCTRL+C - check temporary proxylist file\n\n" sys.exit(0) except: pass
def getsamairdotru(): counter = 1 pxycnt = 0 maxpages = 10 urls = [] pfile = file(output, 'a') while counter <= maxpages: if counter < 10: # workaround for page-01 to page-09 opener = urllib2.build_opener() opener.addheaders = [('User-agent', 'Mozilla/5.0')] url = opener.open('http://www.samair.ru/proxy/proxy-0'+repr(counter)+'.htm').read() else: opener = urllib2.build_opener() opener.addheaders = [('User-agent', 'Mozilla/5.0')] url = opener.open('http://www.samair.ru/proxy/proxy-'+repr(counter)+'.htm').read() strings = re.findall(('\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}:\d{1,5}'), StripTags(url)) for string in strings: pfile.write(string+"\n") pxycnt = pxycnt+1 counter = counter+1 opener.close() print pxycnt, "\t: Proxies received from : http://www.samair.ru/proxy/" pfile.close()
def geturls(query,num): print "[+] getting urls" counter = 10 urls = [] while counter < int(num): url = 'http://www.google.com/search?hl=en&q='+query+'&hl=en&lr=&start='+repr(counter)+'&sa=N' #url = "http://search.lycos.com/?query="+query+"&page="+repr(counter) opener = urllib2.build_opener(url) opener.addheaders = [('User-agent', 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)')] data = opener.open(url).read() print data hosts = re.findall(('\w+\.[\w\.\-/]*\.\w+'),StripTags(data)) #hosts = re.findall('<span class=\"?grnLnk small\"?>http:\/\/(.+?)\/',data) for x in hosts: if x.find('www') != -1: x = x[x.find('www'):] if x not in urls and re.search("google", x) == None: urls.append(x) counter += 10 for url in urls: print url return urls
def run(self): password = getword() try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(sys.argv[1]) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, sys.argv[1], username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) print "\t\n\n[+] Login successful: Username:",username,"Password:",password,"\n" print "[+] Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) sys.exit(2) except (urllib2.HTTPError,socket.error): pass
def run(self): username, password = getword() try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(sys.argv[1]) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, sys.argv[1], username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n" print "Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) sys.exit(2) except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg: print "An error occurred:", msg pass
def callServiceApi(path, params=None, headers=None, base_url=baseUrl, timeout=60): if not params: params = {} if not headers: headers = [] opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie_jar)) headers.append(('User-Agent', userAgent)) opener.addheaders = headers common.log('HEADERS') common.log(path) common.log(headers) if params: data_encoded = urllib.urlencode(params) response = opener.open(base_url + path, data_encoded, timeout=timeout) else: response = opener.open(base_url + path, timeout=timeout) return response.read()
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, cafile=None, capath=None, cadefault=False, context=None): global _opener if cafile or capath or cadefault: if context is not None: raise ValueError( "You can't pass both context and any of cafile, capath, and " "cadefault" ) if not _have_ssl: raise ValueError('SSL support not available') context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile, capath=capath) https_handler = HTTPSHandler(context=context) opener = build_opener(https_handler) elif context: https_handler = HTTPSHandler(context=context) opener = build_opener(https_handler) elif _opener is None: _opener = opener = build_opener() else: opener = _opener return opener.open(url, data, timeout)
def download_vcpython27(self): """ Download vcpython27 since some Windows 7 boxes have it and some don't. :return: None """ self._prepare_for_download() logger.info('Beginning download of vcpython27... this may take a few minutes...') with open(os.path.join(DOWNLOADS_DIR, 'vcpython27.msi'), 'wb') as f: if self.PROXY is not None: opener = urllib2.build_opener( urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY}) ) urllib2.install_opener(opener) f.write(urllib2.urlopen(self.VCPYTHON27_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read()) logger.debug('Download of vcpython27 complete')
def download_python(self): """ Download Python :return: None """ self._prepare_for_download() logger.info('Beginning download of python') with open(os.path.join(DOWNLOADS_DIR, 'python-installer.msi'), 'wb') as f: if self.PROXY is not None: opener = urllib2.build_opener( urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY}) ) urllib2.install_opener(opener) f.write(urllib2.urlopen(self.PYTHON_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read()) logger.debug('Download of python complete')
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False): if (timeout is not None) and not self.supports_feature('timeout'): raise RuntimeError('timeout is not supported with urllib2 transport') if proxy: raise RuntimeError('proxy is not supported with urllib2 transport') if cacert: raise RuntimeError('cacert is not support with urllib2 transport') handlers = [] if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or (sys.version_info[0] == 3 and sys.version_info >= (3,2,0))): context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE handlers.append(urllib2.HTTPSHandler(context=context)) if sessions: handlers.append(urllib2.HTTPCookieProcessor(CookieJar())) opener = urllib2.build_opener(*handlers) self.request_opener = opener.open self._timeout = timeout
def _install_socks_proxy_opener(proxytype, proxyaddr, proxyport=None): """ Install a socks proxy handler so that all urllib2 requests are routed through the socks proxy. """ try: import socks from sockshandler import SocksiPyHandler except ImportError: warn('WARNING: Failed to load PySocks module. Try installing it with `pip install PySocks`.') return if proxytype == 4: proxytype = socks.SOCKS4 elif proxytype == 5: proxytype = socks.SOCKS5 else: abort("Unknown Socks Proxy type {0}".format(proxytype)) opener = urllib2.build_opener(SocksiPyHandler(proxytype, proxyaddr, proxyport)) urllib2.install_opener(opener)
def wait_xxnet_exit(): def http_request(url, method="GET"): proxy_handler = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_handler) try: req = opener.open(url) return req except Exception as e: #logging.exception("web_control http_request:%s fail:%s", url, e) return False for i in range(20): host_port = config.get(["modules", "launcher", "control_port"], 8085) req_url = "http://127.0.0.1:{port}/quit".format(port=host_port) if http_request(req_url) == False: return True time.sleep(1) return False
def get_opener(): autoproxy = '127.0.0.1:8087' import ssl if getattr(ssl, "create_default_context", None): cafile = os.path.join(data_root, "gae_proxy", "CA.crt") if not os.path.isfile(cafile): cafile = None context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile) https_handler = urllib2.HTTPSHandler(context=context) opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler) else: opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy})) return opener
def get_page(self, url, data=None): handlers = [PoolHTTPHandler] opener = urllib2.build_opener(*handlers) if data: data = urllib.urlencode(data) request = urllib2.Request(url, data, self.headers) try: response = opener.open(request) return response.read() except (urllib2.HTTPError, urllib2.URLError), e: raise BrowserError(url, str(e)) except (socket.error, socket.sslerror), msg: raise BrowserError(url, msg) except socket.timeout, e: raise BrowserError(url, "timeout") except KeyboardInterrupt: raise except: raise BrowserError(url, "unknown error")
def follow_redirects(link, sites= None): """Follow directs for the link as long as the redirects are on the given sites and return the resolved link.""" def follow(url): return sites == None or urlparse.urlparse(url).hostname in sites class RedirectHandler(urllib2.HTTPRedirectHandler): def __init__(self): self.last_url = None def redirect_request(self, req, fp, code, msg, hdrs, newurl): self.last_url = newurl if not follow(newurl): return None r = urllib2.HTTPRedirectHandler.redirect_request( self, req, fp, code, msg, hdrs, newurl) r.get_method = lambda : 'HEAD' return r if not follow(link): return link redirect_handler = RedirectHandler() opener = urllib2.build_opener(redirect_handler) req = urllib2.Request(link) req.get_method = lambda : 'HEAD' try: with contextlib.closing(opener.open(req,timeout=1)) as site: return site.url except: return redirect_handler.last_url if redirect_handler.last_url else link
def get_pdf(pdf_link): # check whether value already existing in permanent storage: pdf_name = pdf_link.rsplit('/', 1)[-1] # set filename according to last element of link if not check_db(pdf_name) and not check_db(pdf_link): # print 'Downloading: {}'.format(pdf_link) try: opener = urllib2.build_opener() opener.addheaders = [('User-agent', USER_AGENT)] r = opener.open(pdf_link) path = tmp_dir + pdf_name with open(path, "wb") as code: # 'w' code.write(r.read()) # log successful download: log_download('DOWNLOADED: {}'.format(pdf_link)) except Exception as e: log_download('FAILURE: {} | {}'.format(pdf_link, e)) else: log_download('File already downloaded: {}'.format(pdf_name))
def _do_put_request(self, resource, param_dict): req_url = urlparse.urlunparse(["http", self.host, "api/v%s/%s" % (self.api_version, resource), "", "", ""]) print "req_url=%s" % (req_url) opener = urllib2.build_opener(urllib2.HTTPHandler) req = urllib2.Request(req_url, data=json.dumps(param_dict)) req.add_header('Content-Type', 'application/json') req.get_method = lambda: 'PUT' try: return eval(opener.open(req).read()) except urllib2.HTTPError, err: return parse_errors(err) #--------------------------------------------- # error parsing # --------------------------------------------
def music(m): banlist = redis.sismember('banlist', '{}'.format(m.from_user.id)) if str(banlist) == 'False': text = m.text.replace("/song ","") opener = urllib2.build_opener() f = opener.open('https://api.spotify.com/v1/search?limit=1&type=track&q={}'.format(text)) parsed_json = json.loads(f.read()) Artist = parsed_json['tracks']['items'][0]['artists'][0]['name'] name = parsed_json['tracks']['items'][0]['name'] music = parsed_json['tracks']['items'][0]['preview_url'] urllib.urlretrieve("{}".format(music), "song.ogg") image = parsed_json['tracks']['items'][0]['album']['images'][0]['url'] urllib.urlretrieve("{}".format(image), "song.png") bot.send_message(m.chat.id, "*Artist* : ```{}``` \n *Name* : ```{}```".format(Artist,name), parse_mode="Markdown") bot.send_sticker(m.chat.id, open('song.png')) bot.send_document(m.chat.id, open('song.ogg'), caption=" @OffLiNeTeam") #################################################################################################################################################################################################
def music(m): text = m.text.replace("/music ","") req = urllib2.Request("http://api.gpmod.ir/music.search/?v=2&q={}&count=30".format(text)) opener = urllib2.build_opener() f = opener.open(req) parsed_json = json.loads(f.read()) Artist = parsed_json['response'][0]['title'] Artist1 = parsed_json['response'][1]['title'] Artist2 = parsed_json['response'][2]['title'] Artist3 = parsed_json['response'][3]['title'] Artist4 = parsed_json['response'][4]['title'] Artist5 = parsed_json['response'][5]['title'] link = parsed_json['response'][0]['link'] link1 = parsed_json['response'][1]['link'] link2 = parsed_json['response'][2]['link'] link3 = parsed_json['response'][3]['link'] link4 = parsed_json['response'][4]['link'] link5 = parsed_json['response'][5]['link'] bot.send_message(m.chat.id, "*Title* : `{}` \n\n [Link]({}) \n\n *Title* : `{}` \n\n [Link]({}) ".format(Artist,link,Artist1,link1), parse_mode="Markdown") #################################################################################################################################################################################################
def request(self,url): """ Send request to the http server. """ from core.shell import user_agent opener = urllib2.build_opener() opener.addheaders = [('User-Agent', user_agent), ("Accept", "text/html, application/xml;q=0.9, application/xhtml+xml, image/png, image/webp, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1"), #("Accept-Language","en-US,en;q=0.9,en;q=0.8"), #("Accept-Encoding", "gzip;q=0,deflate,sdch"), #("Accept-Charset", "ISO-8859-2,utf-8;q=0.7,*;q=0.7"), ("Keep-Alive", "115"), ("Connection", "keep-alive"), ("DNT", "1")] return opener.open(self.url).read()
def __init__(self, url, proxy, cafile): self.url = url self.proxy = proxy if proxy: logging.info("Using HTTPS proxy: " + proxy) proxy_handler = urllib2.ProxyHandler({'https': proxy}) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) self.kwargs = {} if cafile and hasattr(ssl, "create_default_context"): logging.info("Using CA file: " + cafile) ctx = ssl.create_default_context() ctx.load_verify_locations(cafile = cafile) self.kwargs['context'] = ctx # given an infoMap returned by the local node, call up the home server
def download(url, headers, proxy, num_retries, data=None): print 'Downloading:', url request = urllib2.Request(url, data, headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: response = opener.open(request) html = response.read() code = response.code except urllib2.URLError as e: print 'Download error:', e.reason html = '' if hasattr(e, 'code'): code = e.code if num_retries > 0 and 500 <= code < 600: # retry 5XX HTTP errors html = download(url, headers, proxy, num_retries-1, data) else: code = None return html
def download(url, headers, proxy, num_retries, data=None): print 'Downloading:', url request = urllib2.Request(url, data, headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: response = opener.open(request) html = response.read() code = response.code except urllib2.URLError as e: print 'Download error:', e.reason html = '' if hasattr(e, 'code'): code = e.code if num_retries > 0 and 500 <= code < 600: # retry 5XX HTTP errors return download(url, headers, proxy, num_retries-1, data) else: code = None return html
def download5(url, user_agent='wswp', proxy=None, num_retries=2): """Download function with support for proxies""" print 'Downloading:', url headers = {'User-agent': user_agent} request = urllib2.Request(url, headers=headers) opener = urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: html = opener.open(request).read() except urllib2.URLError as e: print 'Download error:', e.reason html = None if num_retries > 0: if hasattr(e, 'code') and 500 <= e.code < 600: # retry 5XX HTTP errors html = download5(url, user_agent, proxy, num_retries-1) return html
def register(first_name, last_name, email, password, captcha_fn): cj = cookielib.CookieJar() opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) html = opener.open(REGISTER_URL).read() form = parse_form(html) form['first_name'] = first_name form['last_name'] = last_name form['email'] = email form['password'] = form['password_two'] = password img = extract_image(html) captcha = captcha_fn(img) form['recaptcha_response_field'] = captcha encoded_data = urllib.urlencode(form) request = urllib2.Request(REGISTER_URL, encoded_data) response = opener.open(request) success = '/user/register' not in response.geturl() return success
def download(self, url, headers, proxy, num_retries, data=None): print 'Downloading:', url request = urllib2.Request(url, data, headers or {}) opener = self.opener or urllib2.build_opener() if proxy: proxy_params = {urlparse.urlparse(url).scheme: proxy} opener.add_handler(urllib2.ProxyHandler(proxy_params)) try: response = opener.open(request) html = response.read() code = response.code except Exception as e: print 'Download error:', str(e) html = '' if hasattr(e, 'code'): code = e.code if num_retries > 0 and 500 <= code < 600: # retry 5XX HTTP errors return self._get(url, headers, proxy, num_retries-1, data) else: code = None return {'html': html, 'code': code}
def __openrequest__(self, req): # Opens the passed in HTTP request if self.debug: print "\n----- REQUEST -----" handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel) opener = urllib2.build_opener(handler) urllib2.install_opener(opener) print "- API ENDPOINT: "+req.get_full_url() print "- REQUEST METHOD: "+req.get_method() print "- AUTHORIZATION HEADER: "+req.get_header("Authorization") print "\n----- REQUEST DATA -----" print req.get_data() res = urllib2.urlopen(req) out = res.read() if self.debug: print "\n----- REQUEST INFO -----" print res.info() print "\n----- RESPONSE -----" print out return out
def __init__(self): self.articles = [] self.query = None self.cjar = MozillaCookieJar() # If we have a cookie file, load it: if ScholarConf.COOKIE_JAR_FILE and \ os.path.exists(ScholarConf.COOKIE_JAR_FILE): try: self.cjar.load(ScholarConf.COOKIE_JAR_FILE, ignore_discard=True) ScholarUtils.log('info', 'loaded cookies file') except Exception as msg: ScholarUtils.log('warn', 'could not load cookies file: %s' % msg) self.cjar = MozillaCookieJar() # Just to be safe self.opener = build_opener(HTTPCookieProcessor(self.cjar)) self.settings = None # Last settings object, if any
def get_access_token(self, code, state=None): ''' In callback url: http://host/callback?code=123&state=xyz use code and state to get an access token. ''' kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code) if self._redirect_uri: kw['redirect_uri'] = self._redirect_uri if state: kw['state'] = state opener = build_opener(HTTPSHandler) request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw)) request.get_method = _METHOD_MAP['POST'] request.add_header('Accept', 'application/json') try: response = opener.open(request, timeout=TIMEOUT) r = _parse_json(response.read()) if 'error' in r: raise ApiAuthError(str(r.error)) return str(r.access_token) except HTTPError as e: raise ApiAuthError('HTTPError when get access token')
def check_single_proxy_status(self, proxy_address, domain_check): try: parse = urlparse(proxy_address) proxy_scheme = parse.scheme proxy = str(parse.hostname) + ':' + str(parse.port) proxy_handler = urllib2.ProxyHandler({ proxy_scheme: proxy}) opener = urllib2.build_opener(proxy_handler) opener.addheaders = [('User-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36')] urllib2.install_opener(opener) req = urllib2.Request(domain_check) start_time = time.time() sock = urllib2.urlopen(req) end_time = time.time() diff_time = round(end_time - start_time, 3) log.console_log(Y + "{}[+] {} OK! Response Time : {}s".format(Y, proxy_address, str(diff_time), W )) return 'ok' except urllib2.HTTPError, e: print('Error code: ' + str(e.code)) return e.code except Exception, detail: print('ERROR ' + str(detail)) return 1
def get(url): # Build and open the URL opener = urllib2.build_opener() opener.addheaders = [('User-Agent', 'Mozilla/5.0')] response = opener.open(url) # HLTV redicrects to a .rar or .zip file final_url = response.geturl() # Gets the filename (everything after the last trailing /) filename = final_url.rsplit('/', 1)[-1] # Gets the Content-Length from the metadata from final_url filesize = (int(urllib.urlopen(final_url).info().getheaders("Content-Length")[0])/1024)/1024 # Tell user we are downloading filesize print "Starting %s: %s MB." % (filename, filesize) # Downloads the file to the directory the user enters urllib.urlretrieve(final_url, directory+"/"+filename) # Tell user the current status and file information print "Completed %s: %s MB." % (filename, filesize) return filesize
def getRankList(params): url = 'http://www.newrank.cn/xdnphb/list/month/rank' datas = getsign('/xdnphb/list/month/rank',params) time.sleep(np.random.rand() * 5) try: data = urllib.urlencode(datas) request = urllib2.Request(url) opener = urllib2.build_opener(urllib2.HTTPCookieProcessor()) result = opener.open(request,data).read() #print params['rank_name']+'?????????'+str(result) if result: return json.loads(result) else: return {'value':[]} except (urllib2.HTTPError, urllib2.URLError), e: print e return {'value':[]} #??firefox
def _update_opener(self): ''' Builds and installs a new opener to be used by all future calls to :func:`urllib2.urlopen`. ''' if self._http_debug: http = urllib2.HTTPHandler(debuglevel=1) else: http = urllib2.HTTPHandler() if self._proxy: opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj), urllib2.ProxyHandler({'http': self._proxy}), urllib2.HTTPBasicAuthHandler(), http) else: opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj), urllib2.HTTPBasicAuthHandler(), http) urllib2.install_opener(opener)
def download_file_chunk(url, buf): opener = urllib2.build_opener() opener.addheaders = [('User-Agent', "DropboxLinuxDownloader/1.6.2")] sock = opener.open(url) size = int(sock.info()['content-length']) bufsize = max(size / 200, 4096) progress = 0 with closing(sock) as f: yield (0, True) while True: try: chunk = f.read(bufsize) progress += len(chunk) buf.write(chunk) yield (float(progress)/size, True) if progress == size: break except OSError as e: if hasattr(e, 'errno') and e.errno == errno.EAGAIN: # nothing left to read yield (float(progress)/size, False) else: raise
def check_gn_proxy(proxy, protocal_type='HTTP'): url = 'http://icanhazip.com' proxy_handler = urllib2.ProxyHandler({ 'http': 'http://' + proxy, 'https': 'https://' + proxy, }) if protocal_type == 'HTTPS': url = 'https://icanhazip.com' opener = urllib2.build_opener(proxy_handler, urllib2.HTTPHandler) try: response = opener.open(url, timeout=3) res_ip = response.read().strip() return response.code == 200 and res_ip == proxy.split(':')[0] except Exception: return False
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): global _opener if _opener is None: _opener = build_opener() return _opener.open(url, data, timeout)
def build_opener(*handlers): """Create an opener object from a list of handlers. The opener will use several default handlers, including support for HTTP, FTP and when applicable, HTTPS. If any of the handlers passed as arguments are subclasses of the default handlers, the default handlers will not be used. """ import types def isclass(obj): return isinstance(obj, (types.ClassType, type)) opener = OpenerDirector() default_classes = [ProxyHandler, UnknownHandler, HTTPHandler, HTTPDefaultErrorHandler, HTTPRedirectHandler, FTPHandler, FileHandler, HTTPErrorProcessor] if hasattr(httplib, 'HTTPS'): default_classes.append(HTTPSHandler) skip = set() for klass in default_classes: for check in handlers: if isclass(check): if issubclass(check, klass): skip.add(klass) elif isinstance(check, klass): skip.add(klass) for klass in skip: default_classes.remove(klass) for klass in default_classes: opener.add_handler(klass()) for h in handlers: if isclass(h): h = h() opener.add_handler(h) return opener
def _create_opener(self): import urllib2 return urllib2.build_opener()