我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用django.conf.settings.TESTING。
def test_kuailexue_api2(self): # return # ?kuailexue???????? settings.TESTING = False test_stu_id = 'test_student_1' test_stu_name = '????1' test_tea_id = 'debug_210_7VQy5' test_teat_name = '????1' test_teat_pswd = '892673' klx_stu = klx.klx_register(klx.KLX_ROLE_STUDENT, test_stu_id, test_stu_name) _console.info(klx_stu) self.assertIsNotNone(klx_stu) klx_tea = klx.klx_register(klx.KLX_ROLE_TEACHER, test_tea_id, test_teat_name, test_teat_pswd) _console.info(klx_tea) self.assertIsNotNone(klx_tea) ok = klx.klx_relation(klx_tea, klx_stu) self.assertTrue(ok)
def clean_description(self): desc = self.cleaned_data['description'] if settings.TESTING: check_spam = False else: akismet = Akismet(settings.AKISMET_KEY, blog="CC Search") check_spam = akismet.check(self.request.get_host(), user_agent=self.request.META.get('user-agent'), comment_author=self.request.user.username, comment_content=desc) wordfilter = Wordfilter() check_words = wordfilter.blacklisted(desc) if check_spam or check_words: raise forms.ValidationError("This description failed our spam or profanity check; the description has not been updated.") return desc
def create_profile(sender, instance, **kwargs): new_account = False if kwargs['created']: profile = Profile.objects.create(user=instance) new_account = True else: profile = Profile.objects.filter(user=instance) if len(profile) == 0: profile = Profile.objects.create(user=instance) new_account = True if new_account: profile.update_code() link_text = settings.HOSTNAME + profile.get_confirmation_link() if not settings.TESTING: send_signup_email(instance.email, link_text)
def save(self, *args, **kwargs): """Save the model after geocoding the supplied address. Here the address is geocoded using the Google geocoding service. This is limited to 2500 requests per day. There is no current system to account for this so models over 2500 will not geocode. """ if not settings.TESTING: response = requests.get( 'https://maps.googleapis.com/maps/api/geocode/json', params={'address': self.raw_address, 'key': settings.G_APPS_KEY}) if response.status_code == 200: address = Address() address.raw = response.text address.from_google_json(response.json()) address.save() self.address_object = address super(Property, self).save(*args, **kwargs)
def article_save_callback(sender, **kwargs): id = kwargs['id'] is_update_views = kwargs['is_update_views'] type = sender.__name__ obj = None from blog.models import Article, Category, Tag if type == 'Article': obj = Article.objects.get(id=id) elif type == 'Category': obj = Category.objects.get(id=id) elif type == 'Tag': obj = Tag.objects.get(id=id) if obj is not None: if not settings.TESTING and not is_update_views: try: notify_url = obj.get_full_url() SpiderNotify.baidu_notify([notify_url]) except Exception as ex: logger.error("notify sipder", ex) print(ex)
def _get_image_url(self, image_gcs_filename): """ Takes the filename of an image stored in Cloud Storage and calls the images API to serve it from there. Returns the images API serving URL No longer used. """ if settings.TESTING or not settings.DEBUG: if image_gcs_filename: return get_gcs_image_serving_url( image_gcs_filename) else: # Little local dev hack. We're using the Images API with GCS. # This doesn't work locally so instead we pass the image dataUri # and use it directly return image_gcs_filename
def __basic_publish(self): """ Publish message to exchange :return: """ if not django_settings.TESTING: body = json.dumps(self.body) self.channel.basic_publish( exchange=self.exchange_name, routing_key=self.routing_key, body=body, ) print(" [x] Sent %r" % body) self.connection.close()
def add_region(apps, schema_editor): if settings.TESTING: data_file = "regions_for_test.json" else: data_file = "regions.txt" regions = json.load(open( os.path.join(os.path.dirname(__file__), data_file)), object_pairs_hook=OrderedDict) dfs(apps, regions, 1)
def get_one_subject_report(self, the_subject, parent, params): s_name = the_subject.name s_name_en = klx.klx_subject_name(s_name) url = klx.KLX_STUDY_URL_FMT.format(subject=s_name_en) ans_data = {'subject_id': the_subject.id} if settings.TESTING: return JsonResponse(ans_data) # query the last order last_order = models.Order.objects.filter( parent=parent, status=models.Order.PAID, subject=the_subject).order_by('-created_at').first() if not last_order: return HttpResponse(status=404) # Have not joined the course ans_data['grade_id'] = last_order.grade_id # ??????????? ans_data.update(self._get_total_nums(url, params)) # ????????????????? ans_data.update(self._get_exercise_total_nums(url, params)) # ??????? ans_data['error_rates'] = self._get_error_rates(url, params) # ????????? ans_data['month_trend'] = self._get_month_trend(url, params) # ??????/???????? ans_data['knowledges_accuracy'] = self._get_knowledges_accuracy( url, params) # ?????? ans_data['abilities'] = self._get_abilities( url, params, klx.KLX_MATH_ABILITY_KEYS) # ?????(?????????????????????) ans_data['score_analyses'] = self._get_score_analyses(url, params) settings.DEBUG and logger.debug(json.dumps(ans_data)) return JsonResponse(ans_data)
def _get_auth_redirect_url(request, state): if settings.TESTING: return reverse('wechat:phone_page') + '?state=' + str(state) checkPhoneURI = get_server_host(request) + reverse('wechat:check_phone') params_str = { # 'redirect_uri': checkPhoneURI, 'response_type': "code", 'scope': "snsapi_base", 'state': state, 'connect_redirect': "1" } redirect_url = wx.WX_AUTH_URL + '&' + urlencode( params_str) + '&redirect_uri=' + checkPhoneURI + '#wechat_redirect' return redirect_url
def get_parent(self, request): parent = _get_parent(request) if parent is None and settings.TESTING: # the below line is only for testing parent = models.Parent.objects.get(pk=3) parent.user.backend = _get_default_bankend_path() login(request, parent.user) return parent
def verify_order(self, request): # get request params prepay_id = request.POST.get('prepay_id') order_id = request.POST.get('order_id') if settings.TESTING: ret_code = set_order_paid(order_id=order_id) if ret_code == 1: return JsonResponse( {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4}) return JsonResponse({'ok': True, 'msg': '', 'code': 0}) query_ret = wx.wx_pay_order_query(order_id=order_id) if query_ret['ok']: trade_state = query_ret['data']['trade_state'] if trade_state == wx.WX_SUCCESS: # ????, ????????, ???????? try: ret_code = set_order_paid(prepay_id=prepay_id, order_id=query_ret['data'][ 'out_trade_no'], open_id=query_ret['data'][ 'openid']) if ret_code == 1: return JsonResponse( {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4}) except (OrderStatusIncorrect, RefundError): return JsonResponse( {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4}) except Exception as ex: logger.exception(ex) return JsonResponse( {'ok': False, 'msg': '????, ?????', 'code': 5}) return JsonResponse({'ok': True, 'msg': '', 'code': 0}) else: if trade_state == wx.WX_PAYERROR: return {'ok': False, 'msg': '????', 'code': 2} else: return {'ok': False, 'msg': '???', 'code': 3} else: return {'ok': False, 'msg': query_ret['msg'], 'code': 1}
def _get_wx_jsapi_ticket(access_token): if settings.TESTING: return "", "" jsapi_ticket = _get_wx_jsapi_ticket_from_db() msg = None if not jsapi_ticket: result = wx.wx_get_jsapi_ticket(access_token) if result['ok']: jsapi_ticket = result['ticket'] else: msg = result['msg'] return jsapi_ticket, msg
def _get_wx_token(): if settings.TESTING: return "", "" token = _get_wx_token_from_db() msg = None if not token: result = wx.wx_get_token() if result['ok']: token = result['token'] else: msg = result['msg'] return token, msg
def update_search_index(sender, instance, **kwargs): """When an Image instance is saved, tell the search engine about it.""" if not settings.TESTING: _update_search_index(instance)
def autocomplete_queries(self, query_string): """provide autocomplete suggestions""" analyzer = InputQAnalyzer(query_string) query_components = [ vestiging_query(analyzer), mac_query(analyzer) ] result_data = [] # Ignoring cache in case debug is on ignore_cache = settings.TESTING # create elk queries for q in query_components: # type: ElasticQueryWrapper search = q.to_elasticsearch_object(self.client) # get the result from elastic try: result = search.execute(ignore_cache=ignore_cache) except: log.exception('FAILED ELK SEARCH: %s', json.dumps(search.to_dict(), indent=2)) continue # Get the datas! result_data.append(result) return result_data
def setup_test_environment(self, **kwargs): super(TestRunner, self).setup_test_environment(**kwargs) settings.TESTING = True
def teardown_test_environment(self, **kwargs): super(TestRunner, self).teardown_test_environment(**kwargs) settings.TESTING = True
def handler(self): info = self.message.content if self.userinfo.isAdmin and info.upper() == 'EXIT': self.userinfo = WxUserInfo() self.savesession() return "????" if info.upper() == 'ADMIN': self.userinfo.isAdmin = True self.savesession() return "???????" if self.userinfo.isAdmin and not self.userinfo.isPasswordSet: passwd = settings.WXADMIN if settings.TESTING: passwd='123' if passwd.upper() == get_md5(get_md5(info)).upper(): self.userinfo.isPasswordSet = True self.savesession() return "????,???????????????:??helpme????" else: if self.userinfo.Count >= 3: self.userinfo = WxUserInfo() self.savesession() return "??????" self.userinfo.Count += 1 self.savesession() return "???????????????:" if self.userinfo.isAdmin and self.userinfo.isPasswordSet: if self.userinfo.Command != '' and info.upper() == 'Y': return cmdhandler.run(self.userinfo.Command) else: if info.upper() == 'HELPME': return cmdhandler.get_help() self.userinfo.Command = info self.savesession() return "????: " + info + " ???" rsp = tuling.getdata(info) return rsp
def fileupload(request): if request.method == 'POST': response = [] for filename in request.FILES: timestr = datetime.datetime.now().strftime('%Y/%m/%d') imgextensions = ['jpg', 'png', 'jpeg', 'bmp'] fname = u''.join(str(filename)) isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0 basepath = r'/var/www/resource/{type}/{timestr}'.format( type='files' if not isimage else'image', timestr=timestr) if settings.TESTING: basepath = settings.BASE_DIR + '/uploads' url = 'https://resource.lylinux.net/{type}/{timestr}/{filename}'.format( type='files' if not isimage else'image', timestr=timestr, filename=filename) if not os.path.exists(basepath): os.makedirs(basepath) savepath = os.path.join(basepath, filename) with open(savepath, 'wb+') as wfile: for chunk in request.FILES[filename].chunks(): wfile.write(chunk) if isimage: from PIL import Image image = Image.open(savepath) image.save(savepath, quality=20, optimize=True) response.append(url) return HttpResponse(response) else: return HttpResponse("only for post")
def mutate_and_get_payload(cls, input, context, info): if not context.user.stripe_user_id: error_msg = 'User tried to create a transaction before they have a stripe account' logger.warning(error_msg, extra={'request': context}) raise MutationException(error_msg) formatted_input = convert_input_global_ids_to_pks(input) serializer = CreateTransactionSerializer(data=formatted_input, context={ 'user': context.user, 'receiving_person_id': formatted_input.get('receiving_person_id') }) serializer.is_valid(raise_exception=True) product = Product.objects.get(pk=formatted_input.get('product_id')) associated_event = AssociatedEvent.objects.get(pk=formatted_input.get('associated_event_id')) transaction = Transaction( **formatted_input, associated_event_date=associated_event.event.next_date, cost_usd=product.cost_usd, user=context.user) transaction.save() if settings.TESTING: mock_create_stripe_charge( user=context.user, transaction=transaction, request=context ) else: create_stripe_charge( user=context.user, transaction=transaction, request=context ) return CreateTransaction(transaction=transaction)
def __init__(self, **kwargs): self.connection_name = kwargs.get('connection_name', getattr(settings, 'LOG_QUERY_DATABASE_CONNECTION', 'default')) self.log_duplicate_queries = kwargs.get('log_duplicate_queries', getattr(settings, 'LOG_QUERY_DUPLICATE_QUERIES', True)) self.log_tracebacks = kwargs.get('log_tracebacks', getattr(settings, 'LOG_QUERY_TRACEBACKS', False)) self.log_long_running_time = kwargs.get('log_long_running_time', getattr(settings, 'LOG_QUERY_TIME_ABSOLUTE_LIMIT', 1000)) self.logging_extras = kwargs.get('logging_extra_dict', {}) # This is for internal testing only. If you add unit tests yourself for the query debbuging mixin, then you can # define a settings.TESTING variable that is True when unit tests are running. self.testing = getattr(settings, 'TESTING', False)
def get_subjects_summary(self, parent, params): subjects_list = [] if settings.TESTING: return JsonResponse({'results': subjects_list}) purchased_subjects = [] # get subject from order ordered_subjects = models.Order.objects.filter( parent=parent, status=models.Order.PAID).values( 'subject', 'grade', 'created_at').order_by( '-created_at') for tmp_order in ordered_subjects: # only support math presently tmp_subject = models.Subject.objects.get(id=tmp_order['subject']) s_name = tmp_subject.name if s_name in purchased_subjects: continue purchased_subjects.append(s_name) if s_name in klx.KLX_REPORT_SUBJECTS: s_name_en = klx.klx_subject_name(s_name) url = klx.KLX_STUDY_URL_FMT.format(subject=s_name_en) subject_data = { 'subject_id': tmp_subject.id, 'supported': True, 'purchased': True, 'grade_id': tmp_order['grade']} # ??????????? subject_data.update(self._get_total_nums(url, params)) subjects_list.append(subject_data) else: subjects_list.append({ 'subject_id': tmp_subject.id, 'purchased': True, 'grade_id': tmp_order['grade'], 'supported': False, }) # subjects supported, but user did not purchase should_buy_subjects = [b for b in klx.KLX_REPORT_SUBJECTS if b not in purchased_subjects] if should_buy_subjects: to_buy_subjects = models.Subject.objects.filter( name__in=should_buy_subjects) for s in to_buy_subjects: subjects_list.append({ 'subject_id': s.id, 'supported': True, 'purchased': False, }) settings.DEBUG and logger.debug(json.dumps(subjects_list)) return JsonResponse({'results': subjects_list})
def klx_register(role, uid, name, password=None, subject=None): ''' :param role: 1???? 2??? :param uid: models.User.id :param name: Student Name or Teacher Name :param password: ??123456 :param subject: ?role?????????? :return: kuailexue username ''' klx_url = settings.KUAILEXUE_SERVER + '/third-partner/register' # _logger.debug(klx_url) params = { 'role': role, 'uid': uid, 'name': name, } if password is not None: params['password'] = password if subject is not None: params['subject'] = subject params = klx_build_params(params, True) if settings.TESTING: return klx_verify_sign(params) and '1' or '0' # _logger.debug(params) try: resp = requests.post(klx_url, data=params, timeout=10) except Exception as err: _logger.error('cannot reach kuailexue server') _logger.exception(err) return None if resp.status_code != 200: _logger.error('cannot reach kuailexue server, http_status is %s' % (resp.status_code)) # raise KuailexueServerError('cannot reach kuailexue server, http_status is %s' % (resp.status_code)) return None if _get_is_cold_testing(): _console.warning(klx_url+'?'+urllib.parse.urlencode(params)) ret_json = json.loads(resp.content.decode('utf-8')) if _get_is_cold_testing(): _console.info(ret_json) if ret_json.get('data') is not None: # code == 0, ??????code != 0 ret_data = ret_json.get('data') return ret_data.get('username') # (????)??????? KUAILEXUE_PARTNER+uid+'_'+${YYYY} else: req_url = klx_url+'?'+urllib.parse.urlencode(params) _logger.error('kuailexue reponse data error, CODE: %s, MSG: %s. (URL=%s)' % (ret_json.get('code'), ret_json.get('message'), req_url)) # raise KuailexueDataError('get kuailexue wrong data, CODE: %s, MSG: %s' % (ret_json.get('code'), ret_json.get('message'))) return None