我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib.parse.urljoin()。
def get_keystone_url(auth_url, auth_version): """Gives an http/https url to contact keystone. Given an auth_url and auth_version, this method generates the url in which keystone can be reached. :param auth_url: a http or https url to be inspected (like 'http://127.0.0.1:9898/'). :param auth_version: a string containing the version (like v2, v3.0, etc) :returns: a string containing the keystone url """ api_v3 = _is_apiv3(auth_url, auth_version) api_version = 'v3' if api_v3 else 'v2.0' # NOTE(lucasagomes): Get rid of the trailing '/' otherwise urljoin() # fails to override the version in the URL return parse.urljoin(auth_url.rstrip('/'), api_version)
def _prepare_request(self, **kwargs): """Prepares a HTTP request. Args: kwargs (dict): keyword arguments for the authentication function (``_add_ecdsa_signature()`` or ``_add_basic_auth()``) and :py:class:`requests.Request` class. Raises: AssertionError: in case ``kwargs['path']`` doesn't start with ``/``. """ kwargs.setdefault('headers', {}) # Add appropriate authentication headers if isinstance(self.private_key, SigningKey): self._add_ecdsa_signature(kwargs) elif self.email and self.password: self._add_basic_auth(kwargs) # Generate URL from path path = kwargs.pop('path') assert path.startswith('/') kwargs['url'] = urljoin(self.api_url, path) return requests.Request(**kwargs).prepare()
def get_contents_if_file(contents_or_file_name): """Get the contents of a file. If the value passed in is a file name or file URI, return the contents. If not, or there is an error reading the file contents, return the value passed in as the contents. For example, a workflow definition will be returned if either the workflow definition file name, or file URI are passed in, or the actual workflow definition itself is passed in. """ try: if parse.urlparse(contents_or_file_name).scheme: definition_url = contents_or_file_name else: path = os.path.abspath(contents_or_file_name) definition_url = parse.urljoin( 'file:', request.pathname2url(path) ) return request.urlopen(definition_url).read().decode('utf8') except Exception: return contents_or_file_name
def get_upload_channels(upload_config_dir, subdir, channels=None): """thought here was to provide whatever channel you have set as an output also to be an input Killed this in favor of setting channels in condarc in the docker image. """ configurations = load_yaml_config_dir(upload_config_dir) channels = channels or [] for config in configurations: if 'token' in config: channels.append(config['user']) elif 'server' in config: channels.append(parse.urljoin('http://' + config['server'], config['destination_path'].format(subdir=subdir))) else: channels.append(config['channel']) return channels
def test_get_pbm_wsdl_location(self): wsdl = pbm.get_pbm_wsdl_location(None) self.assertIsNone(wsdl) def expected_wsdl(version): driver_abs_dir = os.path.abspath(os.path.dirname(pbm.__file__)) path = os.path.join(driver_abs_dir, 'wsdl', version, 'pbmService.wsdl') return urlparse.urljoin('file:', urllib.pathname2url(path)) with mock.patch('os.path.exists') as path_exists: path_exists.return_value = True wsdl = pbm.get_pbm_wsdl_location('5') self.assertEqual(expected_wsdl('5'), wsdl) wsdl = pbm.get_pbm_wsdl_location('5.5') self.assertEqual(expected_wsdl('5.5'), wsdl) wsdl = pbm.get_pbm_wsdl_location('5.5.1') self.assertEqual(expected_wsdl('5.5'), wsdl) path_exists.return_value = False wsdl = pbm.get_pbm_wsdl_location('5.5') self.assertIsNone(wsdl)
def get_pbm_wsdl_location(vc_version): """Return PBM WSDL file location corresponding to VC version. :param vc_version: a dot-separated version string. For example, "1.2". :return: the pbm wsdl file location. """ if not vc_version: return ver = vc_version.split('.') major_minor = ver[0] if len(ver) >= 2: major_minor = '%s.%s' % (major_minor, ver[1]) curr_dir = os.path.abspath(os.path.dirname(__file__)) pbm_service_wsdl = os.path.join(curr_dir, 'wsdl', major_minor, 'pbmService.wsdl') if not os.path.exists(pbm_service_wsdl): LOG.warning(_LW("PBM WSDL file %s not found."), pbm_service_wsdl) return pbm_wsdl = urlparse.urljoin('file:', urllib.pathname2url(pbm_service_wsdl)) LOG.debug("Using PBM WSDL location: %s.", pbm_wsdl) return pbm_wsdl
def _request(self, endpoint, method="GET", lookup=None, data={}, params={}, userargs=None, password=None): """ Generic request method designed to handle any morango endpoint. :param endpoint: constant representing which morango endpoint we are querying :param method: HTTP verb/method for request :param lookup: the pk value for the specific object we are querying :param data: dict that will be form-encoded in request :param params: dict to be sent as part of URL's query string :param userargs: Authorization credentials :param password: :return: ``Response`` object from request """ # convert user arguments into query str for passing to auth layer if isinstance(userargs, dict): userargs = "&".join(["{}={}".format(key, val) for (key, val) in iteritems(userargs)]) # build up url and send request if lookup: lookup = lookup + '/' url = urljoin(urljoin(self.base_url, endpoint), lookup) auth = (userargs, password) if userargs else None resp = requests.request(method, url, json=data, params=params, auth=auth) resp.raise_for_status() return resp
def get_file_contents(from_data, files, base_url=None, ignore_if=None): if isinstance(from_data, dict): for key, value in from_data.items(): if ignore_if and ignore_if(key, value): continue if base_url and not base_url.endswith('/'): base_url = base_url + '/' str_url = parse.urljoin(base_url, value) if str_url not in files: file_content = utils.read_url_content(str_url) if is_template(file_content): template = get_template_contents( template_url=str_url, files=files)[1] file_content = jsonutils.dumps(template) files[str_url] = file_content # replace the data value with the normalised absolute URL from_data[key] = str_url
def test_class_get_oauth_token_method(self, mocker, mocked_monzo): """Test class `_get_oauth_token` method""" mocked_fetch_token = mocker.MagicMock() mocked_oauth2_session = mocker.patch('pymonzo.monzo_api.OAuth2Session') mocked_oauth2_session.return_value.fetch_token = mocked_fetch_token token = mocked_monzo._get_oauth_token() assert token == mocked_fetch_token.return_value mocked_oauth2_session.assert_called_once_with( client_id=mocked_monzo._client_id, redirect_uri=config.PYMONZO_REDIRECT_URI, ) mocked_fetch_token.assert_called_once_with( token_url=urljoin(mocked_monzo.api_url, '/oauth2/token'), code=mocked_monzo._auth_code, client_secret=mocked_monzo._client_secret, )
def _get_oauth_token(self): """ Get Monzo access token via OAuth2 `authorization code` grant type. Official docs: https://monzo.com/docs/#acquire-an-access-token :returns: OAuth 2 access token :rtype: dict """ url = urljoin(self.api_url, '/oauth2/token') oauth = OAuth2Session( client_id=self._client_id, redirect_uri=config.PYMONZO_REDIRECT_URI, ) token = oauth.fetch_token( token_url=url, code=self._auth_code, client_secret=self._client_secret, ) return token
def crawl(self, url, base_url): """Crawl .html page and extract all URls we think are part of application from there. Parallerize downloads using threads. """ resp = requests.get(url) # See through redirects final_base_url = resp.url tree = lxml.html.fromstring(resp.content) elems = tree.cssselect("a") links = [urljoin(final_base_url, elem.attrib.get("href", "")) for elem in elems] links = [link for link in links if is_likely_app_part(link, base_url)] # Load all links paraller with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(self.fetch_file, link, base_url): link for link in links} for future in concurrent.futures.as_completed(future_to_url): future.result() # Raise exception in main thread if bad stuff happened
def _get_sushy_system(self, system_id): """Get the sushy system for system_id :param system_id: The identity of the System resource :returns: the Sushy system instance :raises: IloError """ system_url = parse.urljoin(self._sushy.get_system_collection_path(), system_id) try: return self._sushy.get_system(system_url) except sushy.exceptions.SushyError as e: msg = (self._('The Redfish System "%(system)s" was not found. ' 'Error %(error)s') % {'system': system_id, 'error': str(e)}) LOG.debug(msg) raise exception.IloError(msg)
def _get_sushy_manager(self, manager_id): """Get the sushy Manager for manager_id :param manager_id: The identity of the Manager resource :returns: the Sushy Manager instance :raises: IloError """ manager_url = parse.urljoin(self._sushy.get_manager_collection_path(), manager_id) try: return self._sushy.get_manager(manager_url) except sushy.exceptions.SushyError as e: msg = (self._('The Redfish Manager "%(manager)s" was not found. ' 'Error %(error)s') % {'manager': manager_id, 'error': str(e)}) LOG.debug(msg) raise exception.IloError(msg)
def collect_usage(self): url = urlparse.urljoin(self.endpoint, "collect_usage") headers = {"Content-Type": "application/json", "X-Auth-Token": self.auth_token} try: response = requests.post(url, headers=headers, verify=not self.insecure) if response.status_code != 200: raise AttributeError("Usage cycle failed: %s code: %s" % (response.text, response.status_code)) else: return response.json() except ConnectionError as e: print(e)
def last_collected(self): url = urlparse.urljoin(self.endpoint, "last_collected") headers = {"Content-Type": "application/json", "X-Auth-Token": self.auth_token} try: response = requests.get(url, headers=headers, verify=not self.insecure) if response.status_code != 200: raise AttributeError("Get last collected failed: %s code: %s" % (response.text, response.status_code)) else: return response.json() except ConnectionError as e: print(e)
def _query_usage(self, tenant, start, end, endpoint): url = urlparse.urljoin(self.endpoint, endpoint) headers = {"X-Auth-Token": self.auth_token} params = {"tenant": tenant, "start": start, "end": end } try: response = requests.get(url, headers=headers, params=params, verify=not self.insecure) if response.status_code != 200: raise AttributeError("Get usage failed: %s code: %s" % (response.text, response.status_code)) else: return response.json() except ConnectionError as e: print(e)
def testGet300WithLocation(self): # Test the we automatically follow 300 redirects if a # Location: header is provided uri = urllib_parse.urljoin(base, "300/with-location-header.asis") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 300) self.assertEqual(response.previous.fromcache, False) # Confirm that the intermediate 300 is not cached (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 300) self.assertEqual(response.previous.fromcache, False)
def testGet301(self): # Test that we automatically follow 301 redirects # and that we cache the 301 response uri = urllib_parse.urljoin(base, "301/onestep.asis") destination = urllib_parse.urljoin(base, "302/final-destination.txt") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertTrue('content-location' in response) self.assertEqual(response['content-location'], destination) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 301) self.assertEqual(response.previous.fromcache, False) (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertEqual(response['content-location'], destination) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 301) self.assertEqual(response.previous.fromcache, True)
def testGet302RedirectionLimit(self): # Test that we can set a lower redirection limit # and that we raise an exception when we exceed # that limit. self.http.force_exception_to_status_code = False uri = urllib_parse.urljoin(base, "302/twostep.asis") try: (response, content) = self.http.request(uri, "GET", redirections=1) self.fail("This should not happen") except httplib2.RedirectLimit: pass except Exception: self.fail("Threw wrong kind of exception ") # Re-run the test with out the exceptions self.http.force_exception_to_status_code = True (response, content) = self.http.request(uri, "GET", redirections=1) self.assertEqual(response.status, 500) self.assertTrue(response.reason.startswith("Redirected more")) self.assertEqual("302", response['status']) self.assertTrue(content.startswith(b"<html>")) self.assertTrue(response.previous is not None)
def testGetIgnoreEtag(self): # Test that we can forcibly ignore ETags uri = urllib_parse.urljoin(base, "reflector/reflector.cgi") (response, content) = self.http.request(uri, "GET", headers={ 'accept-encoding': 'identity'}) self.assertNotEqual(response['etag'], "") (response, content) = self.http.request(uri, "GET", headers={ 'accept-encoding': 'identity', 'cache-control': 'max-age=0'}) d = self.reflector(content) self.assertTrue('HTTP_IF_NONE_MATCH' in d) self.http.ignore_etag = True (response, content) = self.http.request(uri, "GET", headers={ 'accept-encoding': 'identity', 'cache-control': 'max-age=0'}) d = self.reflector(content) self.assertEqual(response.fromcache, False) self.assertFalse('HTTP_IF_NONE_MATCH' in d)
def testGet307(self): # Test that we do follow 307 redirects but # do not cache the 307 uri = urllib_parse.urljoin(base, "307/onestep.asis") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 307) self.assertEqual(response.previous.fromcache, False) (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, True) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 307) self.assertEqual(response.previous.fromcache, False)
def testGetGZipFailure(self): # Test that we raise a good exception when the gzip fails self.http.force_exception_to_status_code = False uri = urllib_parse.urljoin(base, "gzip/failed-compression.asis") try: (response, content) = self.http.request(uri, "GET") self.fail("Should never reach here") except httplib2.FailedToDecompressContent: pass except Exception: self.fail("Threw wrong kind of exception") # Re-run the test with out the exceptions self.http.force_exception_to_status_code = True (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 500) self.assertTrue(response.reason.startswith("Content purported"))
def testGetDeflateFailure(self): # Test that we raise a good exception when the deflate fails self.http.force_exception_to_status_code = False uri = urllib_parse.urljoin(base, "deflate/failed-compression.asis") try: (response, content) = self.http.request(uri, "GET") self.fail("Should never reach here") except httplib2.FailedToDecompressContent: pass except Exception: self.fail("Threw wrong kind of exception") # Re-run the test with out the exceptions self.http.force_exception_to_status_code = True (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 500) self.assertTrue(response.reason.startswith("Content purported"))
def testGetCacheControlNoCache(self): # Test Cache-Control: no-cache on requests uri = urllib_parse.urljoin(base, "304/test_etag.txt") (response, content) = self.http.request( uri, "GET", headers={'accept-encoding': 'identity'}) self.assertNotEqual(response['etag'], "") (response, content) = self.http.request( uri, "GET", headers={'accept-encoding': 'identity'}) self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, True) (response, content) = self.http.request( uri, "GET", headers={ 'accept-encoding': 'identity', 'Cache-Control': 'no-cache'}) self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, False)
def testGetCacheControlPragmaNoCache(self): # Test Pragma: no-cache on requests uri = urllib_parse.urljoin(base, "304/test_etag.txt") (response, content) = self.http.request( uri, "GET", headers={'accept-encoding': 'identity'}) self.assertNotEqual(response['etag'], "") (response, content) = self.http.request( uri, "GET", headers={'accept-encoding': 'identity'}) self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, True) (response, content) = self.http.request( uri, "GET", headers={ 'accept-encoding': 'identity', 'Pragma': 'no-cache'}) self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, False)
def testBasicAuthTwoDifferentCredentials(self): # Test Basic Authentication with multiple sets of credentials uri = urllib_parse.urljoin(base, "basic2/file.txt") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 401) uri = urllib_parse.urljoin(base, "basic2/") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 401) self.http.add_credentials('fred', 'barney') (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) uri = urllib_parse.urljoin(base, "basic2/file.txt") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200)
def testDigestAuthNextNonceAndNC(self): # Test that if the server sets nextnonce that we reset # the nonce count back to 1 uri = urllib_parse.urljoin(base, "digest/file.txt") self.http.add_credentials('joe', 'password') (response, content) = self.http.request( uri, "GET", headers={"cache-control": "no-cache"}) info = httplib2._parse_www_authenticate( response, 'authentication-info') self.assertEqual(response.status, 200) (response, content) = self.http.request( uri, "GET", headers={"cache-control": "no-cache"}) info2 = httplib2._parse_www_authenticate( response, 'authentication-info') self.assertEqual(response.status, 200) if 'nextnonce' in info: self.assertEqual(info2['nc'], 1)
def testReflector(self): uri = urllib_parse.urljoin(base, "reflector/reflector.cgi") (response, content) = self.http.request(uri, "GET") d = self.reflector(content) self.assertTrue('HTTP_USER_AGENT' in d) # NOTE: disabled because this isn't relevant to the shim. # def testConnectionClose(self): # uri = "http://www.google.com/" # (response, content) = self.http.request(uri, "GET") # for c in self.http.connections.values(): # self.assertNotEqual(None, c.sock) # (response, content) = self.http.request( # uri, "GET", headers={"connection": "close"}) # for c in self.http.connections.values(): # self.assertEqual(None, c.sock)
def get_full_url(self, routename, **kwargs): """ Construct full URL using components from current bottle request, merged with get_url() For example: https://example.com/hello?world=1 XXX: Needs UT """ url = self.app.get_url(routename, **kwargs) return urljoin(self.base_url, url) ############################################################ # CBVs (class based views) ############################################################
def show(self, fmt="html", header_block=None, footer_block=None): """ Show the block in a browser. :param fmt: The format of the saved block. Supports the same output as `Block.save` :return: Path to the block file. """ file_name = str_base(hash(self._id)) + "." + fmt file_path = self.publish(os.path.expanduser(os.path.join(user_config["tmp_html_dir"], file_name)), header_block=header_block, footer_block=footer_block) try: url_base = user_config["public_dir"] except KeyError: path = os.path.expanduser(file_path) else: path = urljoin(url_base, os.path.expanduser(user_config["tmp_html_dir"] + "/" + file_name)) webbrowser.open_new_tab(path) return path
def request(self, method, path, allowed_statuses=None, **kwargs): from . import __version__ url = urljoin(self.base_url, path, '/') headers = kwargs.setdefault('headers', {}) headers['Authorization'] = 'Token token="{}"'.format(self.access_token) headers['User-Agent'] = 'python-percy/{}'.format(__version__) try: response = requests.request(method, url, **kwargs) self._debug_response(response) except Exception as ex: l.debug('%s %s -> Exception: %s: %s', method, url, ex.__class__.__name__, ex.args) raise if not allowed_statuses or response.status_code not in allowed_statuses: self._check_response_error(response) assert response.status_code < 300, (response.status_code, response.content) return response
def user_login(self, username, password): response = self._get_opener().open(CONF.dashboard.dashboard_url).read() # Grab the CSRF token and default region parser = HorizonHTMLParser() parser.feed(response) # construct login url for dashboard, discovery accommodates non-/ web # root for dashboard login_url = parse.urljoin(CONF.dashboard.dashboard_url, parser.login) # Prepare login form request req = request.Request(login_url) req.add_header('Content-type', 'application/x-www-form-urlencoded') req.add_header('Referer', CONF.dashboard.dashboard_url) # Pass the default domain name regardless of the auth version in order # to test the scenario of when horizon is running with keystone v3 params = {'username': username, 'password': password, 'region': parser.region, 'domain': CONF.auth.default_credentials_domain_name, 'csrfmiddlewaretoken': parser.csrf_token} self._get_opener().open(req, parse.urlencode(params))
def get_index(self): '''Get index page of periodic job and returns all links to jobs''' url = urljoin(self.config.log_url, self.args.job) res = get_html(url) if res is None or not res.ok: return [] body = res.content.decode() if res.content else '' hrefs = [HREF.search(l).group(1) for l in body.splitlines() if HREF.search(l)] links = ["/".join((url, link)) for link in hrefs if JOBRE.match(link)] if links: # Number of links to return return links[:NLINKS] else: return []
def _call(method, path, api_version=None, **kwargs): # type: (str, str, str, **Any) -> requests.Response url = urljoin(base_url, '/api/server/') url = urljoin(url, path) headers = kwargs.setdefault('headers', {}) if api_version is not None: headers['X-Cloak-API-Version'] = api_version elif default_api_version is not None: headers['X-Cloak-API-Version'] = default_api_version if method == 'GET': response = session.get(url, **kwargs) elif method == 'POST': response = session.post(url, **kwargs) else: raise NotImplementedError() if response.status_code not in xrange(200, 400): raise ServerApiError(response) return response
def log_curl_request(self, method, url, kwargs): curl = ['curl -i -X %s' % method] for (key, value) in kwargs['headers'].items(): header = '-H \'%s: %s\'' % self._process_header(key, value) curl.append(header) if not self.session.verify: curl.append('-k') elif isinstance(self.session.verify, six.string_types): curl.append('--cacert %s' % self.session.verify) if self.session.cert: curl.append('--cert %s' % self.session.cert[0]) curl.append('--key %s' % self.session.cert[1]) if 'body' in kwargs: body = strutils.mask_password(kwargs['body']) curl.append('-d \'%s\'' % body) curl.append(urlparse.urljoin(self.endpoint_trimmed, url)) LOG.debug(' '.join(curl))
def resource_type_dict(cls, request=None): """ Return a ``dict`` containing ResourceType metadata for the user object. """ id_ = cls.resource_type path = reverse('scim:resource-types', kwargs={'uuid': id_}) location = urljoin(get_base_scim_location_getter()(request), path) return { 'schemas': [constants.SchemaURI.RESOURCE_TYPE], 'id': id_, 'name': 'User', 'endpoint': reverse('scim:users'), 'description': 'User Account', 'schema': constants.SchemaURI.USER, 'meta': { 'location': location, 'resourceType': 'ResourceType' } }
def resource_type_dict(cls, request=None): """ Return a ``dict`` containing ResourceType metadata for the group object. """ id_ = cls.resource_type path = reverse('scim:resource-types', kwargs={'uuid': id_}) location = urljoin(get_base_scim_location_getter()(request), path) return { 'schemas': [constants.SchemaURI.RESOURCE_TYPE], 'id': id_, 'name': 'Group', 'endpoint': reverse('scim:groups'), 'description': 'Group', 'schema': constants.SchemaURI.GROUP, 'meta': { 'location': location, 'resourceType': 'ResourceType' } }
def __init__(self, url, api_key, project_group, board): self.url = url self.cookie = get_cookie(url, api_key=api_key) self.client = SFStoryboard(urljoin(url, "storyboard_api"), self.cookie) try: self.project_group = self.client.project_groups.find( name=project_group) except exceptions.NotFound: raise Exception('projects group not found') self.stories = self.client.stories.get_all( project_group_id=self.project_group.id) self.board_id = None if board: try: self.board_id = self.client.boards.find(title=board).id except exceptions.NotFound: raise Exception('board not found') self.board_lanes = {} for lane in self.client.worklists.get_all(board_id=self.board_id): if not lane.archived and lane.title in LANES: self.board_lanes[lane.title] = lane
def __network_ping(self): try: repourl = urljoin(self.get_depot_url(), "versions/0") # Disable SSL peer verification, we just want to check # if the depot is running. url = urlopen(repourl, context=ssl._create_unverified_context()) url.close() except HTTPError as e: # Server returns NOT_MODIFIED if catalog is up # to date if e.code == http_client.NOT_MODIFIED: return True else: return False except URLError as e: return False return True
def test_bug_5366(self): """Publish a package with slashes in the name, and then verify that the depot manifest and info operations work regardless of the encoding.""" depot_url = self.dc.get_depot_url() plist = self.pkgsend_bulk(depot_url, self.system10) # First, try it un-encoded. repourl = urljoin(depot_url, "info/0/{0}".format(plist[0])) urlopen(repourl) repourl = urljoin(depot_url, "manifest/0/{0}".format( plist[0])) urlopen(repourl) # Second, try it encoded. repourl = urljoin(depot_url, "info/0/{0}".format( quote(plist[0]))) urlopen(repourl) repourl = urljoin(depot_url, "manifest/0/{0}".format( quote(plist[0]))) urlopen(repourl)
def test_bug_15482(self): """Test to make sure BUI search doesn't trigger a traceback.""" # Now update the repository configuration while the depot is # stopped so changes won't be overwritten on exit. self.__update_repo_config() # Start the depot. self.dc.start() # Then, publish some packages we can abuse for testing. durl = self.dc.get_depot_url() self.pkgsend_bulk(durl, self.quux10, refresh_index=True) surl = urljoin(durl, "en/search.shtml?action=Search&token=*") urlopen(surl).read() surl = urljoin(durl, "en/advanced_search.shtml?action=Search&token=*") urlopen(surl).read() surl = urljoin(durl, "en/advanced_search.shtml?token=*&show=a&rpp=50&" "action=Advanced+Search") urlopen(surl).read()
def _network_ping(self): try: # Ping the versions URL, rather than the default / # so that we don't initialize the BUI code yet. repourl = urljoin(self.url, "versions/0") # Disable SSL peer verification, we just want to check # if the depot is running. urlopen(repourl, context=ssl._create_unverified_context()) except HTTPError as e: if e.code == http_client.FORBIDDEN: return True return False except URLError: return False return True
def _lookup_torrent_url_fn(): """Load a "fetcher" func to get the right torrent URL. """ if CONF.xenserver.torrent_base_url: if '/' not in CONF.xenserver.torrent_base_url: LOG.warning(_LW('Value specified in conf file for' ' xenserver.torrent_base_url does not contain a' ' slash character, therefore it will not be used' ' as part of the torrent URL. Specify a valid' ' base URL as defined by RFC 1808 (see step 6).')) def _default_torrent_url_fn(image_id): return urlparse.urljoin(CONF.xenserver.torrent_base_url, "%s.torrent" % image_id) return _default_torrent_url_fn raise RuntimeError(_('Cannot create default bittorrent URL' ' without xenserver.torrent_base_url' ' configuration option set.'))
def get_request_params(self, api_url): """Returns a dict of requests.request params that will send payload to the ESP. :param api_url: the base api_url for the backend :return: dict """ api_endpoint = self.get_api_endpoint() if api_endpoint is not None: url = urljoin(api_url, api_endpoint) else: url = api_url return dict( method=self.method, url=url, params=self.params, data=self.serialize_data(), headers=self.headers, files=self.files, auth=self.auth, # json= is not here, because we prefer to do our own serialization # to provide extra context in error messages )
def do_GET(self): url_map = {instance.url: instance for instance in self.server.test_instances} if self.path == self.ticket_path: self.send_response(200) self.end_headers() urls = [ { "url": urljoin(SERVER_URL, test_instance.url), "headers": test_instance.headers } for test_instance in self.server.test_instances] ticket = {"htsget": {"urls": urls}} self.wfile.write(json.dumps(ticket).encode()) elif self.path in url_map: instance = url_map[self.path] if instance.error_code is not None: self.send_error(instance.error_code) else: self.send_response(200) self.send_header("Content-Length", len(instance.data)) if instance.truncate: self.end_headers() self.wfile.write(instance.data[:-1]) self.wfile.flush() else: self.end_headers() self.wfile.write(instance.data) else: self.send_error(404)
def _get_endpoint_url(request, endpoint_type, catalog=None): if getattr(request.user, "service_catalog", None): url = base.url_for(request, service_type='identity', endpoint_type=endpoint_type) else: auth_url = getattr(settings, 'OPENSTACK_KEYSTONE_URL') url = request.session.get('region_endpoint', auth_url) # TODO(gabriel): When the Service Catalog no longer contains API versions # in the endpoints this can be removed. url = url.rstrip('/') url = urlparse.urljoin(url, 'v%s' % VERSIONS.active) return url
def resolve_service_href(self, href): absolute_href = urljoin(self.api.url, href) # The first replace remove the root of ovirt location # The second replace is to remove the first / in the path # The str ensure that service_path is a str, not a unicode in python 2 service_path = str(absolute_href.replace(self.api.url, "").replace("/", "", 1)) new_service = self.api.service(service_path) return new_service
def http_request(self, method, path, data=None, params=None): """ Wraps HTTP calls to ThrestStack API """ s = Session() url = urljoin(self.BASE_URL, path) headers = {"Authorization": self.api_key} if self.org_id: headers[self.org_id_header] = self.org_id req = Request( method, url, headers=headers, data=data, params=params ) prepped = req.prepare() resp = s.send(prepped, timeout=self.timeout) if resp.status_code == 429: raise errors.APIRateLimitError("Threat Stack API rate limit exceeded") else: return self.handle_response(resp)