我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用six.moves.urllib.request.build_opener()。
def __init__(self, proxy=None, secure_proxy=None, retries_num=0, retry_interval=0): """Initialises. :param proxy: the url of proxy for http-connections :param secure_proxy: the url of proxy for https-connections :param retries_num: the number of allowed retries :param retry_interval: the minimal time between retries (in seconds) """ if proxy: proxies = { "http": proxy, "https": secure_proxy or proxy, } else: proxies = None self.retries_num = retries_num self.retry_interval = retry_interval self.opener = urllib.build_opener( RetryHandler(), urllib.ProxyHandler(proxies) )
def _get_opener(self): if not self.opener: if (CONF.dashboard.disable_ssl_certificate_validation and self._ssl_default_context_supported()): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.opener = request.build_opener( request.HTTPSHandler(context=ctx), request.HTTPCookieProcessor()) else: self.opener = request.build_opener( request.HTTPCookieProcessor()) return self.opener
def _follow_redirects(uri_list, http_timeout): """ Follow HTTP redirects from servers. Needed so that we can create RewriteRules for all repository URLs that pkg clients may encounter. We return a sorted list of URIs that were found having followed all redirects in 'uri_list'. We also return a boolean, True if we timed out when following any of the URIs. """ ret_uris = set(uri_list) timed_out = False class SysrepoRedirectHandler(HTTPRedirectHandler): """ A HTTPRedirectHandler that saves URIs we've been redirected to along the path to our eventual destination.""" def __init__(self): self.redirects = set() def redirect_request(self, req, fp, code, msg, hdrs, newurl): self.redirects.add(newurl) return HTTPRedirectHandler.redirect_request( self, req, fp, code, msg, hdrs, newurl) for uri in uri_list: handler = SysrepoRedirectHandler() opener = build_opener(handler) if not uri.startswith("http:"): ret_uris.update([uri]) continue # otherwise, open a known url to check for redirects try: opener.open("{0}/versions/0".format(uri), None, http_timeout) ret_uris.update(set( [item.replace("/versions/0", "").rstrip("/") for item in handler.redirects])) except URLError as err: # We need to log this, and carry on - the url # could become available at a later date. msg(_("WARNING: unable to access {uri} when checking " "for redirects: {err}").format(**locals())) timed_out = True return sorted(list(ret_uris)), timed_out