Python six.moves.urllib.parse 模块,urlunsplit() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用six.moves.urllib.parse.urlunsplit()

项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def update_query_parameters(url, query_parameters):
    """
    Return url with updated query parameters.

    Arguments:
        url (str): Original url whose query parameters need to be updated.
        query_parameters (dict): A dictionary containing query parameters to be added to course selection url.

    Returns:
        (slug): slug identifier for the identity provider that can be used for identity verification of
            users associated the enterprise customer of the given user.

    """
    scheme, netloc, path, query_string, fragment = urlsplit(url)
    url_params = parse_qs(query_string)

    # Update url query parameters
    url_params.update(query_parameters)

    return urlunsplit(
        (scheme, netloc, path, urlencode(url_params, doseq=True), fragment),
    )
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def remove_trailing_version_from_href(href):
    """Removes the api version from the href.

    Given: 'http://www.masakari.com/ha/v1.1'
    Returns: 'http://www.masakari.com/ha'

    Given: 'http://www.masakari.com/v1.1'
    Returns: 'http://www.masakari.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.rsplit('/', 1)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if not expression.match(url_parts.pop()):
        LOG.debug('href %s does not contain version', href)
        raise ValueError(_('href %s does not contain version') % href)

    new_path = url_join(*url_parts)
    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def remove_trailing_version_from_href(href):
    """Removes the api version from the href.

    Given: 'http://www.nova.com/compute/v1.1'
    Returns: 'http://www.nova.com/compute'

    Given: 'http://www.nova.com/v1.1'
    Returns: 'http://www.nova.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.rsplit('/', 1)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if not expression.match(url_parts.pop()):
        LOG.debug('href %s does not contain version', href)
        raise ValueError(_('href %s does not contain version') % href)

    new_path = url_join(*url_parts)
    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def remove_version_from_href(href):
    """Removes the first api version from the href.

    Given: 'http://www.meteos.com/v1.1/123'
    Returns: 'http://www.meteos.com/123'

    Given: 'http://www.meteos.com/v1.1'
    Returns: 'http://www.meteos.com'

    """
    parsed_url = parse.urlsplit(href)
    url_parts = parsed_url.path.split('/', 2)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if expression.match(url_parts[1]):
        del url_parts[1]

    new_path = '/'.join(url_parts)

    if new_path == parsed_url.path:
        msg = 'href %s does not contain version' % href
        LOG.debug(msg)
        raise ValueError(msg)

    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return parse.urlunsplit(parsed_url)
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(parse.urlsplit(orig_url))
        prefix_parts = list(parse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        return parse.urlunsplit(url_parts)
项目:webapp2    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def _urlunsplit(scheme=None, netloc=None, path=None, query=None,
                fragment=None):
    """Like ``urlparse.urlunsplit``, but will escape values and urlencode and
    sort query arguments.

    :param scheme:
        URI scheme, e.g., `http` or `https`.
    :param netloc:
        Network location, e.g., `localhost:8080` or `www.google.com`.
    :param path:
        URI path.
    :param query:
        URI query as an escaped string, or a dictionary or list of key-values
        tuples to build a query.
    :param fragment:
        Fragment identifier, also known as "anchor".
    :returns:
        An assembled absolute or relative URI.
    """
    if not scheme or not netloc:
        scheme = ''
        netloc = None

    if path:
        path = quote(_to_utf8(path))

    if query and not isinstance(query, six.string_types):
        if isinstance(query, dict):
            query = six.iteritems(query)

        # Sort args: commonly needed to build signatures for services.
        query = urlencode(sorted(query))

    if fragment:
        fragment = quote(_to_utf8(fragment))

    return urlunsplit((scheme, netloc, path, query, fragment))
项目:scrapy-cdr    作者:TeamHG-Memex    | 项目源码 | 文件源码
def get_path(url):
    p = urlsplit(url)
    return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def __str__(self):
        return urllib_parse.urlunsplit((self.scheme, self.netloc, self.path, self.query, self.fragment))
项目:deb-python-gabbi    作者:openstack    | 项目源码 | 文件源码
def _fully_qualify(environ, url):
        """Turn a URL path into a fully qualified URL."""
        split_url = urlparse.urlsplit(url)
        server_name = environ.get('SERVER_NAME')
        server_port = str(environ.get('SERVER_PORT'))
        server_scheme = environ.get('wsgi.url_scheme')
        if server_port not in ['80', '443']:
            netloc = '%s:%s' % (server_name, server_port)
        else:
            netloc = server_name

        return urlparse.urlunsplit((server_scheme, netloc, split_url.path,
                                    split_url.query, split_url.fragment))
项目:deb-python-gabbi    作者:openstack    | 项目源码 | 文件源码
def _parse_url(self, url):
        """Create a url from test data.

        If provided with a full URL, just return that. If SSL is requested
        set the scheme appropriately.

        Scheme and netloc are saved for later use in comparisons.
        """
        query_params = self.test_data['query_parameters']
        ssl = self.test_data['ssl']

        parsed_url = urlparse.urlsplit(url)
        if not parsed_url.scheme:
            full_url = utils.create_url(url, self.host, port=self.port,
                                        prefix=self.prefix, ssl=ssl)
            # parse again to set updated netloc and scheme
            parsed_url = urlparse.urlsplit(full_url)

        self.scheme = parsed_url.scheme
        self.netloc = parsed_url.netloc

        if query_params:
            query_string = self._update_query_params(parsed_url.query,
                                                     query_params)
        else:
            query_string = parsed_url.query

        return urlparse.urlunsplit((parsed_url.scheme, parsed_url.netloc,
                                    parsed_url.path, query_string, ''))
项目:deb-python-gabbi    作者:openstack    | 项目源码 | 文件源码
def create_url(base_url, host, port=None, prefix='', ssl=False):
    """Given pieces of a path-based url, return a fully qualified url."""
    scheme = 'http'

    # A host with : in it at this stage is assumed to be an IPv6
    # address of some kind (they come in many forms). Port should
    # already have been stripped off.
    if ':' in host and not (host.startswith('[') and host.endswith(']')):
        host = '[%s]' % host

    if port and not _port_follows_standard(port, ssl):
        netloc = '%s:%s' % (host, port)
    else:
        netloc = host

    if ssl:
        scheme = 'https'

    parsed_url = urlparse.urlsplit(base_url)
    query_string = parsed_url.query
    path = parsed_url.path

    # Guard against a prefix of None or the url already having the
    # prefix. Without the startswith check, the tests in prefix.yaml
    # fail. This is a pragmatic fix which does this for any URL in a
    # test request that does not have a scheme and does not
    # distinguish between URLs in a gabbi test file and those
    # generated by the server. Idealy we would not mutate nor need
    # to check URLs returned from the server. Doing that, however,
    # would require more complex data handling than we have now and
    # this covers most common cases and will be okay until someone
    # reports a bug.
    if prefix and not path.startswith(prefix):
        prefix = prefix.rstrip('/')
        path = path.lstrip('/')
        path = '%s/%s' % (prefix, path)

    return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(urlparse.urlsplit(orig_url))
        prefix_parts = list(urlparse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        url_parts[2] = prefix_parts[2] + url_parts[2]
        return urlparse.urlunsplit(url_parts).rstrip('/')
项目:domain-discovery-crawler    作者:TeamHG-Memex    | 项目源码 | 文件源码
def get_path(url):
    p = urlsplit(url)
    return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
项目:deb-python-wsgi-intercept    作者:openstack    | 项目源码 | 文件源码
def _url_from_primitives(self):
        if self.port == 443:
            scheme = 'https'
        else:
            scheme = 'http'

        if self.port and self.port not in [443, 80]:
            port = ':%s' % self.port
        else:
            port = ''
        netloc = self.host + port

        return urlparse.urlunsplit((scheme, netloc, self.script_name,
                                    None, None))
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __init__(self, bus, gconf):
                """Bus is the cherrypy engine and gconf is a dictionary
                containing the CherryPy configuration.
                """

                SimplePlugin.__init__(self, bus)

                if "pybonjour" not in globals():
                        self.start = lambda: None
                        self.exit = lambda: None
                        return

                self.name = "pkg(7) mirror on {0}".format(socket.gethostname())
                self.wanted_name = self.name
                self.regtype = "_pkg5._tcp"
                self.port = gconf["server.socket_port"]
                self.sd_hdl = None
                self.reg_ok = False

                if gconf["server.ssl_certificate"] and \
                    gconf["server.ssl_private_key"]:
                        proto = "https"
                else:
                        proto = "http"

                netloc = "{0}:{1}".format(socket.getfqdn(), self.port)
                self.url = urlunsplit((proto, netloc, '', '', ''))
项目:satellite-populate    作者:SatelliteQE    | 项目源码 | 文件源码
def _get_url(self):
        """Return the base URL of the Foreman deployment being tested.

        The following values from the config file are used to build the URL:

        * ``[server] scheme`` (default: https)
        * ``[server] hostname`` (required)
        * ``[server] port`` (default: none)

        Setting ``port`` to 80 does *not* imply that ``scheme`` is 'https'. If
        ``port`` is 80 and ``scheme`` is unset, ``scheme`` will still default
        to 'https'.

        :return: A URL.
        :rtype: str

        """
        if not self.scheme:
            scheme = 'https'
        else:
            scheme = self.scheme
        # All anticipated error cases have been handled at this point.
        if not self.port:
            return urlunsplit((scheme, self.hostname, '', '', ''))
        else:
            return urlunsplit((
                scheme, '{0}:{1}'.format(self.hostname, self.port), '', '', ''
            ))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(urlparse.urlsplit(orig_url))
        prefix_parts = list(urlparse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        url_parts[2] = prefix_parts[2] + url_parts[2]
        return urlparse.urlunsplit(url_parts).rstrip('/')
项目:django-anymail    作者:anymail    | 项目源码 | 文件源码
def get_request_uri(request):
    """Returns the "exact" url used to call request.

    Like :func:`django.http.request.HTTPRequest.build_absolute_uri`,
    but also inlines HTTP basic auth, if present.
    """
    url = request.build_absolute_uri()
    basic_auth = get_request_basic_auth(request)
    if basic_auth is not None:
        # must reassemble url with auth
        parts = urlsplit(url)
        url = urlunsplit((parts.scheme, basic_auth + '@' + parts.netloc,
                          parts.path, parts.query, parts.fragment))
    return url
项目:chalktalk_docs    作者:loremIpsum1771    | 项目源码 | 文件源码
def encode_uri(uri):
    split = list(urlsplit(uri))
    split[1] = split[1].encode('idna').decode('ascii')
    split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii')
    query = list((q, quote_plus(v.encode('utf-8')))
                 for (q, v) in parse_qsl(split[3]))
    split[3] = urlencode(query).decode('ascii')
    return urlunsplit(split)
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _cs_request(self, url, method, **kwargs):
        if not self.management_url:
            self.authenticate()
        if url is None:
            # To get API version information, it is necessary to GET
            # a dh endpoint directly without "v2/<tenant-id>".
            magic_tuple = parse.urlsplit(self.management_url)
            scheme, netloc, path, query, frag = magic_tuple
            path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path)
            url = parse.urlunsplit((scheme, netloc, path, None, None))
        else:
            if self.service_catalog:
                url = self.get_service_url(self.service_type) + url
            else:
                # NOTE(melwitt): The service catalog is not available
                #                when bypass_url is used.
                url = self.management_url + url

        # Perform the request once. If we get a 401 back then it
        # might be because the auth token expired, so try to
        # re-authenticate and try again. If it still fails, bail.
        try:
            kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
            kwargs['headers']['Content-Type'] = 'application/json'
            if self.projectid:
                kwargs['headers']['X-Auth-Project-Id'] = self.projectid

            resp, body = self._time_request(url, method, **kwargs)
            return resp, body
        except exceptions.Unauthorized as e:
            try:
                # first discard auth token, to avoid the possibly expired
                # token being re-used in the re-authentication attempt
                self.unauthenticate()
                # overwrite bad token
                self.keyring_saved = False
                self.authenticate()
                kwargs['headers']['X-Auth-Token'] = self.auth_token
                resp, body = self._time_request(url, method, **kwargs)
                return resp, body
            except exceptions.Unauthorized:
                raise e
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _cs_request(self, url, method, **kwargs):
        if not self.management_url:
            self.authenticate()
        if url is None:
            # To get API version information, it is necessary to GET
            # a nova endpoint directly without "v2/<tenant-id>".
            magic_tuple = parse.urlsplit(self.management_url)
            scheme, netloc, path, query, frag = magic_tuple
            path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path)
            url = parse.urlunsplit((scheme, netloc, path, None, None))
        else:
            if self.service_catalog:
                url = self.get_service_url(self.service_type) + url
            else:
                # NOTE(melwitt): The service catalog is not available
                #                when bypass_url is used.
                url = self.management_url + url

        # Perform the request once. If we get a 401 back then it
        # might be because the auth token expired, so try to
        # re-authenticate and try again. If it still fails, bail.
        try:
            kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
            kwargs['headers']['Content-Type'] = 'application/json'
            if self.projectid:
                kwargs['headers']['X-Auth-Project-Id'] = self.projectid

            resp, body = self._time_request(url, method, **kwargs)
            return resp, body
        except exceptions.Unauthorized as e:
            try:
                # first discard auth token, to avoid the possibly expired
                # token being re-used in the re-authentication attempt
                self.unauthenticate()
                # overwrite bad token
                self.keyring_saved = False
                self.authenticate()
                kwargs['headers']['X-Auth-Token'] = self.auth_token
                resp, body = self._time_request(url, method, **kwargs)
                return resp, body
            except exceptions.Unauthorized:
                raise e
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _cs_request(self, url, method, **kwargs):
        if not self.management_url:
            self.authenticate()
        if url is None:
            # To get API version information, it is necessary to GET
            # a icgw endpoint directly without "v2/<tenant-id>".
            magic_tuple = parse.urlsplit(self.management_url)
            scheme, netloc, path, query, frag = magic_tuple
            path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path)
            url = parse.urlunsplit((scheme, netloc, path, None, None))
        else:
            if self.service_catalog:
                url = self.get_service_url(self.service_type) + url
            else:
                # NOTE(melwitt): The service catalog is not available
                #                when bypass_url is used.
                url = self.management_url + url

        # Perform the request once. If we get a 401 back then it
        # might be because the auth token expired, so try to
        # re-authenticate and try again. If it still fails, bail.
        try:
            kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
            kwargs['headers']['Content-Type'] = 'application/json'
            if self.projectid:
                kwargs['headers']['X-Auth-Project-Id'] = self.projectid

            resp, body = self._time_request(url, method, **kwargs)
            return resp, body
        except exceptions.Unauthorized as e:
            try:
                # first discard auth token, to avoid the possibly expired
                # token being re-used in the re-authentication attempt
                self.unauthenticate()
                # overwrite bad token
                self.keyring_saved = False
                self.authenticate()
                kwargs['headers']['X-Auth-Token'] = self.auth_token
                resp, body = self._time_request(url, method, **kwargs)
                return resp, body
            except exceptions.Unauthorized:
                raise e