我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.splittype()。
def __init__(self, uri, transport=None, encoding=None, verbose=0, allow_none=0, use_datetime=0): # establish a "logical" server connection # get the url import urllib type, uri = urllib.splittype(uri) if type not in ("http", "https"): raise IOError, "unsupported XML-RPC protocol" self.__host, self.__handler = urllib.splithost(uri) if not self.__handler: self.__handler = "/RPC2" if transport is None: if type == "https": transport = SafeTransport(use_datetime=use_datetime) else: transport = Transport(use_datetime=use_datetime) self.__transport = transport self.__encoding = encoding self.__verbose = verbose self.__allow_none = allow_none
def request(self, method, url, body=None, headers={}): # Request is called before connect, so can interpret url and get # real host/port to be used to make CONNECT request to proxy proto, rest = urllib.splittype(url) if proto is None: raise ValueError, "unknown URL type: %s" % url # Get host host, rest = urllib.splithost(rest) # Try to get port host, port = urllib.splitport(host) # If port is not defined try to get from proto if port is None: try: port = self._ports[proto] except KeyError: raise ValueError, "unknown protocol for: %s" % url self._real_host = host self._real_port = int(port) httplib.HTTPConnection.request(self, method, url, body, headers)
def __init__(self, uri, transport=None, encoding=None, verbose=0,version=None): self.location = uri # server location (url) self.trace = verbose # show debug messages self.exceptions = True # raise errors? (JSONRPCError) self.timeout = None self.json_request = self.json_response = '' self.version = version # '2.0' for jsonrpc2 type, uri = urllib.splittype(uri) if type not in ("http", "https"): raise IOError("unsupported JSON-RPC protocol") self.__host, self.__handler = urllib.splithost(uri) if transport is None: if type == "https": transport = JSONSafeTransport() else: transport = JSONTransport() self.__transport = transport self.__encoding = encoding self.__verbose = verbose
def __init__(self, uri, transport=None, encoding=None, verbose=0, allow_none=0, use_datetime=0): # establish a "logical" server connection # get the url import urllib type, uri = urllib.splittype(uri) if type not in ("http", "https"): raise IOError("unsupported XML-RPC protocol") self.__host, self.__handler = urllib.splithost(uri) if not self.__handler: self.__handler = "/RPC2" if transport is None: if type == "https": transport = SafeTransport(use_datetime=use_datetime) else: transport = Transport(use_datetime=use_datetime) self.__transport = transport self.__encoding = encoding self.__verbose = verbose self.__allow_none = allow_none
def make_connection(self, host): self.realhost = host proxies = urllib.getproxies() proxyurl = None if 'http' in proxies: proxyurl = proxies['http'] elif 'all' in proxies: proxyurl = proxies['all'] if proxyurl: urltype, proxyhost = urllib.splittype(proxyurl) host, selector = urllib.splithost(proxyhost) h = httplib.HTTP(host) self.proxy_is_used = True return h else: self.proxy_is_used = False return Transport.make_connection(self, host)
def __init__(self, uri, transport=None, encoding=None, verbose=False, allow_none=False, use_datetime=False): type, uri = urllib.splittype(uri) if type not in ('scgi'): raise IOError('unsupported XML-RPC protocol') self.__host, self.__handler = urllib.splithost(uri) if not self.__handler: self.__handler = '/' if transport is None: transport = SCGITransport(use_datetime=use_datetime) self.__transport = transport self.__encoding = encoding self.__verbose = verbose self.__allow_none = allow_none
def __init__(self, uri, transport=None, verbose=False, binary=True, compressRequest=True, acceptCompressedResponse=True): """Establish a "logical" server connection.""" # get the url import urllib typ, uri = urllib.splittype(uri) if typ not in ('http', 'https'): raise IOError('unsupported Pickle-RPC protocol') self._host, self._handler = urllib.splithost(uri) if not self._handler: self._handler = '/PickleRPC' if transport is None: transport = (SafeTransport if typ == 'https' else Transport)() self._transport = transport self._verbose = verbose self._binary = binary self._compressRequest = compressRequest self._acceptCompressedResponse = acceptCompressedResponse
def url2pathname(pathname): """OS-specific conversion from a relative URL of the 'file' scheme to a file system path; not recommended for general use.""" # # XXXX The .. handling should be fixed... # tp = urllib.splittype(pathname)[0] if tp and tp != 'file': raise RuntimeError, 'Cannot convert non-local URL to pathname' # Turn starting /// into /, an empty hostname means current host if pathname[:3] == '///': pathname = pathname[2:] elif pathname[:2] == '//': raise RuntimeError, 'Cannot convert non-local URL to pathname' components = pathname.split('/') # Remove . and embedded .. i = 0 while i < len(components): if components[i] == '.': del components[i] elif components[i] == '..' and i > 0 and \ components[i-1] not in ('', '..'): del components[i-1:i+1] i = i-1 elif components[i] == '' and i > 0 and components[i-1] != '': del components[i] else: i = i+1 if not components[0]: # Absolute unix path, don't start with colon rv = ':'.join(components[1:]) else: # relative unix path, start with colon. First replace # leading .. by empty strings (giving ::file) i = 0 while i < len(components) and components[i] == '..': components[i] = '' i = i + 1 rv = ':' + ':'.join(components) # and finally unquote slashes and other funny characters return urllib.unquote(rv)
def _spliturl(url): scheme, opaque = urllib.splittype(url) netloc, path = urllib.splithost(opaque) host, port = urllib.splitport(netloc) # Strip brackets if its an IPv6 address if host.startswith('[') and host.endswith(']'): host = host[1:-1] if port is None: port = DEFAULT_PORT return scheme, host, port, path # Given an HTTP request handler, this wrapper objects provides a related # family of convenience methods built using that handler.
def getpage(self, url_pair): # Incoming argument name is a (URL, fragment) pair. # The page may have been cached in the name_table variable. url, fragment = url_pair if self.name_table.has_key(url): return self.name_table[url] scheme, path = urllib.splittype(url) if scheme in ('mailto', 'news', 'javascript', 'telnet'): self.note(1, " Not checking %s URL" % scheme) return None isint = self.inroots(url) # Ensure that openpage gets the URL pair to # print out its error message and record the error pair # correctly. if not isint: if not self.checkext: self.note(1, " Not checking ext link") return None f = self.openpage(url_pair) if f: self.safeclose(f) return None text, nurl = self.readhtml(url_pair) if nurl != url: self.note(1, " Redirected to %s", nurl) url = nurl if text: return Page(text, url, maxpage=self.maxpage, checker=self) # These next three functions take (URL, fragment) pairs as # arguments, so that openpage() receives the appropriate tuple to # record error messages.
def savefilename(self, url): type, rest = urllib.splittype(url) host, path = urllib.splithost(rest) path = path.lstrip("/") user, host = urllib.splituser(host) host, port = urllib.splitnport(host) host = host.lower() if not path or path[-1] == "/": path = path + "index.html" if os.sep != "/": path = os.sep.join(path.split("/")) if os.name == "mac": path = os.sep + path path = os.path.join(host, path) return path
def __init__(self, uri, transport=None, encoding=None, verbose=0, allow_none=0, use_datetime=0, context=None): # establish a "logical" server connection if isinstance(uri, unicode): uri = uri.encode('ISO-8859-1') # get the url import urllib type, uri = urllib.splittype(uri) if type not in ("http", "https"): raise IOError, "unsupported XML-RPC protocol" self.__host, self.__handler = urllib.splithost(uri) if not self.__handler: self.__handler = "/RPC2" if transport is None: if type == "https": transport = SafeTransport(use_datetime=use_datetime, context=context) else: transport = Transport(use_datetime=use_datetime) self.__transport = transport self.__encoding = encoding self.__verbose = verbose self.__allow_none = allow_none
def __init__(self, url, config = Config): proto, uri = urllib.splittype(url) # apply some defaults if uri[0:2] != '//': if proto != None: uri = proto + ':' + uri uri = '//' + uri proto = 'http' host, path = urllib.splithost(uri) try: int(host) host = 'localhost:' + host except: pass if not path: path = '/' if proto not in ('http', 'https', 'httpg'): raise IOError, "unsupported SOAP protocol" if proto == 'httpg' and not config.GSIclient: raise AttributeError, \ "GSI client not supported by this Python installation" if proto == 'https' and not config.SSLclient: raise AttributeError, \ "SSL client not supported by this Python installation" self.user,host = urllib.splituser(host) self.proto = proto self.host = host self.path = path
def parse(self, response): def getdomain(url): proto, rest = urllib.splittype(url) host, rest = urllib.splithost(rest) return "http://"+host sel=scrapy.Selector(response) links_in_a_page = sel.xpath('//a[@href]') for link_sel in links_in_a_page: item=QqurlItem() link=str(link_sel.re('href="(.*?)"')[0]) if link: if not link.startswith('http'): if link.startswith('javascript'): continue if link.startswith('//support'): continue link=getdomain(response.url)+link if re.match('.*comment.*',link): continue yield scrapy.Request(link,callback=self.parse) if not re.match('.*comment.*',link): if re.match('^http.*qq.com.*\.s?html?$',link): item['link']=link yield item
def parse(self, response): def getdomain(url): #proto,rest=urllib.splittype(url) #host,rest=urllib.splithost(rest) return "http:" sel = scrapy.Selector(response) links_in_a_page=sel.xpath('//a[@href]') for link_sel in links_in_a_page: item=SohuItem() link=str(link_sel.re('href="(.*?)"')[0]) if link: if not link.startswith('http'): link=getdomain(response.url)+link yield scrapy.Request(link,callback=self.parse) p1=re.compile(r'.*/a/.*') p2=re.compile(r'.*#comment_area$') p3=re.compile(r'.*news.sohu.com.*s?html?$') if (re.match(p3,link) or re.match(p1,link)) and (not re.match(p2,link)): #print ('T: '+link) item['link']=link yield item else: pass #print ('F: '+link)
def parse_host(self): proto, rest = urllib.splittype(self.get_host()) host, rest = urllib.splithost(rest) host, port = urllib.splitport(host) return host
def test_splittype(self): splittype = urllib.splittype self.assertEqual(splittype('type:opaquestring'), ('type', 'opaquestring')) self.assertEqual(splittype('opaquestring'), (None, 'opaquestring')) self.assertEqual(splittype(':opaquestring'), (None, ':opaquestring')) self.assertEqual(splittype('type:'), ('type', '')) self.assertEqual(splittype('type:opaque:string'), ('type', 'opaque:string'))
def __init__(self, uri, transport=None, encoding=None, verbose=0, allow_none=0, use_datetime=0, context=None): # establish a "logical" server connection if unicode and isinstance(uri, unicode): uri = uri.encode('ISO-8859-1') # get the url import urllib type, uri = urllib.splittype(uri) if type not in ("http", "https"): raise IOError, "unsupported XML-RPC protocol" self.__host, self.__handler = urllib.splithost(uri) if not self.__handler: self.__handler = "/RPC2" if transport is None: if type == "https": transport = SafeTransport(use_datetime=use_datetime, context=context) else: transport = Transport(use_datetime=use_datetime) self.__transport = transport self.__encoding = encoding self.__verbose = verbose self.__allow_none = allow_none
def url2pathname(url): """OS-specific conversion from a relative URL of the 'file' scheme to a file system path; not recommended for general use.""" tp = urllib.splittype(url)[0] if tp and tp <> 'file': raise RuntimeError, 'Cannot convert non-local URL to pathname' # Turn starting /// into /, an empty hostname means current host if url[:3] == '///': url = url[2:] elif url[:2] == '//': raise RuntimeError, 'Cannot convert non-local URL to pathname' components = string.split(url, '/') if not components[0]: if '$' in components: del components[0] else: components[0] = '$' # Remove . and embedded .. i = 0 while i < len(components): if components[i] == '.': del components[i] elif components[i] == '..' and i > 0 and \ components[i-1] not in ('', '..'): del components[i-1:i+1] i -= 1 elif components[i] == '..': components[i] = '^' i += 1 elif components[i] == '' and i > 0 and components[i-1] <> '': del components[i] else: i += 1 components = map(lambda x: urllib.unquote(x).translate(__slash_dot), components) return '.'.join(components)
def savefilename(self, url): type, rest = urllib.splittype(url) host, path = urllib.splithost(rest) path = path.lstrip("/") user, host = urllib.splituser(host) host, port = urllib.splitnport(host) host = host.lower() if not path or path[-1] == "/": path = path + "index.html" if os.sep != "/": path = os.sep.join(path.split("/")) path = os.path.join(host, path) return path
def reference_url(self, reference_url): """????URL ?????????, ?: ??: http://www.ttmark.com/diannao/2014/11/04/470.html ???: www.ttmark.com """ proto, rest = urllib.splittype(reference_url) res, rest = urllib.splithost(rest) if not res: self._reference_url = '-' else: self._reference_url = res
def request_url(self, request_url): """?????URL ???????URL???????, ?: ??: /wp-admin/admin-ajax.php?postviews_id=1348 ???: /wp-admin/admin-ajax.php """ proto, rest = urllib.splittype(request_url) url_path, url_param = urllib.splitquery(rest) if url_path.startswith('/tag/'): url_path = '/tag/' self._request_url = url_path
def __init__(self, uri, transport=None, encoding=None, verbose=0, version=None): import urllib if not version: version = config.version self.__version = version schema, uri = urllib.splittype(uri) if schema not in ('http', 'https', 'unix'): raise IOError('Unsupported JSON-RPC protocol.') if schema == 'unix': if not USE_UNIX_SOCKETS: # Don't like the "generic" Exception... raise UnixSocketMissing("Unix sockets not available.") self.__host = uri self.__handler = '/' else: self.__host, self.__handler = urllib.splithost(uri) if not self.__handler: # Not sure if this is in the JSON spec? # self.__handler = '/' self.__handler == '/' if transport is None: if schema == 'unix': transport = UnixTransport() elif schema == 'https': transport = SafeTransport() else: transport = Transport() self.__transport = transport self.__encoding = encoding self.__verbose = verbose
def url_permutations(url): """Try all permutations of hostname and path which can be applied to blacklisted URLs""" def url_host_permutations(host): if re.match(r'\d+\.\d+\.\d+\.\d+', host): yield host return parts = host.split('.') l = min(len(parts),5) if l > 4: yield host for i in xrange(l-1): yield '.'.join(parts[i-l:]) def url_path_permutations(path): if path != '/': yield path query = None if '?' in path: path, query = path.split('?', 1) if query is not None: yield path path_parts = path.split('/')[0:-1] curr_path = '' for i in xrange(min(4, len(path_parts))): curr_path = curr_path + path_parts[i] + '/' yield curr_path protocol, address_str = urllib.splittype(url) host, path = urllib.splithost(address_str) user, host = urllib.splituser(host) host, port = urllib.splitport(host) host = host.strip('/') for h in url_host_permutations(host): for p in url_path_permutations(path): yield '%s%s' % (h, p)