我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.urlunparse()。
def force_https(request): host = request.headers.get('Host', '') # get scheme, first by checking the x-forwarded-proto (from nginx/heroku etc) # then falling back on whether or not there is an sslcontext scheme = request.headers.get( 'x-forwarded-proto', "https" if request.transport.get_extra_info('sslcontext') else "http") if not host.startswith("localhost:") and scheme != "https": url = urlunparse(( "https", host, request.path, None, request.query_string, None)) return redirect(url)
def __init__(self, base_url, *args, **kwargs): requests.Session.__init__(self, *args, **kwargs) self.base_url = base_url # Check for basic authentication parsed_url = urlparse(self.base_url) if parsed_url.username and parsed_url.password: netloc = parsed_url.hostname if parsed_url.port: netloc += ":%d" % parsed_url.port # remove username and password from the base_url self.base_url = urlunparse((parsed_url.scheme, netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment)) # configure requests to use basic auth self.auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
def make_next_param(login_url, current_url): ''' Reduces the scheme and host from a given URL so it can be passed to the given `login` URL more efficiently. :param login_url: The login URL being redirected to. :type login_url: str :param current_url: The URL to reduce. :type current_url: str ''' l = urlparse(login_url) c = urlparse(current_url) if (not l.scheme or l.scheme == c.scheme) and \ (not l.netloc or l.netloc == c.netloc): return urlunparse(('', '', c.path, c.params, c.query, '')) return current_url
def get_sendgrid_request_message(cfg, keyid, hex, user_email): url_prefix = urljoin( cfg.config.megserver_hostname_url, os.path.join(cfg.config.meg_url_prefix, "revoke") ) params = urlencode([("keyid", keyid), ("token", hex)]) parsed = list(urlparse(url_prefix)) parsed[4] = params revocation_link = urlunparse(parsed) message = Mail() message.add_to(user_email) message.set_from(cfg.config.sendgrid.from_email) message.set_subject(cfg.config.sendgrid.subject) message.set_html(EMAIL_HTML.format(keyid=keyid, link=revocation_link)) return message
def handle_redirect_to_login(request, **kwargs): login_url = kwargs.get("login_url") redirect_field_name = kwargs.get("redirect_field_name") next_url = kwargs.get("next_url") if login_url is None: login_url = settings.ACCOUNT_LOGIN_URL if next_url is None: next_url = request.get_full_path() try: login_url = urlresolvers.reverse(login_url) except urlresolvers.NoReverseMatch: if callable(login_url): raise if "/" not in login_url and "." not in login_url: raise url_bits = list(urlparse(login_url)) if redirect_field_name: querystring = QueryDict(url_bits[4], mutable=True) querystring[redirect_field_name] = next_url url_bits[4] = querystring.urlencode(safe="/") return HttpResponseRedirect(urlunparse(url_bits))
def get_websearch(self,query): """ HTTP GET of a websearch, then add any embedded links. :param query: :return: """ self.select_random_search_engine() url = uprs.urlunparse(uprs.urlparse(self.SafeSearch.search_url)._replace(query='{}={}{}&{}'.format( self.SafeSearch.query_parameter,uprs.quote_plus(query), self.SafeSearch.additional_parameters,self.SafeSearch.safe_parameter))) if self.verbose: self.print_url(url) @self.phantomjs_timeout def phantomjs_get(): self.driver.get(url) # selenium driver phantomjs_get() @self.phantomjs_short_timeout def phantomjs_page_source(): self.data_usage += len(self.driver.page_source) phantomjs_page_source() new_links = self.websearch_links() if self.link_count() < self.max_links_cached: self.add_url_links(new_links,url)
def get_referer_url(request): """ ?????????? ????????????? ???? REFERER, ???? ?? ? ???????? ?????. ?????, ?????????? MULTILANGUAGE_FALLBACK_URL """ referer = request.META.get('HTTP_REFERER') if not referer: return resolve_url(options.MULTILANGUAGE_FALLBACK_URL) site = get_current_site(request) url_parts = list(urlparse(referer)) if url_parts[1] != site.domain: return resolve_url(options.MULTILANGUAGE_FALLBACK_URL) url_parts[0] = '' url_parts[1] = '' return urlunparse(url_parts)
def noredirect_url(url, forced_path=None): """ ?????????? ? ???? ?????????, ???????????? ???????????? """ url_parts = list(urlparse(url)) query = dict(parse_qsl(url_parts[4])) if forced_path: forced_path_parts = list(urlparse(forced_path)) url_parts[2] = forced_path_parts[2] forced_path_query = dict(parse_qsl(forced_path_parts[4])) query.update(forced_path_query) query.update({ options.MULTILANGUAGE_GET_PARAM: 1 }) url_parts[4] = urlencode(query) return urlunparse(url_parts)
def http_error_302(self, req, fp, code, msg, headers): """store "Location" HTTP response header :return: http """ self.location = headers.get('Location', '') uprint("headers['Location']=" + self.location) def squote(s): return urllib.parse.quote(s, ';/?:&=+,$[]%^') try: self.location.encode('ascii') except UnicodeEncodeError: scheme, netloc, path, params, query, fragment = \ urllib.parse.urlparse(self.location) self.location = urllib.parse.urlunparse(( scheme, netloc, urllib.parse.quote(path), squote(params), squote(query), fragment)) headers.replace_header('Location', self.location) uprint("pquoted headers['Location']=" + self.location) return urllib.request.HTTPRedirectHandler.http_error_302( self, req, fp, code, msg, headers)
def get_data(self, save_directory): """ Retrieves data from remote location saves data in: save_directory TODO: figure out how to handle local file paths consider directory downloads from html pages with hyperlinks ** Impliment custom URL schemes -- Now needs to be done in lasubway.py How does raw data fit into this function? """ url = urlunparse(self) file_name = os.path.basename(os.path.normpath(self.path)) save_path = os.path.join(save_directory, file_name) with closing(urlopen(url)) as request: with open(save_path, 'wb') as sfile: shutil.copyfileobj(request, sfile)
def serve_websocket(request, port): """Start UWSGI websocket loop and proxy.""" env = request.environ # Send HTTP response 101 Switch Protocol downstream uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', '')) # Map the websocket URL to the upstream localhost:4000x Notebook instance parts = urlparse(request.url) parts = parts._replace(scheme="ws", netloc="localhost:{}".format(port)) url = urlunparse(parts) # Proxy initial connection headers headers = [(header, value) for header, value in request.headers.items() if header.lower() in CAPTURE_CONNECT_HEADERS] logger.info("Connecting to upstream websockets: %s, headers: %s", url, headers) ws = ProxyClient(url, headers=headers) ws.connect() # TODO: Will complain loudly about already send headers - how to abort? return httpexceptions.HTTPOk()
def get_draft_url(url): """ Return the given URL with a draft mode HMAC in its querystring. """ if verify_draft_url(url): # Nothing to do. Already a valid draft URL. return url # Parse querystring and add draft mode HMAC. url = urlparse.urlparse(url) salt = get_random_string(5) # QueryDict requires a bytestring as its first argument query = QueryDict(force_bytes(url.query), mutable=True) query['edit'] = '%s:%s' % (salt, get_draft_hmac(salt, url.path)) # Reconstruct URL. parts = list(url) parts[4] = query.urlencode(safe=':') return urlparse.urlunparse(parts)
def open_stream_in_browser(self, event): stream_url = urlparse(self.stream.url) netloc = stream_url.netloc path = stream_url.path if "twitch" in netloc: path = stream_url.path + "/chat" if "youtube" in netloc: path = path.replace("watch", "live_chat") correct_url = (stream_url.scheme, netloc, path, stream_url.params, stream_url.query, stream_url.fragment) url = urlunparse(correct_url) os = platform.system() if os == OS.WINDOWS: os2.startfile(url) else: webbrowser.open(url)
def _BuildUrl(self, url, path_elements=None, extra_params=None): # Break url into constituent parts (scheme, netloc, path, params, query, fragment) = urlparse(url) # Add any additional path elements to the path if path_elements: # Filter out the path elements that have a value of None p = [i for i in path_elements if i] if not path.endswith('/'): path += '/' path += '/'.join(p) # Add any additional query parameters to the query string if extra_params and len(extra_params) > 0: extra_query = self._EncodeParameters(extra_params) # Add it to the existing query if query: query += '&' + extra_query else: query = extra_query # Return the rebuilt URL return urlunparse((scheme, netloc, path, params, query, fragment))
def url_item_arg_increment(partten, url, count): """ ????url arguments??item?partten??????url????????????url @param partten: `keyword`=`begin_num`(eg: start=0)????partten????? @param url: http://www.ecco.com/abc?start=30 @param count: 30???item??? @return: http://www.ecco.com/abc?start=60 """ keyword, begin_num = partten.split("=") mth = re.search(r"%s=(\d+)"%keyword, url) if mth: start = int(mth.group(1)) else: start = int(begin_num) parts = urlparse(url) if parts.query: query = dict(x.split("=") for x in parts.query.split("&")) query[keyword] = start + count else: query = {keyword: count+start} return urlunparse(parts._replace(query=urlencode(query)))
def url_replace_param(url, name, value): """ Replace a GET parameter in an URL """ url_components = urlparse(url) query_params = parse_qs(url_components.query) query_params[name] = value query = urlencode(query_params, doseq=True) return urlunparse([ url_components.scheme, url_components.netloc, url_components.path, url_components.params, query, url_components.fragment, ])
def download(self, source, dest): """ Download an archive file. :param str source: URL pointing to an archive file. :param str dest: Local path location to download archive file to. """ # propogate all exceptions # URLError, OSError, etc proto, netloc, path, params, query, fragment = urlparse(source) if proto in ('http', 'https'): auth, barehost = splituser(netloc) if auth is not None: source = urlunparse((proto, barehost, path, params, query, fragment)) username, password = splitpasswd(auth) passman = HTTPPasswordMgrWithDefaultRealm() # Realm is set to None in add_password to force the username and password # to be used whatever the realm passman.add_password(None, source, username, password) authhandler = HTTPBasicAuthHandler(passman) opener = build_opener(authhandler) install_opener(opener) response = urlopen(source) try: with open(dest, 'wb') as dest_file: dest_file.write(response.read()) except Exception as e: if os.path.isfile(dest): os.unlink(dest) raise e # Mandatory file validation via Sha1 or MD5 hashing.
def base_url(self, url): """Return url without querystring or fragment""" parts = list(self.parse_url(url)) parts[4:] = ['' for i in parts[4:]] return urlunparse(parts)
def removeURLHash(url: str) -> str: """Remove url hash.""" urlObject, _ = urldefrag(url) return urlunparse(urlObject)
def _remove_md5_fragment(location): if not location: return '' parsed = urlparse(location) if parsed[-1].startswith('md5='): return urlunparse(parsed[:-1] + ('',)) return location
def login_url(login_view, next_url=None, next_field='next'): ''' Creates a URL for redirecting to a login page. If only `login_view` is provided, this will just return the URL for it. If `next_url` is provided, however, this will append a ``next=URL`` parameter to the query string so that the login view can redirect back to that URL. :param login_view: The name of the login view. (Alternately, the actual URL to the login view.) :type login_view: str :param next_url: The URL to give the login view for redirection. :type next_url: str :param next_field: What field to store the next URL in. (It defaults to ``next``.) :type next_field: str ''' if login_view.startswith(('https://', 'http://', '/')): base = login_view else: base = url_for(login_view) if next_url is None: return base parts = list(urlparse(base)) md = url_decode(parts[4]) md[next_field] = make_next_param(base, next_url) parts[4] = url_encode(md, sort=True) return urlunparse(parts)
def _build_url(self, api_method): path = '{}/{}'.format(self.base_url.path, api_method.lstrip('/')) url = self.base_url._replace(path=path) return urlparse.urlunparse(url).rstrip('/')
def url(scheme='', netloc='', path='', params='', query=(), fragment=''): """Construct a URL from individual components. <scheme>://<netloc>/<path>;<params>?<query>#<fragment> Args: schema: "http", "https", etc netloc: aka "host" path: "/path/to/resource". Leading slash is automatic. params: Rarely used; you probably want query. query: May be a dict or a sequence of 2-tuples. fragment: Position on page; usually used only by the browser. """ return urlunparse( (scheme, netloc, path, params, urlencode(query), fragment))
def post(self, url: str, data: Mapping[str, str]) -> None: """ POST the given URL while enforcing a 60 second timeout and handling error pages """ def timeoutHandler(signum, frame): raise TimeoutException("Timeout!") signal.signal(signal.SIGALRM, timeoutHandler) signal.alarm(60) try: parsed = urlparse(url) # In order to POST data, we have to go to an HTML page on the same domain and then use # JS to kick off the post self.driver.get(parsed.scheme + '://' + parsed.netloc) js = ("function post(path) {\n" " var form = document.createElement('form');\n" " form.setAttribute('action', path);\n" " form.setAttribute('method', 'post');\n" "%s" " document.body.appendChild(form);\n" " form.submit();\n" "}\n") fields = '' for index, (key, val) in zip(range(len(data.items())), data.items()): i = str(index) field = (" var a%s = document.createElement('input');\n" " a%s.setAttribute('name', '%s');\n" " a%s.setAttribute('value', '%s');\n" " form.appendChild(a%s);\n") % (i, i, key.replace("'", "\\'"), i, val.replace("'", "\\'"), i) fields += field js = js % fields path = urlunparse(['', '', parsed.path, parsed.params, parsed.query, parsed.fragment]) js += "post('%s');\n" % path self.driver.execute_script(js) except Exception as e: signal.alarm(0) raise e