我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urlparse.urlsplit()。
def reduce_uri(self, uri, default_port=True): """Accept authority or URI and extract only the authority and path.""" # note HTTP URLs do not have a userinfo component parts = urlparse.urlsplit(uri) if parts[1]: # URI scheme = parts[0] authority = parts[1] path = parts[2] or '/' else: # host or host:port scheme = None authority = uri path = '/' host, port = splitport(authority) if default_port and port is None and scheme is not None: dport = {"http": 80, "https": 443, }.get(scheme) if dport is not None: authority = "%s:%d" % (host, dport) return authority, path
def getOtherRecipeLinks(self): """Return a list of other recipes found in the page: while single recipe pages do not have links, the various categories at http://www.williams-sonoma.com/recipe/ do. For example, http://www.williams-sonoma.com/search/results.html?activeTab=recipes&words=winter_weeknight_dinners has a collection of individual recipe links, and this method will find them. """ data = [] for link in self.tree.xpath('//ul[@class="recipe-list"]/li/a'): if 'href' in link.keys(): href = urlsplit(link.get('href')) if 'cm_src=RECIPESEARCH' == href.query: data.append(href.scheme + '://' + href.netloc + href.path) return data
def run(self): ind=self.qu.get() url=self.url+str(ind) soup =bs.BeautifulSoup(''.join( ul.urlopen(url).readlines() )) bu = up.urlsplit(self.url) print 'started with the ' ,str(url).split('/')[-1], for i in soup.find_all(attrs = { "class" : "recipe-title"}): sp = up.urlsplit(i.a.get('href')) path = sp.path print path if re.search(pat, path): path = bu.scheme+'://'+bu.netloc+path filename = str(path).split('/')[-2] filename = op.join(op.abspath(op.curdir),filename+'.py') # recipe will be stored in given location # filename = op.join(op.abspath(op.curdir),filename+'.html') #uncomment the above line if downloading the web page for teh recipe print path self.q.put((path,filename)) self.fetch_data() time.sleep(1) self.qu.task_done() self.q.join() print 'done with the ' ,str(url).split('/')[-1],
def get_version_from_url(url): components = urlparse.urlsplit(url) path = components.path pos = path.find('/') ver = '' if pos == 0: path = path[1:] i = path.find('/') if i >= 0: ver = path[:i] else: ver = path elif pos > 0: ver = path[:pos] else: ver = path return ver
def remove_trailing_version_from_href(href): """Removes the api version from the href. Given: 'http://www.nova.com/compute/v1.1' Returns: 'http://www.nova.com/compute' Given: 'http://www.nova.com/v1.1' Returns: 'http://www.nova.com' """ parsed_url = urlparse.urlsplit(href) url_parts = parsed_url.path.rsplit('/', 1) # NOTE: this should match vX.X or vX expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)') if not expression.match(url_parts.pop()): raise ValueError('URL %s does not contain version' % href) new_path = url_join(*url_parts) parsed_url = list(parsed_url) parsed_url[2] = new_path return urlparse.urlunsplit(parsed_url)
def click(self, st): """Return a path which is the URL where a browser would presumably take you if you clicked on a link with an HREF as given. """ scheme, netloc, path, query, fragment = urlparse.urlsplit(st) if not scheme: scheme = self.scheme if not netloc: netloc = self.netloc if not path: path = self.path if not query: query = self.query elif path[0] != '/': l = self.pathList() l[-1] = path path = '/'.join(l) return URLPath(scheme, netloc, path, query, fragment)
def _checkFrom(self, pyobj): '''WS-Address From, XXX currently not checking the hostname, not forwarding messages. pyobj -- From server returned. ''' if pyobj is None: return value = pyobj._Address if value != self._addressTo: scheme,netloc,path,query,fragment = urlparse.urlsplit(value) schemeF,netlocF,pathF,queryF,fragmentF = urlparse.urlsplit(self._addressTo) if scheme==schemeF and path==pathF and query==queryF and fragment==fragmentF: netloc = netloc.split(':') + ['80'] netlocF = netlocF.split(':') + ['80'] if netloc[1]==netlocF[1] and (socket.gethostbyname(netlocF[0]) in ('127.0.0.1', socket.gethostbyname(netloc[0]))): return raise WSActionException('wrong WS-Address From(%s), expecting %s'%(value,self._addressTo))
def change_locale(request): """ Redirect to a given url while changing the locale in the path The url and the locale code need to be specified in the request parameters. """ next = request.REQUEST.get('next', None) if not next: referrer = request.META.get('HTTP_REFERER', None) if referrer: next = urlsplit(referrer)[2] if not next: next = '/' _, path = utils.strip_path(next) if request.method == 'POST': locale = request.POST.get('locale', None) if locale and check_for_language(locale): if localeurl_settings.USE_SESSION: request.session['django_language'] = locale path = utils.locale_path(path, locale) response = http.HttpResponseRedirect(path) return response
def serial_class_for_url(url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'alt': raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,)) class_name = 'Serial' try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'class': class_name = values[0] else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e) return (''.join([parts.netloc, parts.path]), getattr(serial, class_name)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "socket": raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.socket') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: %r' % (option,)) # get host and port host, port = parts.hostname, parts.port if not 0 <= port < 65536: raise ValueError("port not in range 0...65535") except ValueError as e: raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e) return (host, port) # - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "loop": raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.loop') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e) # - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'spy': raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,)) # process options now, directly altering self formatter = FormatHexdump color = False output = sys.stderr try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'file': output = open(values[0], 'w') elif option == 'color': color = True elif option == 'raw': formatter = FormatRaw elif option == 'all': self.show_all = True else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e) self.formatter = formatter(output, color) return ''.join([parts.netloc, parts.path])
def get_base_page_info(self, page_data): """Find the reverse-ip info for the base page""" domain = urlparse.urlsplit(page_data['final_url']).hostname try: import socket addr = socket.gethostbyname(domain) host = str(socket.gethostbyaddr(addr)[0]) page_data['base_page_ip_ptr'] = host except Exception: pass # keep moving up the domain until we can get a NS record while domain is not None and 'base_page_dns_soa' not in page_data: try: import dns.resolver dns_servers = dns.resolver.query(domain, "NS") dns_server = str(dns_servers[0].target).strip('. ') page_data['base_page_dns_ns'] = dns_server except Exception: pass pos = domain.find('.') if pos > 0: domain = domain[pos + 1:] else: domain = None
def can_view_parent_source (self, url_data): """Determine if parent URL source can be retrieved.""" if not url_data.valid: return False parent = url_data.parent_url if not parent: return False # Directory contents are dynamically generated, so it makes # no sense in viewing/editing them. if parent.startswith(u"file:"): path = urlparse.urlsplit(parent)[2] return not os.path.isdir(get_os_filename(path)) if parent.startswith((u"ftp:", u"ftps:")): path = urlparse.urlsplit(parent)[2] return bool(path) and not path.endswith(u'/') # Only HTTP left return parent.startswith((u"http:", u"https:"))
def _convert_to_idn(url): """Convert a URL to IDN notation""" # this function should only be called with a unicode string # strategy: if the host cannot be encoded in ascii, then # it'll be necessary to encode it in idn form parts = list(urlparse.urlsplit(url)) try: parts[1].encode('ascii') except UnicodeEncodeError: # the url needs to be converted to idn notation host = parts[1].rsplit(':', 1) newhost = [] port = u'' if len(host) == 2: port = host.pop() for h in host[0].split('.'): newhost.append(h.encode('idna').decode('utf-8')) parts[1] = '.'.join(newhost) if port: parts[1] += ':' + port return urlparse.urlunsplit(parts) else: return url
def download_file(my_URL, my_outfile = ''): # function to download a file from a URL # !! This will overwrite the output file # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2 import urllib2 import urlparse import os URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path) # if no output file specified, save to URL filename in current dir if my_outfile == '': my_outfile = URL_basename my_URL = urllib2.urlopen(my_URL) with open(my_outfile, 'wb') as output: while True: data = my_URL.read(4096) # download in chunks if data: output.write(data) else: break
def create_http_request(self, method, url, headers, body, timeout, **kwargs): scheme, netloc, path, query, _ = urlparse.urlsplit(url) if netloc.rfind(':') <= netloc.rfind(']'): # no port number host = netloc port = 443 if scheme == 'https' else 80 else: host, _, port = netloc.rpartition(':') port = int(port) if query: path += '?' + query if 'Host' not in headers: headers['Host'] = host if body and 'Content-Length' not in headers: headers['Content-Length'] = str(len(body)) ConnectionType = httplib.HTTPSConnection if scheme == 'https' else httplib.HTTPConnection connection = ConnectionType(netloc, timeout=timeout) connection.request(method, path, body=body, headers=headers) response = connection.getresponse() return response
def filter(self, handler): path = urlparse.urlsplit(handler.path).path if path.startswith('/'): path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8') if os.path.isdir(path): index_file = os.path.join(path, self.index_file) if not os.path.isfile(index_file): content = self.format_index_html(path).encode('UTF-8') headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'} return 'mock', {'status': 200, 'headers': headers, 'body': content} else: path = index_file if os.path.isfile(path): content_type = 'application/octet-stream' try: import mimetypes content_type = mimetypes.types_map.get(os.path.splitext(path)[1]) if os.path.splitext(path)[1].endswith(('crt', 'pem')): content_type = 'application/x-x509-ca-cert' except StandardError as e: logging.error('import mimetypes failed: %r', e) with open(path, 'rb') as fp: content = fp.read() headers = {'Connection': 'close', 'Content-Type': content_type} return 'mock', {'status': 200, 'headers': headers, 'body': content}
def _CalculateRequestSize(self, req): """Calculates the request size. Args: req: A tuple of (uri, method name, request body, header map) Returns: the size of the request, in bytes. """ uri, method, body, headers = req (unused_scheme, unused_host_port, url_path, unused_query, unused_fragment) = urlparse.urlsplit(uri) size = len('%s %s HTTP/1.1\n' % (method, url_path)) size += self._CalculateHeaderSize(headers) if body: size += len(body) return size
def _parse_relative_url(relative_url): """Parses a relative URL and splits it into its path and query string. Args: relative_url: The relative URL, starting with a '/'. Returns: Tuple (path, query) where: path: The path in the relative URL. query: The query string in the URL without the '?' character. Raises: _RelativeUrlError if the relative_url is invalid for whatever reason. """ if not relative_url: raise _RelativeUrlError('Relative URL is empty') (scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url) if scheme or netloc: raise _RelativeUrlError('Relative URL may not have a scheme or location') if fragment: raise _RelativeUrlError('Relative URL may not specify a fragment') if not path or path[0] != '/': raise _RelativeUrlError('Relative URL path must start with "/"') return path, query
def _CalculateRequestSize(self, req): """Calculates the request size. May be overriden to support different types of requests. Args: req: A urllib2.Request. Returns: the size of the request, in bytes. """ (unused_scheme, unused_host_port, url_path, unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url()) size = len('%s %s HTTP/1.1\n' % (req.get_method(), url_path)) size += self._CalculateHeaderSize(req.headers) size += self._CalculateHeaderSize(req.unredirected_hdrs) data = req.get_data() if data: size += len(data) return size
def join_url(base_url, path): """Joins base url and path removing extra slashes. Removes trailing slashes. Joins queries. E.g.: See unit tests. :param base_url: Base url. :param path: Path. :return: Joined url. """ # Example of usages see in unittests base_url = urlparse.urlsplit(base_url, allow_fragments=False) path = urlparse.urlsplit(path, allow_fragments=False) full_path = _join_paths(base_url.path, path.path) full_query = _join_queries(base_url.query, path.query) return urlparse.urlunsplit( (base_url.scheme, base_url.netloc, full_path, full_query, base_url.fragment))
def check_registry_status(url=DEFAULT_IMAGES_URL, _v2=False): """ Performs api check for registry health status. :params url: registry url :raises RegistryError: if registry is not available """ url = urlsplit(url)._replace(path='/v2/' if _v2 else '/v1/_ping').geturl() with raise_registry_error(url): response = requests.get(url, timeout=PING_REQUEST_TIMEOUT, verify=False) need_v2 = not _v2 and response.status_code == 404 and \ response.headers.get(API_VERSION_HEADER) == 'registry/2.0' if need_v2: check_registry_status(url, _v2=True) elif response.status_code == 401: return # user is not authorized, but registry is available else: response.raise_for_status()
def url_join(*parts, **kwargs): """ Normalize url parts and join them with a slash. adapted from: http://codereview.stackexchange.com/q/13027 """ def concat_paths(sequence): result = [] for path in sequence: result.append(path) if path.startswith('/'): break return '/'.join(reversed(result)) schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts))) scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http')) netloc = next((x for x in netlocs if x), '') path = concat_paths(paths) query = queries[0] fragment = fragments[0] return urlunsplit((scheme, netloc, path, query, fragment))
def url_to_path(self, url): """Create file system path for this URL """ components = urlparse.urlsplit(url) # when empty path set to /index.html path = components.path if not path: path = '/index.html' elif path.endswith('/'): path += 'index.html' filename = components.netloc + path + components.query # replace invalid characters filename = re.sub('[^/0-9a-zA-Z\-.,;_ ]', '_', filename) # restrict maximum number of characters filename = '/'.join(segment[:255] for segment in filename.split('/')) return os.path.join(self.cache_dir, filename)
def resource_dictize(res, context): model = context['model'] resource = d.table_dictize(res, context) extras = resource.pop("extras", None) if extras: resource.update(extras) # some urls do not have the protocol this adds http:// to these url = resource['url'] ## for_edit is only called at the times when the dataset is to be edited ## in the frontend. Without for_edit the whole qualified url is returned. if resource.get('url_type') == 'upload' and not context.get('for_edit'): cleaned_name = munge.munge_filename(url) resource['url'] = h.url_for(controller='package', action='resource_download', id=resource['package_id'], resource_id=res.id, filename=cleaned_name, qualified=True) elif resource['url'] and not urlparse.urlsplit(url).scheme and not context.get('for_edit'): resource['url'] = u'http://' + url.lstrip('/') return resource
def do_GET(self): # /?oauth_token=72157630789362986-5405f8542b549e95&oauth_verifier=fe4eac402339100e qs = urllib_parse.urlsplit(self.path).query url_vars = urllib_parse.parse_qs(qs) oauth_token = url_vars['oauth_token'][0] oauth_verifier = url_vars['oauth_verifier'][0] if six.PY2: self.server.oauth_token = oauth_token.decode('utf-8') self.server.oauth_verifier = oauth_verifier.decode('utf-8') else: self.server.oauth_token = oauth_token self.server.oauth_verifier = oauth_verifier assert (isinstance(self.server.oauth_token, six.string_types)) assert (isinstance(self.server.oauth_verifier, six.string_types)) self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(html.auth_okay_html)
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "loop": raise SerialException( 'expected a string in the form ' '"loop://[?logging={debug|info|warning|error}]": not starting ' 'with loop:// ({!r})'.format(parts.scheme)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.loop') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: {!r}'.format(option)) except ValueError as e: raise SerialException( 'expected a string in the form ' '"loop://[?logging={debug|info|warning|error}]": {}'.format(e)) # - - - - - - - - - - - - - - - - - - - - - - - -
def _get_env_info(self, script_url): script_folder = ModuleExec('system_info', [ '-info', 'script_folder' ]).load_result_or_run('script_folder') if not script_folder: return script_url_splitted = urlparse.urlsplit(script_url) script_url_path_folder, script_url_path_filename = os.path.split( script_url_splitted.path) url_folder_pieces = script_url_path_folder.split(os.sep) folder_pieces = script_folder.split(os.sep) for pieceurl, piecefolder in zip(reversed(url_folder_pieces), reversed(folder_pieces)): if pieceurl == piecefolder: folder_pieces.pop() url_folder_pieces.pop() else: break base_url_path_folder = os.sep.join(url_folder_pieces) self.base_folder_url = urlparse.urlunsplit( script_url_splitted[:2] + (base_url_path_folder, ) + script_url_splitted[3:]) self.base_folder_path = os.sep.join(folder_pieces)
def gethtml(url): with open('cookies') as f: cookies = requests.utils.cookiejar_from_dict(pickle.load(f)) session = requests.session() session.cookies = cookies del session.cookies['c_visitor'] if not forceusa and localizecookies: session.cookies['c_locale']={u'Español (Espana)' : 'esES', u'Français (France)' : 'frFR', u'Português (Brasil)' : 'ptBR', u'English' : 'enUS', u'Español' : 'esLA', u'Türkçe' : 'enUS', u'Italiano' : 'itIT', u'???????' : 'arME' , u'Deutsch' : 'deDE'}[lang] if forceusa: try: session.cookies['sess_id'] = requests.get('http://www.crunblocker.com/sess_id.php').text except: sleep(10) # sleep so we don't overload crunblocker session.cookies['sess_id'] = requests.get('http://www.crunblocker.com/sess_id.php').text parts = urlparse.urlsplit(url) if not parts.scheme or not parts.netloc: print 'Apparently not a URL' sys.exit() data = {'Referer': 'http://crunchyroll.com/', 'Host': 'www.crunchyroll.com', 'User-Agent': 'Mozilla/5.0 Windows NT 6.1; rv:26.0 Gecko/20100101 Firefox/26.0'} res = session.get(url, params=data) res.encoding = 'UTF-8' return res.text
def __init__(self, base_url, login, api_key): if not base_url.endswith("/"): base_url += "/" self.__base_url = base_url self.__api_key = api_key self.__login = login self._api_version = "api/v1/" self.__unique_code = self.get_unique_code() self._scheme, self._server, self._api_base, _, _ = urlparse.urlsplit(base_url) self.__sign_code = None self.__entity_list = [] self.__general_doc_dict = None self.__logger = None # self.function_list = Command(self, "console/FunctionList", []) entity_list_params = [ {"attr": "entity", "type": "list", "need": False} ] self._entities_detail = Command(self, "console/entity", entity_list_params) self.__init_entities()
def from_html(self, cr, uid, model, field, element, context=None): url = element.find('img').get('src') url_object = urlparse.urlsplit(url) if url_object.path.startswith('/website/image'): # url might be /website/image/<model>/<id>[_<checksum>]/<field>[/<width>x<height>] fragments = url_object.path.split('/') query = dict(urlparse.parse_qsl(url_object.query)) model = query.get('model', fragments[3]) oid = query.get('id', fragments[4].split('_')[0]) field = query.get('field', fragments[5]) item = self.pool[model].browse(cr, uid, int(oid), context=context) return item[field] if self.local_url_re.match(url_object.path): return self.load_local_url(url) return self.load_remote_url(url)
def load_local_url(self, url): match = self.local_url_re.match(urlparse.urlsplit(url).path) rest = match.group('rest') for sep in os.sep, os.altsep: if sep and sep != '/': rest.replace(sep, '/') path = openerp.modules.get_module_resource( match.group('module'), 'static', *(rest.split('/'))) if not path: return None try: with open(path, 'rb') as f: # force complete image load to ensure it's valid image data image = I.open(f) image.load() f.seek(0) return f.read().encode('base64') except Exception: logger.exception("Failed to load local image %r", url) return None