我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用django.http.QueryDict()。
def test_change_password_sets_logout_reason(self): api.keystone.user_update_own_password(IsA(http.HttpRequest), 'oldpwd', 'normalpwd').AndReturn(None) self.mox.ReplayAll() formData = {'method': 'PasswordForm', 'current_password': 'oldpwd', 'new_password': 'normalpwd', 'confirm_password': 'normalpwd'} res = self.client.post(INDEX_URL, formData, follow=False) self.assertRedirectsNoFollow(res, settings.LOGOUT_URL) self.assertIn('logout_reason', res.cookies) self.assertEqual(res.cookies['logout_reason'].value, "Password changed. Please log in again to continue.") scheme, netloc, path, query, fragment = urlsplit(res.url) redirect_response = res.client.get(path, http.QueryDict(query)) self.assertRedirectsNoFollow(redirect_response, settings.LOGIN_URL)
def redirect_to_login(next, login_url=None, redirect_field_name=REDIRECT_FIELD_NAME): """ Redirects the user to the login page, passing the given 'next' page """ resolved_url = resolve_url(login_url or settings.LOGIN_URL) login_url_parts = list(urlparse(resolved_url)) if redirect_field_name: querystring = QueryDict(login_url_parts[4], mutable=True) querystring[redirect_field_name] = next login_url_parts[4] = querystring.urlencode(safe='/') return HttpResponseRedirect(urlunparse(login_url_parts)) # 4 views for password reset: # - password_reset sends the mail # - password_reset_done shows a success message for the above # - password_reset_confirm checks the link the user clicked and # prompts for a new password # - password_reset_complete shows a success message for the above
def reject_suggestion(request, unit, suggid): try: sugg = unit.suggestion_set.get(id=suggid) except ObjectDoesNotExist: raise Http404 # In order to be able to reject a suggestion, users have to either: # 1. Have `review` rights, or # 2. Be the author of the suggestion being rejected if (not check_permission('review', request) and (request.user.is_anonymous or request.user != sugg.user)): raise PermissionDenied(_('Insufficient rights to access review mode.')) unit.reject_suggestion(sugg, request.translation_project, request.user) r_data = QueryDict(request.body) if "comment" in r_data and r_data["comment"]: handle_suggestion_comment(request, sugg, unit, r_data["comment"], "rejected") json = { 'udbid': unit.id, 'sugid': suggid, 'user_score': request.user.public_score, } return JsonResponse(json)
def handle_redirect_to_login(request, **kwargs): login_url = kwargs.get("login_url") redirect_field_name = kwargs.get("redirect_field_name") next_url = kwargs.get("next_url") if login_url is None: login_url = settings.ACCOUNT_LOGIN_URL if next_url is None: next_url = request.get_full_path() try: login_url = urlresolvers.reverse(login_url) except urlresolvers.NoReverseMatch: if callable(login_url): raise if "/" not in login_url and "." not in login_url: raise url_bits = list(urlparse.urlparse(login_url)) if redirect_field_name: querystring = QueryDict(url_bits[4], mutable=True) querystring[redirect_field_name] = next_url url_bits[4] = querystring.urlencode(safe="/") return HttpResponseRedirect(urlparse.urlunparse(url_bits))
def parse(self, stream, media_type=None, parser_context=None): """ Parses the incoming bytestream as a multipart encoded form, and returns a DataAndFiles object. `.data` will be a `QueryDict` containing all the form parameters. `.files` will be a `QueryDict` containing all the form files. """ parser_context = parser_context or {} request = parser_context['request'] encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET) meta = request.META.copy() meta['CONTENT_TYPE'] = media_type upload_handlers = request.upload_handlers try: parser = DjangoMultiPartParser(meta, stream, upload_handlers, encoding) data, files = parser.parse() return DataAndFiles(data, files) except MultiPartParserError as exc: raise ParseError('Multipart form parse error - %s' % six.text_type(exc))
def prepare_facet_data(self, aggregations: AggResponse, get_args: QueryDict) -> Dict[str, List[Dict[str, str]]]: resp: DataDict = {} for area, agg in aggregations.to_dict().items(): resp[area] = [] for item in aggregations[area].buckets: url_args, is_active = self.facet_url_args( url_args=deepcopy(get_args.dict()), field_name=area, field_value=str(item.key) ) resp[area].append({ 'url_args': urlencode(url_args), 'name': item.key, 'count': item.doc_count, 'is_active': is_active }) return resp
def handle_redirect_to_login(request, **kwargs): login_url = kwargs.get("login_url") redirect_field_name = kwargs.get("redirect_field_name") next_url = kwargs.get("next_url") if login_url is None: login_url = settings.ACCOUNT_LOGIN_URL if next_url is None: next_url = request.get_full_path() try: login_url = urlresolvers.reverse(login_url) except urlresolvers.NoReverseMatch: if callable(login_url): raise if "/" not in login_url and "." not in login_url: raise url_bits = list(urlparse(login_url)) if redirect_field_name: querystring = QueryDict(url_bits[4], mutable=True) querystring[redirect_field_name] = next_url url_bits[4] = querystring.urlencode(safe="/") return HttpResponseRedirect(urlunparse(url_bits))
def test_post_data(django_elasticapm_client): request = WSGIRequest(environ={ 'wsgi.input': compat.BytesIO(), 'REQUEST_METHOD': 'POST', 'SERVER_NAME': 'testserver', 'SERVER_PORT': '80', 'CONTENT_TYPE': 'application/json', 'ACCEPT': 'application/json', }) request.POST = QueryDict("x=1&y=2") django_elasticapm_client.capture('Message', message='foo', request=request) assert len(django_elasticapm_client.events) == 1 event = django_elasticapm_client.events.pop(0)['errors'][0] assert 'request' in event['context'] request = event['context']['request'] assert request['method'] == 'POST' assert request['body'] == {'x': '1', 'y': '2'}
def query_string(context, key, value): """ For adding/replacing a key=value pair to the GET string for a URL. eg, if we're viewing ?p=3 and we do {% url_replace order 'taken' %} then this returns "p=3&order=taken" And, if we're viewing ?p=3&order=uploaded and we do the same thing, we get the same result (ie, the existing "order=uploaded" is replaced). Expects the request object in context to do the above; otherwise it will just return a query string with the supplied key=value pair. """ try: request = context['request'] args = request.GET.copy() except KeyError: args = QueryDict('').copy() args[key] = value return args.urlencode()
def delete(self, request, imsi=None): """Handle the deletion of Pending Credit Updates.""" user_profile = UserProfile.objects.get(user=request.user) network = user_profile.network request_data = QueryDict(request.body) if "pending_id" not in request_data: return HttpResponseBadRequest() pending_id = request_data['pending_id'] try: update = PendingCreditUpdate.objects.get(uuid=pending_id) except PendingCreditUpdate.DoesNotExist: return HttpResponseBadRequest() if update.subscriber.network != network: return HttpResponseBadRequest() update.delete() return HttpResponse()
def export_menu(request, username, id_string): req = request.REQUEST export_type = req.get('type', None) if export_type: lang = req.get('lang', None) hierarchy_in_labels = req.get('hierarchy_in_labels') group_sep = req.get('group_sep', '/') q = QueryDict('', mutable=True) q['lang'] = req.get('lang') q['hierarchy_in_labels'] = req.get('hierarchy_in_labels') q['group_sep'] = req.get('group_sep', '/') if export_type == "xlsx": url = reverse('formpack_xlsx_export', args=(username, id_string)) return redirect(url + '?' + q.urlencode()) if export_type == "csv": url = reverse('formpack_csv_export', args=(username, id_string)) return redirect(url + '?' + q.urlencode()) context = build_export_context(request, username, id_string) return render(request, 'survey_report/export_menu.html', context)
def __init__(self, request): self.request = request self.user = request.user self.method = request.method self.POST = request.POST self.submit = 'task_management-action' in self.POST self.save = self.POST.get('task_management-save') if request.method == 'POST' and not self.task is None: if self.submit or self.save: post_data = self.POST.get('task_management-post_data', '') # QueryDict only reliably works with bytestrings, so we encode `post_data` again (see #2978). request.POST = QueryDict(post_data.encode('utf-8'), encoding='utf-8') # if there is no post_data, we pretend that the request is a GET request, so # forms in the view don't show errors if not post_data: request.method = 'GET'
def signup_view(request): if request.user.is_authenticated(): return redirect(reverse('home_projects')) if request.method == 'POST': post = request.POST form = UserCreationForm(post) if form.is_valid(): form.save() new_post = QueryDict(mutable=True) new_post.update(post) new_post['password'] = post['password1'] request.POST = new_post return login_view(request) else: log.warn(form.is_valid()) log.warn(form.errors) else: form = UserCreationForm() token = {} token.update(csrf(request)) token['form'] = form return render(request, 'signup.html', token)
def get_query_string(request, **kwargs): """ Conditional query string creator. Only useful for boolean parameters. """ qs = QueryDict('', mutable=True) for name, condition in iteritems(kwargs): try: value = request.GET[name] except KeyError: pass else: if condition and value: qs[name] = 1 return qs
def get_draft_url(url): """ Return the given URL with a draft mode HMAC in its querystring. """ if verify_draft_url(url): # Nothing to do. Already a valid draft URL. return url # Parse querystring and add draft mode HMAC. url = urlparse.urlparse(url) salt = get_random_string(5) # QueryDict requires a bytestring as its first argument query = QueryDict(force_bytes(url.query), mutable=True) query['edit'] = '%s:%s' % (salt, get_draft_hmac(salt, url.path)) # Reconstruct URL. parts = list(url) parts[4] = query.urlencode(safe=':') return urlparse.urlunparse(parts)
def post(self, request): data = request.data # Clean emoji for now if isinstance(data, dict) and data.get('Body'): if isinstance(data, QueryDict): data = data.dict() data['Body'] = clean_text(data['Body']) PhoneReceivedRaw.objects.create(data=data) if data.get('Direction') == 'inbound': if data.get('CallStatus') == 'ringing': # incoming voice call text = getattr(settings, 'UNIVERSAL_NOTIFICATIONS_TWILIO_CALL_RESPONSE_DEFAULT', '<?xml version="1.0" encoding="UTF-8"?>' + '<Response>' + '<Say>Hello, thanks for calling. ' + 'To leave a message wait for the tone.</Say>' + '<Record timeout="30" />' '</Response>') return Response(SafeString(text), content_type='text/xml') return Response({}, status=status.HTTP_202_ACCEPTED)
def test_missing_credentials(self): """ Test for missing credentials. """ #self.skipTest("Temporarily skipped") # Get user's credentials. username, password, email = self._prompt(need_email=True) # Setup request request = self.factory.get('django-pam:login') request.user = AnonymousUser() kwargs = {} data = kwargs.setdefault('data', QueryDict(mutable=True)) data.appendlist('username', username) data.appendlist('password', '') data.appendlist('email', email) form = AuthenticationForm(**kwargs) msg = "kwargs: {}, errors: {}".format(kwargs, form.errors.as_data()) self.assertFalse(form.is_valid(), msg) # Check that we have a password and __all__ error messages. self.assertTrue('password' in form.errors.as_data(), msg) self.assertTrue('__all__' in form.errors.as_data(), msg)
def _run_import_view(self, postfile): with factory.load_test_file(postfile) as poststring: POST = QueryDict(poststring.read()) # mocking redis and celery task, to only test the view itself with patch('main.views.redis.ScratchStorage') as MockStorage: with patch('main.views.import_table_task.delay') as mock_task: storage = MockStorage.return_value storage.save.return_value = 'randomkey' result = MagicMock() result.id = '00000000-0000-0000-0000-000000000001' mock_task.return_value = result # fake the request response = self.client.post(self._import_url(), data=POST) # assert calls to redis and celery storage.save.assert_called() mock_task.assert_called_with(self.target_study.pk, self.user.pk, 'randomkey') self.assertEqual(self._assay_count(), 0) # view does not change assays return response
def create_time_select_form(self, payload, **kwargs): """ Constructs a form to select the timepoint of data to export to SBML and the output filename. Depends on measurement forms already existing. :param payload: the QueryDict from POST attribute of a request :param kwargs: any additional kwargs to pass to ALL forms; see Django Forms documentation. :return: a SbmlExportSelectionForm """ # error if no range or if max < min if self._min is None or self._max is None or self._max < self._min: return None points = self._points t_range = Range(min=self._min, max=self._max) if points is not None: points = sorted(points) time_form = SbmlExportSelectionForm( t_range=t_range, points=points, line=self._selection.lines[0], data=payload, **kwargs ) time_form.sbml_warnings.extend(self._export_errors) return time_form
def update_bound_data_with_defaults(self): """ Forces data bound to the form to update to default values. """ super(SbmlExportOdForm, self).update_bound_data_with_defaults() if self.is_bound: # create mutable copy of QueryDict replace_data = QueryDict(mutable=True) replace_data.update(self.data) # set initial gcdw_conversion values cfield = self.fields['gcdw_conversion'] if cfield.initial: name = self.add_prefix('gcdw_conversion') for i, part in enumerate(cfield.widget.decompress(cfield.initial)): replace_data['%s_%s' % (name, i)] = part # set initial gcdw_default value dfield = self.fields['gcdw_default'] replace_data[self.add_prefix('gcdw_default')] = '%s' % dfield.initial self.data = replace_data
def clean(self): data = super(WorklistDefaultsForm, self).clean() # this is SUPER GROSS, but apparently the only way to change the form output from here is # to muck with the source data, by poking the undocumented _mutable property of QueryDict self.data._mutable = True # if no incoming data for field, fall back to default (initial) instead of empty string for name, field in self._created_fields.items(): key = self.add_prefix(name) value = field.widget.value_from_datadict(self.data, self.files, key) if not value: value = field.initial self.data[key] = data[key] = value self._lookup[name].default_value = value # flip back _mutable property self.data._mutable = False return data
def _init_options(self): # sometimes self.data is a plain dict instead of a QueryDict data = QueryDict(mutable=True) data.update(self.data) # update available choices based on instances in self._selection if self._selection and hasattr(self._selection, 'lines'): columns = self._selection.line_columns self.fields['line_meta'].choices = map(table.ColumnChoice.get_field_choice, columns) self.fields['line_meta'].coerce = table.ColumnChoice.coerce(columns) # set all _meta options if no list of options was passed in for meta in ['study_meta', 'line_meta', 'protocol_meta', 'assay_meta', 'measure_meta']: if self.initial.get(meta, None) == '__all__': self.initial.update({ meta: [choice[0] for choice in self.fields[meta].choices], }) # update incoming data with default initial if not already set if meta not in data and 'layout' not in data: data.setlist(meta, self.initial.get(meta, [])) self.data = data
def assertURLEqual(self, url, expected, parse_qs=False): """ Given two URLs, make sure all their components (the ones given by urlparse) are equal, only comparing components that are present in both URLs. If `parse_qs` is True, then the querystrings are parsed with QueryDict. This is useful if you don't want the order of parameters to matter. Otherwise, the query strings are compared as-is. """ fields = ParseResult._fields for attr, x, y in zip(fields, urlparse(url), urlparse(expected)): if parse_qs and attr == 'query': x, y = QueryDict(x), QueryDict(y) if x and y and x != y: self.fail("%r != %r (%s doesn't match)" % (url, expected, attr))
def get_context_from_bindings(code, recipient, bindings): """Finalizes the bindings and create a Context for templating """ if code not in settings.EMAIL_TEMPLATES: raise ImproperlyConfigured("Mail '%s' cannot be found") url = settings.EMAIL_TEMPLATES[code] res = dict(bindings) res['EMAIL'] = recipient qs = QueryDict(mutable=True) qs.update(res) # We first initialize the LINK_BROWSER variable as "#" (same page) qs['LINK_BROWSER'] = "#" # we generate the final browser link and add it to result dictionary res['LINK_BROWSER'] = f"{url}?{qs.urlencode()}" return res
def login(request): if request.method == 'GET': return render(request, 'core/login.html', {'alert': request.session.get('alert', False)}) elif request.method == 'POST': data = QueryDict(request.body) mysnu_username = data['username'] mysnu_password = data['password'] result = dict() try: result = crawlCourse(mysnu_username, mysnu_password) except Exception: request.session['alert'] = True return redirect('login') request.session['result'] = result request.session.set_expiry(3600) return redirect('courses')