我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用django.utils.six.moves.urllib.parse.urlencode()。
def next_redirect(request, fallback, **get_kwargs): """ Handle the "where should I go next?" part of comment views. The next value could be a ``?next=...`` GET arg or the URL of a given view (``fallback``). See the view modules for examples. Returns an ``HttpResponseRedirect``. """ next = request.POST.get('next') if not is_safe_url(url=next, host=request.get_host()): next = resolve_url(fallback) if get_kwargs: if '#' in next: tmp = next.rsplit('#', 1) next = tmp[0] anchor = '#' + tmp[1] else: anchor = '' joiner = '&' if '?' in next else '?' next += joiner + urlencode(get_kwargs) + anchor return HttpResponseRedirect(next)
def ping_google(sitemap_url=None, ping_url=PING_URL): """ Alerts Google that the sitemap for the current site has been updated. If sitemap_url is provided, it should be an absolute path to the sitemap for this site -- e.g., '/sitemap.xml'. If sitemap_url is not provided, this function will attempt to deduce it by using urlresolvers.reverse(). """ if sitemap_url is None: try: # First, try to get the "index" sitemap URL. sitemap_url = urlresolvers.reverse('django.contrib.sitemaps.views.index') except urlresolvers.NoReverseMatch: try: # Next, try for the "global" sitemap URL. sitemap_url = urlresolvers.reverse('django.contrib.sitemaps.views.sitemap') except urlresolvers.NoReverseMatch: pass if sitemap_url is None: raise SitemapNotFound("You didn't provide a sitemap_url, and the sitemap URL couldn't be auto-detected.") if not django_apps.is_installed('django.contrib.sites'): raise ImproperlyConfigured("ping_google requires django.contrib.sites, which isn't installed.") Site = django_apps.get_model('sites.Site') current_site = Site.objects.get_current() url = "http://%s%s" % (current_site.domain, sitemap_url) params = urlencode({'sitemap': url}) urlopen("%s?%s" % (ping_url, params))
def ping_google(sitemap_url=None, ping_url=PING_URL): """ Alerts Google that the sitemap for the current site has been updated. If sitemap_url is provided, it should be an absolute path to the sitemap for this site -- e.g., '/sitemap.xml'. If sitemap_url is not provided, this function will attempt to deduce it by using urls.reverse(). """ if sitemap_url is None: try: # First, try to get the "index" sitemap URL. sitemap_url = reverse('django.contrib.sitemaps.views.index') except NoReverseMatch: try: # Next, try for the "global" sitemap URL. sitemap_url = reverse('django.contrib.sitemaps.views.sitemap') except NoReverseMatch: pass if sitemap_url is None: raise SitemapNotFound("You didn't provide a sitemap_url, and the sitemap URL couldn't be auto-detected.") if not django_apps.is_installed('django.contrib.sites'): raise ImproperlyConfigured("ping_google requires django.contrib.sites, which isn't installed.") Site = django_apps.get_model('sites.Site') current_site = Site.objects.get_current() url = "http://%s%s" % (current_site.domain, sitemap_url) params = urlencode({'sitemap': url}) urlopen("%s?%s" % (ping_url, params))
def ping_google(sitemap_url=None, ping_url=PING_URL): """ Alerts Google that the sitemap for the current site has been updated. If sitemap_url is provided, it should be an absolute path to the sitemap for this site -- e.g., '/sitemap.xml'. If sitemap_url is not provided, this function will attempt to deduce it by using urls.reverse(). """ sitemap_full_url = _get_sitemap_full_url(sitemap_url) params = urlencode({'sitemap': sitemap_full_url}) urlopen('%s?%s' % (ping_url, params))
def replace_query_param(url, key, val): """ Given a URL and a key/val pair, set or replace an item in the query parameters of the URL, and return the new URL. """ (scheme, netloc, path, query, fragment) = urlparse.urlsplit(url) query_dict = urlparse.parse_qs(query, keep_blank_values=True) query_dict[key] = [val] query = urlparse.urlencode(sorted(list(query_dict.items())), doseq=True) return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
def remove_query_param(url, key): """ Given a URL and a key/val pair, remove an item in the query parameters of the URL, and return the new URL. """ (scheme, netloc, path, query, fragment) = urlparse.urlsplit(url) query_dict = urlparse.parse_qs(query, keep_blank_values=True) query_dict.pop(key, None) query = urlparse.urlencode(sorted(list(query_dict.items())), doseq=True) return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
def encode_cursor(self, cursor): """ Given a Cursor instance, return an url with encoded cursor. """ tokens = {} if cursor.offset != 0: tokens['o'] = str(cursor.offset) if cursor.reverse: tokens['r'] = '1' if cursor.position is not None: tokens['p'] = cursor.position querystring = urlparse.urlencode(tokens, doseq=True) encoded = b64encode(querystring.encode('ascii')).decode('ascii') return replace_query_param(self.base_url, self.cursor_query_param, encoded)
def _created_proxy_response(self, request, path): request_payload = request.body self.log.debug("Request headers: %s", self.request_headers) path = quote_plus(path.encode('utf8'), QUOTE_SAFE) request_url = (self.upstream + '/' if path and self.upstream[-1] != '/' else self.upstream) + path self.log.debug("Request URL: %s", request_url) if request.GET: get_data = encode_items(request.GET.lists()) request_url += '?' + urlencode(get_data) self.log.debug("Request URL: %s", request_url) try: proxy_response = self.http.urlopen( request.method, request_url, redirect=False, retries=self.retries, headers=self.request_headers, body=request_payload, decode_content=False, preload_content=False ) self.log.debug("Proxy response header: %s", proxy_response.getheaders()) except urllib3.exceptions.HTTPError as error: self.log.exception(error) raise return proxy_response
def replace_query_param(url, key, val): """ Given a URL and a key/val pair, set or replace an item in the query parameters of the URL, and return the new URL. """ (scheme, netloc, path, query, fragment) = urlparse.urlsplit(url) query_dict = urlparse.parse_qs(query) query_dict[key] = [val] query = urlparse.urlencode(sorted(list(query_dict.items())), doseq=True) return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
def remove_query_param(url, key): """ Given a URL and a key/val pair, remove an item in the query parameters of the URL, and return the new URL. """ (scheme, netloc, path, query, fragment) = urlparse.urlsplit(url) query_dict = urlparse.parse_qs(query) query_dict.pop(key, None) query = urlparse.urlencode(sorted(list(query_dict.items())), doseq=True) return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
def get_login_url(**query): return reverse('login') + '?' + parse.urlencode(query)
def __str__(self): return reverse("graphite-render") + "?" + urlencode(self.args, True)
def raw_metric_query(query): """Runs a query for metric information against Graphite's REST API. :param query: A search string, e.g. "nav.devices.some-gw_example_org.*" :returns: A list of matching metrics, each represented by a dict. """ base = CONFIG.get("graphiteweb", "base") url = urljoin(base, "/metrics/find") query = urlencode({'query': query}) url = "%s?%s" % (url, query) req = Request(url) try: response = urlopen(req) return json.load(response) except URLError as err: raise errors.GraphiteUnreachableError( "{0} is unreachable".format(base), err) except ValueError: # response could not be decoded return [] finally: try: response.close() except NameError: pass
def ping_google(sitemap_url=None, ping_url=PING_URL): """ Alerts Google that the sitemap for the current site has been updated. If sitemap_url is provided, it should be an absolute path to the sitemap for this site -- e.g., '/sitemap.xml'. If sitemap_url is not provided, this function will attempt to deduce it by using urlresolvers.reverse(). """ if sitemap_url is None: try: # First, try to get the "index" sitemap URL. sitemap_url = urlresolvers.reverse('django.contrib.sitemaps.views.index') except urlresolvers.NoReverseMatch: try: # Next, try for the "global" sitemap URL. sitemap_url = urlresolvers.reverse('django.contrib.sitemaps.views.sitemap') except urlresolvers.NoReverseMatch: pass if sitemap_url is None: raise SitemapNotFound("You didn't provide a sitemap_url, and the sitemap URL couldn't be auto-detected.") from django.contrib.sites.models import Site current_site = Site.objects.get_current() url = "http://%s%s" % (current_site.domain, sitemap_url) params = urlencode({'sitemap':url}) urlopen("%s?%s" % (ping_url, params))
def add_pushbullet(request): if settings.PUSHBULLET_CLIENT_ID is None: raise Http404("pushbullet integration is not available") if "code" in request.GET: code = request.GET.get("code", "") if len(code) < 8: return HttpResponseBadRequest() result = requests.post("https://api.pushbullet.com/oauth2/token", { "client_id": settings.PUSHBULLET_CLIENT_ID, "client_secret": settings.PUSHBULLET_CLIENT_SECRET, "code": code, "grant_type": "authorization_code" }) doc = result.json() if "access_token" in doc: channel = Channel(kind="pushbullet") channel.user = request.team.user channel.value = doc["access_token"] channel.save() channel.assign_all_checks() messages.success(request, "The Pushbullet integration has been added!") else: messages.warning(request, "Something went wrong") return redirect("hc-channels") redirect_uri = settings.SITE_ROOT + reverse("hc-add-pushbullet") authorize_url = "https://www.pushbullet.com/authorize?" + urlencode({ "client_id": settings.PUSHBULLET_CLIENT_ID, "redirect_uri": redirect_uri, "response_type": "code" }) ctx = { "page": "channels", "authorize_url": authorize_url } return render(request, "integrations/add_pushbullet.html", ctx)
def add_pushover(request): if settings.PUSHOVER_API_TOKEN is None or settings.PUSHOVER_SUBSCRIPTION_URL is None: raise Http404("pushover integration is not available") if request.method == "POST": # Initiate the subscription nonce = get_random_string() request.session["po_nonce"] = nonce failure_url = settings.SITE_ROOT + reverse("hc-channels") success_url = settings.SITE_ROOT + reverse("hc-add-pushover") + "?" + urlencode({ "nonce": nonce, "prio": request.POST.get("po_priority", "0"), }) subscription_url = settings.PUSHOVER_SUBSCRIPTION_URL + "?" + urlencode({ "success": success_url, "failure": failure_url, }) return redirect(subscription_url) # Handle successful subscriptions if "pushover_user_key" in request.GET: if "nonce" not in request.GET or "prio" not in request.GET: return HttpResponseBadRequest() # Validate nonce if request.GET["nonce"] != request.session.get("po_nonce"): return HttpResponseForbidden() # Validate priority if request.GET["prio"] not in ("-2", "-1", "0", "1", "2"): return HttpResponseBadRequest() # All looks well-- del request.session["po_nonce"] if request.GET.get("pushover_unsubscribed") == "1": # Unsubscription: delete all Pushover channels for this user Channel.objects.filter(user=request.user, kind="po").delete() return redirect("hc-channels") else: # Subscription user_key = request.GET["pushover_user_key"] priority = int(request.GET["prio"]) return do_add_channel(request, { "kind": "po", "value": "%s|%d" % (user_key, priority), }) # Show Integration Settings form ctx = { "page": "channels", "po_retry_delay": td(seconds=settings.PUSHOVER_EMERGENCY_RETRY_DELAY), "po_expiration": td(seconds=settings.PUSHOVER_EMERGENCY_EXPIRATION), } return render(request, "integrations/add_pushover.html", ctx)
def get_metric_data(target, start="-5min", end="now"): """ Retrieves raw datapoints from a graphite target for a given period of time. :param target: A metric path string or a list of multiple metric paths :param start: A start time specification that Graphite will accept. :param end: An end time specification that Graphite will accept. :returns: A raw, response from Graphite. Normally a list of dicts that represent the names and datapoints of each matched target, like so:: [{'target': 'x', 'datapoints': [(value, timestamp), ...]}] """ base = CONFIG.get("graphiteweb", "base") url = urljoin(base, "/render/") # What does Graphite accept of formats? Lets check if the parameters are # datetime objects and try to force a format then if isinstance(start, datetime): start = start.strftime('%H:%M%Y%m%d') if isinstance(end, datetime): end = end.strftime('%H:%M%Y%m%d') query = { 'target': target, 'from': start, 'until': end, 'format': 'json', } query = urlencode(query, True) _logger.debug("get_metric_data%r", (target, start, end)) req = Request(url, data=query) try: response = urlopen(req) json_data = json.load(response) _logger.debug("get_metric_data: returning %d results", len(json_data)) return json_data except HTTPError as err: _logger.error("Got a 500 error from graphite-web when fetching %s" "with data %s", err.url, query) _logger.error("Graphite output: %s", err.fp.read()) raise errors.GraphiteUnreachableError( "{0} is unreachable".format(base), err) except URLError as err: raise errors.GraphiteUnreachableError( "{0} is unreachable".format(base), err) except ValueError: # response could not be decoded return [] finally: try: response.close() except NameError: pass