Python django.http.response 模块,HttpResponse() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.http.response.HttpResponse()

项目:django-csv-export-view    作者:benkonrath    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        queryset = self.get_queryset()

        field_names = self.get_fields(queryset)

        response = HttpResponse(content_type='text/csv')

        filename = self.get_filename(queryset)
        response['Content-Disposition'] = 'attachment; filename="{}.csv"'.format(filename)

        writer = csv.writer(response, **self.get_csv_writer_fmtparams())

        if self.specify_separator:
            response.write('sep={}{}'.format(writer.dialect.delimiter, writer.dialect.lineterminator))

        if self.header:
            writer.writerow([self.get_header_name(queryset.model, field_name) for field_name in list(field_names)])

        for obj in queryset:
            writer.writerow([self.get_field_value(obj, field) for field in field_names])

        return response
项目:ramlwrap    作者:jmons    | 项目源码 | 文件源码
def _validation_error_handler(e):
    """
    Handle validation errors.
    This can be overridden via the settings.
    """

    if isinstance(e, ValidationError):
        message = "Validation failed. {}".format(e.message)
        error_response = {
            "message": message,
            "code": e.validator
        }
        logger.info(message)
        error_resp = HttpResponse(error_response, status=422)
    else:
        raise FatalException("Malformed JSON in the request.", 400)

    return error_resp
项目:djangocms-concurrent-users    作者:Blueshoe    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        _page = get_object_or_404(Page, pk=request.POST.get('page_id'))

        def update_indicator():
            indicator = PageIndicator.objects.get(editor=request.user, edited_on__gte=self.time_past, page=_page)
            indicator.edited_on = self.now
            indicator.save()

        # To avoid uncontrolled creation of PageIndicators delete any for this user/page combination which
        # is too old.
        PageIndicator.objects.filter(editor=request.user, edited_on__lte=self.time_past, page=_page).delete()

        try:
            update_indicator()
        except PageIndicator.DoesNotExist:
            PageIndicator.objects.create(editor=request.user, edited_on=self.now, started_editing=self.now, page=_page)

        return HttpResponse(status=200)
项目:golem    作者:prihoda    | 项目源码 | 文件源码
def log_tests(request):

    es = get_elastic()
    if not es:
        return HttpResponse('not able to connect to elasticsearch')

    res = es.search(index="message-log", doc_type='message', body={
    "size": 0,
    "aggs" : {
        "test_ids" : {
            "terms" : { "field" : "test_id",  "size" : 500 }
        }
    }})
    test_ids = []
    for bucket in res['aggregations']['test_ids']['buckets']:
        test_id = bucket['key']
        test_ids.append({'id':'test_id_'+test_id, 'name':test_id})


    context = {
        'groups' : test_ids
    }
    template = loader.get_template('golem/log.html')
    return HttpResponse(template.render(context,request))
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def remove_home_recommend_handler(request):
    result = base_result()
    if request.method == "GET":
        result["code"] = 500
        result["message"] = "get method is not supported!"
        return result

    article_id = request.POST.get("article_id")

    try:
        recommend = HomeRecommend.objects.get(share_id=article_id)
        recommend.status = 0
        recommend.save()
    except HomeRecommend.DoesNotExist:
        result["code"] = 500
        result["message"] = "article is not exists!"

    return HttpResponse(json.dumps(result))
项目:django-cache-headers    作者:praekelt    | 项目源码 | 文件源码
def handle(self, *args, **options):
        request = RequestFactory().get("/")
        response = HttpResponse()
        user = User()
        n = 0
        print(TEMPLATE_A)
        for pattern, age, cache_type, dc in rules:
            policy = POLICIES[cache_type]
            policy(request, response, user, age)
            if n == 0:
                print("if", end="")
            else:
                print("else if", end="")
            print(""" (req.url ~ "%s") {""" % pattern.pattern)
            print("""set req.http.Hash-Cookies = "%s";""" % response["X-Hash-Cookies"])
            print("}")
            n += 1
        print(TEMPLATE_B)
项目:intake    作者:codeforamerica    | 项目源码 | 文件源码
def csv_download(request):
    """ Creates a CSV file using all of the applications to the users
    organization.
    """
    apps = get_all_applications_for_users_org(request.user)
    data = ApplicationCSVDownloadSerializer(apps, many=True).data
    fields = []
    for datum in data:
        these_fields = list(datum.keys())
        # Finds the largest set of fields and uses it
        # There should not be a case where a smaller set of fields would have
        # a field not in a larger one.
        if len(these_fields) > len(fields):
            fields = these_fields
    response = HttpResponse(content_type='text/csv')
    csv_writer = csv.DictWriter(response, fieldnames=fields)
    csv_writer.writeheader()
    csv_writer.writerows(data)
    file = 'all_applications_to_%s_%s.csv' % (
        request.user.profile.organization.slug,
        timezone.now().strftime('%m-%d-%Y'),
    )
    response['Content-Disposition'] = 'attachment; filename="%s"' % file
    return response
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        code = request.GET.get('code', '')
        if not code:
            raise Http404

        config = SocialConfig.get_solo()
        redirect_uri = self.request.build_absolute_uri(resolve_url('admin_social_networks:instagram_token'))
        response = requests.post(
            'https://api.instagram.com/oauth/access_token',
            data={
                'grant_type': 'authorization_code',
                'client_id': config.instagram_client_id,
                'client_secret': config.instagram_client_secret,
                'redirect_uri': redirect_uri,
                'code': code,
            }
        )

        answer = response.json()
        if answer and 'access_token' in answer:
            SocialConfig.objects.update(instagram_access_token=answer['access_token'])
            add_message(request, SUCCESS, _('Instagram access_token updated successfully!'))
            return redirect('admin:social_networks_socialconfig_change')
        else:
            return HttpResponse(response.text)
项目:heltour    作者:cyanfish    | 项目源码 | 文件源码
def fcm_register(request):
    args = json.loads(request.body)
    slack_token = args.get('slack_token')
    reg_id = args.get('reg_id')

    url = 'https://slack.com/api/auth.test'
    r = requests.get(url, params={'token': slack_token})
    slack_user_id = r.json().get('user_id')
    if not slack_user_id:
        logger.warning('Couldn\'t validate slack token for FCM registration')
        return HttpResponse('Could not validate slack token', status=400)

    FcmSub.objects.update_or_create(reg_id=reg_id, defaults={'slack_user_id': slack_user_id})
    logger.warning('FCM registration complete for %s %s' % (slack_user_id, reg_id))

    return HttpResponse('ok')
项目:WineHelper    作者:scorelabio    | 项目源码 | 文件源码
def slack_oauth(request):
    code = request.GET['code']

    params = {
        'code': code,
        'client_id': settings.SLACK_CLIENT_ID,
        "client_secret": settings.SLACK_CLIENT_SECRET
    }
    url = 'https://slack.com/api/oauth.access'
    json_response = requests.get(url, params)
    data = json.loads(json_response.text)
    Team.objects.get_or_create(
        name=data['team_name'],
        team_id=data['team_id'],
        bot_user_id=data['bot']['bot_user_id'],
        bot_access_token=data['bot']['bot_access_token']
    )
    return HttpResponse('Bot added to your Slack team!')
项目:tpobot    作者:rishabhiitbhu    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        try:
            incoming_message = json.loads(self.request.body.decode('utf-8'))
            for entry in incoming_message['entry']:
                for message in entry['messaging']:
                    if 'message' in message:
                        text =  message['message']['text']
                        psid = message['sender']['id']
                        print(text, psid)
                        try:
                            user = User.objects.get(psid=psid)
                            if user.valid:
                                if user.profile_completed:
                                    analyseMessage.delay(psid, text)
                                else:#get user profile completed
                                    completeProfile.delay(psid, text)
                            else:
                                gotInactiveUser.delay(psid)
                        except Exception as e:
                            # print('\nnew user,, yaaayyye')
                            newUser.delay(psid)
        except Exception as e:
            # print(incoming_message)
            print("Exception: ", e)
        return HttpResponse()
项目:vaultier    作者:Movile    | 项目源码 | 文件源码
def get(self, request):
        """
        Get configuration from settings, format it and return
        """
        # get settings and transform it to json
        config = json.dumps({
            'VERSION': pkg_resources.get_distribution("Vaultier").version,
            'raven_key': settings.VAULTIER.get('raven_key'),
            'invitation_lifetime': settings.VAULTIER.get(
                'invitation_lifetime'),
            'registration_allow': settings.VAULTIER.get('registration_allow'),
            'registration_enforce': not bool(User.objects.all().count()),
            # dev
            'dev_shared_key': settings.VAULTIER.get('dev_shared_key'),
            'dev_shared_key_private': settings.VAULTIER.get('dev_shared_key_private'),
            'dev_shared_key_public': settings.VAULTIER.get('dev_shared_key_public'),
            'dev_show_token': settings.VAULTIER.get('dev_show_token'),
            'dev_email': settings.VAULTIER.get('dev_email')
        })

        return HttpResponse(config, content_type='application/json')
项目:django-db-storage    作者:derekkwok    | 项目源码 | 文件源码
def get(self, request, name):
        db_file = get_object_or_404(DBFile.objects.defer('content'), name=name)

        mtime = time.mktime(db_file.updated_on.timetuple())
        modified = was_modified_since(
            header=self.request.META.get('HTTP_IF_MODIFIED_SINCE'),
            mtime=mtime,
            size=db_file.size)

        if not modified:
            return HttpResponseNotModified()

        content_type, encoding = mimetypes.guess_type(db_file.name)
        content_type = content_type or 'application/octet-stream'

        response = HttpResponse(db_file.content, content_type=content_type)
        response['Last-Modified'] = http_date(mtime)
        response['Content-Length'] = db_file.size
        if encoding: response['Content-Encoding'] = encoding
        return response
项目:lykops-old    作者:lykops    | 项目源码 | 文件源码
def uploadfile(file, filename, upload_dir):
    if not os.path.exists(upload_dir):
        try :
            os.mkdir(upload_dir)
        except :
            os.makedirs(upload_dir)
    '''    
    if self.file.size > 5 * 1024 * 1024 :
        return HttpResponse('?????????5M')
    '''

    filename = upload_dir + filename

    with open(filename, 'wb+') as dest:
        for chunk in file.chunks():
            dest.write(chunk)
        return filename
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def run(self, query, query_args):
        """Genera una respuesta CSV, con columnas
        (indice tiempo, serie1, serie2, ...) y un dato por fila
        """

        # Saco metadatos, no se usan para el formato CSV
        query.set_metadata_config(constants.METADATA_NONE)

        header = query_args.get(constants.PARAM_HEADER,
                                constants.API_DEFAULT_VALUES[constants.PARAM_HEADER])
        series_ids = query.get_series_ids(how=header)
        data = query.run()['data']

        response = HttpResponse(content_type='text/csv')
        content = 'attachment; filename="{}"'
        response['Content-Disposition'] = content.format(constants.CSV_RESPONSE_FILENAME)

        writer = unicodecsv.writer(response)
        header = [settings.INDEX_COLUMN] + series_ids
        writer.writerow(header)
        for row in data:
            writer.writerow(row)

        return response
项目:django-facebook-messenger-bot-tutorial    作者:abhay1    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        # Converts the text payload into a python dictionary
        incoming_message = json.loads(self.request.body.decode('utf-8'))
        # Facebook recommends going through every entry since they might send
        # multiple messages in a single call during high load
        for entry in incoming_message['entry']:
            for message in entry['messaging']:
                # Check to make sure the received call is a message call
                # This might be delivery, optin, postback for other events 
                if 'message' in message:
                    # Print the message to the terminal
                    pprint(message)    
                    # Assuming the sender only sends text. Non-text messages like stickers, audio, pictures
                    # are sent as attachments and must be handled accordingly. 
                    post_facebook_message(message['sender']['id'], message['message']['text'])    
        return HttpResponse()
项目:dev-wine-helper    作者:WineHelperEnseirb    | 项目源码 | 文件源码
def slack_oauth(request):
    code = request.GET['code']

    params = {
        'code': code,
        'client_id': settings.SLACK_CLIENT_ID,
        "client_secret": settings.SLACK_CLIENT_SECRET
    }
    url = 'https://slack.com/api/oauth.access'
    json_response = requests.get(url, params)
    data = json.loads(json_response.text)
    Team.objects.get_or_create(
        name=data['team_name'],
        team_id=data['team_id'],
        bot_user_id=data['bot']['bot_user_id'],
        bot_access_token=data['bot']['bot_access_token']
    )
    return HttpResponse('Bot added to your Slack team!')
项目:quizwizard    作者:naveensaikiran    | 项目源码 | 文件源码
def add_question(request):
    if request.method == "POST":
        question  = request.POST.get("question")
        option1 = request.POST.get("option1")
        option2 = request.POST.get("option2")
        option3 = request.POST.get("option3")
        option4 = request.POST.get("option4")
        answer = request.POST.get("answer")
        exam = request.POST.get("exam")
        q = Question()
        q.question = question
        q.option1 = option1
        q.option2 = option2
        q.option3 = option3
        q.option4 = option4
        q.answer = answer
        q.exam = Exam.objects.get(pk=int(exam))
        q.save()
        return HttpResponse("success")

#viewsets for rest_framework
项目:retoohs    作者:youyaochi    | 项目源码 | 文件源码
def get_ss_qr(request):
    if request.user.is_anonymous():
        return JsonResponse({'error': 'unauthorized'})

    if not hasattr(request.user, 'ss_user'):
        return JsonResponse({'error': 'no linked shadowsocks account'})
    ss_user = request.user.ss_user

    if request.GET.get('nid'):
        try:
            node = Node.objects.get(pk=request.GET.get('nid'))
        except Node.DoesNotExist:
            return JsonResponse({'error': 'node not exist'})
    else:
        node = Node.objects.all().order_by('-weight')
        if node:
            node = node[0]
        else:
            return JsonResponse({'error': 'no node at all'})

    password = '{}:{}@{}:{}'.format(node.method, ss_user.password, node.server, ss_user.port)
    img = qrcode.make('ss://{}'.format(base64.b64encode(bytes(password, 'utf8')).decode('ascii')))
    response = HttpResponse(content_type="image/png")
    img.save(response)
    return response
项目:TheaterWecker    作者:CodeforChemnitz    | 项目源码 | 文件源码
def device(request):
    c = statsd.StatsClient('localhost', 8125)
    c.incr('subscribe.success')
    c.gauge('total.subscribe.success', 1, delta=True)
    device_id = request.body.decode("utf-8")

    if check_uuid.match(device_id) is None:
        return HttpResponse("", status=400)

    user_device, created = UserDevice.objects.get_or_create(device_id=device_id)
    if not created:
        if not user_device.verified:
            send_verify_notification.delay(device_id, 0)
        return HttpResponse(json.dumps({'verified': user_device.verified}), status=200, content_type="application/json")

    user_device.verified = False
    user_device.verification_key = uuid4().hex
    user_device.save()

    # TODO send verification key via PUSH
    send_verify_notification.delay(device_id, 0)

    return HttpResponse(json.dumps({}), status=201, content_type="application/json")
项目:TheaterWecker    作者:CodeforChemnitz    | 项目源码 | 文件源码
def verify(request, key=None):
    c = statsd.StatsClient('localhost', 8125)
    if not key:
        c.incr('verify.device.failed')
        c.gauge('total.verify.device.failed', 1, delta=True)
        return HttpResponse("", status=400)

    try:
        user_device = UserDevice.objects.get(verification_key=key)
        user_device.verified = True
        user_device.save()
    except UserEmail.DoesNotExist:
        c.incr('verify.device.failed')
        c.gauge('total.verify.device.failed', 1, delta=True)
        return HttpResponse("", status=400)

    c.incr('verify.device.success')
    c.gauge('total.verify.device.success', 1, delta=True)
    return HttpResponse("")
项目:blogghar    作者:v1k45    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        content = request.POST.get('content')
        if content:
            response = md2html(content)
        else:
            response = b""
        return HttpResponse(response)
项目:lepo    作者:akx    | 项目源码 | 文件源码
def greet(request, greeting, greetee):
    raise ExceptionalResponse(HttpResponse('oh no', status=400))
项目:odmtp-tpf    作者:benjimor    | 项目源码 | 文件源码
def twitter_tpf_server(request):
    tpq = TriplePatternQuery(request.GET.get('page', '1'),
                             request.GET.get('subject'),
                             request.GET.get('predicate'),
                             request.GET.get('object'))
    fragment = Fragment()
    Odmtp(TrimmerXr2rmlTwitter(), Tp2QueryTwitter(), MapperTwitterXr2rml()).match(tpq, fragment, request)
    response = HttpResponse(
        fragment.serialize(),
        content_type='application/trig; charset=utf-8')
    response['Content-Disposition'] = 'attachment; filename="fragment.trig"'
    response['Access-Control-Allow-Origin'] = '*'
    return response
项目:odmtp-tpf    作者:benjimor    | 项目源码 | 文件源码
def github_tpf_server(request):
    tpq = TriplePatternQuery(request.GET.get('page', '1'),
                             request.GET.get('subject'),
                             request.GET.get('predicate'),
                             request.GET.get('object'))
    fragment = Fragment()
    Odmtp(TrimmerXr2rmlGithub(), Tp2QueryGithub(), MapperGithubXr2rml()).match(tpq, fragment, request)
    response = HttpResponse(
        fragment.serialize(),
        content_type='application/trig; charset=utf-8')
    response['Content-Disposition'] = 'attachment; filename="fragment.trig"'
    response['Access-Control-Allow-Origin'] = '*'
    return response
项目:ramlwrap    作者:jmons    | 项目源码 | 文件源码
def custom_validation_response(e):
    """
    Custom validation handler to override the default
    and return a HttpResponse I'm a teapot code.
    """

    return HttpResponse(status=418)
项目:ramlwrap    作者:jmons    | 项目源码 | 文件源码
def ExampleAPI(request, schema, example):

    response = _example_api(request, schema, example)
    if isinstance(response, HttpResponse):
        return response
    else:
        return HttpResponse(response)
项目:ramlwrap    作者:jmons    | 项目源码 | 文件源码
def ValidatedGETAPI(request, expected_params, target):
    """
    Validate GET APIs.
    """

    if _is_valid_query(request.GET, expected_params):
        response = target(request)

        if isinstance(response, HttpResponse):
            return response
        else:
            return HttpResponse(json.dumps(response))
项目:ramlwrap    作者:jmons    | 项目源码 | 文件源码
def ValidatedPOSTAPI(request, schema, expected_params, target):
    """
    Validate POST APIs.
    """

    error_response = None
    if expected_params:
        _is_valid_query(request.GET, expected_params)   # Either passes through or raises an exception.

    if schema:
        # If there is a problem with the json data, return a 400.
        try:
            data = json.loads(request.body.decode("utf-8"))
            validate(data, schema) 
        except Exception as e:
            # Check the value is in settings, and that it is not None
            if hasattr(settings, 'RAMLWRAP_VALIDATION_ERROR_HANDLER') and settings.RAMLWRAP_VALIDATION_ERROR_HANDLER:
                error_response = _call_custom_handler(e)

            else:
                error_response = _validation_error_handler(e)
    else:
        data = json.loads(request.body.decode('utf-8'))

    if error_response:
        response = error_response
    else:
        request.validated_data = data
        response = target(request)

    if isinstance(response, HttpResponse):
        return response
    else:
        return HttpResponse(json.dumps(response))
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_redirect(browser, logged_in_staff, mocker, settings):
    """Test the redirect behavior. If the dashboard API returns a 401 it should handle it properly."""
    # Set ROOT_URLCONF to this modules path. This will cause the app to use the 'urlpatterns' value defined above
    # at the module level.
    settings.ROOT_URLCONF = __name__
    dashboard_patch = mocker.patch('dashboard.views.UserDashboard.get', return_value=HttpResponse(
        status=401,
        content=json.dumps({"error": "message"}).encode()
    ))
    browser.get("/dashboard", ignore_errors=True)
    assert FAKE_RESPONSE in browser.driver.find_element_by_css_selector("body").text
    assert dashboard_patch.called
项目:django-robokassa-merchant    作者:DirectlineDev    | 项目源码 | 文件源码
def get(self, request):
        return HttpResponse('Main Page')
项目:golem    作者:prihoda    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        if self.request.GET.get('hub.verify_token') == settings.GOLEM_CONFIG.get('WEBHOOK_VERIFY_TOKEN'):
            return HttpResponse(self.request.GET['hub.challenge'])
        else:
            return HttpResponse('Error, invalid token')
项目:golem    作者:prihoda    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        # Converts the text payload into a python dictionary
        request_body = json.loads(self.request.body.decode('utf-8'))
        FacebookInterface.accept_request(request_body)
        return HttpResponse()
项目:golem    作者:prihoda    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        request_body = json.loads(self.request.body.decode('utf-8'))
        from pprint import pprint
        pprint(request_body)
        return HttpResponse()
项目:golem    作者:prihoda    | 项目源码 | 文件源码
def post(self, request, *args, **kwargs):
        # Converts the text payload into a python dictionary
        request_body = json.loads(self.request.body.decode('utf-8'))
        TelegramInterface.accept_request(request_body)
        return HttpResponse()
项目:golem    作者:prihoda    | 项目源码 | 文件源码
def test(request):

    with open('test/results.json') as infile:
        tests = json.load(infile)

    status = 'passed'
    avg = {'duration':0, 'total':0, 'init':0, 'parsing':0, 'processing':0}
    passed = 0
    for test in tests:
        result = test['result']
        if result['status'] == 'passed':
            passed += 1
            avg['duration'] += result['duration']
            avg['total'] += result['report']['avg']['total']
            avg['init'] += result['report']['avg']['init']
            avg['parsing'] += result['report']['avg']['parsing']
            avg['processing'] += result['report']['avg']['processing']
        elif status != 'exception':
            status = result['status']

    if passed > 0:
        for key in avg:
            avg[key] = avg[key] / passed

    context = {'tests':tests, 'avg':avg, 'status':status}
    template = loader.get_template('golem/test.html')
    return HttpResponse(template.render(context, request))
项目:golem    作者:prihoda    | 项目源码 | 文件源码
def debug(request):
    FacebookInterface.accept_request({'entry':[{'messaging':[{'message': {'seq': 356950, 'mid': 'mid.$cAAPhQrFuNkFibcXMZ1cPICEB8YUn', 'text': 'hi'}, 'recipient': {'id': '1092102107505462'}, 'timestamp': 1595663674471, 'sender': {'id': '1046728978756975'}}]}]})

    return HttpResponse('done')
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def user_info_api_handler(request):
    user_id = request.GET.get("user_id")
    user_info = UserInfo.query_format_info_by_user_id(user_id)
    return HttpResponse(json.dumps({"user_info": user_info}))
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def user_info_api_by_ease_mob_handler(request):
    ease_mob = request.GET.get("ease_mob")
    user_info = UserInfo.query_format_info_by_ease_mob(ease_mob)
    return HttpResponse(json.dumps({"user_info": user_info}))
项目:django-field-history    作者:grantmcconnaughey    | 项目源码 | 文件源码
def test_view(request):
    PizzaOrder.objects.create(status='ORDERED')
    return HttpResponse()
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def preview_campaign(request, campaign_id):
    from premailer import Premailer
    campaign = get_object_or_404(Campaign, pk=campaign_id)
    content = campaign.render_html(request, test=True)
    content = Premailer(content, strip_important=False).transform()
    return HttpResponse(content)
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def preview_campaign_plain(request, campaign_id):
    campaign = get_object_or_404(Campaign, pk=campaign_id)
    content = campaign.render_plain(request, test=True)
    return HttpResponse('<pre>%s</pre>' % content.strip())
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        from requests_oauthlib import OAuth1Session

        config = SocialConfig.get_solo()
        oauth_token = request.GET.get('oauth_token')
        oauth_verifier = request.GET.get('oauth_verifier')

        oauth_client = OAuth1Session(
            client_key=config.twitter_client_id,
            client_secret=config.twitter_client_secret,
            resource_owner_key=oauth_token,
            resource_owner_secret=config.twitter_access_token_secret,
            verifier=oauth_verifier
        )

        try:
            answer = oauth_client.fetch_access_token('https://api.twitter.com/oauth/access_token')
        except ValueError:
            raise Http404

        if answer and 'oauth_token' in answer:
            SocialConfig.objects.update(twitter_access_token=answer['oauth_token'])
            add_message(request, SUCCESS, _('Twitter access_token updated successfully!'))
            return redirect('admin:social_networks_socialconfig_change')
        else:
            return HttpResponse(answer)
项目:Rinzler    作者:feliphebueno    | 项目源码 | 文件源码
def render(self, indent=0):
        """
        Renders a HttpResponse for the ongoing request
        :param indent int
        :rtype: HttpResponse
        """
        self.__indent = indent
        return HttpResponse(
            self.__str__(), content_type=self.__content_type, charset=self.__charset, **self.__kwargs
        )
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def get_invalid_result():
    """Return an object instance that cannot be serialized in json or xml"""
    from django.http.response import HttpResponse
    return HttpResponse(content='dummy')
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def xml_http_response(data, http_response_cls=HttpResponse):
        response = http_response_cls(data)
        response['Content-Type'] = 'text/xml'
        return response
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def result_error(self, exception, http_response_cls=HttpResponse):

        raw_response = '<?xml version="1.0"?>'
        raw_response += '<methodResponse>'
        raw_response += self.dumps(xmlrpc_client.Fault(exception.code, exception.message))
        raw_response += '</methodResponse>'

        return self.xml_http_response(raw_response, http_response_cls=http_response_cls)
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def json_http_response(data, http_response_cls=HttpResponse):
        response = http_response_cls(data)
        response['Content-Type'] = 'application/json'
        return response
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def result_error(self, exception, http_response_cls=HttpResponse):
        result = self.json_error_response(exception)
        return self.json_http_response(self.dumps(result), http_response_cls=http_response_cls)
项目:heltour    作者:cyanfish    | 项目源码 | 文件源码
def lichess_api_call(request, path):
    params = request.GET.dict()
    priority = int(params.pop('priority', 0))
    max_retries = int(params.pop('max_retries', 3))
    redis_key = get_random_string(length=16)
    worker.queue_work(priority, _do_lichess_api_call, redis_key, path, request.method, request.body, params, priority, max_retries)
    return HttpResponse(redis_key)