我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用urlparse.urlunsplit()。
def remove_trailing_version_from_href(href): """Removes the api version from the href. Given: 'http://www.nova.com/compute/v1.1' Returns: 'http://www.nova.com/compute' Given: 'http://www.nova.com/v1.1' Returns: 'http://www.nova.com' """ parsed_url = urlparse.urlsplit(href) url_parts = parsed_url.path.rsplit('/', 1) # NOTE: this should match vX.X or vX expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)') if not expression.match(url_parts.pop()): raise ValueError('URL %s does not contain version' % href) new_path = url_join(*url_parts) parsed_url = list(parsed_url) parsed_url[2] = new_path return urlparse.urlunsplit(parsed_url)
def put(self, uri, data, content_type=''): dbname, dburi = self._get_dburi(uri) if not dbname or not dburi: raise DAV_Forbidden pool = Pool(Transaction().cursor.database_name) Collection = pool.get('webdav.collection') try: res = Collection.put(dburi, data, content_type, cache=CACHE) Transaction().cursor.commit() except (DAV_Error, DAV_NotFound, DAV_Secret, DAV_Forbidden), exception: self._log_exception(exception) Transaction().cursor.rollback() raise except Exception, exception: self._log_exception(exception) Transaction().cursor.rollback() raise DAV_Error(500) if res: uparts = list(urlparse.urlsplit(uri)) uparts[2] = res res = urlparse.urlunsplit(uparts) return res
def _convert_to_idn(url): """Convert a URL to IDN notation""" # this function should only be called with a unicode string # strategy: if the host cannot be encoded in ascii, then # it'll be necessary to encode it in idn form parts = list(urlparse.urlsplit(url)) try: parts[1].encode('ascii') except UnicodeEncodeError: # the url needs to be converted to idn notation host = parts[1].rsplit(':', 1) newhost = [] port = u'' if len(host) == 2: port = host.pop() for h in host[0].split('.'): newhost.append(h.encode('idna').decode('utf-8')) parts[1] = '.'.join(newhost) if port: parts[1] += ':' + port return urlparse.urlunsplit(parts) else: return url
def join_url(base_url, path): """Joins base url and path removing extra slashes. Removes trailing slashes. Joins queries. E.g.: See unit tests. :param base_url: Base url. :param path: Path. :return: Joined url. """ # Example of usages see in unittests base_url = urlparse.urlsplit(base_url, allow_fragments=False) path = urlparse.urlsplit(path, allow_fragments=False) full_path = _join_paths(base_url.path, path.path) full_query = _join_queries(base_url.query, path.query) return urlparse.urlunsplit( (base_url.scheme, base_url.netloc, full_path, full_query, base_url.fragment))
def url_join(*parts, **kwargs): """ Normalize url parts and join them with a slash. adapted from: http://codereview.stackexchange.com/q/13027 """ def concat_paths(sequence): result = [] for path in sequence: result.append(path) if path.startswith('/'): break return '/'.join(reversed(result)) schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts))) scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http')) netloc = next((x for x in netlocs if x), '') path = concat_paths(paths) query = queries[0] fragment = fragments[0] return urlunsplit((scheme, netloc, path, query, fragment))
def _get_env_info(self, script_url): script_folder = ModuleExec('system_info', [ '-info', 'script_folder' ]).load_result_or_run('script_folder') if not script_folder: return script_url_splitted = urlparse.urlsplit(script_url) script_url_path_folder, script_url_path_filename = os.path.split( script_url_splitted.path) url_folder_pieces = script_url_path_folder.split(os.sep) folder_pieces = script_folder.split(os.sep) for pieceurl, piecefolder in zip(reversed(url_folder_pieces), reversed(folder_pieces)): if pieceurl == piecefolder: folder_pieces.pop() url_folder_pieces.pop() else: break base_url_path_folder = os.sep.join(url_folder_pieces) self.base_folder_url = urlparse.urlunsplit( script_url_splitted[:2] + (base_url_path_folder, ) + script_url_splitted[3:]) self.base_folder_path = os.sep.join(folder_pieces)
def audit(arg): Ii1iI = arg Oo = urlparse.urlparse(Ii1iI) I1Ii11I1Ii1i = urlparse.urlunsplit((Oo.scheme, Oo.netloc, Oo.path, decode(''), decode(''))) Oo0Ooo = urlparse.parse_qsl(Oo.query) if 0: iiI1iIiI.ooo0Oo0 * i1 - Oooo0000 * i1IIi11111i / o000o0o00o0Oo oo = [decode('\xb0\xee\xa7\xb8\xcd[\xa7y\xe8\x81\xf1'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc6'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc2')] for O0O0OO0O0O0, iiiii in Oo0Ooo: if O0O0OO0O0O0 in oo: continue debug(decode('\xa0\xfb\xad\xb5\xce%\xddE\x8c\xe7\xcb'), O0O0OO0O0O0, I1Ii11I1Ii1i) IiII1I1i1i1ii = iI1(I1Ii11I1Ii1i, Oo0Ooo, O0O0OO0O0O0, iiiii) if IiII1I1i1i1ii: security_info(IiII1I1i1i1ii[1]) return if 0: OOo0o0 / OOoOoo00oo - iI1OoOooOOOO + i1iiIII111ii + i1iIIi1
def audit(arg): ooO0oooOoO0 = arg II11i = urlparse.urlparse(ooO0oooOoO0) i1oOOoo00O0O = urlparse.urlunsplit((II11i.scheme, II11i.netloc, II11i.path, decode(''), decode(''))) Oo0Ooo = urlparse.parse_qsl(II11i.query) i1111 = [decode('\x19\xd8C\xe3UG<\xeb\xc4\x8ft'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1k'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1{')] i11 = [decode('\x1f\xfec'), decode('*\xdcR\xf4')] for I11 in i11: for O0O0OO0O0O0, iiiii in Oo0Ooo: if O0O0OO0O0O0 in i1111: continue debug(decode('\x18\xe9R\xc5S:G\xb7\xe8\xe5-\x94\xc9\xdd\xa9\x14'), I11, O0O0OO0O0O0, i1oOOoo00O0O) Oo0o0000o0o0 = iI1(I11, i1oOOoo00O0O, Oo0Ooo, O0O0OO0O0O0, iiiii) if Oo0o0000o0o0: security_info(decode('a\xb6Z\x9e\x0c+4') % (I11, Oo0o0000o0o0[1])) return if 0: iiiii11iII1 % O0o
def generate_docservice_url(request, doc_id, temporary=True, prefix=None): docservice_key = getattr(request.registry, 'docservice_key', None) parsed_url = urlparse(request.registry.docservice_url) query = {} if temporary: expires = int(ttime()) + 300 # EXPIRES mess = "{}\0{}".format(doc_id, expires) query['Expires'] = expires else: mess = doc_id if prefix: mess = '{}/{}'.format(prefix, mess) query['Prefix'] = prefix query['Signature'] = quote(b64encode(docservice_key.signature(mess.encode("utf-8")))) query['KeyID'] = docservice_key.hex_vk()[:8] return urlunsplit((parsed_url.scheme, parsed_url.netloc, '/get/{}'.format(doc_id), urlencode(query), ''))
def request_url(secure, hostname, port, path, query=None): ''' Combines url components into a url passable into the request function. Args: secure (boolean): Whether or not to use HTTPS. hostname (str): The host name for the url. port (int): The port number, as an integer. path (str): The hierarchical path. query (dict): A dict of query parameters. Returns: (str) A complete url made up of the arguments. ''' encoded_query = urlencode(query) if query else '' scheme = 'https' if secure else 'http' netloc = '{0}:{1}'.format(hostname, port) return urlunsplit((scheme, netloc, path, encoded_query, ''))
def validate_(self, value, context=None): url = self.valid_url(value) if not url: raise StopValidationError(self.messages['invalid_url']) if self.verify_exists: url_string = urlquote(urlunsplit(( url['scheme'], (url['host6'] or url['host4'] or url['hostn_enc']) + ':' + (url['port'] or ''), url['path'], url['query'], url['frag']) ).encode('utf-8'), safe=VALID_CHAR_STRING) try: urlopen(url_string) except URLError: raise StopValidationError(self.messages['not_found'])
def parse_url(self, url): """ Remove the list parameter from the URL as we currently don't support conversion of an entire playlist. """ qs = parse_qs(urlparse(url).query) if qs.get('list', None): del(qs['list']) parts = urlsplit(url) return urlunsplit([ parts.scheme, parts.netloc, parts.path, urllib.urlencode(qs, True), parts.fragment ]) return url
def normalize_url(url): """ :param url: :return: """ # only hostname if not '/' in url: return 'http://{}'.format(url) p = urlparse.urlsplit(url) # www.test.com/index.php # exclude /xxxxx/index.php if not p.netloc: if url.startswith('/'): # /xxxxx/index.php return '' else: # www.test.com/index.php return 'http://{}'.format(url) # //www.test.com/index.php if not p.scheme: url = urlparse.urlunsplit(('http', p.netloc, p.path or '/', p.query, p.fragment)) return url
def _generate_url(self, path, query=None, frag=None): '''_generate_url :param path: :param query: :param frag: :return:url ''' if CONF.vrm_ssl: scheme = 'https' else: scheme = 'http' fc_ip = FC_DRIVER_CONF.fc_ip netloc = str(fc_ip) + ':' + str(CONF.vrm_port) if path.startswith(self.BASIC_URI): url = urlparse.urlunsplit((scheme, netloc, path, query, frag)) else: url = urlparse.urlunsplit( (scheme, netloc, self.BASIC_URI + str(path), query, frag)) return url
def _generate_url(self, path, query=None, frag=None): '''_generate_url _generate_url :param path: :param query: :param frag: :return: ''' if CONF.vrm_ssl: scheme = 'https' else: scheme = 'http' fc_ip = FC_DRIVER_CONF.fc_ip netloc = str(fc_ip) + ':' + str(CONF.vrm_port) if path.startswith(self.BASIC_URI): url = urlparse.urlunsplit((scheme, netloc, path, query, frag)) else: url = urlparse.urlunsplit( (scheme, netloc, self.BASIC_URI + str(path), query, frag)) return url
def serialize(url='', data={}): """ Returns a URL with a query string of the given data. """ p = urlparse.urlsplit(url) q = urlparse.parse_qsl(p.query) q.extend((b(k), b(v)) for k, v in sorted(data.items())) q = urlencode(q, doseq=True) p = p.scheme, p.netloc, p.path, q, p.fragment s = urlparse.urlunsplit(p) s = s.lstrip('?') return s # print(serialize('http://www.google.com', {'q': 'cats'})) # http://www.google.com?q=cats #---- REQUESTS & STREAMS -------------------------------------------------------------------------- # The download(url) function returns the HTML (JSON, image data, ...) at the given url. # If this fails it will raise NotFound (404), Forbidden (403) or TooManyRequests (420).
def redirect(self, url, **kwargs): if url.startswith('/'): spliturl = urlparse.urlsplit(self.request.url) # Redirect to the www.dancedeets.com domain if they requested the raw hostname domain = self._get_full_hostname() if spliturl.netloc == 'dancedeets.com' else spliturl.netloc # Redirect to https on prod, as relying on url.scheme would send it back to http, due to the nginx http-based proxy scheme = 'https' if self.request.app.prod_mode else 'http' new_url = urlparse.urlunsplit([ scheme, domain, spliturl.path, spliturl.query, spliturl.fragment, ]) url = str(urlparse.urljoin(new_url, url)) return super(BaseRequestHandler, self).redirect(url, **kwargs)
def validate_url(in_url): """ Take some value provided by the user and attempt to produce a meaningful url from it. """ parts = list(urlsplit(in_url)) scheme = parts[0] netloc = parts[1] path = parts[2] if not netloc: tld_regex = r'^\S+\.\S+$' if re.match(tld_regex, in_url): return validate_url("https://%s" % in_url) elif in_url == "about:blank": return in_url else: return validate_url("https://en.wikipedia.org/wiki/%s" % in_url) return urlunsplit(parts)
def __call__(self, value): try: super(URLValidator, self).__call__(value) except ValidationError as e: # Trivial case failed. Try for possible IDN domain if value: value = text_type(value) scheme, netloc, path, query, fragment = urlsplit(value) try: # IDN -> ACE netloc = netloc.encode('idna').decode('ascii') except UnicodeError: # invalid domain part raise ValidationError(self.message.format(value), code=self.code) url = urlunsplit((scheme, netloc, path, query, fragment)) return super(URLValidator, self).__call__(url) else: raise ValidationError(self.message.format(value), code=self.code) return value
def _convert_to_idn(url): """Convert a URL to IDN notation""" # this function should only be called with a unicode string # strategy: if the host cannot be encoded in ascii, then # it'll be necessary to encode it in idn form parts = list(urlparse.urlsplit(url)) try: parts[1].encode('ascii') except UnicodeEncodeError: # the url needs to be converted to idn notation host = parts[1].rsplit(':', 1) newhost = [] port = '' if len(host) == 2: port = host.pop() for h in host[0].split('.'): newhost.append(h.encode('idna').decode('utf-8')) parts[1] = '.'.join(newhost) if port: parts[1] += ':' + port return urlparse.urlunsplit(parts) else: return url
def url(self): """ Full URL as requested by the client (computed). This value is constructed out of different environment variables and includes scheme, host, port, scriptname, path and query string. """ scheme = self.environ.get('wsgi.url_scheme', 'http') host = self.environ.get('HTTP_X_FORWARDED_HOST') host = host or self.environ.get('HTTP_HOST', None) if not host: host = self.environ.get('SERVER_NAME') port = self.environ.get('SERVER_PORT', '80') if (scheme, port) not in (('https','443'), ('http','80')): host += ':' + port parts = (scheme, host, urlquote(self.fullpath), self.query_string, '') return urlunsplit(parts)
def _get_db(dburl, create=True, reset_db=False): """ Get a db handle. Optionally reset / create. """ sres = urlparse.urlsplit(dburl) dbname = sres.path.lstrip('/') srv = couchdb.Server(urlparse.urlunsplit(sres._replace(path='', query=''))) srv.version() # throws if can't connect to server if reset_db: if dbname in srv: del srv[dbname] return srv.create(dbname) if dbname in srv: return srv[dbname] else: if create: return srv.create(dbname) else: return None
def iri2uri(uri): """Convert an IRI to a URI. Note that IRIs must be passed in a unicode strings. That is, do not utf-8 encode the IRI before passing it into the function.""" if isinstance(uri ,unicode): (scheme, authority, path, query, fragment) = urlparse.urlsplit(uri) authority = authority.encode('idna') # For each character in 'ucschar' or 'iprivate' # 1. encode as utf-8 # 2. then %-encode each octet of that utf-8 uri = urlparse.urlunsplit((scheme, authority, path, query, fragment)) uri = "".join([encode(c) for c in uri]) return uri
def _get_caldav_calendar_home_set(self, uri): dbname, dburi = self._get_dburi(uri) if not dbname: raise DAV_NotFound pool = Pool(Transaction().cursor.database_name) try: Collection = pool.get('webdav.collection') except KeyError: raise DAV_NotFound if not getattr(Collection, 'get_calendar_home_set', None): raise DAV_NotFound try: res = Collection.get_calendar_home_set(dburi, cache=CACHE) except DAV_Error, exception: self._log_exception(exception) raise except Exception, exception: self._log_exception(exception) raise DAV_Error(500) uparts = list(urlparse.urlsplit(uri)) uparts[2] = urllib.quote(dbname + res) doc = domimpl.createDocument(None, 'href', None) href = doc.documentElement href.tagName = 'D:href' # iPhone doesn't handle "http" in href # huri = doc.createTextNode(urlparse.urlunsplit(uparts)) huri = doc.createTextNode(urllib.quote('/' + dbname + res)) href.appendChild(huri) return href
def _get_caldav_schedule_inbox_URL(self, uri): dbname, dburi = self._get_dburi(uri) if not dbname: raise DAV_NotFound pool = Pool(Transaction().cursor.database_name) try: Collection = pool.get('webdav.collection') except KeyError: raise DAV_NotFound if not getattr(Collection, 'get_schedule_inbox_URL', None): raise DAV_NotFound try: res = Collection.get_schedule_inbox_URL(dburi, cache=CACHE) except DAV_Error, exception: self._log_exception(exception) raise except Exception, exception: self._log_exception(exception) raise DAV_Error(500) uparts = list(urlparse.urlsplit(uri)) uparts[2] = urllib.quote(dbname + res) doc = domimpl.createDocument(None, 'href', None) href = doc.documentElement href.tagName = 'D:href' huri = doc.createTextNode(urlparse.urlunsplit(uparts)) href.appendChild(huri) return href
def _get_caldav_schedule_outbox_URL(self, uri): dbname, dburi = self._get_dburi(uri) if not dbname: raise DAV_NotFound pool = Pool(Transaction().cursor.database_name) try: Collection = pool.get('webdav.collection') except KeyError: raise DAV_NotFound if not getattr(Collection, 'get_schedule_outbox_URL', None): raise DAV_NotFound try: res = Collection.get_schedule_outbox_URL(dburi, cache=CACHE) except DAV_Error, exception: self._log_exception(exception) raise except Exception, exception: self._log_exception(exception) raise DAV_Error(500) uparts = list(urlparse.urlsplit(uri)) uparts[2] = urllib.quote(dbname + res) doc = domimpl.createDocument(None, 'href', None) href = doc.documentElement href.tagName = 'D:href' huri = doc.createTextNode(urlparse.urlunsplit(uparts)) href.appendChild(huri) return href
def _get_dav_principal_collection_set(self, uri): dbname, dburi = self._get_dburi(uri) if dburi.startswith('Calendars'): uparts = list(urlparse.urlsplit(uri)) uparts[2] = urllib.quote(dbname + '/Calendars/') doc = domimpl.createDocument(None, 'href', None) href = doc.documentElement href.tagName = 'D:href' huri = doc.createTextNode(urlparse.urlunsplit(uparts)) href.appendChild(huri) return href if _prev_get_dav_principal_collection_set: return _prev_get_dav_principal_collection_set(self, uri) raise DAV_NotFound