我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用six.moves.urllib.parse.urlunsplit()。
def update_query_parameters(url, query_parameters): """ Return url with updated query parameters. Arguments: url (str): Original url whose query parameters need to be updated. query_parameters (dict): A dictionary containing query parameters to be added to course selection url. Returns: (slug): slug identifier for the identity provider that can be used for identity verification of users associated the enterprise customer of the given user. """ scheme, netloc, path, query_string, fragment = urlsplit(url) url_params = parse_qs(query_string) # Update url query parameters url_params.update(query_parameters) return urlunsplit( (scheme, netloc, path, urlencode(url_params, doseq=True), fragment), )
def remove_trailing_version_from_href(href): """Removes the api version from the href. Given: 'http://www.masakari.com/ha/v1.1' Returns: 'http://www.masakari.com/ha' Given: 'http://www.masakari.com/v1.1' Returns: 'http://www.masakari.com' """ parsed_url = urlparse.urlsplit(href) url_parts = parsed_url.path.rsplit('/', 1) # NOTE: this should match vX.X or vX expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)') if not expression.match(url_parts.pop()): LOG.debug('href %s does not contain version', href) raise ValueError(_('href %s does not contain version') % href) new_path = url_join(*url_parts) parsed_url = list(parsed_url) parsed_url[2] = new_path return urlparse.urlunsplit(parsed_url)
def remove_trailing_version_from_href(href): """Removes the api version from the href. Given: 'http://www.nova.com/compute/v1.1' Returns: 'http://www.nova.com/compute' Given: 'http://www.nova.com/v1.1' Returns: 'http://www.nova.com' """ parsed_url = urlparse.urlsplit(href) url_parts = parsed_url.path.rsplit('/', 1) # NOTE: this should match vX.X or vX expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)') if not expression.match(url_parts.pop()): LOG.debug('href %s does not contain version', href) raise ValueError(_('href %s does not contain version') % href) new_path = url_join(*url_parts) parsed_url = list(parsed_url) parsed_url[2] = new_path return urlparse.urlunsplit(parsed_url)
def remove_version_from_href(href): """Removes the first api version from the href. Given: 'http://www.meteos.com/v1.1/123' Returns: 'http://www.meteos.com/123' Given: 'http://www.meteos.com/v1.1' Returns: 'http://www.meteos.com' """ parsed_url = parse.urlsplit(href) url_parts = parsed_url.path.split('/', 2) # NOTE: this should match vX.X or vX expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)') if expression.match(url_parts[1]): del url_parts[1] new_path = '/'.join(url_parts) if new_path == parsed_url.path: msg = 'href %s does not contain version' % href LOG.debug(msg) raise ValueError(msg) parsed_url = list(parsed_url) parsed_url[2] = new_path return parse.urlunsplit(parsed_url)
def _update_link_prefix(self, orig_url, prefix): if not prefix: return orig_url url_parts = list(parse.urlsplit(orig_url)) prefix_parts = list(parse.urlsplit(prefix)) url_parts[0:2] = prefix_parts[0:2] return parse.urlunsplit(url_parts)
def _urlunsplit(scheme=None, netloc=None, path=None, query=None, fragment=None): """Like ``urlparse.urlunsplit``, but will escape values and urlencode and sort query arguments. :param scheme: URI scheme, e.g., `http` or `https`. :param netloc: Network location, e.g., `localhost:8080` or `www.google.com`. :param path: URI path. :param query: URI query as an escaped string, or a dictionary or list of key-values tuples to build a query. :param fragment: Fragment identifier, also known as "anchor". :returns: An assembled absolute or relative URI. """ if not scheme or not netloc: scheme = '' netloc = None if path: path = quote(_to_utf8(path)) if query and not isinstance(query, six.string_types): if isinstance(query, dict): query = six.iteritems(query) # Sort args: commonly needed to build signatures for services. query = urlencode(sorted(query)) if fragment: fragment = quote(_to_utf8(fragment)) return urlunsplit((scheme, netloc, path, query, fragment))
def get_path(url): p = urlsplit(url) return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
def __str__(self): return urllib_parse.urlunsplit((self.scheme, self.netloc, self.path, self.query, self.fragment))
def _fully_qualify(environ, url): """Turn a URL path into a fully qualified URL.""" split_url = urlparse.urlsplit(url) server_name = environ.get('SERVER_NAME') server_port = str(environ.get('SERVER_PORT')) server_scheme = environ.get('wsgi.url_scheme') if server_port not in ['80', '443']: netloc = '%s:%s' % (server_name, server_port) else: netloc = server_name return urlparse.urlunsplit((server_scheme, netloc, split_url.path, split_url.query, split_url.fragment))
def _parse_url(self, url): """Create a url from test data. If provided with a full URL, just return that. If SSL is requested set the scheme appropriately. Scheme and netloc are saved for later use in comparisons. """ query_params = self.test_data['query_parameters'] ssl = self.test_data['ssl'] parsed_url = urlparse.urlsplit(url) if not parsed_url.scheme: full_url = utils.create_url(url, self.host, port=self.port, prefix=self.prefix, ssl=ssl) # parse again to set updated netloc and scheme parsed_url = urlparse.urlsplit(full_url) self.scheme = parsed_url.scheme self.netloc = parsed_url.netloc if query_params: query_string = self._update_query_params(parsed_url.query, query_params) else: query_string = parsed_url.query return urlparse.urlunsplit((parsed_url.scheme, parsed_url.netloc, parsed_url.path, query_string, ''))
def create_url(base_url, host, port=None, prefix='', ssl=False): """Given pieces of a path-based url, return a fully qualified url.""" scheme = 'http' # A host with : in it at this stage is assumed to be an IPv6 # address of some kind (they come in many forms). Port should # already have been stripped off. if ':' in host and not (host.startswith('[') and host.endswith(']')): host = '[%s]' % host if port and not _port_follows_standard(port, ssl): netloc = '%s:%s' % (host, port) else: netloc = host if ssl: scheme = 'https' parsed_url = urlparse.urlsplit(base_url) query_string = parsed_url.query path = parsed_url.path # Guard against a prefix of None or the url already having the # prefix. Without the startswith check, the tests in prefix.yaml # fail. This is a pragmatic fix which does this for any URL in a # test request that does not have a scheme and does not # distinguish between URLs in a gabbi test file and those # generated by the server. Idealy we would not mutate nor need # to check URLs returned from the server. Doing that, however, # would require more complex data handling than we have now and # this covers most common cases and will be okay until someone # reports a bug. if prefix and not path.startswith(prefix): prefix = prefix.rstrip('/') path = path.lstrip('/') path = '%s/%s' % (prefix, path) return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))
def _update_link_prefix(self, orig_url, prefix): if not prefix: return orig_url url_parts = list(urlparse.urlsplit(orig_url)) prefix_parts = list(urlparse.urlsplit(prefix)) url_parts[0:2] = prefix_parts[0:2] url_parts[2] = prefix_parts[2] + url_parts[2] return urlparse.urlunsplit(url_parts).rstrip('/')
def _url_from_primitives(self): if self.port == 443: scheme = 'https' else: scheme = 'http' if self.port and self.port not in [443, 80]: port = ':%s' % self.port else: port = '' netloc = self.host + port return urlparse.urlunsplit((scheme, netloc, self.script_name, None, None))
def __init__(self, bus, gconf): """Bus is the cherrypy engine and gconf is a dictionary containing the CherryPy configuration. """ SimplePlugin.__init__(self, bus) if "pybonjour" not in globals(): self.start = lambda: None self.exit = lambda: None return self.name = "pkg(7) mirror on {0}".format(socket.gethostname()) self.wanted_name = self.name self.regtype = "_pkg5._tcp" self.port = gconf["server.socket_port"] self.sd_hdl = None self.reg_ok = False if gconf["server.ssl_certificate"] and \ gconf["server.ssl_private_key"]: proto = "https" else: proto = "http" netloc = "{0}:{1}".format(socket.getfqdn(), self.port) self.url = urlunsplit((proto, netloc, '', '', ''))
def _get_url(self): """Return the base URL of the Foreman deployment being tested. The following values from the config file are used to build the URL: * ``[server] scheme`` (default: https) * ``[server] hostname`` (required) * ``[server] port`` (default: none) Setting ``port`` to 80 does *not* imply that ``scheme`` is 'https'. If ``port`` is 80 and ``scheme`` is unset, ``scheme`` will still default to 'https'. :return: A URL. :rtype: str """ if not self.scheme: scheme = 'https' else: scheme = self.scheme # All anticipated error cases have been handled at this point. if not self.port: return urlunsplit((scheme, self.hostname, '', '', '')) else: return urlunsplit(( scheme, '{0}:{1}'.format(self.hostname, self.port), '', '', '' ))
def get_request_uri(request): """Returns the "exact" url used to call request. Like :func:`django.http.request.HTTPRequest.build_absolute_uri`, but also inlines HTTP basic auth, if present. """ url = request.build_absolute_uri() basic_auth = get_request_basic_auth(request) if basic_auth is not None: # must reassemble url with auth parts = urlsplit(url) url = urlunsplit((parts.scheme, basic_auth + '@' + parts.netloc, parts.path, parts.query, parts.fragment)) return url
def encode_uri(uri): split = list(urlsplit(uri)) split[1] = split[1].encode('idna').decode('ascii') split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii') query = list((q, quote_plus(v.encode('utf-8'))) for (q, v) in parse_qsl(split[3])) split[3] = urlencode(query).decode('ascii') return urlunsplit(split)
def _cs_request(self, url, method, **kwargs): if not self.management_url: self.authenticate() if url is None: # To get API version information, it is necessary to GET # a dh endpoint directly without "v2/<tenant-id>". magic_tuple = parse.urlsplit(self.management_url) scheme, netloc, path, query, frag = magic_tuple path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path) url = parse.urlunsplit((scheme, netloc, path, None, None)) else: if self.service_catalog: url = self.get_service_url(self.service_type) + url else: # NOTE(melwitt): The service catalog is not available # when bypass_url is used. url = self.management_url + url # Perform the request once. If we get a 401 back then it # might be because the auth token expired, so try to # re-authenticate and try again. If it still fails, bail. try: kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token kwargs['headers']['Content-Type'] = 'application/json' if self.projectid: kwargs['headers']['X-Auth-Project-Id'] = self.projectid resp, body = self._time_request(url, method, **kwargs) return resp, body except exceptions.Unauthorized as e: try: # first discard auth token, to avoid the possibly expired # token being re-used in the re-authentication attempt self.unauthenticate() # overwrite bad token self.keyring_saved = False self.authenticate() kwargs['headers']['X-Auth-Token'] = self.auth_token resp, body = self._time_request(url, method, **kwargs) return resp, body except exceptions.Unauthorized: raise e
def _cs_request(self, url, method, **kwargs): if not self.management_url: self.authenticate() if url is None: # To get API version information, it is necessary to GET # a nova endpoint directly without "v2/<tenant-id>". magic_tuple = parse.urlsplit(self.management_url) scheme, netloc, path, query, frag = magic_tuple path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path) url = parse.urlunsplit((scheme, netloc, path, None, None)) else: if self.service_catalog: url = self.get_service_url(self.service_type) + url else: # NOTE(melwitt): The service catalog is not available # when bypass_url is used. url = self.management_url + url # Perform the request once. If we get a 401 back then it # might be because the auth token expired, so try to # re-authenticate and try again. If it still fails, bail. try: kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token kwargs['headers']['Content-Type'] = 'application/json' if self.projectid: kwargs['headers']['X-Auth-Project-Id'] = self.projectid resp, body = self._time_request(url, method, **kwargs) return resp, body except exceptions.Unauthorized as e: try: # first discard auth token, to avoid the possibly expired # token being re-used in the re-authentication attempt self.unauthenticate() # overwrite bad token self.keyring_saved = False self.authenticate() kwargs['headers']['X-Auth-Token'] = self.auth_token resp, body = self._time_request(url, method, **kwargs) return resp, body except exceptions.Unauthorized: raise e
def _cs_request(self, url, method, **kwargs): if not self.management_url: self.authenticate() if url is None: # To get API version information, it is necessary to GET # a icgw endpoint directly without "v2/<tenant-id>". magic_tuple = parse.urlsplit(self.management_url) scheme, netloc, path, query, frag = magic_tuple path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path) url = parse.urlunsplit((scheme, netloc, path, None, None)) else: if self.service_catalog: url = self.get_service_url(self.service_type) + url else: # NOTE(melwitt): The service catalog is not available # when bypass_url is used. url = self.management_url + url # Perform the request once. If we get a 401 back then it # might be because the auth token expired, so try to # re-authenticate and try again. If it still fails, bail. try: kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token kwargs['headers']['Content-Type'] = 'application/json' if self.projectid: kwargs['headers']['X-Auth-Project-Id'] = self.projectid resp, body = self._time_request(url, method, **kwargs) return resp, body except exceptions.Unauthorized as e: try: # first discard auth token, to avoid the possibly expired # token being re-used in the re-authentication attempt self.unauthenticate() # overwrite bad token self.keyring_saved = False self.authenticate() kwargs['headers']['X-Auth-Token'] = self.auth_token resp, body = self._time_request(url, method, **kwargs) return resp, body except exceptions.Unauthorized: raise e