Python six.moves.urllib.parse 模块,urljoin() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib.parse.urljoin()

项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def get_keystone_url(auth_url, auth_version):
    """Gives an http/https url to contact keystone.

    Given an auth_url and auth_version, this method generates the url in
    which keystone can be reached.

    :param auth_url: a http or https url to be inspected (like
        'http://127.0.0.1:9898/').
    :param auth_version: a string containing the version (like v2, v3.0, etc)
    :returns: a string containing the keystone url
    """
    api_v3 = _is_apiv3(auth_url, auth_version)
    api_version = 'v3' if api_v3 else 'v2.0'
    # NOTE(lucasagomes): Get rid of the trailing '/' otherwise urljoin()
    #   fails to override the version in the URL
    return parse.urljoin(auth_url.rstrip('/'), api_version)
项目:storj-python-sdk    作者:Storj    | 项目源码 | 文件源码
def _prepare_request(self, **kwargs):
        """Prepares a HTTP request.
        Args:
            kwargs (dict): keyword arguments for the authentication function
                (``_add_ecdsa_signature()`` or ``_add_basic_auth()``) and
                :py:class:`requests.Request` class.
        Raises:
            AssertionError: in case ``kwargs['path']`` doesn't start with ``/``.
        """

        kwargs.setdefault('headers', {})

        # Add appropriate authentication headers
        if isinstance(self.private_key, SigningKey):
            self._add_ecdsa_signature(kwargs)
        elif self.email and self.password:
            self._add_basic_auth(kwargs)

        # Generate URL from path
        path = kwargs.pop('path')
        assert path.startswith('/')
        kwargs['url'] = urljoin(self.api_url, path)

        return requests.Request(**kwargs).prepare()
项目:python-kingbirdclient    作者:openstack    | 项目源码 | 文件源码
def get_contents_if_file(contents_or_file_name):
    """Get the contents of a file.

    If the value passed in is a file name or file URI, return the
    contents. If not, or there is an error reading the file contents,
    return the value passed in as the contents.

    For example, a workflow definition will be returned if either the
    workflow definition file name, or file URI are passed in, or the
    actual workflow definition itself is passed in.
    """
    try:
        if parse.urlparse(contents_or_file_name).scheme:
            definition_url = contents_or_file_name
        else:
            path = os.path.abspath(contents_or_file_name)
            definition_url = parse.urljoin(
                'file:',
                request.pathname2url(path)
            )
        return request.urlopen(definition_url).read().decode('utf8')
    except Exception:
        return contents_or_file_name
项目:conda-concourse-ci    作者:conda    | 项目源码 | 文件源码
def get_upload_channels(upload_config_dir, subdir, channels=None):
    """thought here was to provide whatever channel you have set as an output also to be an input

    Killed this in favor of setting channels in condarc in the docker image.
    """
    configurations = load_yaml_config_dir(upload_config_dir)
    channels = channels or []

    for config in configurations:
        if 'token' in config:
            channels.append(config['user'])
        elif 'server' in config:
            channels.append(parse.urljoin('http://' + config['server'],
                            config['destination_path'].format(subdir=subdir)))
        else:
            channels.append(config['channel'])
    return channels
项目:deb-oslo.vmware    作者:openstack    | 项目源码 | 文件源码
def test_get_pbm_wsdl_location(self):
        wsdl = pbm.get_pbm_wsdl_location(None)
        self.assertIsNone(wsdl)

        def expected_wsdl(version):
            driver_abs_dir = os.path.abspath(os.path.dirname(pbm.__file__))
            path = os.path.join(driver_abs_dir, 'wsdl', version,
                                'pbmService.wsdl')
            return urlparse.urljoin('file:', urllib.pathname2url(path))

        with mock.patch('os.path.exists') as path_exists:
            path_exists.return_value = True
            wsdl = pbm.get_pbm_wsdl_location('5')
            self.assertEqual(expected_wsdl('5'), wsdl)
            wsdl = pbm.get_pbm_wsdl_location('5.5')
            self.assertEqual(expected_wsdl('5.5'), wsdl)
            wsdl = pbm.get_pbm_wsdl_location('5.5.1')
            self.assertEqual(expected_wsdl('5.5'), wsdl)
            path_exists.return_value = False
            wsdl = pbm.get_pbm_wsdl_location('5.5')
            self.assertIsNone(wsdl)
项目:deb-oslo.vmware    作者:openstack    | 项目源码 | 文件源码
def get_pbm_wsdl_location(vc_version):
    """Return PBM WSDL file location corresponding to VC version.

    :param vc_version: a dot-separated version string. For example, "1.2".
    :return: the pbm wsdl file location.
    """
    if not vc_version:
        return
    ver = vc_version.split('.')
    major_minor = ver[0]
    if len(ver) >= 2:
        major_minor = '%s.%s' % (major_minor, ver[1])
    curr_dir = os.path.abspath(os.path.dirname(__file__))
    pbm_service_wsdl = os.path.join(curr_dir, 'wsdl', major_minor,
                                    'pbmService.wsdl')
    if not os.path.exists(pbm_service_wsdl):
        LOG.warning(_LW("PBM WSDL file %s not found."), pbm_service_wsdl)
        return
    pbm_wsdl = urlparse.urljoin('file:', urllib.pathname2url(pbm_service_wsdl))
    LOG.debug("Using PBM WSDL location: %s.", pbm_wsdl)
    return pbm_wsdl
项目:morango    作者:learningequality    | 项目源码 | 文件源码
def _request(self, endpoint, method="GET", lookup=None, data={}, params={}, userargs=None, password=None):
        """
        Generic request method designed to handle any morango endpoint.

        :param endpoint: constant representing which morango endpoint we are querying
        :param method: HTTP verb/method for request
        :param lookup: the pk value for the specific object we are querying
        :param data: dict that will be form-encoded in request
        :param params: dict to be sent as part of URL's query string
        :param userargs: Authorization credentials
        :param password:
        :return: ``Response`` object from request
        """
        # convert user arguments into query str for passing to auth layer
        if isinstance(userargs, dict):
            userargs = "&".join(["{}={}".format(key, val) for (key, val) in iteritems(userargs)])

        # build up url and send request
        if lookup:
            lookup = lookup + '/'
        url = urljoin(urljoin(self.base_url, endpoint), lookup)
        auth = (userargs, password) if userargs else None
        resp = requests.request(method, url, json=data, params=params, auth=auth)
        resp.raise_for_status()
        return resp
项目:python-zunclient    作者:openstack    | 项目源码 | 文件源码
def get_file_contents(from_data, files, base_url=None,
                      ignore_if=None):

    if isinstance(from_data, dict):
        for key, value in from_data.items():
            if ignore_if and ignore_if(key, value):
                continue

            if base_url and not base_url.endswith('/'):
                base_url = base_url + '/'

            str_url = parse.urljoin(base_url, value)
            if str_url not in files:
                file_content = utils.read_url_content(str_url)
                if is_template(file_content):
                    template = get_template_contents(
                        template_url=str_url, files=files)[1]
                    file_content = jsonutils.dumps(template)
                files[str_url] = file_content
            # replace the data value with the normalised absolute URL
            from_data[key] = str_url
项目:pymonzo    作者:pawelad    | 项目源码 | 文件源码
def test_class_get_oauth_token_method(self, mocker, mocked_monzo):
        """Test class `_get_oauth_token` method"""
        mocked_fetch_token = mocker.MagicMock()
        mocked_oauth2_session = mocker.patch('pymonzo.monzo_api.OAuth2Session')
        mocked_oauth2_session.return_value.fetch_token = mocked_fetch_token

        token = mocked_monzo._get_oauth_token()

        assert token == mocked_fetch_token.return_value

        mocked_oauth2_session.assert_called_once_with(
            client_id=mocked_monzo._client_id,
            redirect_uri=config.PYMONZO_REDIRECT_URI,
        )
        mocked_fetch_token.assert_called_once_with(
            token_url=urljoin(mocked_monzo.api_url, '/oauth2/token'),
            code=mocked_monzo._auth_code,
            client_secret=mocked_monzo._client_secret,
        )
项目:pymonzo    作者:pawelad    | 项目源码 | 文件源码
def _get_oauth_token(self):
        """
        Get Monzo access token via OAuth2 `authorization code` grant type.

        Official docs:
            https://monzo.com/docs/#acquire-an-access-token

        :returns: OAuth 2 access token
        :rtype: dict
        """
        url = urljoin(self.api_url, '/oauth2/token')

        oauth = OAuth2Session(
            client_id=self._client_id,
            redirect_uri=config.PYMONZO_REDIRECT_URI,
        )

        token = oauth.fetch_token(
            token_url=url,
            code=self._auth_code,
            client_secret=self._client_secret,
        )

        return token
项目:webkivy    作者:miohtama    | 项目源码 | 文件源码
def crawl(self, url, base_url):
        """Crawl .html page and extract all URls we think are part of application from there.

        Parallerize downloads using threads.
        """

        resp = requests.get(url)

        # See through redirects
        final_base_url = resp.url

        tree = lxml.html.fromstring(resp.content)
        elems = tree.cssselect("a")
        links = [urljoin(final_base_url, elem.attrib.get("href", "")) for elem in elems]
        links = [link for link in links if is_likely_app_part(link, base_url)]

        # Load all links paraller
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            future_to_url = {executor.submit(self.fetch_file, link, base_url): link for link in links}
            for future in concurrent.futures.as_completed(future_to_url):
                future.result()  # Raise exception in main thread if bad stuff happened
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def _get_sushy_system(self, system_id):
        """Get the sushy system for system_id

        :param system_id: The identity of the System resource
        :returns: the Sushy system instance
        :raises: IloError
        """
        system_url = parse.urljoin(self._sushy.get_system_collection_path(),
                                   system_id)
        try:
            return self._sushy.get_system(system_url)
        except sushy.exceptions.SushyError as e:
            msg = (self._('The Redfish System "%(system)s" was not found. '
                          'Error %(error)s') %
                   {'system': system_id, 'error': str(e)})
            LOG.debug(msg)
            raise exception.IloError(msg)
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def _get_sushy_manager(self, manager_id):
        """Get the sushy Manager for manager_id

        :param manager_id: The identity of the Manager resource
        :returns: the Sushy Manager instance
        :raises: IloError
        """
        manager_url = parse.urljoin(self._sushy.get_manager_collection_path(),
                                    manager_id)
        try:
            return self._sushy.get_manager(manager_url)
        except sushy.exceptions.SushyError as e:
            msg = (self._('The Redfish Manager "%(manager)s" was not found. '
                          'Error %(error)s') %
                   {'manager': manager_id, 'error': str(e)})
            LOG.debug(msg)
            raise exception.IloError(msg)
项目:python-distilclient    作者:openstack    | 项目源码 | 文件源码
def collect_usage(self):
        url = urlparse.urljoin(self.endpoint, "collect_usage")

        headers = {"Content-Type": "application/json",
                   "X-Auth-Token": self.auth_token}

        try:
            response = requests.post(url, headers=headers,
                                     verify=not self.insecure)
            if response.status_code != 200:
                raise AttributeError("Usage cycle failed: %s  code: %s" %
                                     (response.text, response.status_code))
            else:
                return response.json()
        except ConnectionError as e:
            print(e)
项目:python-distilclient    作者:openstack    | 项目源码 | 文件源码
def last_collected(self):
        url = urlparse.urljoin(self.endpoint, "last_collected")

        headers = {"Content-Type": "application/json",
                   "X-Auth-Token": self.auth_token}

        try:
            response = requests.get(url, headers=headers,
                                    verify=not self.insecure)
            if response.status_code != 200:
                raise AttributeError("Get last collected failed: %s code: %s" %
                                     (response.text, response.status_code))
            else:
                return response.json()
        except ConnectionError as e:
            print(e)
项目:python-distilclient    作者:openstack    | 项目源码 | 文件源码
def _query_usage(self, tenant, start, end, endpoint):
        url = urlparse.urljoin(self.endpoint, endpoint)

        headers = {"X-Auth-Token": self.auth_token}

        params = {"tenant": tenant,
                  "start": start,
                  "end": end
                  }

        try:
            response = requests.get(url, headers=headers,
                                    params=params,
                                    verify=not self.insecure)
            if response.status_code != 200:
                raise AttributeError("Get usage failed: %s code: %s" %
                                     (response.text, response.status_code))
            else:
                return response.json()
        except ConnectionError as e:
            print(e)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGet300WithLocation(self):
        # Test the we automatically follow 300 redirects if a
        # Location: header is provided
        uri = urllib_parse.urljoin(base, "300/with-location-header.asis")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 300)
        self.assertEqual(response.previous.fromcache, False)

        # Confirm that the intermediate 300 is not cached
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 300)
        self.assertEqual(response.previous.fromcache, False)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGet301(self):
        # Test that we automatically follow 301 redirects
        # and that we cache the 301 response
        uri = urllib_parse.urljoin(base, "301/onestep.asis")
        destination = urllib_parse.urljoin(base, "302/final-destination.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertTrue('content-location' in response)
        self.assertEqual(response['content-location'], destination)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 301)
        self.assertEqual(response.previous.fromcache, False)

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(response['content-location'], destination)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 301)
        self.assertEqual(response.previous.fromcache, True)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGet302RedirectionLimit(self):
        # Test that we can set a lower redirection limit
        # and that we raise an exception when we exceed
        # that limit.
        self.http.force_exception_to_status_code = False

        uri = urllib_parse.urljoin(base, "302/twostep.asis")
        try:
            (response, content) = self.http.request(uri, "GET", redirections=1)
            self.fail("This should not happen")
        except httplib2.RedirectLimit:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception ")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET", redirections=1)
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Redirected more"))
        self.assertEqual("302", response['status'])
        self.assertTrue(content.startswith(b"<html>"))
        self.assertTrue(response.previous is not None)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGetIgnoreEtag(self):
        # Test that we can forcibly ignore ETags
        uri = urllib_parse.urljoin(base, "reflector/reflector.cgi")
        (response, content) = self.http.request(uri, "GET", headers={
            'accept-encoding': 'identity'})
        self.assertNotEqual(response['etag'], "")

        (response, content) = self.http.request(uri, "GET", headers={
            'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
        d = self.reflector(content)
        self.assertTrue('HTTP_IF_NONE_MATCH' in d)

        self.http.ignore_etag = True
        (response, content) = self.http.request(uri, "GET", headers={
            'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
        d = self.reflector(content)
        self.assertEqual(response.fromcache, False)
        self.assertFalse('HTTP_IF_NONE_MATCH' in d)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGet307(self):
        # Test that we do follow 307 redirects but
        # do not cache the 307
        uri = urllib_parse.urljoin(base, "307/onestep.asis")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 307)
        self.assertEqual(response.previous.fromcache, False)

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 307)
        self.assertEqual(response.previous.fromcache, False)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGetGZipFailure(self):
        # Test that we raise a good exception when the gzip fails
        self.http.force_exception_to_status_code = False
        uri = urllib_parse.urljoin(base, "gzip/failed-compression.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.FailedToDecompressContent:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Content purported"))
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGetDeflateFailure(self):
        # Test that we raise a good exception when the deflate fails
        self.http.force_exception_to_status_code = False

        uri = urllib_parse.urljoin(base, "deflate/failed-compression.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.FailedToDecompressContent:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Content purported"))
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGetCacheControlNoCache(self):
        # Test Cache-Control: no-cache on requests
        uri = urllib_parse.urljoin(base, "304/test_etag.txt")
        (response, content) = self.http.request(
            uri, "GET", headers={'accept-encoding': 'identity'})
        self.assertNotEqual(response['etag'], "")
        (response, content) = self.http.request(
            uri, "GET", headers={'accept-encoding': 'identity'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)

        (response, content) = self.http.request(
            uri, "GET", headers={
                'accept-encoding': 'identity',
                'Cache-Control': 'no-cache'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, False)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testGetCacheControlPragmaNoCache(self):
        # Test Pragma: no-cache on requests
        uri = urllib_parse.urljoin(base, "304/test_etag.txt")
        (response, content) = self.http.request(
            uri, "GET", headers={'accept-encoding': 'identity'})
        self.assertNotEqual(response['etag'], "")
        (response, content) = self.http.request(
            uri, "GET", headers={'accept-encoding': 'identity'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)

        (response, content) = self.http.request(
            uri, "GET", headers={
                'accept-encoding': 'identity',
                'Pragma': 'no-cache'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, False)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testBasicAuthTwoDifferentCredentials(self):
        # Test Basic Authentication with multiple sets of credentials
        uri = urllib_parse.urljoin(base, "basic2/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        uri = urllib_parse.urljoin(base, "basic2/")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        self.http.add_credentials('fred', 'barney')
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)

        uri = urllib_parse.urljoin(base, "basic2/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testDigestAuthNextNonceAndNC(self):
        # Test that if the server sets nextnonce that we reset
        # the nonce count back to 1
        uri = urllib_parse.urljoin(base, "digest/file.txt")
        self.http.add_credentials('joe', 'password')
        (response, content) = self.http.request(
            uri, "GET", headers={"cache-control": "no-cache"})
        info = httplib2._parse_www_authenticate(
            response, 'authentication-info')
        self.assertEqual(response.status, 200)
        (response, content) = self.http.request(
            uri, "GET", headers={"cache-control": "no-cache"})
        info2 = httplib2._parse_www_authenticate(
            response, 'authentication-info')
        self.assertEqual(response.status, 200)

        if 'nextnonce' in info:
            self.assertEqual(info2['nc'], 1)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def testReflector(self):
        uri = urllib_parse.urljoin(base, "reflector/reflector.cgi")
        (response, content) = self.http.request(uri, "GET")
        d = self.reflector(content)
        self.assertTrue('HTTP_USER_AGENT' in d)

    # NOTE: disabled because this isn't relevant to the shim.
    # def testConnectionClose(self):
    #     uri = "http://www.google.com/"
    #     (response, content) = self.http.request(uri, "GET")
    #     for c in self.http.connections.values():
    #         self.assertNotEqual(None, c.sock)
    #     (response, content) = self.http.request(
    #         uri, "GET", headers={"connection": "close"})
    #     for c in self.http.connections.values():
    #         self.assertEqual(None, c.sock)
项目:bottlecap    作者:foxx    | 项目源码 | 文件源码
def get_full_url(self, routename, **kwargs):
        """
        Construct full URL using components from current
        bottle request, merged with get_url()

        For example:
        https://example.com/hello?world=1

        XXX: Needs UT
        """
        url = self.app.get_url(routename, **kwargs)
        return urljoin(self.base_url, url)


############################################################
# CBVs (class based views)
############################################################
项目:PyBloqs    作者:manahl    | 项目源码 | 文件源码
def show(self, fmt="html", header_block=None, footer_block=None):
        """
        Show the block in a browser.

        :param fmt: The format of the saved block. Supports the same output as `Block.save`
        :return: Path to the block file.
        """
        file_name = str_base(hash(self._id)) + "." + fmt
        file_path = self.publish(os.path.expanduser(os.path.join(user_config["tmp_html_dir"], file_name)),
                                 header_block=header_block, footer_block=footer_block)

        try:
            url_base = user_config["public_dir"]
        except KeyError:
            path = os.path.expanduser(file_path)
        else:
            path = urljoin(url_base, os.path.expanduser(user_config["tmp_html_dir"] + "/" + file_name))

        webbrowser.open_new_tab(path)

        return path
项目:python-percy    作者:teeberg    | 项目源码 | 文件源码
def request(self, method, path, allowed_statuses=None, **kwargs):
        from . import __version__

        url = urljoin(self.base_url, path, '/')
        headers = kwargs.setdefault('headers', {})
        headers['Authorization'] = 'Token token="{}"'.format(self.access_token)
        headers['User-Agent'] = 'python-percy/{}'.format(__version__)
        try:
            response = requests.request(method, url, **kwargs)
            self._debug_response(response)
        except Exception as ex:
            l.debug('%s %s -> Exception: %s: %s', method, url, ex.__class__.__name__, ex.args)
            raise

        if not allowed_statuses or response.status_code not in allowed_statuses:
            self._check_response_error(response)
            assert response.status_code < 300, (response.status_code, response.content)
        return response
项目:tempest-horizon    作者:openstack    | 项目源码 | 文件源码
def user_login(self, username, password):
        response = self._get_opener().open(CONF.dashboard.dashboard_url).read()

        # Grab the CSRF token and default region
        parser = HorizonHTMLParser()
        parser.feed(response)

        # construct login url for dashboard, discovery accommodates non-/ web
        # root for dashboard
        login_url = parse.urljoin(CONF.dashboard.dashboard_url, parser.login)

        # Prepare login form request
        req = request.Request(login_url)
        req.add_header('Content-type', 'application/x-www-form-urlencoded')
        req.add_header('Referer', CONF.dashboard.dashboard_url)

        # Pass the default domain name regardless of the auth version in order
        # to test the scenario of when horizon is running with keystone v3
        params = {'username': username,
                  'password': password,
                  'region': parser.region,
                  'domain': CONF.auth.default_credentials_domain_name,
                  'csrfmiddlewaretoken': parser.csrf_token}
        self._get_opener().open(req, parse.urlencode(params))
项目:dci-ansible-agent    作者:redhat-cip    | 项目源码 | 文件源码
def get_index(self):
        '''Get index page of periodic job and returns all links to jobs'''
        url = urljoin(self.config.log_url, self.args.job)
        res = get_html(url)
        if res is None or not res.ok:
            return []

        body = res.content.decode() if res.content else ''
        hrefs = [HREF.search(l).group(1)
                 for l in body.splitlines() if HREF.search(l)]
        links = ["/".join((url, link))
                 for link in hrefs if JOBRE.match(link)]
        if links:
            # Number of links to return
            return links[:NLINKS]
        else:
            return []
项目:dci-ansible-agent    作者:redhat-cip    | 项目源码 | 文件源码
def get_index(self):
        '''Get index page of periodic job and returns all links to jobs'''
        url = urljoin(self.config.log_url, self.args.job)
        res = get_html(url)
        if res is None or not res.ok:
            return []

        body = res.content.decode() if res.content else ''
        hrefs = [HREF.search(l).group(1)
                 for l in body.splitlines() if HREF.search(l)]
        links = ["/".join((url, link))
                 for link in hrefs if JOBRE.match(link)]
        if links:
            # Number of links to return
            return links[:NLINKS]
        else:
            return []
项目:dci-ansible-agent    作者:redhat-cip    | 项目源码 | 文件源码
def get_index(self):
        '''Get index page of periodic job and returns all links to jobs'''
        url = urljoin(self.config.log_url, self.args.job)
        res = get_html(url)
        if res is None or not res.ok:
            return []

        body = res.content.decode() if res.content else ''
        hrefs = [HREF.search(l).group(1)
                 for l in body.splitlines() if HREF.search(l)]
        links = ["/".join((url, link))
                 for link in hrefs if JOBRE.match(link)]
        if links:
            # Number of links to return
            return links[:NLINKS]
        else:
            return []
项目:cloak-server    作者:encryptme    | 项目源码 | 文件源码
def _call(method, path, api_version=None, **kwargs):
    # type: (str, str, str, **Any) -> requests.Response
    url = urljoin(base_url, '/api/server/')
    url = urljoin(url, path)

    headers = kwargs.setdefault('headers', {})
    if api_version is not None:
        headers['X-Cloak-API-Version'] = api_version
    elif default_api_version is not None:
        headers['X-Cloak-API-Version'] = default_api_version

    if method == 'GET':
        response = session.get(url, **kwargs)
    elif method == 'POST':
        response = session.post(url, **kwargs)
    else:
        raise NotImplementedError()

    if response.status_code not in xrange(200, 400):
        raise ServerApiError(response)

    return response
项目:python-iotronicclient    作者:openstack    | 项目源码 | 文件源码
def log_curl_request(self, method, url, kwargs):
        curl = ['curl -i -X %s' % method]

        for (key, value) in kwargs['headers'].items():
            header = '-H \'%s: %s\'' % self._process_header(key, value)
            curl.append(header)

        if not self.session.verify:
            curl.append('-k')
        elif isinstance(self.session.verify, six.string_types):
            curl.append('--cacert %s' % self.session.verify)

        if self.session.cert:
            curl.append('--cert %s' % self.session.cert[0])
            curl.append('--key %s' % self.session.cert[1])

        if 'body' in kwargs:
            body = strutils.mask_password(kwargs['body'])
            curl.append('-d \'%s\'' % body)

        curl.append(urlparse.urljoin(self.endpoint_trimmed, url))
        LOG.debug(' '.join(curl))
项目:django-scim2    作者:15five    | 项目源码 | 文件源码
def resource_type_dict(cls, request=None):
        """
        Return a ``dict`` containing ResourceType metadata for the user object.
        """
        id_ = cls.resource_type
        path = reverse('scim:resource-types', kwargs={'uuid': id_})
        location = urljoin(get_base_scim_location_getter()(request), path)
        return {
            'schemas': [constants.SchemaURI.RESOURCE_TYPE],
            'id': id_,
            'name': 'User',
            'endpoint': reverse('scim:users'),
            'description': 'User Account',
            'schema': constants.SchemaURI.USER,
            'meta': {
                'location': location,
                'resourceType': 'ResourceType'
            }
        }
项目:django-scim2    作者:15five    | 项目源码 | 文件源码
def resource_type_dict(cls, request=None):
        """
        Return a ``dict`` containing ResourceType metadata for the group object.
        """
        id_ = cls.resource_type
        path = reverse('scim:resource-types', kwargs={'uuid': id_})
        location = urljoin(get_base_scim_location_getter()(request), path)
        return {
            'schemas': [constants.SchemaURI.RESOURCE_TYPE],
            'id': id_,
            'name': 'Group',
            'endpoint': reverse('scim:groups'),
            'description': 'Group',
            'schema': constants.SchemaURI.GROUP,
            'meta': {
                'location': location,
                'resourceType': 'ResourceType'
            }
        }
项目:software-factory    作者:softwarefactory-project    | 项目源码 | 文件源码
def __init__(self, url, api_key, project_group, board):
        self.url = url
        self.cookie = get_cookie(url, api_key=api_key)
        self.client = SFStoryboard(urljoin(url, "storyboard_api"),
                                   self.cookie)
        try:
            self.project_group = self.client.project_groups.find(
                name=project_group)
        except exceptions.NotFound:
            raise Exception('projects group not found')
        self.stories = self.client.stories.get_all(
            project_group_id=self.project_group.id)
        self.board_id = None
        if board:
            try:
                self.board_id = self.client.boards.find(title=board).id
            except exceptions.NotFound:
                raise Exception('board not found')
        self.board_lanes = {}
        for lane in self.client.worklists.get_all(board_id=self.board_id):
            if not lane.archived and lane.title in LANES:
                self.board_lanes[lane.title] = lane
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __network_ping(self):
                try:
                        repourl = urljoin(self.get_depot_url(),
                            "versions/0")
                        # Disable SSL peer verification, we just want to check
                        # if the depot is running.
                        url = urlopen(repourl,
                            context=ssl._create_unverified_context())
                        url.close()
                except HTTPError as e:
                        # Server returns NOT_MODIFIED if catalog is up
                        # to date
                        if e.code == http_client.NOT_MODIFIED:
                                return True
                        else:
                                return False
                except URLError as e:
                        return False
                return True
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def test_bug_5366(self):
                """Publish a package with slashes in the name, and then verify
                that the depot manifest and info operations work regardless of
                the encoding."""
                depot_url = self.dc.get_depot_url()
                plist = self.pkgsend_bulk(depot_url, self.system10)
                # First, try it un-encoded.
                repourl = urljoin(depot_url, "info/0/{0}".format(plist[0]))
                urlopen(repourl)
                repourl = urljoin(depot_url, "manifest/0/{0}".format(
                    plist[0]))
                urlopen(repourl)
                # Second, try it encoded.
                repourl = urljoin(depot_url, "info/0/{0}".format(
                    quote(plist[0])))
                urlopen(repourl)
                repourl = urljoin(depot_url, "manifest/0/{0}".format(
                    quote(plist[0])))
                urlopen(repourl)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def test_bug_15482(self):
                """Test to make sure BUI search doesn't trigger a traceback."""

                # Now update the repository configuration while the depot is
                # stopped so changes won't be overwritten on exit.
                self.__update_repo_config()

                # Start the depot.
                self.dc.start()

                # Then, publish some packages we can abuse for testing.
                durl = self.dc.get_depot_url()
                self.pkgsend_bulk(durl, self.quux10, refresh_index=True)

                surl = urljoin(durl,
                    "en/search.shtml?action=Search&token=*")
                urlopen(surl).read()
                surl = urljoin(durl,
                    "en/advanced_search.shtml?action=Search&token=*")
                urlopen(surl).read()
                surl = urljoin(durl,
                    "en/advanced_search.shtml?token=*&show=a&rpp=50&"
                    "action=Advanced+Search")
                urlopen(surl).read()
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def _network_ping(self):
                try:
                        # Ping the versions URL, rather than the default /
                        # so that we don't initialize the BUI code yet.
                        repourl = urljoin(self.url, "versions/0")
                        # Disable SSL peer verification, we just want to check
                        # if the depot is running.
                        urlopen(repourl,
                            context=ssl._create_unverified_context())
                except HTTPError as e:
                        if e.code == http_client.FORBIDDEN:
                                return True
                        return False
                except URLError:
                        return False
                return True
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _lookup_torrent_url_fn():
        """Load a "fetcher" func to get the right torrent URL.
        """

        if CONF.xenserver.torrent_base_url:
            if '/' not in CONF.xenserver.torrent_base_url:
                LOG.warning(_LW('Value specified in conf file for'
                             ' xenserver.torrent_base_url does not contain a'
                             ' slash character, therefore it will not be used'
                             ' as part of the torrent URL. Specify a valid'
                             ' base URL as defined by RFC 1808 (see step 6).'))

            def _default_torrent_url_fn(image_id):
                return urlparse.urljoin(CONF.xenserver.torrent_base_url,
                                        "%s.torrent" % image_id)

            return _default_torrent_url_fn

        raise RuntimeError(_('Cannot create default bittorrent URL'
                             ' without xenserver.torrent_base_url'
                             ' configuration option set.'))
项目:django-anymail    作者:anymail    | 项目源码 | 文件源码
def get_request_params(self, api_url):
        """Returns a dict of requests.request params that will send payload to the ESP.

        :param api_url: the base api_url for the backend
        :return: dict
        """
        api_endpoint = self.get_api_endpoint()
        if api_endpoint is not None:
            url = urljoin(api_url, api_endpoint)
        else:
            url = api_url

        return dict(
            method=self.method,
            url=url,
            params=self.params,
            data=self.serialize_data(),
            headers=self.headers,
            files=self.files,
            auth=self.auth,
            # json= is not here, because we prefer to do our own serialization
            #       to provide extra context in error messages
        )
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def do_GET(self):
        url_map = {instance.url: instance for instance in self.server.test_instances}
        if self.path == self.ticket_path:
            self.send_response(200)
            self.end_headers()
            urls = [
                {
                    "url": urljoin(SERVER_URL, test_instance.url),
                    "headers": test_instance.headers
                } for test_instance in self.server.test_instances]
            ticket = {"htsget": {"urls": urls}}
            self.wfile.write(json.dumps(ticket).encode())
        elif self.path in url_map:
            instance = url_map[self.path]
            if instance.error_code is not None:
                self.send_error(instance.error_code)
            else:
                self.send_response(200)
                self.send_header("Content-Length", len(instance.data))
                if instance.truncate:
                    self.end_headers()
                    self.wfile.write(instance.data[:-1])
                    self.wfile.flush()
                else:
                    self.end_headers()
                    self.wfile.write(instance.data)
        else:
            self.send_error(404)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def _get_endpoint_url(request, endpoint_type, catalog=None):
    if getattr(request.user, "service_catalog", None):
        url = base.url_for(request,
                           service_type='identity',
                           endpoint_type=endpoint_type)
    else:
        auth_url = getattr(settings, 'OPENSTACK_KEYSTONE_URL')
        url = request.session.get('region_endpoint', auth_url)

    # TODO(gabriel): When the Service Catalog no longer contains API versions
    # in the endpoints this can be removed.
    url = url.rstrip('/')
    url = urlparse.urljoin(url, 'v%s' % VERSIONS.active)

    return url
项目:ovirtcmd    作者:fbacchella    | 项目源码 | 文件源码
def resolve_service_href(self, href):
        absolute_href = urljoin(self.api.url, href)

        # The first replace remove the root of ovirt location
        # The second replace is to remove the first / in the path
        # The str ensure that service_path is a str, not a unicode in python 2
        service_path = str(absolute_href.replace(self.api.url, "").replace("/", "", 1))

        new_service = self.api.service(service_path)
        return new_service
项目:threatstack-python-client    作者:MyPureCloud    | 项目源码 | 文件源码
def http_request(self, method, path, data=None, params=None):
        """ Wraps HTTP calls to ThrestStack API """

        s = Session()
        url = urljoin(self.BASE_URL, path)
        headers = {"Authorization": self.api_key}
        if self.org_id:
            headers[self.org_id_header] = self.org_id

        req = Request(
            method,
            url,
            headers=headers,
            data=data,
            params=params
        )
        prepped = req.prepare()
        resp = s.send(prepped, timeout=self.timeout)
        if resp.status_code == 429:
            raise errors.APIRateLimitError("Threat Stack API rate limit exceeded")
        else:
            return self.handle_response(resp)