Python six.moves.urllib.parse 模块,urlsplit() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib.parse.urlsplit()

项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def urlsplit(url, scheme='', allow_fragments=True):
    """Parse a URL using urlparse.urlsplit(), splitting query and fragments.
    This function papers over Python issue9374_ when needed.

    .. _issue9374: http://bugs.python.org/issue9374

    The parameters are the same as urlparse.urlsplit.
    """
    scheme, netloc, path, query, fragment = parse.urlsplit(
        url, scheme, allow_fragments)
    if allow_fragments and '#' in path:
        path, fragment = path.split('#', 1)
    if '?' in path:
        path, query = path.split('?', 1)
    return _ModifiedSplitResult(scheme, netloc,
                                path, query, fragment)
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _get_session(self, url):
        if self._connection_pool:
            magic_tuple = parse.urlsplit(url)
            scheme, netloc, path, query, frag = magic_tuple
            service_url = '%s://%s' % (scheme, netloc)
            if self._current_url != service_url:
                # Invalidate Session object in case the url is somehow changed
                if self._session:
                    self._session.close()
                self._current_url = service_url
                self._logger.debug(
                    "New session created for: (%s)" % service_url)
                self._session = requests.Session()
                self._session.mount(service_url,
                                    self._connection_pool.get(service_url))
            return self._session
        elif self._session:
            return self._session

    # @set_headers_param
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _get_session(self, url):
        if self._connection_pool:
            magic_tuple = parse.urlsplit(url)
            scheme, netloc, path, query, frag = magic_tuple
            service_url = '%s://%s' % (scheme, netloc)
            if self._current_url != service_url:
                # Invalidate Session object in case the url is somehow changed
                if self._session:
                    self._session.close()
                self._current_url = service_url
                self._logger.debug(
                    "New session created for: (%s)" % service_url)
                self._session = requests.Session()
                self._session.mount(service_url,
                                    self._connection_pool.get(service_url))
            return self._session
        elif self._session:
            return self._session
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _get_session(self, url):
        if self._connection_pool:
            magic_tuple = parse.urlsplit(url)
            scheme, netloc, path, query, frag = magic_tuple
            service_url = '%s://%s' % (scheme, netloc)
            if self._current_url != service_url:
                # Invalidate Session object in case the url is somehow changed
                if self._session:
                    self._session.close()
                self._current_url = service_url
                self._logger.debug(
                    "New session created for: (%s)" % service_url)
                self._session = requests.Session()
                self._session.mount(service_url,
                                    self._connection_pool.get(service_url))
            return self._session
        elif self._session:
            return self._session
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def load_tests(loader, tests, pattern):
    """Provide a TestSuite to the discovery process."""
    gnocchi_url = os.getenv('GNOCCHI_ENDPOINT')
    if gnocchi_url:
        parsed_url = urlparse.urlsplit(gnocchi_url)
        prefix = parsed_url.path.rstrip('/')  # turn it into a prefix

        # NOTE(chdent): gabbi requires a port be passed or it will
        # default to 8001, so we must dance a little dance to get
        # the right ports. Probably gabbi needs to change.
        # https://github.com/cdent/gabbi/issues/50
        port = 443 if parsed_url.scheme == 'https' else 80
        if parsed_url.port:
            port = parsed_url.port

        test_dir = os.path.join(os.path.dirname(__file__), TESTS_DIR)
        return driver.build_tests(test_dir, loader,
                                  host=parsed_url.hostname,
                                  port=port,
                                  prefix=prefix)
    elif os.getenv("GABBI_LIVE"):
        raise RuntimeError('"GNOCCHI_ENDPOINT" is not set')
项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def update_query_parameters(url, query_parameters):
    """
    Return url with updated query parameters.

    Arguments:
        url (str): Original url whose query parameters need to be updated.
        query_parameters (dict): A dictionary containing query parameters to be added to course selection url.

    Returns:
        (slug): slug identifier for the identity provider that can be used for identity verification of
            users associated the enterprise customer of the given user.

    """
    scheme, netloc, path, query_string, fragment = urlsplit(url)
    url_params = parse_qs(query_string)

    # Update url query parameters
    url_params.update(query_parameters)

    return urlunsplit(
        (scheme, netloc, path, urlencode(url_params, doseq=True), fragment),
    )
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def remove_trailing_version_from_href(href):
    """Removes the api version from the href.

    Given: 'http://www.masakari.com/ha/v1.1'
    Returns: 'http://www.masakari.com/ha'

    Given: 'http://www.masakari.com/v1.1'
    Returns: 'http://www.masakari.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.rsplit('/', 1)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if not expression.match(url_parts.pop()):
        LOG.debug('href %s does not contain version', href)
        raise ValueError(_('href %s does not contain version') % href)

    new_path = url_join(*url_parts)
    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
项目:deb-python-wsgi-intercept    作者:openstack    | 项目源码 | 文件源码
def _init_from_url(self, url):
        port = None
        parsed_url = urlparse.urlsplit(url)
        if ':' in parsed_url.netloc:
            host, port = parsed_url.netloc.split(':')
        else:
            host = parsed_url.netloc
        if not port:
            if parsed_url.scheme == 'https':
                port = 443
            else:
                port = 80
        path = parsed_url.path
        if path == '/' or not path:
            self.script_name = ''
        else:
            self.script_name = path
        self.host = host
        self.port = int(port)
项目:wsgiprox    作者:webrecorder    | 项目源码 | 文件源码
def resolve(self, url, env, hostname):
        if hostname in self.proxy_apps.keys():
            parts = urlsplit(url)
            full = parts.path
            if parts.query:
                full += '?' + parts.query

            env['REQUEST_URI'] = full
            env['wsgiprox.matched_proxy_host'] = hostname
            env['wsgiprox.proxy_host'] = hostname
        else:
            env['REQUEST_URI'] = self.prefix_resolver(url, env)
            env['wsgiprox.proxy_host'] = self.proxy_host

        queryparts = env['REQUEST_URI'].split('?', 1)

        env['PATH_INFO'] = queryparts[0]

        env['QUERY_STRING'] = queryparts[1] if len(queryparts) > 1 else ''
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def remove_trailing_version_from_href(href):
    """Removes the api version from the href.

    Given: 'http://www.nova.com/compute/v1.1'
    Returns: 'http://www.nova.com/compute'

    Given: 'http://www.nova.com/v1.1'
    Returns: 'http://www.nova.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.rsplit('/', 1)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if not expression.match(url_parts.pop()):
        LOG.debug('href %s does not contain version', href)
        raise ValueError(_('href %s does not contain version') % href)

    new_path = url_join(*url_parts)
    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
项目:caom2tools    作者:opencadc    | 项目源码 | 文件源码
def get_plane_uri(cls, observation_uri, product_id):
        """
        Initializes an Plane URI instance

        Arguments:
        observation_uri : the uri of the observation
        product_id : ID of the product
        """
        caom_util.type_check(observation_uri, ObservationURI,
                             "observation_uri",
                             override=False)
        caom_util.type_check(product_id, str, "observation_uri",
                             override=False)
        caom_util.validate_path_component(cls, "product_id", product_id)

        path = urlsplit(observation_uri.uri).path
        uri = SplitResult(ObservationURI._SCHEME, "", path + "/" +
                          product_id, "", "").geturl()
        return cls(uri)

    # Properties
项目:caom2tools    作者:opencadc    | 项目源码 | 文件源码
def uri(self, value):

        caom_util.type_check(value, str, "uri", override=False)
        tmp = urlsplit(value)

        if tmp.scheme != ObservationURI._SCHEME:
            raise ValueError("{} doesn't have an allowed scheme".format(value))
        if tmp.geturl() != value:
            raise ValueError("Failed to parse uri correctly: {}".format(value))

        (collection, observation_id, product_id) = tmp.path.split("/")

        if product_id is None:
            raise ValueError("Faield to get product ID from uri: {}"
                             .format(value))

        self._product_id = product_id
        self._observation_uri = \
            ObservationURI.get_observation_uri(collection, observation_id)
        self._uri = value
项目:icrawler    作者:hellock    | 项目源码 | 文件源码
def _url_scheme(self, url):
        return urlsplit(url).scheme
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def stack_output(output):
    if not output:
        return u''
    if isinstance(output, six.string_types):
        parts = urlparse.urlsplit(output)
        if parts.netloc and parts.scheme in ('http', 'https'):
            url = html.escape(output)
            safe_link = u'<a href="%s" target="_blank">%s</a>' % (url, url)
            return safestring.mark_safe(safe_link)
    if isinstance(output, dict) or isinstance(output, list):
        output = json.dumps(output, indent=2)
    return safestring.mark_safe(u'<pre>%s</pre>' % html.escape(output))
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def remove_version_from_href(href):
    """Removes the first api version from the href.

    Given: 'http://www.meteos.com/v1.1/123'
    Returns: 'http://www.meteos.com/123'

    Given: 'http://www.meteos.com/v1.1'
    Returns: 'http://www.meteos.com'

    """
    parsed_url = parse.urlsplit(href)
    url_parts = parsed_url.path.split('/', 2)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if expression.match(url_parts[1]):
        del url_parts[1]

    new_path = '/'.join(url_parts)

    if new_path == parsed_url.path:
        msg = 'href %s does not contain version' % href
        LOG.debug(msg)
        raise ValueError(msg)

    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return parse.urlunsplit(parsed_url)
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(parse.urlsplit(orig_url))
        prefix_parts = list(parse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        return parse.urlunsplit(url_parts)
项目:callisto-core    作者:project-callisto    | 项目源码 | 文件源码
def _get_url_parts(url):
    url = _clean_url(url)
    return urlsplit(url)
项目:fuel-ccp    作者:openstack    | 项目源码 | 文件源码
def get_host(path):
    return urlparse.urlsplit(path).netloc
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def can_fetch(self, user_agent, url):
        parsed = urlsplit(url)
        domain = parsed.netloc
        if domain in self.robots_txt_cache:
            robot_txt = self.robots_txt_cache[domain]
            if time.time() - robot_txt.mtime() > self.robot_txt_age:
                robot_txt = None
        else:
            robot_txt = None

        if robot_txt is None:
            robot_txt = RobotFileParser()
            try:
                response = yield gen.maybe_future(self.http_client.fetch(
                    urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
                content = response.body
            except tornado.httpclient.HTTPError as e:
                logger.error('load robots.txt from %s error: %r', domain, e)
                content = ''

            try:
                content = content.decode('utf8', 'ignore')
            except UnicodeDecodeError:
                content = ''

            robot_txt.parse(content.splitlines())
            self.robots_txt_cache[domain] = robot_txt

        raise gen.Return(robot_txt.can_fetch(user_agent, url))
项目:frontera-google-docker    作者:casertap    | 项目源码 | 文件源码
def _get_domain_bucket(self, url):
        parsed = urlparse.urlsplit(url)
        hostname, _, _ = parsed.netloc.partition(':')
        return self.domain_cache.setdefault(hostname, {})
项目:scrapy-cdr    作者:TeamHG-Memex    | 项目源码 | 文件源码
def get_path(url):
    p = urlsplit(url)
    return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
项目:scrapy-cdr    作者:TeamHG-Memex    | 项目源码 | 文件源码
def _reverse_domain_storage(item, media_root):
    for obj in item.get('objects', []):
        stored_url = obj['obj_stored_url']
        assert '/' not in stored_url
        domain = urlsplit(obj['obj_original_url']).netloc
        if ':' in domain:
            domain, _ = domain.split(':', 1)
        parents = [p for p in reversed(domain.split('.')) if p]
        os.makedirs(os.path.join(media_root, *parents), exist_ok=True)
        stored_url_noext, _ = os.path.splitext(stored_url)
        new_stored_url = os.path.sep.join(parents + [stored_url_noext])
        dest = os.path.join(media_root, new_stored_url)
        if not os.path.exists(dest):
            shutil.copy(os.path.join(media_root, stored_url), dest)
        obj['obj_stored_url'] = new_stored_url
项目:aws-vapor    作者:ohtomi    | 项目源码 | 文件源码
def take_action(self, args):
        """Download a recipe from remote URL and save it to a local file under contrib directory.

        Args:
            args (:obj:`dict`): Parsed command line arguments.
                "url" is an URL where a recipe will be downloaded from.
        """
        file_url = args.url
        filename = parse.urlsplit(file_url).path.split('/')[-1:][0]
        contrib = utils.get_property_from_config_file('defaults', 'contrib')
        self._download_recipe(file_url, filename, contrib)
项目:cti-taxii-client    作者:oasis-open    | 项目源码 | 文件源码
def canonicalize_url(api_root_url):
    api_root_url = urlparse.urlsplit(api_root_url).geturl()
    if not api_root_url.endswith("/"):
        api_root_url += "/"
    return api_root_url
项目:targets-python    作者:targets-fs    | 项目源码 | 文件源码
def open(self, target_uri, **kwargs):
        """Open target uri.

        :param target_uri: Uri to open
        :type target_uri: string

        :returns: Target object

        """
        target = urlsplit(target_uri, scheme=self.default_opener)

        opener = self.get_opener(target.scheme)
        query = opener.conform_query(target.query)

        target = opener.get_target(
            target.scheme,
            target.path,
            target.fragment,
            target.username,
            target.password,
            target.hostname,
            target.port,
            query,
            **kwargs
        )
        target.opener_path = target_uri

        return target
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def __init__(self, url_string):
        split_url = urllib_parse.urlsplit(url_string)
        self.scheme = split_url.scheme     #:
        self.username = split_url.username #:
        self.password = split_url.password #:
        self.hostname = split_url.hostname #:
        self.port = split_url.port         #:
        self.path = split_url.path         #:
        self.query = split_url.query       #:
        self.fragment = split_url.fragment #:
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def get_driver(conf):
    """Return the configured driver."""
    split = parse.urlsplit(conf.indexer.url)
    d = driver.DriverManager('gnocchi.indexer',
                             split.scheme).driver
    return d(conf)
项目:deb-python-gabbi    作者:openstack    | 项目源码 | 文件源码
def _fully_qualify(environ, url):
        """Turn a URL path into a fully qualified URL."""
        split_url = urlparse.urlsplit(url)
        server_name = environ.get('SERVER_NAME')
        server_port = str(environ.get('SERVER_PORT'))
        server_scheme = environ.get('wsgi.url_scheme')
        if server_port not in ['80', '443']:
            netloc = '%s:%s' % (server_name, server_port)
        else:
            netloc = server_name

        return urlparse.urlunsplit((server_scheme, netloc, split_url.path,
                                    split_url.query, split_url.fragment))
项目:deb-python-gabbi    作者:openstack    | 项目源码 | 文件源码
def _parse_url(self, url):
        """Create a url from test data.

        If provided with a full URL, just return that. If SSL is requested
        set the scheme appropriately.

        Scheme and netloc are saved for later use in comparisons.
        """
        query_params = self.test_data['query_parameters']
        ssl = self.test_data['ssl']

        parsed_url = urlparse.urlsplit(url)
        if not parsed_url.scheme:
            full_url = utils.create_url(url, self.host, port=self.port,
                                        prefix=self.prefix, ssl=ssl)
            # parse again to set updated netloc and scheme
            parsed_url = urlparse.urlsplit(full_url)

        self.scheme = parsed_url.scheme
        self.netloc = parsed_url.netloc

        if query_params:
            query_string = self._update_query_params(parsed_url.query,
                                                     query_params)
        else:
            query_string = parsed_url.query

        return urlparse.urlunsplit((parsed_url.scheme, parsed_url.netloc,
                                    parsed_url.path, query_string, ''))
项目:deb-python-gabbi    作者:openstack    | 项目源码 | 文件源码
def create_url(base_url, host, port=None, prefix='', ssl=False):
    """Given pieces of a path-based url, return a fully qualified url."""
    scheme = 'http'

    # A host with : in it at this stage is assumed to be an IPv6
    # address of some kind (they come in many forms). Port should
    # already have been stripped off.
    if ':' in host and not (host.startswith('[') and host.endswith(']')):
        host = '[%s]' % host

    if port and not _port_follows_standard(port, ssl):
        netloc = '%s:%s' % (host, port)
    else:
        netloc = host

    if ssl:
        scheme = 'https'

    parsed_url = urlparse.urlsplit(base_url)
    query_string = parsed_url.query
    path = parsed_url.path

    # Guard against a prefix of None or the url already having the
    # prefix. Without the startswith check, the tests in prefix.yaml
    # fail. This is a pragmatic fix which does this for any URL in a
    # test request that does not have a scheme and does not
    # distinguish between URLs in a gabbi test file and those
    # generated by the server. Idealy we would not mutate nor need
    # to check URLs returned from the server. Doing that, however,
    # would require more complex data handling than we have now and
    # this covers most common cases and will be okay until someone
    # reports a bug.
    if prefix and not path.startswith(prefix):
        prefix = prefix.rstrip('/')
        path = path.lstrip('/')
        path = '%s/%s' % (prefix, path)

    return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))
项目:contentful.py    作者:contentful    | 项目源码 | 文件源码
def _get_sync_token(self):
        url_parts = urlsplit(self.next_sync_url or self.next_page_url)
        querystring = parse_qs(url_parts.query)
        return querystring['sync_token'][0]
项目:autologin-middleware    作者:TeamHG-Memex    | 项目源码 | 文件源码
def parse(self, response):
        self.responses.append(response)
        p = urlsplit(response.url)
        self.visited_urls.append(
            urlunsplit(['', '', p.path, p.query, p.fragment]) or '/')
        urls = {link.url for link in
                self.link_extractor.extract_links(response)
                if not self._looks_like_logout(link, response)}
        for url in urls:
            yield self.make_request(url)
项目:autologin-middleware    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_login(settings, extra_settings=None):
    """ No logout links, just one page after login.
    """
    crawler = make_crawler(settings, **AL_SETTINGS)
    with MockServer(Login) as s:
        yield crawler.crawl(url=s.root_url)
    spider = crawler.spider
    assert len(spider.visited_urls) == 2
    assert set(spider.visited_urls) == {'/', '/hidden'}
    response = spider.responses[0]
    assert urlsplit(response.url).path.rstrip('/') == ''
    assert response.meta['autologin_active']
    assert response.meta['autologin_response']['status'] == 'solved'
项目:autologin-middleware    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_login_error(settings, extra_settings=None):
    """ Trying to login with wrong credentials
    """
    al_settings = dict(AL_SETTINGS)
    al_settings['AUTOLOGIN_PASSWORD'] = 'wrong'
    crawler = make_crawler(settings, **al_settings)
    with MockServer(Login) as s:
        yield crawler.crawl(url=s.root_url)
    spider = crawler.spider
    assert len(spider.visited_urls) == 2
    assert set(spider.visited_urls) == {'/', '/login'}
    response = spider.responses[0]
    assert urlsplit(response.url).path.rstrip('/') == ''
    assert not response.meta['autologin_active']
    assert response.meta['autologin_response']['status'] == 'error'
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def create_return_url(base, query, **kwargs):
    """
    Add a query string plus extra parameters to a base URL which may contain
    a query part already.

    :param base: redirect_uri may contain a query part, no fragment allowed.
    :param query: Old query part as a string
    :param kwargs: extra query parameters
    :return:
    """
    part = urlsplit(base)
    if part.fragment:
        raise ValueError("Base URL contained parts it shouldn't")

    for key, values in parse_qs(query).items():
        if key in kwargs:
            if isinstance(kwargs[key], six.string_types):
                kwargs[key] = [kwargs[key]]
            kwargs[key].extend(values)
        else:
            kwargs[key] = values

    if part.query:
        for key, values in parse_qs(part.query).items():
            if key in kwargs:
                if isinstance(kwargs[key], six.string_types):
                    kwargs[key] = [kwargs[key]]
                kwargs[key].extend(values)
            else:
                kwargs[key] = values

        _pre = base.split("?")[0]
    else:
        _pre = base

    logger.debug("kwargs: %s" % kwargs)

    return "%s?%s" % (_pre, url_encode_params(kwargs))
项目:cloak-server    作者:encryptme    | 项目源码 | 文件源码
def _fetch_crl(self, config, url, out, fmt):
        # type: (ConfigParser, str, str, str) -> bool
        updated = False

        url_hash = sha1(url.encode('utf-8')).hexdigest()
        headers = {}  # type: Dict[str, str]

        try:
            etag = config.get(CONFIG_SECTION, url_hash)
        except NoOptionError:
            pass
        else:
            headers = {'If-None-Match': etag}

        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            crl_name = os.path.basename(urlsplit(url).path)
            crl_name, content = self._format_crl(crl_name, response.content, fmt)
            crl_path = os.path.join(out, crl_name)
            with open(crl_path, 'wb') as f:
                f.write(content)
            print(crl_path, file=self.stdout)
            updated = True

            if 'ETag' in response.headers:
                config.set(CONFIG_SECTION, url_hash, response.headers['ETag'])
        elif response.status_code == 304:
            pass
        else:
            print("Error {} downloading {}: {}".format(
                response.status_code, url, response.content
            ), file=self.stderr)

        return updated
项目:oejia_wx    作者:JoneXiong    | 项目源码 | 文件源码
def get_querystring(uri):
    parts = urlparse.urlsplit(uri)
    if sys.version_info[:2] == (2, 6):
        query = parts.path
        if query.startswith('?'):
            query = query[1:]
    else:
        query = parts.query
    return urlparse.parse_qs(query)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(urlparse.urlsplit(orig_url))
        prefix_parts = list(urlparse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        url_parts[2] = prefix_parts[2] + url_parts[2]
        return urlparse.urlunsplit(url_parts).rstrip('/')
项目:domain-discovery-crawler    作者:TeamHG-Memex    | 项目源码 | 文件源码
def get_path(url):
    p = urlsplit(url)
    return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
项目:my_utils    作者:aploium    | 项目源码 | 文件源码
def get_session(domain_or_url):
    """
    ???????? keep-alive ?session
    :param domain_or_url: ??
    :type domain_or_url: str
    :rtype: requests.Session
    """
    domain = urllib_parse.urlsplit(domain_or_url).netloc or domain_or_url

    if domain not in pool:
        pool[domain] = []

    if not hasattr(locked_session, "sessdicts"):
        # ????????????????session
        # ???session???????, ?? pool ????, ??????????
        # ??????, ???? release_lock() ???????session
        #    ??????session?????session?
        locked_session.sessdicts = []

    if not pool[domain]:
        # ????, ???? session
        sessdict = {
            "domain": domain,
            "sessobj": requests.Session(),
        }
    else:
        # ????????????
        sessdict = pool[domain].pop()

    sessdict["active"] = time.time()

    locked_session.sessdicts.append(sessdict)

    if _gc_checkpoint < time.time() - SESSION_TTL:
        with cleaning_lock:
            clear()

    return sessdict["sessobj"]  # type: requests.Session
项目:inspire-schemas    作者:inspirehep    | 项目源码 | 文件源码
def get_license_from_url(url):
    """Get the license abbreviation from an URL.

    Args:
        url(str): canonical url of the license.

    Returns:
        str: the corresponding license abbreviation.

    Raises:
        ValueError: when the url is not recognized
    """
    if not url:
        return

    split_url = urlsplit(url, scheme='http')

    if split_url.netloc.lower() == 'creativecommons.org':
        license = ['CC']
        match = _RE_LICENSE_URL.match(split_url.path)
        license.extend(part.upper() for part in match.groups() if part)
    elif split_url.netloc == 'arxiv.org':
        license = ['arXiv']
        match = _RE_LICENSE_URL.match(split_url.path)
        license.extend(part for part in match.groups() if part)
    else:
        raise ValueError('Unknown license URL')

    return u' '.join(license)
项目:TornadoWeb    作者:VxCoder    | 项目源码 | 文件源码
def get_querystring(uri):
    """Get Qeruystring information from uri.

    :param uri: uri
    :return: querystring info or {}
    """
    parts = urlparse.urlsplit(uri)
    if sys.version_info[:2] == (2, 6):
        query = parts.path
        if query.startswith('?'):
            query = query[1:]
    else:
        query = parts.query
    return urlparse.parse_qs(query)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __init__(self, repouri):
                """Initialize a RepoStats object.  Pass a TransportRepoURI
                object in repouri to configure an object for a particular
                repository URI."""

                self.__url = repouri.uri.rstrip("/")
                self.__scheme = urlsplit(self.__url)[0]
                self.__priority = repouri.priority

                self.__proxy = repouri.proxy
                self.__system = repouri.system

                self._err_decay = 0
                self.__failed_tx = 0
                self.__content_err = 0
                self.__decayable_err = 0
                self.__timeout_err = 0
                self.__total_tx = 0
                self.__consecutive_errors = 0

                self.__connections = 0
                self.__connect_time = 0.0

                self.__used = False

                self.__bytes_xfr = 0.0
                self.__seconds_xfr = 0.0
                self.origin_speed = 0.0
                self.origin_cspeed = 0.0
                self.origin_count = 1
                self.origin_factor = 1
                self.origin_decay = 1
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __str__(self):
                illegals = []

                for u in self.uris:
                        assert isinstance(u, six.string_types)
                        scheme = urlsplit(u,
                            allow_fragments=0)[0]
                        illegals.append((u, scheme))

                if len(illegals) > 1:
                        msg = _("The follwing URIs use unsupported "
                            "schemes.  Supported schemes are "
                            "file://, http://, and https://.")
                        for i, s in illegals:
                                msg += _("\n  {uri} (scheme: "
                                    "{scheme})").format(uri=i, scheme=s)
                        return msg
                elif len(illegals) == 1:
                        i, s = illegals[0]
                        return _("The URI '{uri}' uses the unsupported "
                            "scheme '{scheme}'.  Supported schemes are "
                            "file://, http://, and https://.").format(
                            uri=i, scheme=s)
                return _("The specified URI uses an unsupported scheme."
                    "  Supported schemes are: file://, http://, and "
                    "https://.")
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __str__(self):
                if self.data:
                        scheme = urlsplit(self.data,
                            allow_fragments=0)[0]
                        return _("The proxy URI '{uri}' uses the unsupported "
                            "scheme '{scheme}'. Currently the only supported "
                            "scheme is http://.").format(
                            uri=self.data, scheme=scheme)
                return _("The specified proxy URI uses an unsupported scheme."
                    " Currently the only supported scheme is: http://.")
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def valid_pub_url(url, proxy=False):
        """Verify that the publisher URL contains only valid characters.
        If 'proxy' is set to True, some checks are relaxed."""

        if not url:
                return False

        # First split the URL and check if the scheme is one we support
        o = urlsplit(url)

        if not o[0] in _valid_proto:
                return False

        if o[0] == "file":
                path = urlparse(url, "file", allow_fragments=0)[2]
                path = url2pathname(path)
                if not os.path.abspath(path):
                        return False
                # No further validation to be done.
                return True

        # Next verify that the network location is valid
        if six.PY3:
                host = urllib.parse.splitport(o[1])[0]
        else:
                host = urllib.splitport(o[1])[0]

        if proxy:
                # We may have authentication details in the proxy URI, which
                # we must ignore when checking for hostname validity.
                host_parts = host.split("@")
                if len(host_parts) == 2:
                        host = host[1]

        if not host or _invalid_host_chars.match(host):
                return False

        if _hostname_re.match(host):
                return True

        return False
项目:scrapy-rotating-proxies    作者:TeamHG-Memex    | 项目源码 | 文件源码
def get_proxy_slot(self, proxy):
        """
        Return downloader slot for a proxy.
        By default it doesn't take port in account, i.e. all proxies with
        the same hostname / ip address share the same slot.
        """
        # FIXME: an option to use website address as a part of slot as well?
        return urlsplit(proxy).hostname
项目:wsgiprox    作者:webrecorder    | 项目源码 | 文件源码
def convert_env(self):
        full_uri = self.environ['REQUEST_URI']

        parts = urlsplit(full_uri)

        self.resolve(full_uri, self.environ, parts.netloc.split(':')[0])

        for header in list(self.environ.keys()):
            if header in self.FILTER_REQ_HEADERS:
                self.environ.pop(header, '')
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def get_id_from_href(href):
    """Return the id or uuid portion of a url.

    Given: 'http://www.foo.com/bar/123?q=4'
    Returns: '123'

    Given: 'http://www.foo.com/bar/abc123?q=4'
    Returns: 'abc123'

    """
    return urlparse.urlsplit("%s" % href).path.split('/')[-1]
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _update_link_prefix(self, orig_url, prefix):
        if not prefix:
            return orig_url
        url_parts = list(urlparse.urlsplit(orig_url))
        prefix_parts = list(urlparse.urlsplit(prefix))
        url_parts[0:2] = prefix_parts[0:2]
        url_parts[2] = prefix_parts[2] + url_parts[2]
        return urlparse.urlunsplit(url_parts).rstrip('/')