我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用urllib.parse.urlunsplit()。
def generate_path(self, endpoint_desc, session, request_params): path = endpoint_desc.get('path', '') url = list(urlparse(self.base_path)) url[2] = '/'.join([url[2].rstrip('/'), path.lstrip('/')]) url.pop() path = urlunsplit(url) hooks = [getattr(plugin, 'prepare_path') for plugin in self._plugins if hasattr(plugin, 'prepare_path')] self.logger.debug("Calling {0} plugin hooks...".format('prepare_path')) for func in hooks: try: path = await func(endpoint_desc=endpoint_desc, session=session, request_params=request_params, path=path) except Exception as ex: # pragma: no cover self.logger.error("Exception executing {0}".format(repr(func))) self.logger.exception(ex) raise return path
def url_join(*parts, **kwargs): """ Normalize url parts and join them with a slash. adapted from: http://codereview.stackexchange.com/q/13027 """ def concat_paths(sequence): result = [] for path in sequence: result.append(path) if path.startswith('/'): break return '/'.join(reversed(result)) schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts))) scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http')) netloc = next((x for x in netlocs if x), '') path = concat_paths(paths) query = queries[0] fragment = fragments[0] return urlunsplit((scheme, netloc, path, query, fragment))
def append_query_params(self, url, **kwargs): uri = urlsplit(url) query = parse_qs(uri.query) for key in kwargs: if key in query: query[key].append(kwargs[key]) else: query[key] = kwargs[key] query_string = urlencode(query, doseq=True) uri_new = uri._replace(query=query_string) return urlunsplit(uri_new)
def request_url(secure, hostname, port, path, query=None): ''' Combines url components into a url passable into the request function. Args: secure (boolean): Whether or not to use HTTPS. hostname (str): The host name for the url. port (int): The port number, as an integer. path (str): The hierarchical path. query (dict): A dict of query parameters. Returns: (str) A complete url made up of the arguments. ''' encoded_query = urlencode(query) if query else '' scheme = 'https' if secure else 'http' netloc = '{0}:{1}'.format(hostname, port) return urlunsplit((scheme, netloc, path, encoded_query, ''))
def validate_(self, value, context=None): url = self.valid_url(value) if not url: raise StopValidationError(self.messages['invalid_url']) if self.verify_exists: url_string = urlquote(urlunsplit(( url['scheme'], (url['host6'] or url['host4'] or url['hostn_enc']) + ':' + (url['port'] or ''), url['path'], url['query'], url['frag']) ).encode('utf-8'), safe=VALID_CHAR_STRING) try: urlopen(url_string) except URLError: raise StopValidationError(self.messages['not_found'])
def launch(self): self.check() received = False def set_received(*_): nonlocal received received = True signal.signal(signal.SIGUSR1, set_received) pid = self.fork_zeo() if not received: while not signal.sigtimedwait([signal.SIGUSR1], 1): pid, waitres = os.waitpid(pid, os.WNOHANG) if pid: log.error('Database server failed to start (check log above).') sys.exit(1) settings.DB_URI = urlunsplit(('zeo', '', self.zeo_path, '', '')) log.debug('Launched built-in ZEO') return pid
def merge_url_qs(url: str, **kw) -> str: """Merge the query string elements of a URL with the ones in ``kw``. If any query string element exists in ``url`` that also exists in ``kw``, replace it. :param url: An URL. :param kw: Dictionary with keyword arguments. :return: An URL with keyword arguments merged into the query string. """ segments = urlsplit(url) extra_qs = [ (k, v) for (k, v) in parse_qsl(segments.query, keep_blank_values=1) if k not in kw ] qs = urlencode(sorted(kw.items())) if extra_qs: qs += '&' + urlencode(extra_qs) return urlunsplit((segments.scheme, segments.netloc, segments.path, qs, segments.fragment))
def serialize(url='', data={}): """ Returns a URL with a query string of the given data. """ p = urlparse.urlsplit(url) q = urlparse.parse_qsl(p.query) q.extend((b(k), b(v)) for k, v in sorted(data.items())) q = urlencode(q, doseq=True) p = p.scheme, p.netloc, p.path, q, p.fragment s = urlparse.urlunsplit(p) s = s.lstrip('?') return s # print(serialize('http://www.google.com', {'q': 'cats'})) # http://www.google.com?q=cats #---- REQUESTS & STREAMS -------------------------------------------------------------------------- # The download(url) function returns the HTML (JSON, image data, ...) at the given url. # If this fails it will raise NotFound (404), Forbidden (403) or TooManyRequests (420).
def safe_url(url, remove_empty_query=True): scheme, netloc, path, query, fragment = urlsplit(url) if not query: return url.rstrip('/') # Sort all the queries queries = [] for q in query.split('&'): if '=' not in q: return url key, value = q.split('=') if remove_empty_query and not value: continue queries.append((key, value)) queries.sort(key=lambda x: x[0]) query = urlencode(queries) return urlunsplit((scheme, netloc, path, query, fragment)).rstrip('/')
def __call__(self, value): try: super(URLValidator, self).__call__(value) except ValidationError as e: # Trivial case failed. Try for possible IDN domain if value: value = text_type(value) scheme, netloc, path, query, fragment = urlsplit(value) try: # IDN -> ACE netloc = netloc.encode('idna').decode('ascii') except UnicodeError: # invalid domain part raise ValidationError(self.message.format(value), code=self.code) url = urlunsplit((scheme, netloc, path, query, fragment)) return super(URLValidator, self).__call__(url) else: raise ValidationError(self.message.format(value), code=self.code) return value
def _convert_to_idn(url): """Convert a URL to IDN notation""" # this function should only be called with a unicode string # strategy: if the host cannot be encoded in ascii, then # it'll be necessary to encode it in idn form parts = list(urlparse.urlsplit(url)) try: parts[1].encode('ascii') except UnicodeEncodeError: # the url needs to be converted to idn notation host = parts[1].rsplit(':', 1) newhost = [] port = '' if len(host) == 2: port = host.pop() for h in host[0].split('.'): newhost.append(h.encode('idna').decode('utf-8')) parts[1] = '.'.join(newhost) if port: parts[1] += ':' + port return urlparse.urlunsplit(parts) else: return url
def geturl(self): """ """ return urlunsplit(self)
def embed_real_url_to_embedded_url(real_url_raw, url_mime, escape_slash=False): """ ?url???(?q=some&foo=bar)???url???, ??url?????????? ????url???????CDN?, ?????? `cdn_redirect_encode_query_str_into_url`????????, ????????????????? ??? extract_real_url_from_embedded_url() ????, ???????????? :rtype: str """ # dbgprint(real_url_raw, url_mime, escape_slash) if escape_slash: real_url = real_url_raw.replace(r'\/', '/') else: real_url = real_url_raw url_sp = urlsplit(real_url) if not url_sp.query: # no query, needn't rewrite return real_url_raw byte_query = url_sp.query.encode() if len(byte_query) > 128: # ????????, ??gzip?? gzip_label = 'z' # ????????, ??????????z byte_query = zlib.compress(byte_query) else: gzip_label = '' b64_query = base64.urlsafe_b64encode(byte_query).decode() # dbgprint(url_mime) mixed_path = url_sp.path + '_' + _url_salt + gzip_label + '_.' \ + b64_query \ + '._' + _url_salt + '_.' + mime_to_use_cdn[url_mime] result = urlunsplit((url_sp.scheme, url_sp.netloc, mixed_path, '', '')) if escape_slash: result = s_esc(result) # dbgprint('embed:', real_url_raw, 'to:', result) return result
def setup(app, host, version, host_prefix='', host_scheme='https', login_route='/login', logout_route='/logout', on_success='/', on_logout='/'): """Sets up CAS authentication for the app. :param app: aiohttp app. :param str host: CAS host to authenticate against :param str version: Version of CAS to use :param str host_prefix: Server prefix that CAS runs under :param str host_scheme: Scheme to access the CAS host under :param str login_route: Route for local login handler :param str logout_route: Route for local logout handler :param str on_success: Default route for redirect after a successful login :param str on_logout: Route for redirect after logout """ # Add a closing /, if necessary if not host_prefix.endswith('/'): host_prefix += '/' cas_root_url = parse.urlunsplit(( host_scheme, host, host_prefix, None, None )) app[APP_KEY] = { 'VERSION': version, 'ROOT_URL': cas_root_url, 'LOGIN_ROUTE': login_route, 'LOGOUT_ROUTE': logout_route, 'ON_SUCCESS': on_success, 'ON_LOGOUT': on_logout, } app.router.add_route('GET', login_route, login_handler) app.router.add_route('GET', logout_route, logout_handler) return app
def make_abs_url(url): pr = parse.urlsplit(url) return parse.urlunsplit(parse.SplitResult(pr.scheme, pr.netloc, path.abspath(pr.path), '',''))
def add_query_param(url, param_name, param_value): scheme, netloc, path, query_string, fragment = urlsplit(url) query_params = parse_qsl(query_string) query_params.append((param_name, quote_plus(param_value))) new_query_string = urlencode(query_params, doseq=True) return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def build_return_url(self): ''' If the Tool Consumer sent a return URL, add any set messages to the URL. ''' if not self.launch_presentation_return_url: return None lti_message_fields = ['lti_errormsg', 'lti_errorlog', 'lti_msg', 'lti_log'] messages = dict([(key, getattr(self, key)) for key in lti_message_fields if getattr(self, key, None)]) # Disassemble original return URL and reassemble with our options added original = urlsplit(self.launch_presentation_return_url) combined = messages.copy() combined.update(dict(parse_qsl(original.query))) combined_query = urlencode(combined) return urlunsplit(( original.scheme, original.netloc, original.path, combined_query, original.fragment ))
def EncodeUrl(self,url): url = parse.urlsplit(url) url = list(url) url[2] = parse.quote(url[2]) encoded_link = parse.urlunsplit(url) return encoded_link
def _build_api_url(url, query): scheme, netloc, path, base_query, fragment = parse.urlsplit(url) if base_query: query = '%s&%s' % (base_query, query) return parse.urlunsplit((scheme, netloc, path, query, fragment))
def decode_url_idna(value): try: scheme, netloc, path, query, fragment = urlparse.urlsplit(value) netloc = netloc.encode('idna').decode('ascii') # IDN -> ACE except UnicodeError: # invalid domain part pass else: return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) return value
def process_request(self, request): if not request.is_secure(): if "runserver" not in sys.argv and "test" not in sys.argv: return HttpResponseRedirect(urlunsplit( ["https"] + list(urlsplit(request.get_raw_uri())[1:])))
def urldefrag(url): if "#" in url: s, n, p, q, frag = urlsplit(url) defrag = urlunsplit((s, n, p, q, '')) else: defrag = url frag = '' return defrag, frag
def get_path(url): p = urlsplit(url) return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
def _set_db_uri(): if not settings.BUILTIN_ZEO: return from .daemon.utils import get_socket_addr settings.DB_URI = urlunsplit(('zeo', '', get_socket_addr('zeo'), 'wait_timeout=5', '')) log.debug('Real DB_URI (at daemon) is %r', settings.DB_URI)
def send(self, url, method='GET', data=None, headers={}): # for testing, URLs just need to have the path and query string url_parsed = urlsplit(url) url = urlunsplit(('', '', url_parsed.path, url_parsed.query, url_parsed.fragment)) # append the autnentication headers to all requests headers = headers.copy() headers['Authorization'] = self.auth headers['Content-Type'] = 'application/json' headers['Accept'] = 'application/json' # convert JSON data to a string if data: data = json.dumps(data) # send request to the test client and return the response with self.app.test_request_context(url, method=method, data=data, headers=headers): rv = self.app.preprocess_request() if rv is None: rv = self.app.dispatch_request() rv = self.app.make_response(rv) rv = self.app.process_response(rv) return response(rv.status_code, rv.headers, json.loads(rv.data.decode('utf-8')))
def absolute_url(self, path): return urlunsplit( ('https', self.headers['Host'], '/'.join([self.stage, path]), '', ''), )
def geturl_nofragment(self): """ return url without fragment """ scheme, netloc, url, params, query, ofragment = self._parsed return URL(urlunsplit((scheme, netloc, url, query, "")))
def replace(self, **kwargs): _parsed = self._parsed url = [] for field in ('scheme', 'netloc', 'path', 'query', 'fragment'): url.append(kwargs.get(field, getattr(_parsed, field))) return URL(urlunsplit(url))
def safe_url(url, remove_empty_query=True): if not url: raise InvalidURLError() try: scheme, netloc, path, params, query, fragment = urlparse(url) if not netloc and path: path, netloc = netloc, path queries = [] for q in query.split('&'): if '=' not in q: break key, value = q.split('=') if remove_empty_query and not value: continue queries.append((key, value)) queries.sort(key=lambda x: x[0]) query = urlencode(queries) url = urlunsplit((scheme or 'http', netloc, path, query, fragment)).rstrip('/') return url except URLError: return url.rstrip('/')
def __init__(self, login_url: str, username: str = None, password: str = None, auth_type: str = 'digest', user_agent: str = 'rets-python/0.3', user_agent_password: str = None, rets_version: str = '1.7.2', capability_urls: str = None, cookie_dict: dict = None ): self._user_agent = user_agent self._user_agent_password = user_agent_password self._rets_version = rets_version splits = urlsplit(login_url) self._base_url = urlunsplit((splits.scheme, splits.netloc, '', '', '')) self._capabilities = capability_urls or { 'Login': splits.path, } # Authenticate using either the user agent auth header or (basic or digest) HTTP auth. if not user_agent_password: self._http_auth = _get_http_auth(username, password, auth_type) else: self._http_auth = None # we use a session to keep track of cookies that are required for certain MLSes self._session = requests.Session() # The user may provide an optional cookie_dict argument, which will be used on first login. # When sending cookies (with a session_id) to the login url, the same cookie (session_id) # is returned, which (most likely) means no additional login is created. if cookie_dict: for name, value in cookie_dict.items(): self._session.cookies.set(name, value=value) # this session id is part of the rets standard for use with a user agent password self._rets_session_id = ''
def uri(self): """The uniform resource identifier.""" scheme, netloc, path, query, fragment = urlsplit(self._iri) if netloc: netloc = netloc.encode('idna').decode() path, query, fragment = map(quote, [path, query, fragment]) return urlunsplit((scheme, netloc, path, query, fragment))
def uri(self, value): scheme, netloc, path, query, fragment = urlsplit(value) if netloc: netloc = netloc.encode().decode('idna') path, query, fragment = map(unquote, [path, query, fragment]) self._iri = urlunsplit((scheme, netloc, path, query, fragment))
def __str__(self) -> str: result = self.splitted._replace(query=urlencode(self.query, doseq=True)) return urlunsplit(result)
def replace_query_param(url, key, val): """ Hacked this function to drop scheme and netloc. """ (scheme, netloc, path, query, fragment) = urlparse.urlsplit(url) query_dict = urlparse.parse_qs(query, keep_blank_values=True) query_dict[key] = [val] query = urlparse.urlencode(sorted(list(query_dict.items())), doseq=True) return urlparse.urlunsplit(('', '', path, query, fragment))
def remove_query_param(url, key): """ Hacked this function to drop scheme and netloc. """ (scheme, netloc, path, query, fragment) = urlparse.urlsplit(url) query_dict = urlparse.parse_qs(query, keep_blank_values=True) query_dict.pop(key, None) query = urlparse.urlencode(sorted(list(query_dict.items())), doseq=True) return urlparse.urlunsplit(('', '', path, query, fragment))
def url_fix(s, charset='utf-8'): scheme, netloc, path, qs, anchor = urlsplit(s) path = urllib.quote(path.encode("utf-8")) qs = urllib.quote_plus(qs, ':&=') return urlunsplit((scheme, netloc, path, qs, anchor)).decode(charset)
def url_fix(s): scheme, netloc, path, qs, anchor = urlsplit(s) path = quote(path, '/%') qs = quote_plus(qs, ':&=') return unicode(urlunsplit((scheme, netloc, path, qs, anchor)))
def add_params_to_urls(url, params): parts = urlsplit(url) query = parse_qsl(parts.query) query.extend(params.items()) return urlunsplit([parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment])
def url(self): return Url(parse.urlunsplit((self.scheme, self.netloc, self.path, self.query_string, "")))
def url_no_query(self): """ ????query?url Examples: "http://foo.com:88/cat.html" "https://foo.com/dog.php" """ return parse.urlunsplit((self.scheme, self.netloc, self.path, "", ""))
def url_no_path(self): """ ????path?url Examples: "http://foo.com:88" "https://bar.com" """ return parse.urlunsplit((self.scheme, self.netloc, "", "", ""))
def without_query(self): """ ????query?url Examples: "http://foo.com:88/cat.html" "https://foo.com/dog.php" """ return parse.urlunsplit((self.scheme, self.netloc, self.path, "", ""))