我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.http.response.JsonResponse()。
def nodeinfo_view(request): """Generate a NodeInfo document.""" site = Site.objects.get_current() usage = {"users": {}} if settings.SOCIALHOME_STATISTICS: usage = { "users": { "total": User.objects.count(), "activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(), "activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(), }, "localPosts": Content.objects.filter(author__user__isnull=False, content_type=ContentType.CONTENT).count(), "localComments": Content.objects.filter(author__user__isnull=False, content_type=ContentType.REPLY).count(), } nodeinfo = NodeInfo( software={"name": "socialhome", "version": version}, protocols={"inbound": ["diaspora"], "outbound": ["diaspora"]}, services={"inbound": [], "outbound": []}, open_registrations=settings.ACCOUNT_ALLOW_REGISTRATION, usage=usage, metadata={"nodeName": site.name} ) return JsonResponse(nodeinfo.doc)
def add_product_to_wishlist(request, wishlist_id, product_id): response = {'created': False} if request.method != 'POST': return JsonResponse({'err': 'invalid request'}, status=405) if not getattr(request, 'customer', None): return JsonResponse({'err': 'unauthorized request'}, status=403) if wishlist_id != 'default': wishlist = Wishlist.objects.filter(customer=request.customer, id=int(wishlist_id)).first() else: wishlist = Wishlist.objects.filter(customer=request.customer, shop=request.shop).first() if wishlist: created = not wishlist.products.filter(id=product_id).exists() response['created'] = created if created: wishlist.products.add(product_id) response['product_name'] = wishlist.products.get(id=product_id).name elif wishlist_id == 'default': return JsonResponse({'err': 'no wishlists exist'}, status=200) else: return JsonResponse({'err': 'invalid wishlist'}, status=400) return JsonResponse(response)
def business_area_handler(request, pk): if request.method == 'POST': data = json.loads(request.body) if pk is None: # New business area BusinessArea.objects.create(name=data['name']) return JsonResponse(ReturnStatus(True, 'OK').to_dict()) else: # existing business area update try: ba = BusinessArea.objects.get(pk=pk) ba.name = data['name'] ba.save() except BusinessArea.DoesNotExist: return JsonResponse(ReturnStatus(False, 'Key does not exist').to_dict()) return JsonResponse(ReturnStatus(True, 'OK').to_dict()) elif request.method == 'DELETE': try: ba = BusinessArea.objects.get(pk=pk) if ba.mission_set.all().count() != 0: return JsonResponse(ReturnStatus(False, 'Business Areas can not be deleted while missions are still associated with them.').to_dict()) ba.delete() except BusinessArea.DoesNotExist: return JsonResponse(ReturnStatus(False, 'Key does not exist').to_dict()) return JsonResponse(ReturnStatus(True, 'OK').to_dict())
def get(self, request, *args, **kwargs): _page = get_object_or_404(Page, pk=request.GET.get('page_id')) # release page for all users which did not update during the last timeframe PageIndicator.objects.filter(edited_on__lte=self.time_past, page=_page).delete() page_indicators = PageIndicator.objects.filter(edited_on__gte=self.time_past, page=_page)\ .exclude(editor=request.user) response = {} if page_indicators.exists(): response['conflict'] = True editing_user = page_indicators.first() response['concurrent_user'] = [editing_user.editor.username] # default behavior for concurrent users is blocking response['block_editing'] = getattr(settings, 'CONCURRENT_BLOCK_EDITING', BLOCK_EDITING_DEFAULT) response['message'] = _(u'Unfortunately you cannot edit this page at the moment: ' u'user {user} is blocking it since {time}').format( user=editing_user.editor, time=_date(editing_user.started_editing, 'D, d. b, H:i')) else: response['conflict'] = False return JsonResponse(json.dumps(response), safe=False)
def _run_test_actions(name, actions): test = ConversationTest(name, actions) start_time = time.time() report = None try: report = test.run() except Exception as e: log = TestLog.get() fatal = not isinstance(e, ConversationTestException) if fatal: trace = traceback.format_exc() print(trace) log.append(trace) return JsonResponse(data={'status': 'exception' if fatal else 'failed', 'log':log, 'message':str(e), 'report':report}) elapsed_time = time.time() - start_time return JsonResponse(data={'status': 'passed', 'log':TestLog.get(), 'duration':elapsed_time, 'report':report})
def root(self, _): """ Implements the `root <http://docs.annotatorjs.org/en/v1.2.x/storage.html#root>`_ endpoint. :param _: :class:`rest_framework.request.Request` object—ignored here. :return: API information :class:`rest_framework.response.Response`. """ return JsonResponse( { "name": getattr(settings, "ANNOTATOR_NAME", "django-annotator-store"), "version": annotator.__version__ })
def test_uploading_image(self, mock_get_thumbnail): """Test upload single image file""" mock_get_thumbnail.return_value = "" with open(settings.TEST_DATA_DIR+'/test.png', 'rb') as att: form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images'} resp = self.client.post(reverse('multiuploader'), data=form_data) self.assertEqual(resp.status_code, 200) data = {"files": [{"id": "1", "name": "test.png", "size": 180, "type": "text/plain", "url": "/multiuploader/1/", "thumbnailUrl": "", "deleteUrl": "/multiuploader/1/", "deleteType": "DELETE" }] } self.assertEqual(resp.content, JsonResponse(data).content) resp = self.client.get(reverse('multiuploader', args=[1]), HTTP_USER_AGENT='Mozilla/5.0') self.assertEqual(resp.status_code, 200) resp = self.client.delete(reverse('multiuploader', args=[1])) self.assertEqual(resp.status_code, 200)
def test_uploading_mp3(self, mock_get_thumbnail): """Test upload single mp3 file""" mock_get_thumbnail.return_value = "" with open(settings.TEST_DATA_DIR + '/test.mp3', 'rb') as att: form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'audio'} resp = self.client.post(reverse('multiuploader'), data=form_data) self.assertEqual(resp.status_code, 200) data = {"files": [{"id": "2", "name": "test.mp3", "size": 3742720, 'type': "text/plain", "url": "/multiuploader/2/", "thumbnailUrl": "", "deleteUrl": "/multiuploader/2/", "deleteType": "DELETE", }] } self.assertEqual(resp.content, JsonResponse(data).content) resp = self.client.get(reverse('multiuploader', args=[2]), HTTP_USER_AGENT='Mozilla/5.0') self.assertEqual(resp.status_code, 200) resp = self.client.delete(reverse('multiuploader', args=[2])) self.assertEqual(resp.status_code, 200)
def test_uploading_pdf(self, mock_get_thumbnail): """Test upload single pdf file""" mock_get_thumbnail.return_value = "" with open(settings.TEST_DATA_DIR + '/test.pdf', 'rb') as att: form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'default'} resp = self.client.post(reverse('multiuploader'), data=form_data) self.assertEqual(resp.status_code, 200) data = {"files": [{"id": "3", "name": "test.pdf", "size": 6763, 'type': "text/plain", "url": "/multiuploader/3/", "thumbnailUrl": "", "deleteUrl": "/multiuploader/3/", "deleteType": "DELETE", }] } self.assertEqual(resp.content, JsonResponse(data).content) resp = self.client.get(reverse('multiuploader', args=[3]), HTTP_USER_AGENT='Mozilla/5.0') self.assertEqual(resp.status_code, 200) resp = self.client.delete(reverse('multiuploader', args=[3])) self.assertEqual(resp.status_code, 200)
def test_uploading_thumbnail_quality(self, mock_get_thumbnail): """Test upload single image file with quality""" mock_get_thumbnail.return_value = "" with open(settings.TEST_DATA_DIR + '/test.png', 'rb') as att: form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images', 'quality': 70} resp = self.client.post(reverse('multiuploader'), data=form_data) self.assertEqual(resp.status_code, 200) data = {"files": [{"id": "4", "name": "test.png", "size": 180, 'type': "text/plain", "url": "/multiuploader/4/", "thumbnailUrl": "", "deleteUrl": "/multiuploader/4/", "deleteType": "DELETE", }] } self.assertEqual(resp.content, JsonResponse(data).content) resp = self.client.get(reverse('multiuploader', args=[4]), HTTP_USER_AGENT='Mozilla/5.0') self.assertEqual(resp.status_code, 200) resp = self.client.delete(reverse('multiuploader', args=[4])) self.assertEqual(resp.status_code, 200)
def test_uploading_thumbnail_size(self, mock_get_thumbnail): """Test upload single image file with size""" mock_get_thumbnail.return_value = "" with open(settings.TEST_DATA_DIR+'/test.png', 'rb') as att: form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images', 'size': '280x80'} resp = self.client.post(reverse('multiuploader'), data=form_data) self.assertEqual(resp.status_code, 200) data = {"files": [{"id": "5", "name": "test.png", "size": 180, "type": "text/plain", "url": "/multiuploader/5/", "thumbnailUrl": "", "deleteUrl": "/multiuploader/5/", "deleteType": "DELETE" }] } self.assertEqual(resp.content, JsonResponse(data).content) resp = self.client.get(reverse('multiuploader', args=[5]), HTTP_USER_AGENT='Mozilla/5.0') self.assertEqual(resp.status_code, 200) resp = self.client.delete(reverse('multiuploader', args=[5])) self.assertEqual(resp.status_code, 200)
def view(self): league_tag = self.request.GET.get('league') if league_tag == 'all': league = None elif league_tag is not None: league = League.objects.filter(tag=league_tag).first() else: league = self.league try: board = int(self.request.GET.get('board', '')) except ValueError: board = None try: team = int(self.request.GET.get('team', '')) except ValueError: team = None return JsonResponse(_tv_json(league, board, team))
def test_async_workflow(self, mock_request, mock_http_task): mock_response = JsonResponse({"balance": 257}) mock_request.return_value = mock_response ussd_client = self.ussd_client() ussd_client.send('') # check http_task is called mock_http_task.delay.assert_called_once_with( request_conf=dict( method='get', url="https://localhost:8000/mock/submission", params={'phone_number': '200', 'session_id': ussd_client.session_id} ) )
def post(self, request): errors = self._validate_request(request) if errors: return JsonResponse(dict(errors=errors), status=400) file_name = request.FILES['file']. name callback = request.GET['callback'] result = run_abacus.delay( file_name, callback, *self._prepare_output_file(file_name) ) return JsonResponse({ 'task_id': result.task_id, 'status': request.build_absolute_uri( reverse('query', args=(result.task_id,)) ) })
def search(request): if not request.method == 'POST': raise Http404 req = json.loads(request.body) tmp = Application.objects.search_text(req['keyword']) apkList = map(lambda x:{ "name":x.name, "version":x.version, "md5":x.md5, "modifyDate":x.modifyDate, "publicationDate":x.publicationDate, "id":str(x.id) }, tmp) return JsonResponse({ "success":True, "data":apkList, "msg":'' })
def process_request(self,request): url=request.get_full_path() if not url in ['/report/basic','/report/analysis']: return None apkId=request.session.get('apkId') if not apkId: if request.method=='POST': return JsonResponse({ "success":False, "data":None, "msg":'????????' }) else: raise Http404 else: return None
def get_ss_qr(request): if request.user.is_anonymous(): return JsonResponse({'error': 'unauthorized'}) if not hasattr(request.user, 'ss_user'): return JsonResponse({'error': 'no linked shadowsocks account'}) ss_user = request.user.ss_user if request.GET.get('nid'): try: node = Node.objects.get(pk=request.GET.get('nid')) except Node.DoesNotExist: return JsonResponse({'error': 'node not exist'}) else: node = Node.objects.all().order_by('-weight') if node: node = node[0] else: return JsonResponse({'error': 'no node at all'}) password = '{}:{}@{}:{}'.format(node.method, ss_user.password, node.server, ss_user.port) img = qrcode.make('ss://{}'.format(base64.b64encode(bytes(password, 'utf8')).decode('ascii'))) response = HttpResponse(content_type="image/png") img.save(response) return response
def render_to_response(self, context, **response_kwargs): try: scope_name = self.kwargs['scope'] range_start = self.request.GET.get('start', None) or None range_end = self.request.GET.get('end', None) or None if scope_name == "custom": if range_start is None: return HttpResponseBadRequest() range_start = int(range_start) if range_end is not None: range_end = int(range_end) data, start, end, resolution = D3GraphDataGenerator(self.node, self.object, scope_name, range_start, range_end).generate() return JsonResponse(data) except: logger.exception("Error rendering graph data for %s on %s", self.object, self.node) raise
def nodeinfo_well_known_view(request): """Generate .well-known/nodeinfo.""" wellknown = get_nodeinfo_well_known_document(settings.SOCIALHOME_URL) return JsonResponse(wellknown)
def social_relay_view(request): """Generate a .well-known/x-social-relay document.""" relay = SocialRelayWellKnown(subscribe=True) return JsonResponse(relay.doc)
def form_invalid(self, form): return JsonResponse({'success': False})
def form_valid(self, form): form_data = form.save(commit=False) form_data.uploader = self.request.user form_data.save() return JsonResponse({'success': True, 'imgURL': form_data.image.url})
def form_valid(self, form): instance = form.save() ctx = {'success': True, 'msg': "Comment was created successfully."} ctx['data'] = self.comment_dict(instance) return JsonResponse(ctx)
def form_invalid(self, form): ctx = {'success': False, 'msg': "Failed to post a comment", 'errors': json.loads(form.errors.as_json())} return JsonResponse(ctx)
def form_valid(self, form): shop = self.request.shop customer = self.request.customer product_id = self.request.POST.get('product_id') response = {} wishlist, created = Wishlist.objects.get_or_create(shop=shop, customer=customer, **form.cleaned_data) response['id'] = wishlist.id response['name'] = wishlist.name if created and product_id: wishlist.products.add(product_id) response["product_id"] = product_id response['product_name'] = wishlist.products.get(pk=product_id).name response['created'] = created return JsonResponse(response)
def form_invalid(self, form): return JsonResponse(form.errors, status=400)
def done(self, form_list, **kwargs): """ Wizard finish handler. The idea is to persist your data here """ return JsonResponse({'next_url': '/next-page/'})
def _get_response_data(self, response): assert isinstance(response, JsonResponse) return json.loads(response.content.decode('utf-8'))
def render_state(self, step=None, form=None, form_data=None, form_files=None, done=False, status_code=200): valid = self.is_valid() current_step = self.get_current_step(step=step) data = { 'current_step': current_step if not done else None, 'done': done, 'valid': valid, 'structure': self.get_structure(), 'steps': {} } for step in self.steps.all: current_form = None current_form_data = None current_form_files = None if form is not None and step == current_step: current_form = form current_form_data = form_data current_form_files = form_files data['steps'][step] = self.get_step_data( step=step, form=current_form, form_data=current_form_data, form_files=current_form_files) # Allow for manipulating state data before returning data = self.clean_state_data(data) return JsonResponse(data, status=status_code, encoder=self.json_encoder_class)
def render_response(self, data=None, status_code=200): data = data or {} return JsonResponse(data, status=status_code, encoder=self.json_encoder_class)
def form_valid(self, form): """ If the form is valid, redirect to the supplied URL. """ log.info('received POST to main multiuploader view') file_obj = self.request.FILES['file'] wrapped_file = UploadedFile(file_obj) filename = wrapped_file.name file_size = wrapped_file.file.size log.info('Got file: "%s"' % filename) fl = self.model(filename=filename, file=file_obj) fl.save() log.info('File saving done') thumb_url = "" size = self.request.POST.get('size') size = size if size else '180x80' quality = self.request.POST.get('quality') quality = quality if quality else 50 try: im = get_thumbnail(fl.file, size, quality=quality) thumb_url = im.url except Exception as e: log.error(e) # generating json response array result = {"files": [{"id": str(fl.id), "name": filename, "size": file_size, 'type': file_obj.content_type, "url": reverse('multiuploader', args=[fl.pk]), "thumbnailUrl": thumb_url, "deleteUrl": reverse('multiuploader', args=[fl.pk]), "deleteType": "DELETE", }] } return JsonResponse(result)
def sendtest(self, request, campaign_id): from premailer import Premailer from django.core.mail import send_mail, BadHeaderError receiver = request.POST.get('receiver') if not receiver: raise Http404 try: campaign = self.model.objects.get(pk=campaign_id) except self.model.DoesNotExist: pass else: content = campaign.render_html(request, scheme='http://', test=True) content = Premailer(content, strip_important=False).transform() plain = campaign.render_plain(request, test=True) try: send_mail( campaign.subject, plain, settings.DEFAULT_FROM_EMAIL, recipient_list=[receiver], html_message=content ) except BadHeaderError: pass response = JsonResponse({}) set_cookie(response, 'last_receiver', receiver) return response
def submit_view(self, request, object_id): try: obj = self.model._default_manager.get(pk=object_id) except self.model.DoesNotExist: raise Http404 form = self.get_autopost_form(request, obj) if form.is_valid(): obj_ct = ContentType.objects.get_for_model(obj) text = form.cleaned_data.get('text') networks = form.cleaned_data.get('networks') for network in networks: try: post = FeedPost.objects.get( network=network, content_type=obj_ct, object_id=obj.pk, ) except FeedPost.DoesNotExist: FeedPost.objects.create( network=network, url=request.build_absolute_uri(self.get_autopost_url(obj)), text=text, content_type=obj_ct, object_id=obj.pk, ) else: if post.scheduled: # ????????? ?????? post.url = request.build_absolute_uri(self.get_autopost_url(obj)) post.text = text post.save() return JsonResponse({}) else: return JsonResponse({ 'errors': form.errors }, status=400)
def json_response(data=None, **kwargs): if data is None: data = {} defaults = { 'encoder': LazyJSONEncoder } defaults.update(kwargs) return JsonResponse(data, **defaults)
def watch(request): game_ids = request.body.split(',') result = worker.watch_games(game_ids) return JsonResponse({'result': result})
def watch_add(request): game_id = request.body worker.add_watch(game_id) return JsonResponse({'ok': True})
def dict_to_response(response, response_format): if response_format == 'msgpack': return HttpResponse(msgpack.packb(response)) else: return JsonResponse(response)
def open_task(request, contest_id, task_id, participant_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) task = get_object_or_404(tasks_models.Task, pk=task_id) if not contest.has_task(task): return HttpResponseNotFound() if not is_manual_task_opening_available_in_contest(contest): messages.error(request, 'Manual task opening is forbidden for this contest') return redirect(urlresolvers.reverse('contests:task_opens', args=[contest.id, task.id])) participant = get_object_or_404(models.AbstractParticipant, pk=participant_id) if participant.contest_id != contest.id: return HttpResponseNotFound() qs = tasks_models.ManualOpenedTask.objects.filter( contest=contest, task=task, participant=participant ) # Toggle opens state: close if it's open, open otherwise if qs.exists(): qs.delete() if is_task_open(contest, task, participant): messages.warning(request, 'Task is opened for this participant not manually, you can\'t close it') else: messages.success(request, 'Task is closed for %s' % participant.name) else: tasks_models.ManualOpenedTask( contest=contest, task=task, participant=participant ).save() messages.success(request, 'Task is opened for %s' % participant.name) return JsonResponse({'done': 'ok'})
def recommend(req): recs=list(map(lambda x:getattr(x, "articleId"),Recommendation.objects(userId=req.POST["uid"]))) articles=Article.objects(eid__in=recs).exclude("content") articles=list(map(lambda x:x.__data,articles)) count=0 for word in WordBag.objects(eid__in=recs): articles[count]["keyWord"]=word.wordList count+=1 return JsonResponse({ "success":True, "data":articles })
def history(req): aids=Record.getArticleForUser(req.POST["uid"]) articles=list(lambda x:x.__data,Article.objects(eid__in=aids).exclude("content")) count=0 for word in WordBag.objects(eid__in=aids): articles[count]["keyWord"]=word.wordList count+=1 return JsonResponse({ "success":True, "data":articles })
def list(req): pageIndex=req.POST["pageIndex"] if req.POST["pageIndex"] else 0 pageNum=req.POST["pageNum"] if req.POST["pageNum"] else 20 articles=list(map(lambda x:x.__data,Article.objects[pageIndex*pageNum:(pageIndex+1)*pageNum])) words=WordBag.objects(eid__in=list(map(lambda x:x["eid"],articles))) for i in range(len(articles)): articles[i]["keyWord"]=words[i].wordList return JsonResponse({ "success":True, "data":articles })
def detail(req,articleId): article=Article.objects(eid=articleId).first() article=article.__data article["keyWord"]=WordBag.objects(eid=articleId).first().wordList return JsonResponse({ "success":True, "data":article })
def form_invalid(self, form): response = super(GeoTarget, self).form_invalid(form) if self.request.is_ajax(): return JsonResponse(form.errors.as_json(), safe=False, status=400) else: return response
def form_valid(self, form): # We make sure to call the parent's form_valid() method because # it might do some processing (in the case of CreateView, it will # call form.save() for example). response = super(GeoTarget, self).form_valid(form) if self.request.is_ajax(): cons_ids = [] kwargs = {'state_cd': form.cleaned_data['state']} if form.cleaned_data['primary_only']: kwargs['is_primary'] = True # Get all constituent addresses in the state cons_addrs = ConstituentAddress.objects.filter(**kwargs) geojson = json.loads(form.cleaned_data['geojson']) if geojson['type'] == 'FeatureCollection': # todo: fetch number, but stick to 1st for now logger.debug('is FeatureCollection') geojson = geojson['features'][0]['geometry'] # elif geojson['type'] not ['MultiPolygon', 'Polygon']: poly = GEOSGeometry(json.dumps(geojson)) for con in cons_addrs: point = Point(y=con.latitude, x=con.longitude) if poly.contains(point): cons_ids.append(con.cons_id) return JsonResponse(cons_ids, safe=False) else: return response
def trending(self, request, object_id, **kwargs): # a custom view that shows trending dogs if request.method == 'GET': obj = get_object_or_404(self.model, id=object_id) # We could filter by dogs specific to this post here, if needed trending_dogs = Dog.trending.all()[:3] dogs = DogSerializer(trending_dogs, many=True).data return JsonResponse({'results': dogs}) else: return JsonResponse({'results': []})
def post(self, request, *args, **kwargs): with PyTessBaseAPI() as api: with Image.open(request.FILES['image']) as image: sharpened_image = image.filter(ImageFilter.SHARPEN) api.SetImage(sharpened_image) utf8_text = api.GetUTF8Text() return JsonResponse({'utf8_text': utf8_text})
def device_token_receive(request): if request.method != 'PUT': return HttpResponse(status=405) if request.body is b'': return JsonResponse({'error': 'Bad Request'}, status=400) query_dict = request.body.decode('utf-8') body = json.loads(query_dict) error = False if 'device_token' not in body: error = True if 'uuid' not in body: error = True if 'sandbox' not in body: error = True if error: return JsonResponse({'error': 'Bad Request'}, status=400) device_token = body['device_token'] uuid = body['uuid'] sandbox = body['sandbox'] if DeviceToken.objects.filter(uuid=uuid).count() != 0: token = DeviceToken.objects.get(uuid=uuid) token.device_token = device_token token.use_sandbox = sandbox token.save() else: token = DeviceToken() token.device_token = device_token token.uuid = uuid token.use_sandbox = sandbox token.save() return JsonResponse({'result': 'success'}, status=200)
def post(self, request): tree = request.session.get('tree', None) node_title = request.data.get('node_title', None) if tree is None or node_title is None: return HttpResponseBadRequest() node = tree.find(node_title) if node is None: return HttpResponseBadRequest() convert_list = { 'title__icontains': 'title', 'code__contains': 'code', 'credit': 'credit', 'category': 'category', 'area': 'area', 'subarea': 'subarea', 'college': 'college', 'dept': 'dept', } query_list = {key: request.data.get(value, None) for key, value in convert_list} # Delete non-parameter keys from query for key, value in query_list: if value is None: del query_list[key] # TODO: add functions to get these variables query_list['year'] = '2017' query_list['semester'] = '1' retrieved = Course.objects.filter(**query_list) node.filter_true(retrieved) retrieved_list = [] for item in retrieved: course = {value: item.get(value, None) for key, value in convert_list} retrieved_list.append(course) json_root = {'data': retrieved_list} return JsonResponse(json.dumps(json_root))
def test(self, mock_request): mock_response = JsonResponse({"balance": 250}) mock_request.return_value = mock_response ussd_client = self.ussd_client() self.assertEqual( "Testing response is being saved in " "session status code is 200 and " "balance is 250 and full content {'balance': 250}.\n", ussd_client.send('') ) expected_calls = [ mock.call( method='get', url="http://localhost:8000/mock/balance", params=dict( phone_number="200", session_id=ussd_client.session_id ), verify=False, headers={"content-type": "application/json", "user-agent": "my-app/0.0.1"} ), mock.call( method="get", url="http://localhost:8000/mock/balance/200/" ), mock.call( method='post', url='http://localhost:8000/mock/balance', params=dict( phone_numbers=[200, 201, 202], session_id=ussd_client.session_id ), verify=True, timeout=30, headers={"content-type": "application/json"} ) ] # test requests that were made mock_request.assert_has_calls(expected_calls)
def test_retry(self, mock_retry, mock_request): mock_response = JsonResponse({"balance": 250}, status=400 ) mock_request.return_value = mock_response ussd_client = self.get_ussd_client() ussd_client.send('mwas') # check posted flag has been set to false self.assertFalse( ussd_session(ussd_client.session_id)['posted'], ) self.assertTrue(mock_retry.called)