我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib.parse.urlparse()。
def validate_config(self, organization, config, actor=None): if config.get('url'): client = self.get_client(actor) # parse out the repo name and the instance parts = urlparse(config['url']) instance = parts.netloc name = parts.path.rsplit('_git/', 1)[-1] project = config.get('project') or name try: repo = client.get_repo(instance, name, project) except Exception as e: self.raise_error(e, identity=client.auth) config.update({ 'instance': instance, 'project': project, 'name': repo['name'], 'external_id': six.text_type(repo['id']), 'url': repo['_links']['web']['href'], }) return config
def _connect_request(self, vdi_ref): # request connection to xapi url service for VDI export try: # create task for VDI export label = 'VDI_EXPORT_for_' + self.instance['name'] desc = 'Exporting VDI for instance: %s' % self.instance['name'] self.task_ref = self.session.task.create(label, desc) LOG.debug("task_ref is %s" % self.task_ref) # connect to XS xs_url = urlparse.urlparse(self.host_url) if xs_url.scheme == 'http': conn = httplib.HTTPConnection(xs_url.netloc) LOG.debug("using http") elif xs_url.scheme == 'https': conn = httplib.HTTPSConnection(xs_url.netloc) LOG.debug("using https") vdi_export_path = utils.get_vdi_export_path( self.session, self.task_ref, vdi_ref) conn.request('GET', vdi_export_path) conn_resp = conn.getresponse() except Exception: LOG.debug('request connect for vdi export failed') raise return conn_resp
def make_temp_file_path_from_url(temp_dir_path, url): try: url_path = urlparse(url).path except AttributeError: raise InvalidFilePathError("url must be a string") if typepy.is_null_string(url_path): raise InvalidFilePathError("invalid URL path: {}".format(url_path)) temp_name = os.path.basename(url_path.rstrip("/")) if typepy.is_null_string(temp_name): temp_name = pathvalidate.replace_symbol( temp_name, replacement_text="_") if typepy.is_null_string(temp_name): raise InvalidFilePathError("invalid URL: {}".format(url)) try: return posixpath.join(temp_dir_path, temp_name) except (TypeError, AttributeError): raise InvalidFilePathError("temp_dir_path must be a string")
def get_filename(self, task, default_ext): """Set the path where the image will be saved. The default strategy is to use an increasing 6-digit number as the filename. You can override this method if you want to set custom naming rules. The file extension is kept if it can be obtained from the url, otherwise ``default_ext`` is used as extension. Args: task (dict): The task dict got from ``task_queue``. Output: Filename with extension. """ url_path = urlparse(task['file_url'])[2] extension = url_path.split('.')[-1] if '.' in url_path else default_ext file_idx = self.fetched_num + self.file_idx_offset return '{:06d}.{}'.format(file_idx, extension)
def do_GET(self): self.send_response(200) self.send_header('Content-Type', 'text/plain') self.end_headers() parts = urlparse(self.path) query = parse_qs(parts.query) result = {} for k, v in query.items(): if len(v) == 1: result[k] = v[0] elif len(v) == 0: result[k] = None else: result[k] = v self.server.callback_result = result self.wfile.write(b'You can close this window now')
def sort_url_by_query_keys(url): """A helper function which sorts the keys of the query string of a url. For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10' returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to prevent non-deterministic ordering of the query string causing problems with unit tests. :param url: url which will be ordered by query keys :returns url: url with ordered query keys """ parsed = urlparse.urlparse(url) queries = urlparse.parse_qsl(parsed.query, True) sorted_query = sorted(queries, key=lambda x: x[0]) encoded_sorted_query = urlparse.urlencode(sorted_query, True) url_parts = (parsed.scheme, parsed.netloc, parsed.path, parsed.params, encoded_sorted_query, parsed.fragment) return urlparse.urlunparse(url_parts)
def strip_version(endpoint): """Strip version from the last component of endpoint if present.""" # NOTE(flaper87): This shouldn't be necessary if # we make endpoint the first argument. However, we # can't do that just yet because we need to keep # backwards compatibility. if not isinstance(endpoint, six.string_types): raise ValueError("Expected endpoint") version = None # Get rid of trailing '/' if present endpoint = endpoint.rstrip('/') url_parts = parse.urlparse(endpoint) (scheme, netloc, path, __, __, __) = url_parts path = path.lstrip('/') # regex to match 'v1' or 'v2.0' etc if re.match('v\d+\.?\d*', path): version = float(path.lstrip('v')) endpoint = scheme + '://' + netloc return endpoint, version
def validate_link(self, link, bookmark=False): """Checks if the given link can get correct data.""" # removes the scheme and net location parts of the link url_parts = list(urlparse.urlparse(link)) url_parts[0] = url_parts[1] = '' # bookmark link should not have the version in the URL if bookmark and url_parts[2].startswith(PATH_PREFIX): return False full_path = urlparse.urlunparse(url_parts) try: self.get_json(full_path, path_prefix='') return True except Exception: return False
def get_contents_if_file(contents_or_file_name): """Get the contents of a file. If the value passed in is a file name or file URI, return the contents. If not, or there is an error reading the file contents, return the value passed in as the contents. For example, a workflow definition will be returned if either the workflow definition file name, or file URI are passed in, or the actual workflow definition itself is passed in. """ try: if parse.urlparse(contents_or_file_name).scheme: definition_url = contents_or_file_name else: path = os.path.abspath(contents_or_file_name) definition_url = parse.urljoin( 'file:', request.pathname2url(path) ) return request.urlopen(definition_url).read().decode('utf8') except Exception: return contents_or_file_name
def __init__(self, requirement_string): try: req = REQUIREMENT.parseString(requirement_string) except ParseException as e: raise InvalidRequirement( "Invalid requirement, parse error at \"{0!r}\"".format( requirement_string[e.loc:e.loc + 8])) self.name = req.name if req.url: parsed_url = urlparse.urlparse(req.url) if not (parsed_url.scheme and parsed_url.netloc) or ( not parsed_url.scheme and not parsed_url.netloc): raise InvalidRequirement("Invalid URL given") self.url = req.url else: self.url = None self.extras = set(req.extras.asList() if req.extras else []) self.specifier = SpecifierSet(req.specifier) self.marker = req.marker if req.marker else None
def __init__(self, url, protocols=None, agent=None): self.url = url self.protocols = protocols or [] self.agent = agent or constants.USER_AGENT self._headers = [] _url = urlparse(url) self.scheme = _url.scheme host, _, port = _url.netloc.partition(':') if not port: port = '443' if self.scheme == 'wss' else '80' if not port.isdigit(): raise ValueError('illegal port value') port = int(port) self.host = host self.port = port self._host_port = "{}:{}".format(host, port) self.resource = _url.path or '/' if _url.query: self.resource = "{}?{}".format(self.resource, _url.query) self.state = self.State()
def test_redirects_without_credentials(self): request = self.factory.get('/test') request.session = self.session @decorators.oauth_required def test_view(request): return http.HttpResponse('test') # pragma: NO COVER response = test_view(request) self.assertIsInstance(response, http.HttpResponseRedirect) self.assertEqual(parse.urlparse(response['Location']).path, '/oauth2/oauth2authorize/') self.assertIn( 'return_url=%2Ftest', parse.urlparse(response['Location']).query) self.assertEqual(response.status_code, http.HttpResponseRedirect.status_code)
def extract_image_url(text): text = _strip_url(text) imgurl = None if text: # check if the text is style content match = _CSS_IMAGERE.search(text) text = match.groups()[0] if match else text parsed = urlparse(text) path = None match = _IMAGE_PATH_RE.search(parsed.path) if match: path = match.group() elif parsed.query: match = _GENERIC_PATH_RE.search(parsed.path) if match: path = match.group() if path is not None: parsed = list(parsed) parsed[2] = path imgurl = urlunparse(parsed) if not imgurl: imgurl = text return imgurl
def _fix_esx_url(url, host, port): """Fix netloc in the case of an ESX host. In the case of an ESX host, the netloc is set to '*' in the URL returned in HttpNfcLeaseInfo. It should be replaced with host name or IP address. """ urlp = urlparse.urlparse(url) if urlp.netloc == '*': scheme, netloc, path, params, query, fragment = urlp if netutils.is_valid_ipv6(host): netloc = '[%s]:%d' % (host, port) else: netloc = "%s:%d" % (host, port) url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) return url
def _decorate_request(self, filters, method, url, headers=None, body=None, auth_data=None): if auth_data is None: auth_data = self.auth_data token, _ = auth_data #base_url = self.base_url(filters=filters, auth_data=auth_data) base_url = url # build authenticated request # returns new request, it does not touch the original values _headers = copy.deepcopy(headers) if headers is not None else {} _headers['X-Auth-Token'] = str(token) if url is None or url == "": _url = base_url else: # Join base URL and url, and remove multiple contiguous slashes _url = "/".join([base_url, url]) parts = [x for x in urlparse.urlparse(_url)] parts[2] = re.sub("/{2,}", "/", parts[2]) _url = urlparse.urlunparse(parts) # no change to method or body return str(_url), _headers, body
def visit_ls(self, node, children): path = urlparse(self.context_override.url).path path = filter(None, path.split('/')) nodes = self.context.root.ls(*path) if self.output.isatty(): names = [] for node in nodes: token_type = String if node.data.get('type') == 'dir' else Name name = self._colorize(node.name, token_type) names.append(name) lines = list(colformat(list(names))) else: lines = [n.name for n in nodes] if lines: self.output.write('\n'.join(lines)) return node
def _drop_db(self): addr = urlparse.urlparse(self.url) database = addr.path.strip('/') loc_pieces = addr.netloc.split('@') host = loc_pieces[1] auth_pieces = loc_pieces[0].split(':') user = auth_pieces[0] password = "" if len(auth_pieces) > 1: if auth_pieces[1].strip(): password = "-p\"%s\"" % auth_pieces[1] sql = ("drop database if exists %(database)s; create " "database %(database)s;") % {'database': database} cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s " "-e \"%(sql)s\"") % {'user': user, 'password': password, 'host': host, 'sql': sql} execute_cmd(cmd)
def get_connection_params(endpoint, **kwargs): parts = urlparse.urlparse(endpoint) # trim API version and trailing slash from endpoint path = parts.path path = path.rstrip('/').rstrip(API_VERSION) _args = (parts.hostname, parts.port, path) _kwargs = {'timeout': (float(kwargs.get('timeout')) if kwargs.get('timeout') else 600)} if parts.scheme == 'https': _class = VerifiedHTTPSConnection _kwargs['ca_file'] = kwargs.get('ca_file', None) _kwargs['cert_file'] = kwargs.get('cert_file', None) _kwargs['key_file'] = kwargs.get('key_file', None) _kwargs['insecure'] = kwargs.get('insecure', False) elif parts.scheme == 'http': _class = six.moves.http_client.HTTPConnection else: msg = 'Unsupported scheme: %s' % parts.scheme raise exceptions.EndpointException(msg) return (_class, _args, _kwargs)
def test_absolute_url(self): """ Does the server return absolute URIs? Relies on the root returing a ``self`` object. """ response = requests.get(self.root_url) resource = response.json().get('href') assert resource is not None href = urlparse(resource) # Test in decreasing order of likely correctness: # Path => Domain => Scheme assert href.path == '/' # Get the origin name, minus the scheme. assert href.netloc == urlparse(self.origin).netloc assert href.scheme == 'http' resource = response.json().get('buckets').get('1.0') href = urlparse(resource) assert href.path.endswith('buckets/1.0')
def add_capability(self, name, uri): """ Add a capability of the RETS board :param name: The name of the capability :param uri: The capability URI given by the RETS board :return: None """ parse_results = urlparse(uri) if parse_results.hostname is None: # relative URL given, so build this into an absolute URL login_url = self.capabilities.get('Login') if not login_url: logger.error("There is no login URL stored, so additional capabilities cannot be added.") raise ValueError("Cannot automatically determine absolute path for {0!s} given.".format(uri)) parts = urlparse(login_url) uri = parts.scheme + '://' + parts.hostname + '/' + uri.lstrip('/') self.capabilities[name] = uri
def _get_proxy_config(self): """ Determine if self._url should be passed through a proxy. If so, return the appropriate proxy_server and proxy_port. Assumes https_proxy is used when ssl_enabled=True. """ proxy_server = None proxy_port = None ssl_enabled = self._ssl_enabled if ssl_enabled: proxy = os.environ.get("https_proxy") else: proxy = os.environ.get("http_proxy") no_proxy = os.environ.get("no_proxy") no_proxy_url = no_proxy and self._server in no_proxy if proxy and not no_proxy_url: p = urlparse(proxy) proxy_server = p.hostname proxy_port = p.port return proxy_server, proxy_port
def __init__(self, base_url='https://hydro1.gesdisc.eosdis.nasa.gov/data/NLDAS/', **layout_kwargs): self.base_url = base_url self.selected_url = None if 'min_width' not in layout_kwargs: layout_kwargs['min_width'] = '30%' self.label_layout = Layout(**layout_kwargs) dd = widgets.Select( options=self.get_links( base_url, href_filter=self.dir_and_not_data, ), description='', #urlparse(base_url).path, ) dd.observe(partial(self.on_value_change, url=self.base_url), names='value') lbl = widgets.Label(urlparse(self.base_url).path, layout=self.label_layout) hbox = widgets.HBox([lbl, dd]) self.elts = [hbox, lbl, dd] display(hbox)
def on_value_change(self, change, url): next_url = change['new'] if next_url is None: # 'Select...' chosen return if next_url.endswith('.grb'): # File reached return self.select_url(next_url) [w.close() for w in self.elts] links = self.get_links(next_url, href_filter=(self.dir_and_not_data if next_url == self.base_url else self.dir_or_grib)) if not links: return next_dd = widgets.Select( options=links, description='', #urlparse(url).path, ) next_dd.observe(partial(self.on_value_change, url=next_url), names='value') lbl = widgets.Label(urlparse(next_url).path, layout=self.label_layout) hbox = widgets.HBox([lbl, next_dd]) self.elts = [hbox, lbl, next_dd] display(hbox)
def validate_href(self, image_href): path = urlparse.urlparse(image_href).path.lstrip('/') if not path: raise exception.ImageRefValidationFailed( _("No path specified in swift resource reference: %s. " "Reference must be like swift:container/path") % str(image_href)) container, s, object = path.partition('/') try: headers = self.client.head_object(container, object) except exception.SwiftOperationError as e: raise exception.ImageRefValidationFailed( _("Cannot fetch %(url)s resource. %(exc)s") % dict(url=str(image_href), exc=str(e))) return (container, object, headers)
def validate_href(self, image_href): path = urlparse.urlparse(image_href).path.lstrip('/') if not path: raise exception.InvalidParameterValue( _("No path specified in rsync resource reference: %s. " "Reference must be like rsync:host::module/path") % str(image_href)) try: stdout, stderr = utils.execute( 'rsync', '--stats', '--dry-run', path, ".", check_exit_code=[0], log_errors=processutils.LOG_ALL_ERRORS) return path, stdout, stderr except (processutils.ProcessExecutionError, OSError) as ex: raise exception.ImageRefValidationFailed( _("Cannot fetch %(url)s resource. %(exc)s") % dict(url=str(image_href), exc=str(ex)))
def get_glance_image_uuid_name(task, url): """Converting glance links. Links like: glance:name glance://name glance:uuid glance://uuid name uuid are converted to tuple uuid, name """ urlobj = urlparse.urlparse(url) if urlobj.scheme and urlobj.scheme != 'glance': raise exception.InvalidImageRef("Only glance images are supported.") path = urlobj.path or urlobj.netloc img_info = _get_glanceclient(task.context).show(path) return img_info['id'], img_info['name']
def url_download_raw_secured(context, node, url): """Download raw contents of the URL bypassing cache and temp files.""" if not url: return url = url.rstrip('/') url = append_storage_prefix(node, url) scheme = parse.urlparse(url).scheme sources_with_tenant_isolation = ('glance', 'swift') if scheme not in sources_with_tenant_isolation: raise bareon_exception.UnsafeUrlError( url=url, details="Use one of the following storages " "for this resource: %s" % str(sources_with_tenant_isolation)) resource_storage = image_service.get_image_service( url, context=context) out = StringIO.StringIO() try: resource_storage.download(url, out) return out.getvalue() finally: out.close()
def __init__(self, **kw): super(FollowAllSpider, self).__init__(**kw) url = 'http://localhost/books.toscrape.com/index.html' if not url.startswith('http://') and not url.startswith('https://'): url = 'http://%s/' % url self.url = url self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)] self.link_extractor = LinkExtractor() self.cookies_seen = set() self.previtem = 0 self.items = 0 self.timesec = datetime.datetime.utcnow()
def params(arg): """ Parse `arg` and returns a dict describing the TPDA arguments. Resolution order: - `arg` is a string is parsed as an URL - `arg` has an items() method and is parsed (works for multidicts, ..) - `arg` is a iterable is parsed as a ((name, value), (name, value), ..) """ if isinstance(arg, str): pr = parse.urlparse(arg) if not pr.query: raise exceptions.IncompleteURI return Params(parse.parse_qsl(pr.query)).params() elif hasattr(arg, 'items'): return Params(arg.items()).params() elif hasattr(arg, '__iter__'): return Params(arg).params()
def media_for(hash_key, scheme=None, style=None): if not hash_key: return None url = hash_key.split('!')[0] up = urlparse(url) hash_domain = up.hostname if hash_domain and hash_domain not in Config.QINIU_DOMAINS: if hash_domain == 'wx.qlogo.cn': hash_key = hash_key.replace('http://', 'https://') return hash_key _hash = up.path if len(_hash) != 0 and _hash[0] == '/': _hash = _hash[1:] media_host = Config.QINIU_HOST url = '%s/%s' % (media_host, _hash) if url.endswith('.amr'): url = '%s.mp3' % url[:-4] if url and style: url = '%s!%s' % (url, style) return url
def create_temp_url(swift_client, name, timeout, container=None): container = container or '%(name)s-%(uuid)s' % { 'name': name, 'uuid': uuid.uuid4()} object_name = str(uuid.uuid4()) swift_client.put_container(container) key_header = 'x-account-meta-temp-url-key' if key_header not in swift_client.head_account(): swift_client.post_account({ key_header: six.text_type(uuid.uuid4())[:32]}) key = swift_client.head_account()[key_header] project_path = swift_client.url.split('/')[-1] path = '/v1/%s/%s/%s' % (project_path, container, object_name) timeout_secs = timeout * 60 tempurl = swiftclient_utils.generate_temp_url(path, timeout_secs, key, 'PUT') sw_url = urlparse.urlparse(swift_client.url) put_url = '%s://%s%s' % (sw_url.scheme, sw_url.netloc, tempurl) swift_client.put_object(container, object_name, '') return put_url
def _pagination(self, collection, path, **params): if params.get('page_reverse', False): linkrel = 'previous' else: linkrel = 'next' next = True while next: res = self.get(path, params=params) yield res next = False try: for link in res['%s_links' % collection]: if link['rel'] == linkrel: query_str = urlparse.urlparse(link['href']).query params = urlparse.parse_qs(query_str) next = True break except KeyError: break
def _get_node_ip_heartbeat(task): callback_url = task.node.driver_internal_info.get('agent_url', '') return urlparse.urlparse(callback_url).netloc.split(':')[0]
def _parse_root_device_hints(node): """Convert string with hints to dict. """ root_device = node.properties.get('root_device') if not root_device: return {} try: parsed_hints = irlib_utils.parse_root_device_hints(root_device) except ValueError as e: raise exception.InvalidParameterValue( _('Failed to validate the root device hints for node %(node)s. ' 'Error: %(error)s') % {'node': node.uuid, 'error': e}) root_device_hints = {} advanced = {} for hint, value in parsed_hints.items(): if isinstance(value, six.string_types): if value.startswith('== '): root_device_hints[hint] = int(value[3:]) elif value.startswith('s== '): root_device_hints[hint] = urlparse.unquote(value[4:]) else: advanced[hint] = value else: root_device_hints[hint] = value if advanced: raise exception.InvalidParameterValue( _('Ansible-deploy does not support advanced root device hints ' 'based on oslo.utils operators. ' 'Present advanced hints for node %(node)s are %(hints)s.') % { 'node': node.uuid, 'hints': advanced}) return root_device_hints
def _prepare_variables(task): node = task.node i_info = node.instance_info image = {} for i_key, i_value in i_info.items(): if i_key.startswith('image_'): image[i_key[6:]] = i_value image['mem_req'] = _calculate_memory_req(task) checksum = image.get('checksum') if checksum: # NOTE(pas-ha) checksum can be in <algo>:<checksum> format # as supported by various Ansible modules, mostly good for # standalone Ironic case when instance_info is populated manually. # With no <algo> we take that instance_info is populated from Glance, # where API reports checksum as MD5 always. if ':' not in checksum: image['checksum'] = 'md5:%s' % checksum _add_ssl_image_options(image) variables = {'image': image} configdrive = i_info.get('configdrive') if configdrive: if urlparse.urlparse(configdrive).scheme in ('http', 'https'): cfgdrv_type = 'url' cfgdrv_location = configdrive else: cfgdrv_location = _get_configdrive_path(node.uuid) with open(cfgdrv_location, 'w') as f: f.write(configdrive) cfgdrv_type = 'file' variables['configdrive'] = {'type': cfgdrv_type, 'location': cfgdrv_location} root_device_hints = _parse_root_device_hints(node) if root_device_hints: variables['root_device_hints'] = root_device_hints return variables
def get_addon_host_ident(): ident = urlparse(options.get('system.url-prefix')).hostname if ident in ('localhost', '127.0.0.1', None, ''): return 'app.dev.getsentry.com' return ident
def base_url(url): result = urlparse(url) return '%s://%s' % (result.scheme, result.netloc)
def from_request(cls, request): host = urlparse(request.url).netloc return cls('Unable to reach host: {}'.format(host))
def get_decorator(placebo): url = placebo._get_url() if isinstance(url, six.string_types): url = parse.urlparse(url) # TODO should we remove empty parts of url? match_kwargs = { 'scheme': url.scheme, 'netloc': url.netloc, 'path': url.path, 'method': placebo._get_method(), 'query': url.query } match_kwargs = {k: v for k, v in match_kwargs.items() if v} @httmock.urlmatch(**match_kwargs) def mock_response(url, request): # Convert parse result type from SplitResult to ParseResult url = parse.urlparse(url.geturl()) # if body is empty httmock returns None # but we want ot to be always string. body = request.body or '' headers = request.headers return {'status_code': placebo._get_status(url, headers, body), 'content': placebo._get_body(url, headers, body), 'headers': placebo._get_headers(url, headers, body)} return httmock.with_httmock(mock_response)
def __init__(self, url, headers, body): if isinstance(url, (parse.ParseResult, parse.SplitResult)): self.url = url.geturl() self.parsed_url = url elif isinstance(url, six.string_types): self.url = url self.parsed_url = parse.urlparse(url) self.headers = headers self.body = body