我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用urllib.request.host()。
def request_host(request): """Return request-host, as defined by RFC 2965. Variation from RFC: returned value is lowercased, for convenient comparison. """ url = request.full_url host = urlparse(url)[1] if host == "": host = request.get_header("Host", "") # remove port, if present host = _cut_port_re.sub("", host, 1) return host.lower()
def __init__(self, url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None): # unwrap('<URL:type://host/path>') --> 'type://host/path' self.full_url = unwrap(url) self.full_url, self.fragment = splittag(self.full_url) self.data = data self.headers = {} self._tunnel_host = None for key, value in headers.items(): self.add_header(key, value) self.unredirected_hdrs = {} if origin_req_host is None: origin_req_host = request_host(self) self.origin_req_host = origin_req_host self.unverifiable = unverifiable self.method = method self._parse()
def http_error_auth_reqed(self, auth_header, host, req, headers): authreq = headers.get(auth_header, None) if self.retried > 5: # Don't fail endlessly - if we failed once, we'll probably # fail a second time. Hm. Unless the Password Manager is # prompting for the information. Crap. This isn't great # but it's better than the current 'repeat until recursion # depth exceeded' approach <wink> raise HTTPError(req.full_url, 401, "digest auth failed", headers, None) else: self.retried += 1 if authreq: scheme = authreq.split()[0] if scheme.lower() == 'digest': return self.retry_http_digest_auth(req, authreq) elif scheme.lower() != 'basic': raise ValueError("AbstractDigestAuthHandler does not support" " the following scheme: '%s'" % scheme)
def proxy_bypass_environment(host): """Test if proxies should not be used for a particular host. Checks the environment for a variable named no_proxy, which should be a list of DNS suffixes separated by commas, or '*' for all hosts. """ no_proxy = os.environ.get('no_proxy', '') or os.environ.get('NO_PROXY', '') # '*' is special case for always bypass if no_proxy == '*': return 1 # strip port off host hostonly, port = splitport(host) # check if the host ends with any of the DNS suffixes no_proxy_list = [proxy.strip() for proxy in no_proxy.split(',')] for name in no_proxy_list: if name and (hostonly.endswith(name) or host.endswith(name)): return 1 # otherwise, don't bypass return 0 # This code tests an OSX specific data structure but is testable on all # platforms
def __init__(self, url, data=None, headers={}, origin_req_host=None, unverifiable=False): # unwrap('<URL:type://host/path>') --> 'type://host/path' self.full_url = unwrap(url) self.full_url, self.fragment = splittag(self.full_url) self.data = data self.headers = {} self._tunnel_host = None for key, value in headers.items(): self.add_header(key, value) self.unredirected_hdrs = {} if origin_req_host is None: origin_req_host = request_host(self) self.origin_req_host = origin_req_host self.unverifiable = unverifiable self._parse()
def http_error_auth_reqed(self, authreq, host, req, headers): # host may be an authority (without userinfo) or a URL with an # authority # XXX could be multiple headers authreq = headers.get(authreq, None) if self.retried > 5: # retry sending the username:password 5 times before failing. raise HTTPError(req.get_full_url(), 401, "basic auth failed", headers, None) else: self.retried += 1 if authreq: mo = AbstractBasicAuthHandler.rx.search(authreq) if mo: scheme, quote, realm = mo.groups() if scheme.lower() == 'basic': response = self.retry_http_basic_auth(host, req, realm) if response and response.code != 401: self.retried = 0 return response
def http_error_auth_reqed(self, auth_header, host, req, headers): authreq = headers.get(auth_header, None) if self.retried > 5: # Don't fail endlessly - if we failed once, we'll probably # fail a second time. Hm. Unless the Password Manager is # prompting for the information. Crap. This isn't great # but it's better than the current 'repeat until recursion # depth exceeded' approach <wink> raise HTTPError(req.full_url, 401, "digest auth failed", headers, None) else: self.retried += 1 if authreq: scheme = authreq.split()[0] if scheme.lower() == 'digest': return self.retry_http_digest_auth(req, authreq)
def _parse(self): self.type, rest = splittype(self.full_url) if self.type is None: raise ValueError("unknown url type: %r" % self.full_url) self.host, self.selector = splithost(rest) if self.host: self.host = unquote(self.host)
def set_proxy(self, host, type): if self.type == 'https' and not self._tunnel_host: self._tunnel_host = self.host else: self.type= type self.selector = self.full_url self.host = host
def proxy_open(self, req, proxy, type): orig_type = req.type proxy_type, user, password, hostport = _parse_proxy(proxy) if proxy_type is None: proxy_type = orig_type if req.host and proxy_bypass(req.host): return None if user and password: user_pass = '%s:%s' % (unquote(user), unquote(password)) creds = base64.b64encode(user_pass.encode()).decode("ascii") req.add_header('Proxy-authorization', 'Basic ' + creds) hostport = unquote(hostport) req.set_proxy(hostport, proxy_type) if orig_type == proxy_type or orig_type == 'https': # let other handlers take care of it return None else: # need to start over, because the other handlers don't # grok the proxy's URL type # e.g. if we have a constructor arg proxies like so: # {'http': 'ftp://proxy.example.com'}, we may end up turning # a request for http://acme.example.com/a into one for # ftp://proxy.example.com/a return self.parent.open(req, timeout=req.timeout)
def http_error_auth_reqed(self, authreq, host, req, headers): # host may be an authority (without userinfo) or a URL with an # authority # XXX could be multiple headers authreq = headers.get(authreq, None) if self.retried > 5: # retry sending the username:password 5 times before failing. raise HTTPError(req.get_full_url(), 401, "basic auth failed", headers, None) else: self.retried += 1 if authreq: scheme = authreq.split()[0] if scheme.lower() != 'basic': raise ValueError("AbstractBasicAuthHandler does not" " support the following scheme: '%s'" % scheme) else: mo = AbstractBasicAuthHandler.rx.search(authreq) if mo: scheme, quote, realm = mo.groups() if quote not in ['"',"'"]: warnings.warn("Basic Auth Realm was unquoted", UserWarning, 2) if scheme.lower() == 'basic': response = self.retry_http_basic_auth(host, req, realm) if response and response.code != 401: self.retried = 0 return response
def retry_http_basic_auth(self, host, req, realm): user, pw = self.passwd.find_user_password(realm, host) if pw is not None: raw = "%s:%s" % (user, pw) auth = "Basic " + base64.b64encode(raw.encode()).decode("ascii") if req.headers.get(self.auth_header, None) == auth: return None req.add_unredirected_header(self.auth_header, auth) return self.parent.open(req, timeout=req.timeout) else: return None
def http_error_407(self, req, fp, code, msg, headers): # http_error_auth_reqed requires that there is no userinfo component in # authority. Assume there isn't one, since urllib.request does not (and # should not, RFC 3986 s. 3.2.1) support requests for URLs containing # userinfo. authority = req.host response = self.http_error_auth_reqed('proxy-authenticate', authority, req, headers) self.reset_retry_count() return response # Return n random bytes.
def http_error_401(self, req, fp, code, msg, headers): host = urlparse(req.full_url)[1] retry = self.http_error_auth_reqed('www-authenticate', host, req, headers) self.reset_retry_count() return retry
def file_open(self, req): url = req.selector if url[:2] == '//' and url[2:3] != '/' and (req.host and req.host != 'localhost'): if not req.host is self.get_names(): raise URLError("file:// scheme is supported only on localhost") else: return self.open_local_file(req) # names for the localhost
def open_local_file(self, req): import future.backports.email.utils as email_utils import mimetypes host = req.host filename = req.selector localfile = url2pathname(filename) try: stats = os.stat(localfile) size = stats.st_size modified = email_utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(filename)[0] headers = email.message_from_string( 'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified)) if host: host, port = splitport(host) if not host or \ (not port and _safe_gethostbyname(host) in self.get_names()): if host: origurl = 'file://' + host + filename else: origurl = 'file://' + filename return addinfourl(open(localfile, 'rb'), headers, origurl) except OSError as exp: # users shouldn't expect OSErrors coming from urlopen() raise URLError(exp) raise URLError('file not on local host')
def _safe_gethostbyname(host): try: return socket.gethostbyname(host) except socket.gaierror: return None
def connect_ftp(self, user, passwd, host, port, dirs, timeout): return ftpwrapper(user, passwd, host, port, dirs, timeout, persistent=False)
def connect_ftp(self, user, passwd, host, port, dirs, timeout): key = user, host, port, '/'.join(dirs), timeout if key in self.cache: self.timeout[key] = time.time() + self.delay else: self.cache[key] = ftpwrapper(user, passwd, host, port, dirs, timeout) self.timeout[key] = time.time() + self.delay self.check_cache() return self.cache[key]