我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib_parse.urlparse()。
def wsgi_app(similarities, fragments, pharmacophores, external_url='http://localhost:8084/kripo'): """Create wsgi app Args: similarities (SimilarityMatrix): Similarity matrix to use in webservice fragments (FragmentsDb): Fragment database filename pharmacophores: Filename of pharmacophores hdf5 file external_url (str): URL which should be used in Swagger spec Returns: connexion.App """ app = connexion.App(__name__) url = urlparse(external_url) swagger_file = resource_filename(__name__, 'swagger.yaml') app.app.json_encoder = KripodbJSONEncoder app.app.config['similarities'] = similarities app.app.config['fragments'] = fragments app.app.config['pharmacophores'] = pharmacophores arguments = {'hostport': url.netloc, 'scheme': url.scheme, 'version': __version__} # Keep validate_responses turned off, because of conflict with connexion.problem # see https://github.com/zalando/connexion/issues/266 app.add_api(swagger_file, base_path=url.path, arguments=arguments) return app
def _idna_encode(self, value): parsed = urllib_parse.urlparse(value) if parsed.port: netloc = ( idna.encode(parsed.hostname) + ":{0}".format(parsed.port).encode("ascii") ).decode("ascii") else: netloc = idna.encode(parsed.hostname).decode("ascii") # Note that building a URL in this fashion means it should be # semantically indistinguishable from the original but is not # guaranteed to be exactly the same. return urllib_parse.urlunparse(( parsed.scheme, netloc, parsed.path, parsed.params, parsed.query, parsed.fragment ))
def do_start(self): start_url = self.backend.start().url # Modify the start URL to make the SAML request consistent # from test to test: start_url = self.modify_start_url(start_url) # If the SAML Identity Provider recognizes the user, we will # be redirected back to: return_url = self.backend.redirect_uri self.install_http_intercepts(start_url, return_url) response = requests.get(start_url) self.assertTrue(response.url.startswith(return_url)) self.assertEqual(response.text, 'foobar') query_values = dict((k, v[0]) for k, v in parse_qs(urlparse(response.url).query).items()) self.assertNotIn(' ', query_values['SAMLResponse']) self.strategy.set_request_data(query_values, self.backend) return self.backend.complete()
def modify_start_url(self, start_url): """ Given a SAML redirect URL, parse it and change the ID to a consistent value, so the request is always identical. """ # Parse the SAML Request URL to get the XML being sent to TestShib url_parts = urlparse(start_url) query = dict((k, v[0]) for (k, v) in parse_qs(url_parts.query).items()) xml = OneLogin_Saml2_Utils.decode_base64_and_inflate( query['SAMLRequest'] ) # Modify the XML: xml = xml.decode() xml, changed = re.subn(r'ID="[^"]+"', 'ID="TEST_ID"', xml) self.assertEqual(changed, 1) # Update the URL to use the modified query string: query['SAMLRequest'] = OneLogin_Saml2_Utils.deflate_and_base64_encode( xml ) url_parts = list(url_parts) url_parts[4] = urlencode(query) return urlunparse(url_parts)
def handle_state(self, start_url, target_url): start_query = parse_qs(urlparse(start_url).query) redirect_uri = start_query.get('redirect_uri') if getattr(self.backend, 'STATE_PARAMETER', False): if start_query.get('state'): target_url = url_add_parameters(target_url, { 'state': start_query['state'] }) if redirect_uri and getattr(self.backend, 'REDIRECT_STATE', False): redirect_query = parse_qs(urlparse(redirect_uri).query) if redirect_query.get('redirect_state'): target_url = url_add_parameters(target_url, { 'redirect_state': redirect_query['redirect_state'] }) return target_url
def __init__(self, raw_json): self.headers = raw_json['headers'] self.method = raw_json['method'] self.body = raw_json['body'] self.url = raw_json['url'] self.ip = raw_json['remote_addr'] components = urlparse(self.url) self.path = components.path self.host = components.hostname self.scheme = components.scheme self.query = components.query self.port = components.port self.fragment = components.fragment self.params = components.params self.netloc = components.netloc
def allowed_token(self, token, token_type): if "data" in token: attrs = dict([(name, val) for name, val in token["data"][::-1] if name in self.allowed_attributes]) for attr in self.attr_val_is_uri: if attr not in attrs: continue val_unescaped = re.sub("[`\000-\040\177-\240\s]+", '', unescape(attrs[attr])).lower() # remove replacement characters from unescaped characters val_unescaped = val_unescaped.replace("\ufffd", "") try: uri = urlparse.urlparse(val_unescaped) except ValueError: uri = None del attrs[attr] if uri and uri.scheme: if uri.scheme not in self.allowed_protocols: del attrs[attr] if uri.scheme == 'data': m = content_type_rgx.match(uri.path) if not m: del attrs[attr] elif m.group('content_type') not in self.allowed_content_types: del attrs[attr] for attr in self.svg_attr_val_allows_ref: if attr in attrs: attrs[attr] = re.sub(r'url\s*\(\s*[^#\s][^)]+?\)', ' ', unescape(attrs[attr])) if (token["name"] in self.svg_allow_local_href and 'xlink:href' in attrs and re.search('^\s*[^#\s].*', attrs['xlink:href'])): del attrs['xlink:href'] if 'style' in attrs: attrs['style'] = self.sanitize_css(attrs['style']) token["data"] = [[name, val] for name, val in list(attrs.items())] return token
def __init__(self, value): if not isinstance(value, six.text_type): raise TypeError("value must be a unicode string") parsed = urllib_parse.urlparse(value) if not parsed.hostname: netloc = "" elif parsed.port: netloc = ( idna.encode(parsed.hostname) + ":{0}".format(parsed.port).encode("ascii") ).decode("ascii") else: netloc = idna.encode(parsed.hostname).decode("ascii") # Note that building a URL in this fashion means it should be # semantically indistinguishable from the original but is not # guaranteed to be exactly the same. uri = urllib_parse.urlunparse(( parsed.scheme, netloc, parsed.path, parsed.params, parsed.query, parsed.fragment )).encode("ascii") self._value = value self._encoded = uri
def skip_format_url(format_, url): """ Checks whether a give format/url should be skipped and not downloaded. @param format_: Filename format (extension). @type format_: str (e.g. html, txt, zip, pdf) @param url: URL. @type url: str @return: True if format/url should be skipped, False otherwise. @rtype bool """ # Do not download empty formats if format_ == '': return True # Do not download email addresses if ('mailto:' in url) and ('@' in url): return True # Is this localhost? parsed = urlparse(url) if parsed.hostname == 'localhost': return True # These are trusted manually added formats, do not skip them if RE_VALID_FORMATS.match(format_): return False # Simple formats only contain letters, numbers, "_" and "-" # If this a non simple format? if RE_NON_SIMPLE_FORMAT.match(format_): return True # Is this a link to the site root? if parsed.path in ('', '/'): return True # Do not skip return False
def setup(self): host = None port = None ssl_host = None ssl_port = None resp = requests.get('http://localhost:4040/api/tunnels') tunnels = resp.json()['tunnels'] for tunnel in tunnels: if tunnel['proto'] in ('http', 'https'): parsed = urlparse(tunnel['public_url']) if tunnel['proto'] == 'http': host = parsed.hostname port = parsed.port if parsed.port is not None else 80 elif tunnel['proto'] == 'https': ssl_host = parsed.hostname ssl_port = parsed.port if parsed.port is not None else 443 if host is None and ssl_host is None: self.stderr.write('Error: no ngrok tunnels found') return pub = get_pubcontrol() if len(pub.clients) == 0: self.stderr.write('Error: no GRIP proxy configured') return pub.set_origin( host=host, port=port, ssl_host=ssl_host, ssl_port=ssl_port, rewrite_host=True) self.stdout.write( 'Setting ngrok tunnel %s as GRIP origin' % (host or ssl_host))
def parse_url(cls, url): parsed = urlparse(url) return cls(proxy_type=parsed.scheme, proxy_address=parsed.hostname, proxy_port=parsed.port, proxy_login=parsed.username, proxy_password=parsed.password)
def allowed_token(self, token, token_type): if "data" in token: attrs = dict([(name, val) for name, val in token["data"][::-1] if name in self.allowed_attributes]) for attr in self.attr_val_is_uri: if attr not in attrs: continue val_unescaped = re.sub("[`\000-\040\177-\240\s]+", '', unescape(attrs[attr])).lower() # remove replacement characters from unescaped characters val_unescaped = val_unescaped.replace("\ufffd", "") uri = urlparse.urlparse(val_unescaped) if uri: if uri.scheme not in self.allowed_protocols: del attrs[attr] if uri.scheme == 'data': m = content_type_rgx.match(uri.path) if not m: del attrs[attr] if m.group('content_type') not in self.allowed_content_types: del attrs[attr] for attr in self.svg_attr_val_allows_ref: if attr in attrs: attrs[attr] = re.sub(r'url\s*\(\s*[^#\s][^)]+?\)', ' ', unescape(attrs[attr])) if (token["name"] in self.svg_allow_local_href and 'xlink:href' in attrs and re.search('^\s*[^#\s].*', attrs['xlink:href'])): del attrs['xlink:href'] if 'style' in attrs: attrs['style'] = self.sanitize_css(attrs['style']) token["data"] = [[name, val] for name, val in list(attrs.items())] return token
def __call__(self, r): url = urlparse(r.url) path = url.path or '/' qs = url.query and '?%s' % url.query or '' safe_url = url.scheme + '://' + url.netloc.split(':')[0] + path + qs request = AWSRequest( method=r.method.upper(), url=safe_url, data=r.body) SigV4Auth( self.credentials, self.service, self.region).add_auth(request) r.headers.update(dict(request.headers.items())) return r
def parse_url(url): parsed_url = urlparse(url) catalog_slug = parsed_url.path.split('/')[2] return catalog_slug
def __init__(self, *args, **kwargs): self.catalog = None if args and hasattr(args[0], 'url'): url = args[0].url self.catalog = parse_url(url) if urlparse(url).path != '/csw' else None try: self.es, self.version = es_connect(url=REGISTRY_SEARCH_URL) self.es_status = 200 except requests.exceptions.ConnectionError: self.es_status = 404 database = PYCSW['repository']['database'] return super(RegistryRepository, self).__init__(database, context=config.StaticContext())
def check_netloc(layer): netloc = urlparse(layer.source).netloc if netloc in netlocs_dic.keys(): netlocs_dic[netloc]['counter'] += 1 else: netlocs_dic[netloc] = { 'counter': 1 } return netloc