我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.http.response.HttpResponse()。
def get(self, request, *args, **kwargs): queryset = self.get_queryset() field_names = self.get_fields(queryset) response = HttpResponse(content_type='text/csv') filename = self.get_filename(queryset) response['Content-Disposition'] = 'attachment; filename="{}.csv"'.format(filename) writer = csv.writer(response, **self.get_csv_writer_fmtparams()) if self.specify_separator: response.write('sep={}{}'.format(writer.dialect.delimiter, writer.dialect.lineterminator)) if self.header: writer.writerow([self.get_header_name(queryset.model, field_name) for field_name in list(field_names)]) for obj in queryset: writer.writerow([self.get_field_value(obj, field) for field in field_names]) return response
def _validation_error_handler(e): """ Handle validation errors. This can be overridden via the settings. """ if isinstance(e, ValidationError): message = "Validation failed. {}".format(e.message) error_response = { "message": message, "code": e.validator } logger.info(message) error_resp = HttpResponse(error_response, status=422) else: raise FatalException("Malformed JSON in the request.", 400) return error_resp
def post(self, request, *args, **kwargs): _page = get_object_or_404(Page, pk=request.POST.get('page_id')) def update_indicator(): indicator = PageIndicator.objects.get(editor=request.user, edited_on__gte=self.time_past, page=_page) indicator.edited_on = self.now indicator.save() # To avoid uncontrolled creation of PageIndicators delete any for this user/page combination which # is too old. PageIndicator.objects.filter(editor=request.user, edited_on__lte=self.time_past, page=_page).delete() try: update_indicator() except PageIndicator.DoesNotExist: PageIndicator.objects.create(editor=request.user, edited_on=self.now, started_editing=self.now, page=_page) return HttpResponse(status=200)
def log_tests(request): es = get_elastic() if not es: return HttpResponse('not able to connect to elasticsearch') res = es.search(index="message-log", doc_type='message', body={ "size": 0, "aggs" : { "test_ids" : { "terms" : { "field" : "test_id", "size" : 500 } } }}) test_ids = [] for bucket in res['aggregations']['test_ids']['buckets']: test_id = bucket['key'] test_ids.append({'id':'test_id_'+test_id, 'name':test_id}) context = { 'groups' : test_ids } template = loader.get_template('golem/log.html') return HttpResponse(template.render(context,request))
def remove_home_recommend_handler(request): result = base_result() if request.method == "GET": result["code"] = 500 result["message"] = "get method is not supported!" return result article_id = request.POST.get("article_id") try: recommend = HomeRecommend.objects.get(share_id=article_id) recommend.status = 0 recommend.save() except HomeRecommend.DoesNotExist: result["code"] = 500 result["message"] = "article is not exists!" return HttpResponse(json.dumps(result))
def handle(self, *args, **options): request = RequestFactory().get("/") response = HttpResponse() user = User() n = 0 print(TEMPLATE_A) for pattern, age, cache_type, dc in rules: policy = POLICIES[cache_type] policy(request, response, user, age) if n == 0: print("if", end="") else: print("else if", end="") print(""" (req.url ~ "%s") {""" % pattern.pattern) print("""set req.http.Hash-Cookies = "%s";""" % response["X-Hash-Cookies"]) print("}") n += 1 print(TEMPLATE_B)
def csv_download(request): """ Creates a CSV file using all of the applications to the users organization. """ apps = get_all_applications_for_users_org(request.user) data = ApplicationCSVDownloadSerializer(apps, many=True).data fields = [] for datum in data: these_fields = list(datum.keys()) # Finds the largest set of fields and uses it # There should not be a case where a smaller set of fields would have # a field not in a larger one. if len(these_fields) > len(fields): fields = these_fields response = HttpResponse(content_type='text/csv') csv_writer = csv.DictWriter(response, fieldnames=fields) csv_writer.writeheader() csv_writer.writerows(data) file = 'all_applications_to_%s_%s.csv' % ( request.user.profile.organization.slug, timezone.now().strftime('%m-%d-%Y'), ) response['Content-Disposition'] = 'attachment; filename="%s"' % file return response
def get(self, request, *args, **kwargs): code = request.GET.get('code', '') if not code: raise Http404 config = SocialConfig.get_solo() redirect_uri = self.request.build_absolute_uri(resolve_url('admin_social_networks:instagram_token')) response = requests.post( 'https://api.instagram.com/oauth/access_token', data={ 'grant_type': 'authorization_code', 'client_id': config.instagram_client_id, 'client_secret': config.instagram_client_secret, 'redirect_uri': redirect_uri, 'code': code, } ) answer = response.json() if answer and 'access_token' in answer: SocialConfig.objects.update(instagram_access_token=answer['access_token']) add_message(request, SUCCESS, _('Instagram access_token updated successfully!')) return redirect('admin:social_networks_socialconfig_change') else: return HttpResponse(response.text)
def fcm_register(request): args = json.loads(request.body) slack_token = args.get('slack_token') reg_id = args.get('reg_id') url = 'https://slack.com/api/auth.test' r = requests.get(url, params={'token': slack_token}) slack_user_id = r.json().get('user_id') if not slack_user_id: logger.warning('Couldn\'t validate slack token for FCM registration') return HttpResponse('Could not validate slack token', status=400) FcmSub.objects.update_or_create(reg_id=reg_id, defaults={'slack_user_id': slack_user_id}) logger.warning('FCM registration complete for %s %s' % (slack_user_id, reg_id)) return HttpResponse('ok')
def slack_oauth(request): code = request.GET['code'] params = { 'code': code, 'client_id': settings.SLACK_CLIENT_ID, "client_secret": settings.SLACK_CLIENT_SECRET } url = 'https://slack.com/api/oauth.access' json_response = requests.get(url, params) data = json.loads(json_response.text) Team.objects.get_or_create( name=data['team_name'], team_id=data['team_id'], bot_user_id=data['bot']['bot_user_id'], bot_access_token=data['bot']['bot_access_token'] ) return HttpResponse('Bot added to your Slack team!')
def post(self, request, *args, **kwargs): try: incoming_message = json.loads(self.request.body.decode('utf-8')) for entry in incoming_message['entry']: for message in entry['messaging']: if 'message' in message: text = message['message']['text'] psid = message['sender']['id'] print(text, psid) try: user = User.objects.get(psid=psid) if user.valid: if user.profile_completed: analyseMessage.delay(psid, text) else:#get user profile completed completeProfile.delay(psid, text) else: gotInactiveUser.delay(psid) except Exception as e: # print('\nnew user,, yaaayyye') newUser.delay(psid) except Exception as e: # print(incoming_message) print("Exception: ", e) return HttpResponse()
def get(self, request): """ Get configuration from settings, format it and return """ # get settings and transform it to json config = json.dumps({ 'VERSION': pkg_resources.get_distribution("Vaultier").version, 'raven_key': settings.VAULTIER.get('raven_key'), 'invitation_lifetime': settings.VAULTIER.get( 'invitation_lifetime'), 'registration_allow': settings.VAULTIER.get('registration_allow'), 'registration_enforce': not bool(User.objects.all().count()), # dev 'dev_shared_key': settings.VAULTIER.get('dev_shared_key'), 'dev_shared_key_private': settings.VAULTIER.get('dev_shared_key_private'), 'dev_shared_key_public': settings.VAULTIER.get('dev_shared_key_public'), 'dev_show_token': settings.VAULTIER.get('dev_show_token'), 'dev_email': settings.VAULTIER.get('dev_email') }) return HttpResponse(config, content_type='application/json')
def get(self, request, name): db_file = get_object_or_404(DBFile.objects.defer('content'), name=name) mtime = time.mktime(db_file.updated_on.timetuple()) modified = was_modified_since( header=self.request.META.get('HTTP_IF_MODIFIED_SINCE'), mtime=mtime, size=db_file.size) if not modified: return HttpResponseNotModified() content_type, encoding = mimetypes.guess_type(db_file.name) content_type = content_type or 'application/octet-stream' response = HttpResponse(db_file.content, content_type=content_type) response['Last-Modified'] = http_date(mtime) response['Content-Length'] = db_file.size if encoding: response['Content-Encoding'] = encoding return response
def uploadfile(file, filename, upload_dir): if not os.path.exists(upload_dir): try : os.mkdir(upload_dir) except : os.makedirs(upload_dir) ''' if self.file.size > 5 * 1024 * 1024 : return HttpResponse('?????????5M') ''' filename = upload_dir + filename with open(filename, 'wb+') as dest: for chunk in file.chunks(): dest.write(chunk) return filename
def run(self, query, query_args): """Genera una respuesta CSV, con columnas (indice tiempo, serie1, serie2, ...) y un dato por fila """ # Saco metadatos, no se usan para el formato CSV query.set_metadata_config(constants.METADATA_NONE) header = query_args.get(constants.PARAM_HEADER, constants.API_DEFAULT_VALUES[constants.PARAM_HEADER]) series_ids = query.get_series_ids(how=header) data = query.run()['data'] response = HttpResponse(content_type='text/csv') content = 'attachment; filename="{}"' response['Content-Disposition'] = content.format(constants.CSV_RESPONSE_FILENAME) writer = unicodecsv.writer(response) header = [settings.INDEX_COLUMN] + series_ids writer.writerow(header) for row in data: writer.writerow(row) return response
def post(self, request, *args, **kwargs): # Converts the text payload into a python dictionary incoming_message = json.loads(self.request.body.decode('utf-8')) # Facebook recommends going through every entry since they might send # multiple messages in a single call during high load for entry in incoming_message['entry']: for message in entry['messaging']: # Check to make sure the received call is a message call # This might be delivery, optin, postback for other events if 'message' in message: # Print the message to the terminal pprint(message) # Assuming the sender only sends text. Non-text messages like stickers, audio, pictures # are sent as attachments and must be handled accordingly. post_facebook_message(message['sender']['id'], message['message']['text']) return HttpResponse()
def add_question(request): if request.method == "POST": question = request.POST.get("question") option1 = request.POST.get("option1") option2 = request.POST.get("option2") option3 = request.POST.get("option3") option4 = request.POST.get("option4") answer = request.POST.get("answer") exam = request.POST.get("exam") q = Question() q.question = question q.option1 = option1 q.option2 = option2 q.option3 = option3 q.option4 = option4 q.answer = answer q.exam = Exam.objects.get(pk=int(exam)) q.save() return HttpResponse("success") #viewsets for rest_framework
def get_ss_qr(request): if request.user.is_anonymous(): return JsonResponse({'error': 'unauthorized'}) if not hasattr(request.user, 'ss_user'): return JsonResponse({'error': 'no linked shadowsocks account'}) ss_user = request.user.ss_user if request.GET.get('nid'): try: node = Node.objects.get(pk=request.GET.get('nid')) except Node.DoesNotExist: return JsonResponse({'error': 'node not exist'}) else: node = Node.objects.all().order_by('-weight') if node: node = node[0] else: return JsonResponse({'error': 'no node at all'}) password = '{}:{}@{}:{}'.format(node.method, ss_user.password, node.server, ss_user.port) img = qrcode.make('ss://{}'.format(base64.b64encode(bytes(password, 'utf8')).decode('ascii'))) response = HttpResponse(content_type="image/png") img.save(response) return response
def device(request): c = statsd.StatsClient('localhost', 8125) c.incr('subscribe.success') c.gauge('total.subscribe.success', 1, delta=True) device_id = request.body.decode("utf-8") if check_uuid.match(device_id) is None: return HttpResponse("", status=400) user_device, created = UserDevice.objects.get_or_create(device_id=device_id) if not created: if not user_device.verified: send_verify_notification.delay(device_id, 0) return HttpResponse(json.dumps({'verified': user_device.verified}), status=200, content_type="application/json") user_device.verified = False user_device.verification_key = uuid4().hex user_device.save() # TODO send verification key via PUSH send_verify_notification.delay(device_id, 0) return HttpResponse(json.dumps({}), status=201, content_type="application/json")
def verify(request, key=None): c = statsd.StatsClient('localhost', 8125) if not key: c.incr('verify.device.failed') c.gauge('total.verify.device.failed', 1, delta=True) return HttpResponse("", status=400) try: user_device = UserDevice.objects.get(verification_key=key) user_device.verified = True user_device.save() except UserEmail.DoesNotExist: c.incr('verify.device.failed') c.gauge('total.verify.device.failed', 1, delta=True) return HttpResponse("", status=400) c.incr('verify.device.success') c.gauge('total.verify.device.success', 1, delta=True) return HttpResponse("")
def post(self, request, *args, **kwargs): content = request.POST.get('content') if content: response = md2html(content) else: response = b"" return HttpResponse(response)
def greet(request, greeting, greetee): raise ExceptionalResponse(HttpResponse('oh no', status=400))
def twitter_tpf_server(request): tpq = TriplePatternQuery(request.GET.get('page', '1'), request.GET.get('subject'), request.GET.get('predicate'), request.GET.get('object')) fragment = Fragment() Odmtp(TrimmerXr2rmlTwitter(), Tp2QueryTwitter(), MapperTwitterXr2rml()).match(tpq, fragment, request) response = HttpResponse( fragment.serialize(), content_type='application/trig; charset=utf-8') response['Content-Disposition'] = 'attachment; filename="fragment.trig"' response['Access-Control-Allow-Origin'] = '*' return response
def github_tpf_server(request): tpq = TriplePatternQuery(request.GET.get('page', '1'), request.GET.get('subject'), request.GET.get('predicate'), request.GET.get('object')) fragment = Fragment() Odmtp(TrimmerXr2rmlGithub(), Tp2QueryGithub(), MapperGithubXr2rml()).match(tpq, fragment, request) response = HttpResponse( fragment.serialize(), content_type='application/trig; charset=utf-8') response['Content-Disposition'] = 'attachment; filename="fragment.trig"' response['Access-Control-Allow-Origin'] = '*' return response
def custom_validation_response(e): """ Custom validation handler to override the default and return a HttpResponse I'm a teapot code. """ return HttpResponse(status=418)
def ExampleAPI(request, schema, example): response = _example_api(request, schema, example) if isinstance(response, HttpResponse): return response else: return HttpResponse(response)
def ValidatedGETAPI(request, expected_params, target): """ Validate GET APIs. """ if _is_valid_query(request.GET, expected_params): response = target(request) if isinstance(response, HttpResponse): return response else: return HttpResponse(json.dumps(response))
def ValidatedPOSTAPI(request, schema, expected_params, target): """ Validate POST APIs. """ error_response = None if expected_params: _is_valid_query(request.GET, expected_params) # Either passes through or raises an exception. if schema: # If there is a problem with the json data, return a 400. try: data = json.loads(request.body.decode("utf-8")) validate(data, schema) except Exception as e: # Check the value is in settings, and that it is not None if hasattr(settings, 'RAMLWRAP_VALIDATION_ERROR_HANDLER') and settings.RAMLWRAP_VALIDATION_ERROR_HANDLER: error_response = _call_custom_handler(e) else: error_response = _validation_error_handler(e) else: data = json.loads(request.body.decode('utf-8')) if error_response: response = error_response else: request.validated_data = data response = target(request) if isinstance(response, HttpResponse): return response else: return HttpResponse(json.dumps(response))
def test_redirect(browser, logged_in_staff, mocker, settings): """Test the redirect behavior. If the dashboard API returns a 401 it should handle it properly.""" # Set ROOT_URLCONF to this modules path. This will cause the app to use the 'urlpatterns' value defined above # at the module level. settings.ROOT_URLCONF = __name__ dashboard_patch = mocker.patch('dashboard.views.UserDashboard.get', return_value=HttpResponse( status=401, content=json.dumps({"error": "message"}).encode() )) browser.get("/dashboard", ignore_errors=True) assert FAKE_RESPONSE in browser.driver.find_element_by_css_selector("body").text assert dashboard_patch.called
def get(self, request): return HttpResponse('Main Page')
def get(self, request, *args, **kwargs): if self.request.GET.get('hub.verify_token') == settings.GOLEM_CONFIG.get('WEBHOOK_VERIFY_TOKEN'): return HttpResponse(self.request.GET['hub.challenge']) else: return HttpResponse('Error, invalid token')
def post(self, request, *args, **kwargs): # Converts the text payload into a python dictionary request_body = json.loads(self.request.body.decode('utf-8')) FacebookInterface.accept_request(request_body) return HttpResponse()
def get(self, request, *args, **kwargs): request_body = json.loads(self.request.body.decode('utf-8')) from pprint import pprint pprint(request_body) return HttpResponse()
def post(self, request, *args, **kwargs): # Converts the text payload into a python dictionary request_body = json.loads(self.request.body.decode('utf-8')) TelegramInterface.accept_request(request_body) return HttpResponse()
def test(request): with open('test/results.json') as infile: tests = json.load(infile) status = 'passed' avg = {'duration':0, 'total':0, 'init':0, 'parsing':0, 'processing':0} passed = 0 for test in tests: result = test['result'] if result['status'] == 'passed': passed += 1 avg['duration'] += result['duration'] avg['total'] += result['report']['avg']['total'] avg['init'] += result['report']['avg']['init'] avg['parsing'] += result['report']['avg']['parsing'] avg['processing'] += result['report']['avg']['processing'] elif status != 'exception': status = result['status'] if passed > 0: for key in avg: avg[key] = avg[key] / passed context = {'tests':tests, 'avg':avg, 'status':status} template = loader.get_template('golem/test.html') return HttpResponse(template.render(context, request))
def debug(request): FacebookInterface.accept_request({'entry':[{'messaging':[{'message': {'seq': 356950, 'mid': 'mid.$cAAPhQrFuNkFibcXMZ1cPICEB8YUn', 'text': 'hi'}, 'recipient': {'id': '1092102107505462'}, 'timestamp': 1595663674471, 'sender': {'id': '1046728978756975'}}]}]}) return HttpResponse('done')
def user_info_api_handler(request): user_id = request.GET.get("user_id") user_info = UserInfo.query_format_info_by_user_id(user_id) return HttpResponse(json.dumps({"user_info": user_info}))
def user_info_api_by_ease_mob_handler(request): ease_mob = request.GET.get("ease_mob") user_info = UserInfo.query_format_info_by_ease_mob(ease_mob) return HttpResponse(json.dumps({"user_info": user_info}))
def test_view(request): PizzaOrder.objects.create(status='ORDERED') return HttpResponse()
def preview_campaign(request, campaign_id): from premailer import Premailer campaign = get_object_or_404(Campaign, pk=campaign_id) content = campaign.render_html(request, test=True) content = Premailer(content, strip_important=False).transform() return HttpResponse(content)
def preview_campaign_plain(request, campaign_id): campaign = get_object_or_404(Campaign, pk=campaign_id) content = campaign.render_plain(request, test=True) return HttpResponse('<pre>%s</pre>' % content.strip())
def get(self, request, *args, **kwargs): from requests_oauthlib import OAuth1Session config = SocialConfig.get_solo() oauth_token = request.GET.get('oauth_token') oauth_verifier = request.GET.get('oauth_verifier') oauth_client = OAuth1Session( client_key=config.twitter_client_id, client_secret=config.twitter_client_secret, resource_owner_key=oauth_token, resource_owner_secret=config.twitter_access_token_secret, verifier=oauth_verifier ) try: answer = oauth_client.fetch_access_token('https://api.twitter.com/oauth/access_token') except ValueError: raise Http404 if answer and 'oauth_token' in answer: SocialConfig.objects.update(twitter_access_token=answer['oauth_token']) add_message(request, SUCCESS, _('Twitter access_token updated successfully!')) return redirect('admin:social_networks_socialconfig_change') else: return HttpResponse(answer)
def render(self, indent=0): """ Renders a HttpResponse for the ongoing request :param indent int :rtype: HttpResponse """ self.__indent = indent return HttpResponse( self.__str__(), content_type=self.__content_type, charset=self.__charset, **self.__kwargs )
def get_invalid_result(): """Return an object instance that cannot be serialized in json or xml""" from django.http.response import HttpResponse return HttpResponse(content='dummy')
def xml_http_response(data, http_response_cls=HttpResponse): response = http_response_cls(data) response['Content-Type'] = 'text/xml' return response
def result_error(self, exception, http_response_cls=HttpResponse): raw_response = '<?xml version="1.0"?>' raw_response += '<methodResponse>' raw_response += self.dumps(xmlrpc_client.Fault(exception.code, exception.message)) raw_response += '</methodResponse>' return self.xml_http_response(raw_response, http_response_cls=http_response_cls)
def json_http_response(data, http_response_cls=HttpResponse): response = http_response_cls(data) response['Content-Type'] = 'application/json' return response
def result_error(self, exception, http_response_cls=HttpResponse): result = self.json_error_response(exception) return self.json_http_response(self.dumps(result), http_response_cls=http_response_cls)
def lichess_api_call(request, path): params = request.GET.dict() priority = int(params.pop('priority', 0)) max_retries = int(params.pop('max_retries', 3)) redis_key = get_random_string(length=16) worker.queue_work(priority, _do_lichess_api_call, redis_key, path, request.method, request.body, params, priority, max_retries) return HttpResponse(redis_key)