我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib2.HTTPSHandler()。
def download_vcpython27(self): """ Download vcpython27 since some Windows 7 boxes have it and some don't. :return: None """ self._prepare_for_download() logger.info('Beginning download of vcpython27... this may take a few minutes...') with open(os.path.join(DOWNLOADS_DIR, 'vcpython27.msi'), 'wb') as f: if self.PROXY is not None: opener = urllib2.build_opener( urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY}) ) urllib2.install_opener(opener) f.write(urllib2.urlopen(self.VCPYTHON27_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read()) logger.debug('Download of vcpython27 complete')
def download_python(self): """ Download Python :return: None """ self._prepare_for_download() logger.info('Beginning download of python') with open(os.path.join(DOWNLOADS_DIR, 'python-installer.msi'), 'wb') as f: if self.PROXY is not None: opener = urllib2.build_opener( urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.ProxyHandler({'http': self.PROXY, 'https': self.PROXY}) ) urllib2.install_opener(opener) f.write(urllib2.urlopen(self.PYTHON_DOWNLOAD_URL, timeout=self.DOWNLOAD_TIMEOUT).read()) logger.debug('Download of python complete')
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False): if (timeout is not None) and not self.supports_feature('timeout'): raise RuntimeError('timeout is not supported with urllib2 transport') if proxy: raise RuntimeError('proxy is not supported with urllib2 transport') if cacert: raise RuntimeError('cacert is not support with urllib2 transport') handlers = [] if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or (sys.version_info[0] == 3 and sys.version_info >= (3,2,0))): context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE handlers.append(urllib2.HTTPSHandler(context=context)) if sessions: handlers.append(urllib2.HTTPCookieProcessor(CookieJar())) opener = urllib2.build_opener(*handlers) self.request_opener = opener.open self._timeout = timeout
def get_opener(): autoproxy = '127.0.0.1:8087' import ssl if getattr(ssl, "create_default_context", None): cafile = os.path.join(data_root, "gae_proxy", "CA.crt") if not os.path.isfile(cafile): cafile = None context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile) https_handler = urllib2.HTTPSHandler(context=context) opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler) else: opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy})) return opener
def __openrequest__(self, req): # Opens the passed in HTTP request if self.debug: print "\n----- REQUEST -----" handler = urllib2.HTTPSHandler(debuglevel=self.debugLevel) opener = urllib2.build_opener(handler) urllib2.install_opener(opener) print "- API ENDPOINT: "+req.get_full_url() print "- REQUEST METHOD: "+req.get_method() print "- AUTHORIZATION HEADER: "+req.get_header("Authorization") print "\n----- REQUEST DATA -----" print req.get_data() res = urllib2.urlopen(req) out = res.read() if self.debug: print "\n----- REQUEST INFO -----" print res.info() print "\n----- RESPONSE -----" print out return out
def get_access_token(self, code, state=None): ''' In callback url: http://host/callback?code=123&state=xyz use code and state to get an access token. ''' kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code) if self._redirect_uri: kw['redirect_uri'] = self._redirect_uri if state: kw['state'] = state opener = build_opener(HTTPSHandler) request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw)) request.get_method = _METHOD_MAP['POST'] request.add_header('Accept', 'application/json') try: response = opener.open(request, timeout=TIMEOUT) r = _parse_json(response.read()) if 'error' in r: raise ApiAuthError(str(r.error)) return str(r.access_token) except HTTPError as e: raise ApiAuthError('HTTPError when get access token')
def make_request(*args): if platform.system() == "Windows": #pragma: no cover sctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sh = urllib2.HTTPSHandler(debuglevel=0, context=sctx) opener = urllib2.build_opener(sh) else: opener = build_opener() opener.addheaders = [('User-agent', 'Mozilla/5.0' + str(random.randrange(1000000)))] try: return opener.open(*args).read().strip() except Exception as e: try: p = e.read().strip() except: p = e raise Exception(p)
def _init_urllib(self, secure, debuglevel=0): cj = cookielib.CookieJar() no_proxy_support = urllib2.ProxyHandler({}) cookie_handler = urllib2.HTTPCookieProcessor(cj) ctx = None if not secure: self._logger.info('[WARNING] Skip certificate verification.') ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE https_handler = urllib2.HTTPSHandler(debuglevel=debuglevel, context=ctx) opener = urllib2.build_opener(no_proxy_support, cookie_handler, https_handler, MultipartPostHandler.MultipartPostHandler) opener.addheaders = [('User-agent', API_USER_AGENT)] urllib2.install_opener(opener)
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret): self.access_token_key = access_token_key self.access_token_secret = access_token_secret self.consumer_key = consumer_key self.consumer_secret = consumer_secret _debug = 0 self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret) self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret) self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() self.http_handler = urllib.HTTPHandler(debuglevel=_debug) self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
def __init__(self, proxy=None, debuglevel=0): self.proxy = proxy urllib2.HTTPSHandler.__init__(self, debuglevel)
def do_open(self, http_class, req): if self.proxy is not None: req.set_proxy(self.proxy, "https") return urllib2.HTTPSHandler.do_open(self, ProxyHTTPSConnection, req)
def __init__(self, cert=None, verify=True): urllib2.HTTPSHandler.__init__(self) if cert is None: certfile = None keyfile = None elif isinstance(cert, basestring): certfile = cert keyfile = cert else: certfile, keyfile = cert if isinstance(verify, basestring): require_cert = True ca_certs = verify elif verify is True: require_cert = True ca_certs = None elif verify is False: require_cert = False ca_certs = None else: raise TypeError("\"verify\" parameter must be a boolean or a string") self._certfile = certfile self._keyfile = keyfile self._require_cert = require_cert self._ca_certs = ca_certs
def __init__(self, ssl_config): # pylint: disable=E1002 """Initialize""" if PY2: urllib2.HTTPSHandler.__init__(self) else: super().__init__() # pylint: disable=W0104 self._ssl_config = ssl_config
def __init__(self): urllib2.HTTPSHandler.__init__(self)
def __init__(self): """Build an HTTPS opener.""" # Based on pip 1.4.1's URLOpener # This verifies certs on only Python >=2.7.9. self._opener = build_opener(HTTPSHandler()) # Strip out HTTPHandler to prevent MITM spoof: for handler in self._opener.handlers: if isinstance(handler, HTTPHandler): self._opener.handlers.remove(handler)
def hashed_download(url, temp, digest): """Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``, and return its path.""" # Based on pip 1.4.1's URLOpener but with cert verification removed. Python # >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert # authenticity has only privacy (not arbitrary code execution) # implications, since we're checking hashes. def opener(): opener = build_opener(HTTPSHandler()) # Strip out HTTPHandler to prevent MITM spoof: for handler in opener.handlers: if isinstance(handler, HTTPHandler): opener.handlers.remove(handler) return opener def read_chunks(response, chunk_size): while True: chunk = response.read(chunk_size) if not chunk: break yield chunk response = opener().open(url) path = join(temp, urlparse(url).path.split('/')[-1]) actual_hash = sha256() with open(path, 'wb') as file: for chunk in read_chunks(response, 4096): file.write(chunk) actual_hash.update(chunk) actual_digest = actual_hash.hexdigest() if actual_digest != digest: raise HashError(url, path, actual_digest, digest) return path
def https_request(self, req): # Make sure that if we're using an iterable object as the request # body, that we've also specified Content-Length if req.has_data(): data = req.get_data() if hasattr(data, 'read') or hasattr(data, 'next'): if not req.has_header('Content-length'): raise ValueError( "No Content-Length specified for iterable body") return urllib2.HTTPSHandler.do_request_(self, req)
def get_opener(): autoproxy = '127.0.0.1:%s' % config.LISTEN_PORT import ssl if getattr(ssl, "create_default_context", None): cafile = os.path.join(data_root, "gae_proxy", "CA.crt") context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile) https_handler = urllib2.HTTPSHandler(context=context) opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler) else: opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy})) return opener
def __init__(self, ssl_config): #pylint: disable=E1002 """Initialize""" if PY2: urllib2.HTTPSHandler.__init__(self) else: super().__init__() # pylint: disable=W0104 self._ssl_config = ssl_config
def __init__(self, connection_class=VerifiedHTTPSConnection): self.specialized_conn_class = connection_class urllib2.HTTPSHandler.__init__(self)
def __init__(self, connection_class=UnverifiedHTTPSConnection): self.specialized_conn_class = connection_class urllib2.HTTPSHandler.__init__(self)
def _additional_handlers(self): handlers = [] if self.session.get('proxy'): protocol, host, port = self._get_proxy() if protocol and host and port: handlers.append( sockshandler.SocksiPyHandler( protocol, host, port ) ) else: raise ChannelException(messages.channels.error_proxy_format) # Skip certificate checks ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE handlers.append(urllib2.HTTPSHandler(context=ctx)) return handlers
def request(target, httpsproxy=None, useragent=None): global contenttype if not useragent: useragent = "Mozilla/5.0 (X11; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0 Iceweasel/22.0" else: print "["+ bc.G + "+" + bc.ENDC + "] User-Agent: " + useragent if httpsproxy: print "["+ bc.G + "+" + bc.ENDC + "] Proxy: " + httpsproxy + "\n" opener = urllib2.build_opener( urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.ProxyHandler({'http': 'http://' + httpsproxy})) urllib2.install_opener(opener) postdata = [('remoteAddress',target),('key','')] postdata = urllib.urlencode(postdata) request = urllib2.Request(url, postdata) request.add_header("Content-type", contenttype) request.add_header("User-Agent", useragent) try: result = urllib2.urlopen(request).read() except urllib2.HTTPError, e: print "Error: " + e.code except urllib2.URLError, e: print "Error: " + e.args obj = json.loads(result) return obj
def _http(self, _method, _path, **kw): data = None params = None if _method=='GET' and kw: _path = '%s?%s' % (_path, _encode_params(kw)) if _method in ['POST', 'PATCH', 'PUT']: data = bytes(_encode_json(kw), 'utf-8') url = '%s%s' % (_URL, _path) opener = build_opener(HTTPSHandler) request = Request(url, data=data) request.get_method = _METHOD_MAP[_method] if self._authorization: request.add_header('Authorization', self._authorization) if _method in ['POST', 'PATCH', 'PUT']: request.add_header('Content-Type', 'application/x-www-form-urlencoded') try: response = opener.open(request, timeout=TIMEOUT) is_json = self._process_resp(response.headers) if is_json: return _parse_json(response.read().decode('utf-8')) except HTTPError as e: is_json = self._process_resp(e.headers) if is_json: json = _parse_json(e.read().decode('utf-8')) else: json = e.read().decode('utf-8') req = JsonObject(method=_method, url=url) resp = JsonObject(code=e.code, json=json) if resp.code==404: raise ApiNotFoundError(url, req, resp) raise ApiError(url, req, resp)
def api_request_native(url, data=None, token=None, https_proxy=None, method=None): request = urllib.Request(url) # print('API request url:', request.get_full_url()) if method: request.get_method = lambda: method token = token if token != None else token_auth_string() request.add_header('Authorization', 'token ' + token) request.add_header('Accept', 'application/json') request.add_header('Content-Type', 'application/json') if data is not None: request.add_data(bytes(data.encode('utf8'))) # print('API request data:', request.get_data()) # print('API request header:', request.header_items()) # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy') # if https_proxy: # opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(), # urllib.ProxyHandler({'https': https_proxy})) # urllib.install_opener(opener) try: with contextlib.closing(urllib.urlopen(request)) as response: if response.code == 204: # No Content return None else: return json.loads(response.read().decode('utf8', 'ignore')) except urllib.HTTPError as err: with contextlib.closing(err): raise SimpleHTTPError(err.code, err.read())
def getUrl(self,url, ischunkDownloading=False): try: post=None print 'url',url #openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler) cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar) openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler()) if post: req = urllib2.Request(url, post) else: req = urllib2.Request(url) ua_header=False if self.clientHeader: for n,v in self.clientHeader: req.add_header(n,v) if n=='User-Agent': ua_header=True if not ua_header: req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko') #response = urllib2.urlopen(req) if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ): req.set_proxy(self.proxy, 'http') response = openner.open(req) data=response.read() return data except: print 'Error in getUrl' traceback.print_exc() return None
def debug(): """ Activate debugging on urllib2 """ handler = HTTPSHandler(debuglevel = 1) opener = build_opener(handler) install_opener(opener) # Store properties for all requests
def __init__(self, auth_file): urllib2.HTTPSHandler.__init__(self) self.auth_file = auth_file
def _GetHandlers(self): return [urllib2.HTTPSHandler()]
def __init__(self, headers = {},debug = True, p = ''): #timeout self.timeout = 10 #cookie handler self.cookie_processor = urllib2.HTTPCookieProcessor(cookielib.LWPCookieJar()) #debug handler self.debug = debug if self.debug: self.httpHandler = urllib2.HTTPHandler(debuglevel=1) self.httpsHandler = urllib2.HTTPSHandler(debuglevel=1) else: self.httpHandler = urllib2.HTTPHandler(debuglevel=0) self.httpsHandler = urllib2.HTTPSHandler(debuglevel=0) #proxy handler (http) if p != '' and p != 'None' and p != None and p != 'NULL': self.proxy_handler = urllib2.ProxyHandler({'http': p}) else: self.proxy_handler = urllib2.ProxyHandler({}) #opener self.opener = urllib2.build_opener( self.cookie_processor,self.proxy_handler, self.httpHandler, self.httpsHandler) self.opener.addheaders = [('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'),] #header for key in headers.keys(): cur=self._replace(key) if cur!=-1: self.opener.addheaders.pop(cur) self.opener.addheaders += [(key, headers[key]), ]
def __init__(self, *args, **kwargs): try: kwargs['context'] = ssl._create_unverified_context() except AttributeError: # Python prior to 2.7.9 doesn't have default-enabled certificate # verification pass urllib2.HTTPSHandler.__init__(self, *args, **kwargs)
def getUrl(self,url, ischunkDownloading=False): try: post=None print 'url',url openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler) #cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar) #openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler()) if post: req = urllib2.Request(url, post) else: req = urllib2.Request(url) ua_header=False if self.clientHeader: for n,v in self.clientHeader: req.add_header(n,v) if n=='User-Agent': ua_header=True if not ua_header: req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; WOW64; rv:30.0) Gecko/20100101 Firefox/30.0') #response = urllib2.urlopen(req) if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ): req.set_proxy(self.proxy, 'http') response = openner.open(req) data=response.read() return data except: print 'Error in getUrl' traceback.print_exc() return None
def openUrl(self,url, ischunkDownloading=False): try: post=None openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler) if post: req = urllib2.Request(url, post) else: req = urllib2.Request(url) ua_header=False if self.clientHeader: for n,v in self.clientHeader: req.add_header(n,v) if n=='User-Agent': ua_header=True if not ua_header: req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36') #response = urllib2.urlopen(req) if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ): req.set_proxy(self.proxy, 'http') response = openner.open(req) return response except: print 'Error in getUrl' traceback.print_exc() return None
def getUrl(self,url, ischunkDownloading=False): try: post=None openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler) if post: req = urllib2.Request(url, post) else: req = urllib2.Request(url) ua_header=False if self.clientHeader: for n,v in self.clientHeader: req.add_header(n,v) if n=='User-Agent': ua_header=True if not ua_header: req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36') #response = urllib2.urlopen(req) if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ): req.set_proxy(self.proxy, 'http') response = openner.open(req) data=response.read() return data except: print 'Error in getUrl' traceback.print_exc() return None
def __init__(self): self.cookies = cookielib.LWPCookieJar() self.handlers = (urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.HTTPCookieProcessor(self.cookies)) self.opener = urllib2.build_opener(*self.handlers)
def __init__(self, login, password): """ Start up... """ self.login = login self.password = password # Simulate browser with cookies enabled self.cj = cookielib.MozillaCookieJar(cookie_filename) ''' Creating settings for the proxy ''' # proxy_handler = urllib2.ProxyHandler({'http':'209.222.25.83:3128'}) # 216.58.194.113 # proxy_handler = urllib2.ProxyHandler({'http':'8.8.8.8'}) proxy_handler = urllib2.ProxyHandler({'http':'notional-sign-110911.appspot.com'}) # proxy_auth_handler = urllib2.ProxyBasicAuthHandler() if os.access(cookie_filename, os.F_OK): self.cj.load() self.opener = urllib2.build_opener( urllib2.HTTPRedirectHandler(), urllib2.HTTPHandler(debuglevel=0), urllib2.HTTPSHandler(debuglevel=0), proxy_handler, urllib2.HTTPCookieProcessor(self.cj) ) self.opener.addheaders = [ ('User-agent', ('Mozilla/4.0 (compatible; MSIE 6.0; ' 'Windows NT 5.2; .NET CLR 1.1.4322)')) ]
def performFullSearch(self, searchParams, dbHost, dbPort, dbName): """ Performs search and Saves the information gathered into DB. This method almost performs everything this class is created for """ print "inside Perform Search ... " try: #self.login = login #self.password = password # Simulate browser with cookies enabled self.cj = cookielib.MozillaCookieJar(cookie_filename) if os.access(cookie_filename, os.F_OK): self.cj.load() self.opener = urllib2.build_opener( urllib2.HTTPRedirectHandler(), urllib2.HTTPHandler(debuglevel=0), urllib2.HTTPSHandler(debuglevel=0), urllib2.HTTPCookieProcessor(self.cj) ) self.opener.addheaders = [ ('User-agent', ('Mozilla/4.0 (compatible; MSIE 6.0; ' 'Windows NT 5.2; .NET CLR 1.1.4322)')) ] self.checkLogin(url1) fName = searchParams['firstName'] mailId = searchParams['email'] if fName == 'EMPTY' or mailId == 'EMPTY': raise Exception('Info: Search has to be performed from Search page only, Please try again', 'Info') fSrchURL = self.formSearchURL(searchParams) linkedJSON = self.loadSearch(fSrchURL, fName) recordJSON = self.formTrimmedJSON(linkedJSON) dbRecord = self.formDBRecord(recordJSON, mailId) client = self.connect2DB(dbHost, dbPort) print "Client details : "+client.__str__() self.store2DB(dbRecord, mailId, client) return 'Success' except Exception as e: x,y = e.args return x