我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib.parse.urlsplit()。
def urlsplit(url, scheme='', allow_fragments=True): """Parse a URL using urlparse.urlsplit(), splitting query and fragments. This function papers over Python issue9374_ when needed. .. _issue9374: http://bugs.python.org/issue9374 The parameters are the same as urlparse.urlsplit. """ scheme, netloc, path, query, fragment = parse.urlsplit( url, scheme, allow_fragments) if allow_fragments and '#' in path: path, fragment = path.split('#', 1) if '?' in path: path, query = path.split('?', 1) return _ModifiedSplitResult(scheme, netloc, path, query, fragment)
def _get_session(self, url): if self._connection_pool: magic_tuple = parse.urlsplit(url) scheme, netloc, path, query, frag = magic_tuple service_url = '%s://%s' % (scheme, netloc) if self._current_url != service_url: # Invalidate Session object in case the url is somehow changed if self._session: self._session.close() self._current_url = service_url self._logger.debug( "New session created for: (%s)" % service_url) self._session = requests.Session() self._session.mount(service_url, self._connection_pool.get(service_url)) return self._session elif self._session: return self._session # @set_headers_param
def _get_session(self, url): if self._connection_pool: magic_tuple = parse.urlsplit(url) scheme, netloc, path, query, frag = magic_tuple service_url = '%s://%s' % (scheme, netloc) if self._current_url != service_url: # Invalidate Session object in case the url is somehow changed if self._session: self._session.close() self._current_url = service_url self._logger.debug( "New session created for: (%s)" % service_url) self._session = requests.Session() self._session.mount(service_url, self._connection_pool.get(service_url)) return self._session elif self._session: return self._session
def load_tests(loader, tests, pattern): """Provide a TestSuite to the discovery process.""" gnocchi_url = os.getenv('GNOCCHI_ENDPOINT') if gnocchi_url: parsed_url = urlparse.urlsplit(gnocchi_url) prefix = parsed_url.path.rstrip('/') # turn it into a prefix # NOTE(chdent): gabbi requires a port be passed or it will # default to 8001, so we must dance a little dance to get # the right ports. Probably gabbi needs to change. # https://github.com/cdent/gabbi/issues/50 port = 443 if parsed_url.scheme == 'https' else 80 if parsed_url.port: port = parsed_url.port test_dir = os.path.join(os.path.dirname(__file__), TESTS_DIR) return driver.build_tests(test_dir, loader, host=parsed_url.hostname, port=port, prefix=prefix) elif os.getenv("GABBI_LIVE"): raise RuntimeError('"GNOCCHI_ENDPOINT" is not set')
def update_query_parameters(url, query_parameters): """ Return url with updated query parameters. Arguments: url (str): Original url whose query parameters need to be updated. query_parameters (dict): A dictionary containing query parameters to be added to course selection url. Returns: (slug): slug identifier for the identity provider that can be used for identity verification of users associated the enterprise customer of the given user. """ scheme, netloc, path, query_string, fragment = urlsplit(url) url_params = parse_qs(query_string) # Update url query parameters url_params.update(query_parameters) return urlunsplit( (scheme, netloc, path, urlencode(url_params, doseq=True), fragment), )
def remove_trailing_version_from_href(href): """Removes the api version from the href. Given: 'http://www.masakari.com/ha/v1.1' Returns: 'http://www.masakari.com/ha' Given: 'http://www.masakari.com/v1.1' Returns: 'http://www.masakari.com' """ parsed_url = urlparse.urlsplit(href) url_parts = parsed_url.path.rsplit('/', 1) # NOTE: this should match vX.X or vX expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)') if not expression.match(url_parts.pop()): LOG.debug('href %s does not contain version', href) raise ValueError(_('href %s does not contain version') % href) new_path = url_join(*url_parts) parsed_url = list(parsed_url) parsed_url[2] = new_path return urlparse.urlunsplit(parsed_url)
def _init_from_url(self, url): port = None parsed_url = urlparse.urlsplit(url) if ':' in parsed_url.netloc: host, port = parsed_url.netloc.split(':') else: host = parsed_url.netloc if not port: if parsed_url.scheme == 'https': port = 443 else: port = 80 path = parsed_url.path if path == '/' or not path: self.script_name = '' else: self.script_name = path self.host = host self.port = int(port)
def resolve(self, url, env, hostname): if hostname in self.proxy_apps.keys(): parts = urlsplit(url) full = parts.path if parts.query: full += '?' + parts.query env['REQUEST_URI'] = full env['wsgiprox.matched_proxy_host'] = hostname env['wsgiprox.proxy_host'] = hostname else: env['REQUEST_URI'] = self.prefix_resolver(url, env) env['wsgiprox.proxy_host'] = self.proxy_host queryparts = env['REQUEST_URI'].split('?', 1) env['PATH_INFO'] = queryparts[0] env['QUERY_STRING'] = queryparts[1] if len(queryparts) > 1 else ''
def remove_trailing_version_from_href(href): """Removes the api version from the href. Given: 'http://www.nova.com/compute/v1.1' Returns: 'http://www.nova.com/compute' Given: 'http://www.nova.com/v1.1' Returns: 'http://www.nova.com' """ parsed_url = urlparse.urlsplit(href) url_parts = parsed_url.path.rsplit('/', 1) # NOTE: this should match vX.X or vX expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)') if not expression.match(url_parts.pop()): LOG.debug('href %s does not contain version', href) raise ValueError(_('href %s does not contain version') % href) new_path = url_join(*url_parts) parsed_url = list(parsed_url) parsed_url[2] = new_path return urlparse.urlunsplit(parsed_url)
def get_plane_uri(cls, observation_uri, product_id): """ Initializes an Plane URI instance Arguments: observation_uri : the uri of the observation product_id : ID of the product """ caom_util.type_check(observation_uri, ObservationURI, "observation_uri", override=False) caom_util.type_check(product_id, str, "observation_uri", override=False) caom_util.validate_path_component(cls, "product_id", product_id) path = urlsplit(observation_uri.uri).path uri = SplitResult(ObservationURI._SCHEME, "", path + "/" + product_id, "", "").geturl() return cls(uri) # Properties
def uri(self, value): caom_util.type_check(value, str, "uri", override=False) tmp = urlsplit(value) if tmp.scheme != ObservationURI._SCHEME: raise ValueError("{} doesn't have an allowed scheme".format(value)) if tmp.geturl() != value: raise ValueError("Failed to parse uri correctly: {}".format(value)) (collection, observation_id, product_id) = tmp.path.split("/") if product_id is None: raise ValueError("Faield to get product ID from uri: {}" .format(value)) self._product_id = product_id self._observation_uri = \ ObservationURI.get_observation_uri(collection, observation_id) self._uri = value
def _url_scheme(self, url): return urlsplit(url).scheme
def stack_output(output): if not output: return u'' if isinstance(output, six.string_types): parts = urlparse.urlsplit(output) if parts.netloc and parts.scheme in ('http', 'https'): url = html.escape(output) safe_link = u'<a href="%s" target="_blank">%s</a>' % (url, url) return safestring.mark_safe(safe_link) if isinstance(output, dict) or isinstance(output, list): output = json.dumps(output, indent=2) return safestring.mark_safe(u'<pre>%s</pre>' % html.escape(output))
def remove_version_from_href(href): """Removes the first api version from the href. Given: 'http://www.meteos.com/v1.1/123' Returns: 'http://www.meteos.com/123' Given: 'http://www.meteos.com/v1.1' Returns: 'http://www.meteos.com' """ parsed_url = parse.urlsplit(href) url_parts = parsed_url.path.split('/', 2) # NOTE: this should match vX.X or vX expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)') if expression.match(url_parts[1]): del url_parts[1] new_path = '/'.join(url_parts) if new_path == parsed_url.path: msg = 'href %s does not contain version' % href LOG.debug(msg) raise ValueError(msg) parsed_url = list(parsed_url) parsed_url[2] = new_path return parse.urlunsplit(parsed_url)
def _update_link_prefix(self, orig_url, prefix): if not prefix: return orig_url url_parts = list(parse.urlsplit(orig_url)) prefix_parts = list(parse.urlsplit(prefix)) url_parts[0:2] = prefix_parts[0:2] return parse.urlunsplit(url_parts)
def _get_url_parts(url): url = _clean_url(url) return urlsplit(url)
def get_host(path): return urlparse.urlsplit(path).netloc
def can_fetch(self, user_agent, url): parsed = urlsplit(url) domain = parsed.netloc if domain in self.robots_txt_cache: robot_txt = self.robots_txt_cache[domain] if time.time() - robot_txt.mtime() > self.robot_txt_age: robot_txt = None else: robot_txt = None if robot_txt is None: robot_txt = RobotFileParser() try: response = yield gen.maybe_future(self.http_client.fetch( urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30)) content = response.body except tornado.httpclient.HTTPError as e: logger.error('load robots.txt from %s error: %r', domain, e) content = '' try: content = content.decode('utf8', 'ignore') except UnicodeDecodeError: content = '' robot_txt.parse(content.splitlines()) self.robots_txt_cache[domain] = robot_txt raise gen.Return(robot_txt.can_fetch(user_agent, url))
def _get_domain_bucket(self, url): parsed = urlparse.urlsplit(url) hostname, _, _ = parsed.netloc.partition(':') return self.domain_cache.setdefault(hostname, {})
def get_path(url): p = urlsplit(url) return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
def _reverse_domain_storage(item, media_root): for obj in item.get('objects', []): stored_url = obj['obj_stored_url'] assert '/' not in stored_url domain = urlsplit(obj['obj_original_url']).netloc if ':' in domain: domain, _ = domain.split(':', 1) parents = [p for p in reversed(domain.split('.')) if p] os.makedirs(os.path.join(media_root, *parents), exist_ok=True) stored_url_noext, _ = os.path.splitext(stored_url) new_stored_url = os.path.sep.join(parents + [stored_url_noext]) dest = os.path.join(media_root, new_stored_url) if not os.path.exists(dest): shutil.copy(os.path.join(media_root, stored_url), dest) obj['obj_stored_url'] = new_stored_url
def take_action(self, args): """Download a recipe from remote URL and save it to a local file under contrib directory. Args: args (:obj:`dict`): Parsed command line arguments. "url" is an URL where a recipe will be downloaded from. """ file_url = args.url filename = parse.urlsplit(file_url).path.split('/')[-1:][0] contrib = utils.get_property_from_config_file('defaults', 'contrib') self._download_recipe(file_url, filename, contrib)
def canonicalize_url(api_root_url): api_root_url = urlparse.urlsplit(api_root_url).geturl() if not api_root_url.endswith("/"): api_root_url += "/" return api_root_url
def open(self, target_uri, **kwargs): """Open target uri. :param target_uri: Uri to open :type target_uri: string :returns: Target object """ target = urlsplit(target_uri, scheme=self.default_opener) opener = self.get_opener(target.scheme) query = opener.conform_query(target.query) target = opener.get_target( target.scheme, target.path, target.fragment, target.username, target.password, target.hostname, target.port, query, **kwargs ) target.opener_path = target_uri return target
def __init__(self, url_string): split_url = urllib_parse.urlsplit(url_string) self.scheme = split_url.scheme #: self.username = split_url.username #: self.password = split_url.password #: self.hostname = split_url.hostname #: self.port = split_url.port #: self.path = split_url.path #: self.query = split_url.query #: self.fragment = split_url.fragment #:
def get_driver(conf): """Return the configured driver.""" split = parse.urlsplit(conf.indexer.url) d = driver.DriverManager('gnocchi.indexer', split.scheme).driver return d(conf)
def _fully_qualify(environ, url): """Turn a URL path into a fully qualified URL.""" split_url = urlparse.urlsplit(url) server_name = environ.get('SERVER_NAME') server_port = str(environ.get('SERVER_PORT')) server_scheme = environ.get('wsgi.url_scheme') if server_port not in ['80', '443']: netloc = '%s:%s' % (server_name, server_port) else: netloc = server_name return urlparse.urlunsplit((server_scheme, netloc, split_url.path, split_url.query, split_url.fragment))
def _parse_url(self, url): """Create a url from test data. If provided with a full URL, just return that. If SSL is requested set the scheme appropriately. Scheme and netloc are saved for later use in comparisons. """ query_params = self.test_data['query_parameters'] ssl = self.test_data['ssl'] parsed_url = urlparse.urlsplit(url) if not parsed_url.scheme: full_url = utils.create_url(url, self.host, port=self.port, prefix=self.prefix, ssl=ssl) # parse again to set updated netloc and scheme parsed_url = urlparse.urlsplit(full_url) self.scheme = parsed_url.scheme self.netloc = parsed_url.netloc if query_params: query_string = self._update_query_params(parsed_url.query, query_params) else: query_string = parsed_url.query return urlparse.urlunsplit((parsed_url.scheme, parsed_url.netloc, parsed_url.path, query_string, ''))
def create_url(base_url, host, port=None, prefix='', ssl=False): """Given pieces of a path-based url, return a fully qualified url.""" scheme = 'http' # A host with : in it at this stage is assumed to be an IPv6 # address of some kind (they come in many forms). Port should # already have been stripped off. if ':' in host and not (host.startswith('[') and host.endswith(']')): host = '[%s]' % host if port and not _port_follows_standard(port, ssl): netloc = '%s:%s' % (host, port) else: netloc = host if ssl: scheme = 'https' parsed_url = urlparse.urlsplit(base_url) query_string = parsed_url.query path = parsed_url.path # Guard against a prefix of None or the url already having the # prefix. Without the startswith check, the tests in prefix.yaml # fail. This is a pragmatic fix which does this for any URL in a # test request that does not have a scheme and does not # distinguish between URLs in a gabbi test file and those # generated by the server. Idealy we would not mutate nor need # to check URLs returned from the server. Doing that, however, # would require more complex data handling than we have now and # this covers most common cases and will be okay until someone # reports a bug. if prefix and not path.startswith(prefix): prefix = prefix.rstrip('/') path = path.lstrip('/') path = '%s/%s' % (prefix, path) return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))
def _get_sync_token(self): url_parts = urlsplit(self.next_sync_url or self.next_page_url) querystring = parse_qs(url_parts.query) return querystring['sync_token'][0]
def parse(self, response): self.responses.append(response) p = urlsplit(response.url) self.visited_urls.append( urlunsplit(['', '', p.path, p.query, p.fragment]) or '/') urls = {link.url for link in self.link_extractor.extract_links(response) if not self._looks_like_logout(link, response)} for url in urls: yield self.make_request(url)
def test_login(settings, extra_settings=None): """ No logout links, just one page after login. """ crawler = make_crawler(settings, **AL_SETTINGS) with MockServer(Login) as s: yield crawler.crawl(url=s.root_url) spider = crawler.spider assert len(spider.visited_urls) == 2 assert set(spider.visited_urls) == {'/', '/hidden'} response = spider.responses[0] assert urlsplit(response.url).path.rstrip('/') == '' assert response.meta['autologin_active'] assert response.meta['autologin_response']['status'] == 'solved'
def test_login_error(settings, extra_settings=None): """ Trying to login with wrong credentials """ al_settings = dict(AL_SETTINGS) al_settings['AUTOLOGIN_PASSWORD'] = 'wrong' crawler = make_crawler(settings, **al_settings) with MockServer(Login) as s: yield crawler.crawl(url=s.root_url) spider = crawler.spider assert len(spider.visited_urls) == 2 assert set(spider.visited_urls) == {'/', '/login'} response = spider.responses[0] assert urlsplit(response.url).path.rstrip('/') == '' assert not response.meta['autologin_active'] assert response.meta['autologin_response']['status'] == 'error'
def create_return_url(base, query, **kwargs): """ Add a query string plus extra parameters to a base URL which may contain a query part already. :param base: redirect_uri may contain a query part, no fragment allowed. :param query: Old query part as a string :param kwargs: extra query parameters :return: """ part = urlsplit(base) if part.fragment: raise ValueError("Base URL contained parts it shouldn't") for key, values in parse_qs(query).items(): if key in kwargs: if isinstance(kwargs[key], six.string_types): kwargs[key] = [kwargs[key]] kwargs[key].extend(values) else: kwargs[key] = values if part.query: for key, values in parse_qs(part.query).items(): if key in kwargs: if isinstance(kwargs[key], six.string_types): kwargs[key] = [kwargs[key]] kwargs[key].extend(values) else: kwargs[key] = values _pre = base.split("?")[0] else: _pre = base logger.debug("kwargs: %s" % kwargs) return "%s?%s" % (_pre, url_encode_params(kwargs))
def _fetch_crl(self, config, url, out, fmt): # type: (ConfigParser, str, str, str) -> bool updated = False url_hash = sha1(url.encode('utf-8')).hexdigest() headers = {} # type: Dict[str, str] try: etag = config.get(CONFIG_SECTION, url_hash) except NoOptionError: pass else: headers = {'If-None-Match': etag} response = requests.get(url, headers=headers) if response.status_code == 200: crl_name = os.path.basename(urlsplit(url).path) crl_name, content = self._format_crl(crl_name, response.content, fmt) crl_path = os.path.join(out, crl_name) with open(crl_path, 'wb') as f: f.write(content) print(crl_path, file=self.stdout) updated = True if 'ETag' in response.headers: config.set(CONFIG_SECTION, url_hash, response.headers['ETag']) elif response.status_code == 304: pass else: print("Error {} downloading {}: {}".format( response.status_code, url, response.content ), file=self.stderr) return updated
def get_querystring(uri): parts = urlparse.urlsplit(uri) if sys.version_info[:2] == (2, 6): query = parts.path if query.startswith('?'): query = query[1:] else: query = parts.query return urlparse.parse_qs(query)
def _update_link_prefix(self, orig_url, prefix): if not prefix: return orig_url url_parts = list(urlparse.urlsplit(orig_url)) prefix_parts = list(urlparse.urlsplit(prefix)) url_parts[0:2] = prefix_parts[0:2] url_parts[2] = prefix_parts[2] + url_parts[2] return urlparse.urlunsplit(url_parts).rstrip('/')
def get_session(domain_or_url): """ ???????? keep-alive ?session :param domain_or_url: ?? :type domain_or_url: str :rtype: requests.Session """ domain = urllib_parse.urlsplit(domain_or_url).netloc or domain_or_url if domain not in pool: pool[domain] = [] if not hasattr(locked_session, "sessdicts"): # ????????????????session # ???session???????, ?? pool ????, ?????????? # ??????, ???? release_lock() ???????session # ??????session?????session? locked_session.sessdicts = [] if not pool[domain]: # ????, ???? session sessdict = { "domain": domain, "sessobj": requests.Session(), } else: # ???????????? sessdict = pool[domain].pop() sessdict["active"] = time.time() locked_session.sessdicts.append(sessdict) if _gc_checkpoint < time.time() - SESSION_TTL: with cleaning_lock: clear() return sessdict["sessobj"] # type: requests.Session
def get_license_from_url(url): """Get the license abbreviation from an URL. Args: url(str): canonical url of the license. Returns: str: the corresponding license abbreviation. Raises: ValueError: when the url is not recognized """ if not url: return split_url = urlsplit(url, scheme='http') if split_url.netloc.lower() == 'creativecommons.org': license = ['CC'] match = _RE_LICENSE_URL.match(split_url.path) license.extend(part.upper() for part in match.groups() if part) elif split_url.netloc == 'arxiv.org': license = ['arXiv'] match = _RE_LICENSE_URL.match(split_url.path) license.extend(part for part in match.groups() if part) else: raise ValueError('Unknown license URL') return u' '.join(license)
def get_querystring(uri): """Get Qeruystring information from uri. :param uri: uri :return: querystring info or {} """ parts = urlparse.urlsplit(uri) if sys.version_info[:2] == (2, 6): query = parts.path if query.startswith('?'): query = query[1:] else: query = parts.query return urlparse.parse_qs(query)
def __init__(self, repouri): """Initialize a RepoStats object. Pass a TransportRepoURI object in repouri to configure an object for a particular repository URI.""" self.__url = repouri.uri.rstrip("/") self.__scheme = urlsplit(self.__url)[0] self.__priority = repouri.priority self.__proxy = repouri.proxy self.__system = repouri.system self._err_decay = 0 self.__failed_tx = 0 self.__content_err = 0 self.__decayable_err = 0 self.__timeout_err = 0 self.__total_tx = 0 self.__consecutive_errors = 0 self.__connections = 0 self.__connect_time = 0.0 self.__used = False self.__bytes_xfr = 0.0 self.__seconds_xfr = 0.0 self.origin_speed = 0.0 self.origin_cspeed = 0.0 self.origin_count = 1 self.origin_factor = 1 self.origin_decay = 1
def __str__(self): illegals = [] for u in self.uris: assert isinstance(u, six.string_types) scheme = urlsplit(u, allow_fragments=0)[0] illegals.append((u, scheme)) if len(illegals) > 1: msg = _("The follwing URIs use unsupported " "schemes. Supported schemes are " "file://, http://, and https://.") for i, s in illegals: msg += _("\n {uri} (scheme: " "{scheme})").format(uri=i, scheme=s) return msg elif len(illegals) == 1: i, s = illegals[0] return _("The URI '{uri}' uses the unsupported " "scheme '{scheme}'. Supported schemes are " "file://, http://, and https://.").format( uri=i, scheme=s) return _("The specified URI uses an unsupported scheme." " Supported schemes are: file://, http://, and " "https://.")
def __str__(self): if self.data: scheme = urlsplit(self.data, allow_fragments=0)[0] return _("The proxy URI '{uri}' uses the unsupported " "scheme '{scheme}'. Currently the only supported " "scheme is http://.").format( uri=self.data, scheme=scheme) return _("The specified proxy URI uses an unsupported scheme." " Currently the only supported scheme is: http://.")
def valid_pub_url(url, proxy=False): """Verify that the publisher URL contains only valid characters. If 'proxy' is set to True, some checks are relaxed.""" if not url: return False # First split the URL and check if the scheme is one we support o = urlsplit(url) if not o[0] in _valid_proto: return False if o[0] == "file": path = urlparse(url, "file", allow_fragments=0)[2] path = url2pathname(path) if not os.path.abspath(path): return False # No further validation to be done. return True # Next verify that the network location is valid if six.PY3: host = urllib.parse.splitport(o[1])[0] else: host = urllib.splitport(o[1])[0] if proxy: # We may have authentication details in the proxy URI, which # we must ignore when checking for hostname validity. host_parts = host.split("@") if len(host_parts) == 2: host = host[1] if not host or _invalid_host_chars.match(host): return False if _hostname_re.match(host): return True return False
def get_proxy_slot(self, proxy): """ Return downloader slot for a proxy. By default it doesn't take port in account, i.e. all proxies with the same hostname / ip address share the same slot. """ # FIXME: an option to use website address as a part of slot as well? return urlsplit(proxy).hostname
def convert_env(self): full_uri = self.environ['REQUEST_URI'] parts = urlsplit(full_uri) self.resolve(full_uri, self.environ, parts.netloc.split(':')[0]) for header in list(self.environ.keys()): if header in self.FILTER_REQ_HEADERS: self.environ.pop(header, '')
def get_id_from_href(href): """Return the id or uuid portion of a url. Given: 'http://www.foo.com/bar/123?q=4' Returns: '123' Given: 'http://www.foo.com/bar/abc123?q=4' Returns: 'abc123' """ return urlparse.urlsplit("%s" % href).path.split('/')[-1]