我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用django.conf.settings()。
def add_sharing_banner(page, response): if not getattr(settings, 'WAGTAILSHARING_BANNER', True): return if hasattr(response, 'render') and callable(response.render): response.render() html = force_text(response.content) body = re.search(r'(?i)<body.*?>', html) if body: endpos = body.end() banner_template_name = 'wagtailsharing/banner.html' banner_template = loader.get_template(banner_template_name) banner_html = banner_template.render() banner_html = force_text(banner_html) content_with_banner = html[:endpos] + banner_html + html[endpos:] response.content = content_with_banner
def __init__(self, testcases_to_run, nr_of_tests, all_should_pass=True, print_bad=True, runner_options=None): runner_options = runner_options or {} self.nr_of_tests = nr_of_tests self.all_should_pass = all_should_pass self.print_bad = print_bad django_runner_cls = get_runner(settings) django_runner = django_runner_cls(**runner_options) self.suite = django_runner.test_suite() for testcase_cls in testcases_to_run: tests = django_runner.test_loader.loadTestsFromTestCase( testcase_cls) self.suite.addTests(tests) self.test_runner = django_runner.test_runner( resultclass=django_runner.get_resultclass(), stream=six.StringIO() )
def create_notification(**kwargs): """Notify signal receiver.""" # make fresh copy and retain kwargs params = kwargs.copy() del params['signal'] del params['sender'] with suppress(KeyError): del params['silent'] silent = kwargs.get('silent', False) # If it's a silent notification create the notification but don't save it if silent: notification = Notification(**params) else: notification = Notification.objects.create(**params) # send via custom adapters for adapter_path in getattr(settings, 'NOTIFICATION_ADAPTERS', []): adapter = import_attr(adapter_path) adapter(**kwargs).notify() if getattr(settings, 'NOTIFICATIONS_WEBSOCKET', False): send_to_queue(notification)
def post_stats(request, response, data): es_url_template = getattr(settings, 'CAVALRY_ELASTICSEARCH_URL_TEMPLATE', None) if not es_url_template: return payload = build_payload(data, request, response) es_url = es_url_template.format_map( dict( payload, ymd=datetime.utcnow().strftime('%Y-%m-%d'), ), ) body = force_bytes(json.dumps(payload, cls=PayloadJSONEncoder)) try: resp = sess.post(es_url, data=body, headers={'Content-Type': 'application/json'}, timeout=0.5) if resp.status_code != 201: log.warning('Unable to post data to %s (error %s): %s', es_url, resp.status_code, resp.text) except Exception as e: log.warning('Unable to post data to %s: %s', es_url, e)
def test_server_list_pagination_more(self): page_size = getattr(settings, 'API_RESULT_PAGE_SIZE', 1) servers = self.servers.list() novaclient = self.stub_novaclient() novaclient.servers = self.mox.CreateMockAnything() novaclient.servers.list(True, {'all_tenants': True, 'marker': None, 'limit': page_size + 1}) \ .AndReturn(servers[:page_size + 1]) self.mox.ReplayAll() ret_val, has_more = api.nova.server_list(self.request, {'marker': None, 'paginate': True}, all_tenants=True) for server in ret_val: self.assertIsInstance(server, api.nova.Server) self.assertEqual(page_size, len(ret_val)) self.assertTrue(has_more)
def heatclient(request, password=None): api_version = "1" insecure = getattr(settings, 'OPENSTACK_SSL_NO_VERIFY', False) cacert = getattr(settings, 'OPENSTACK_SSL_CACERT', None) endpoint = base.url_for(request, 'orchestration') kwargs = { 'token': request.user.token.id, 'insecure': insecure, 'ca_file': cacert, 'username': request.user.username, 'password': password # 'timeout': args.timeout, # 'ca_file': args.ca_file, # 'cert_file': args.cert_file, # 'key_file': args.key_file, } client = heat_client.Client(api_version, endpoint, **kwargs) client.format_parameters = format_parameters return client
def get_active_version(self): if self._active is not None: return self.supported[self._active] key = getattr(settings, self.SETTINGS_KEY, {}).get(self.service_type) if key is None: # TODO(gabriel): support API version discovery here; we'll leave # the setting in as a way of overriding the latest available # version. key = self.preferred # Since we do a key lookup in the supported dict the type matters, # let's ensure people know if they use a string when the key isn't. if isinstance(key, six.string_types): msg = ('The version "%s" specified for the %s service should be ' 'either an integer or a float, not a string.' % (key, self.service_type)) raise exceptions.ConfigurationError(msg) # Provide a helpful error message if the specified version isn't in the # supported list. if key not in self.supported: choices = ", ".join(str(k) for k in six.iterkeys(self.supported)) msg = ('%s is not a supported API version for the %s service, ' ' choices are: %s' % (key, self.service_type, choices)) raise exceptions.ConfigurationError(msg) self._active = key return self.supported[self._active]
def url_for(request, service_type, endpoint_type=None, region=None): endpoint_type = endpoint_type or getattr(settings, 'OPENSTACK_ENDPOINT_TYPE', 'publicURL') fallback_endpoint_type = getattr(settings, 'SECONDARY_ENDPOINT_TYPE', None) catalog = request.user.service_catalog service = get_service_from_catalog(catalog, service_type) if service: if not region: region = request.user.services_region url = get_url_for_service(service, region, endpoint_type) if not url and fallback_endpoint_type: url = get_url_for_service(service, region, fallback_endpoint_type) if url: return url raise exceptions.ServiceCatalogException(service_type)
def get_context_data(self, **kwargs): context = super(NetworkTopologyView, self).get_context_data(**kwargs) network_config = getattr(settings, 'OPENSTACK_NEUTRON_NETWORK', {}) context['launch_instance_allowed'] = self._has_permission( (("compute", "compute:create"),)) context['instance_quota_exceeded'] = self._quota_exceeded('instances') context['create_network_allowed'] = self._has_permission( (("network", "create_network"),)) context['network_quota_exceeded'] = self._quota_exceeded('networks') context['create_router_allowed'] = ( network_config.get('enable_router', True) and self._has_permission((("network", "create_router"),))) context['router_quota_exceeded'] = self._quota_exceeded('routers') context['console_type'] = getattr( settings, 'CONSOLE_TYPE', 'AUTO') context['show_ng_launch'] = getattr( settings, 'LAUNCH_INSTANCE_NG_ENABLED', True) context['show_legacy_launch'] = getattr( settings, 'LAUNCH_INSTANCE_LEGACY_ENABLED', False) return context
def get_subnetpool_choices(self, request): subnetpool_choices = [('', _('Select a pool'))] default_ipv6_subnet_pool_label = \ getattr(settings, 'OPENSTACK_NEUTRON_NETWORK', {}).get( 'default_ipv6_subnet_pool_label', None) default_ipv4_subnet_pool_label = \ getattr(settings, 'OPENSTACK_NEUTRON_NETWORK', {}).get( 'default_ipv4_subnet_pool_label', None) if default_ipv6_subnet_pool_label: subnetpool_dict = {'ip_version': 6, 'name': default_ipv6_subnet_pool_label} subnetpool = api.neutron.SubnetPool(subnetpool_dict) subnetpool_choices.append(('', subnetpool)) if default_ipv4_subnet_pool_label: subnetpool_dict = {'ip_version': 4, 'name': default_ipv4_subnet_pool_label} subnetpool = api.neutron.SubnetPool(subnetpool_dict) subnetpool_choices.append(('', subnetpool)) for subnetpool in api.neutron.subnetpool_list(request): subnetpool_choices.append((subnetpool.id, subnetpool)) return subnetpool_choices
def test_settings(self): from django import conf settings_mod = sys.modules[self.mod_name] settings_mod.DEBUG = True settings_mod.AMF_TIME_OFFSET = 1000 old_settings = conf.settings conf.settings = conf.Settings(self.mod_name) gw = django.DjangoGateway() try: self.assertTrue(gw.debug) self.assertEqual(gw.timezone_offset, 1000) finally: conf.settings = old_settings
def save(self, *args, **kwargs): if self.date_taken is None: try: exif_date = self.exif.get('DateTimeOriginal', None) if exif_date is not None: d, t = exif_date.split(" ") year, month, day = d.split(':') hour, minute, second = t.split(':') if getattr(settings, "USE_TZ", False): tz = get_current_timezone() self.date_taken = make_aware(datetime( int(year), int(month), int(day), int(hour), int(minute), int(second)), tz) else: self.date_taken = datetime( int(year), int(month), int(day), int(hour), int(minute), int(second)) except Exception: pass if self.date_taken is None: self.date_taken = now() super(Image, self).save(*args, **kwargs)
def swappable_first_key(self, item): """ Sorting key function that places potential swappable models first in lists of created models (only real way to solve #22783) """ try: model = self.new_apps.get_model(item[0], item[1]) base_names = [base.__name__ for base in model.__bases__] string_version = "%s.%s" % (item[0], item[1]) if ( model._meta.swappable or "AbstractUser" in base_names or "AbstractBaseUser" in base_names or settings.AUTH_USER_MODEL.lower() == string_version.lower() ): return ("___" + item[0], "___" + item[1]) except LookupError: pass return item
def initialize(): """Initialize the client from Django settings once""" import infusionsoft if not infusionsoft.is_initialized: api_key = getattr(settings, 'INFUSIONSOFT_API_KEY', None) app_name = getattr(settings, 'INFUSIONSOFT_APP_NAME', None) if app_name: app_name_or_api_url = app_name else: app_name_or_api_url = getattr(settings, 'INFUSIONSOFT_API_URL', None) if not api_key or not app_name_or_api_url: raise ValueError( 'Please set INFUSIONSOFT_APP_NAME or INFUSIONSOFT_API_URL, ' 'and INFUSIONSOFT_API_KEY in your settings') options = getattr(settings, 'INFUSIONSOFT_CLIENT_OPTIONS', {}) infusionsoft.initialize(app_name_or_api_url, api_key, **options)
def __init__(self, *args, **kwargs): # Before we get too far, make sure pysqlite 2.5+ is installed. if Database.version_info < (2, 5, 0): raise ImproperlyConfigured('Only versions of pysqlite 2.5+ are ' 'compatible with SpatiaLite and GeoDjango.') # Trying to find the location of the SpatiaLite library. # Here we are figuring out the path to the SpatiaLite library # (`libspatialite`). If it's not in the system library path (e.g., it # cannot be found by `ctypes.util.find_library`), then it may be set # manually in the settings via the `SPATIALITE_LIBRARY_PATH` setting. self.spatialite_lib = getattr(settings, 'SPATIALITE_LIBRARY_PATH', find_library('spatialite')) if not self.spatialite_lib: raise ImproperlyConfigured('Unable to locate the SpatiaLite library. ' 'Make sure it is in your library path, or set ' 'SPATIALITE_LIBRARY_PATH in your settings.' ) super(DatabaseWrapper, self).__init__(*args, **kwargs) self.features = DatabaseFeatures(self) self.ops = SpatiaLiteOperations(self) self.client = SpatiaLiteClient(self) self.introspection = SpatiaLiteIntrospection(self)
def render(self, name, value, attrs=None): # Prepare values attrs = self.build_attrs(attrs, name=name) if not value: value = '' options = getattr(settings, 'MARKEDIT_DEFAULT_SETTINGS', {}) if 'options' in attrs: options = self._eval_value(attrs['options'], {}) del attrs['options'] # Render widget to HTML t = loader.get_template('markedit/ui.html') c = Context({ 'attributes': self._render_attrs(attrs), 'value': conditional_escape(force_unicode(value)), 'id': attrs['id'], 'options': options, }) return t.render(c)
def add_arguments(self, parser): parser.add_argument( 'args', metavar='test_label', nargs='*', help='Module paths to test; can be modulename, modulename.TestCase or modulename.TestCase.test_method' ) parser.add_argument( '--noinput', '--no-input', action='store_false', dest='interactive', default=True, help='Tells Django to NOT prompt the user for input of any kind.', ) parser.add_argument( '--failfast', action='store_true', dest='failfast', default=False, help='Tells Django to stop running the test suite after first failed test.', ) parser.add_argument( '--testrunner', action='store', dest='testrunner', help='Tells Django to use specified test runner class instead of ' 'the one specified by the TEST_RUNNER setting.', ) test_runner_class = get_runner(settings, self.test_runner) if hasattr(test_runner_class, 'add_arguments'): test_runner_class.add_arguments(parser)
def check_url_namespaces_unique(app_configs, **kwargs): """ Warn if URL namespaces used in applications aren't unique. """ if not getattr(settings, 'ROOT_URLCONF', None): return [] from django.urls import get_resolver resolver = get_resolver() all_namespaces = _load_all_namespaces(resolver) counter = Counter(all_namespaces) non_unique_namespaces = [n for n, count in counter.items() if count > 1] errors = [] for namespace in non_unique_namespaces: errors.append(Warning( "URL namespace '{}' isn't unique. You may not be able to reverse " "all URLs in this namespace".format(namespace), id="urls.W005", )) return errors
def __init__(self, *args, **kwargs): # Before we get too far, make sure pysqlite 2.5+ is installed. if Database.version_info < (2, 5, 0): raise ImproperlyConfigured('Only versions of pysqlite 2.5+ are ' 'compatible with SpatiaLite and GeoDjango.') # Trying to find the location of the SpatiaLite library. # Here we are figuring out the path to the SpatiaLite library # (`libspatialite`). If it's not in the system library path (e.g., it # cannot be found by `ctypes.util.find_library`), then it may be set # manually in the settings via the `SPATIALITE_LIBRARY_PATH` setting. self.spatialite_lib = getattr(settings, 'SPATIALITE_LIBRARY_PATH', find_library('spatialite')) if not self.spatialite_lib: raise ImproperlyConfigured('Unable to locate the SpatiaLite library. ' 'Make sure it is in your library path, or set ' 'SPATIALITE_LIBRARY_PATH in your settings.' ) super(DatabaseWrapper, self).__init__(*args, **kwargs)
def get_dirs(self): dirs = super(ThemeLoader, self).get_dirs() theme = get_theme() theme_path = getattr(settings, 'WAGTAIL_THEME_PATH', None) if theme: if theme_path: # Prepend theme path if WAGTAIL_THEME_PATH is set theme_dirs = [ os.path.join(dir, theme_path, theme) for dir in dirs] else: # Append theme for each directory in the DIRS option of the # TEMPLATES setting theme_dirs = [os.path.join(dir, theme) for dir in dirs] return theme_dirs return dirs
def __init__(self): super(NotificationMethod, self).__init__() self.messages = [] self.jid = getattr(settings, 'ASYNC_XMPP_JID', None) self.password = getattr(settings, 'ASYNC_XMPP_PASSWORD', None) if self.jid is None or self.password is None: self.server_configured = False return self.server = getattr(settings, 'ASYNC_XMPP_SERVER', None) self.port = getattr(settings, 'ASYNC_XMPP_SERVER_PORT', 5222) self.connection_tuple = None self.use_srv = True self.jid = xmpp.JID(self.jid) if self.server is not None: self.connection_tuple = (self.server, self.port) self.use_srv = False self.client = Client(self.jid.getDomain()) if not self.client.connect(server=self.connection_tuple, use_srv=self.use_srv): self.server_configured = False return if not self.client.auth(self.jid.getNode(), self.password, resource=self.jid.getResource()): self.server_configured = False return self.client.disconnected() self.server_configured = True
def get_callback_function(setting_name, default=None): func = getattr(settings, setting_name, None) if not func: return default if callable(func): return func if isinstance(func, six.string_types): func = import_string(func) if not callable(func): raise ImproperlyConfigured( '{name} must be callable.'.format(name=setting_name) ) return func
def ready(self): self.schema = getattr(settings, 'SWAGGER_SCHEMA', None) if not self.schema: raise ImproperlyConfigured('You have to provide SWAGGER_SCHEMA setting pointing to desired schema') else: self.module = getattr(settings, 'SWAGGER_MODULE', None) self.swagger = Swagger(self.schema, self.module)
def handle(self, *args, **options): schema = getattr(settings, 'SWAGGER_SCHEMA', None) module = getattr(settings, 'SWAGGER_MODULE', None) if not schema: raise ImproperlyConfigured('You have to provide SWAGGER_SCHEMA setting pointing to desired schema') if not module: raise ImproperlyConfigured('You have to specify desired controller module name in SWAGGER_MODULE setting') router = SwaggerRouter() print('Inspecting available controllers...') router.update(True) router.process() print() print('Following classes and methods are going to be generated:') enum = router.get_enum() for name in enum: print("{} : {}".format(name, [x['method'] for x in enum[name]['methods']])) if(options['generate']): template = Template() filename = module.split('.')[-1] + '.py' structure = [{ 'name' : name, 'data' : data } for name, data in six.iteritems(enum)] print('Generating handlers ({})...'.format(filename)) with codecs.open(filename, 'w', 'utf-8') as f: f.write(template.render(template_name = 'view.jinja', names = structure)) print('Done.') else: print() print('Use --generate option to create them')
def is_open_for_signup(self, request): return getattr(settings, 'ACCOUNT_ALLOW_REGISTRATION', True)
def is_open_for_signup(self, request, sociallogin): return getattr(settings, 'ACCOUNT_ALLOW_REGISTRATION', True)
def get_context_data(self, **kwargs): context = super(MochaView, self).get_context_data(**kwargs) context["settings"] = settings return context
def dotted_paths_for_init(self): if not hasattr(settings, self.settings_name): return self.defaults return getattr(settings, self.settings_name)
def run(self, *a, **kw): """ as self.stopTestRun is ran before the actual results were printed, need to override run() to print things after """ datafile_path = get_datafile_path() self.djpt_writer = Writer(datafile_path) self.djpt_writer.start() retval = super(DjptTestRunnerMixin, self).run(*a, **kw) self.djpt_writer.end() if getattr(settings, 'DJPT_PRINT_WORST_REPORT', True): self.stream.write( 'To see the Worst Performing Items report, ' 'run manage.py djpt_worst_report') return retval
def _validate_data(self): if not self.settings_based: return if self._data: raise TypeError( 'Either provide data (kwargs) or settings_based, ' 'not both.') if self.is_anonymous(): raise TypeError( 'Can only be settings based when collector_id is provided.')
def data(self): if not self.settings_based: return self._data performance_limits = getattr(settings, 'PERFORMANCE_LIMITS', {}) settings_for_id = performance_limits.get(self.collector_id, {}) return settings_for_id.get(self.type_name, {})
def default_checkers(): heartbeat = getattr(settings, 'HEARTBEAT', {}) if not heartbeat.get('checkers'): heartbeat['checkers'] = [ 'heartbeat.checkers.distribution_list', 'heartbeat.checkers.debug_mode', 'heartbeat.checkers.python', 'heartbeat.checkers.host' ] prepare_redis(heartbeat) return heartbeat
def prepare_redis(heartbeat): if 'heartbeat.checkers.redis_status' in heartbeat['checkers']: redis = getattr(settings, 'CACHEOPS_REDIS', None) if redis is None: raise ImproperlyConfigured( 'Missing CACHEOPS_REDIS in project settings')
def get_google_maps_key(): return getattr(settings, 'GOOGLE_MAPS_KEY', "")
def run_tests(*test_args): if not test_args: test_args = ['tests'] os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.settings' django.setup() TestRunner = get_runner(settings) test_runner = TestRunner() failures = test_runner.run_tests(test_args) sys.exit(bool(failures))
def post(self, request): # noqa username = request.data.get('username') password = request.data.get('password') device_id = request.data.get('device_id') or '' if not username or not password: return Response( {'error': 'Missing username or password'}, status=status.HTTP_400_BAD_REQUEST ) if getattr(settings, 'CHECK_USERNAME_IS_EMAIL', False): # Validate username is an email address if not AuthSignup.EMAIL_REGEX.match(username): raise InvalidEmailUsernameAPIException() try: user = get_user_model().objects.create_user( username.lower(), email=username.lower(), password=password ) except IntegrityError: raise UsernameAlreadyTakenAPIException() auth_token, _ = AuthToken.objects.get_or_create( user=user, device_id=device_id ) return Response({'token': auth_token.key}, status=status.HTTP_201_CREATED)
def get_addon_key(): key = getattr(settings, 'HIPCHAT_SENTRY_AC_KEY', None) if key is None: ident = get_addon_host_ident() key = '.'.join(ident.split('.')[::-1]) + '.hipchat-ac' return key
def __getattr__(self, attr): if attr not in self.defaults: raise AttributeError('Invalid app setting "{0}"'.format(attr)) try: value = self.cache[attr] except KeyError: if self.user_settings is None: self.user_settings = getattr(settings, 'POWER_PAGES', {}) try: value = self.user_settings[attr] except KeyError: value = self.defaults[attr] self.cache[attr] = value return value
def generate_thumbnails_async(sender, fieldfile, **kwargs): if getattr(settings, 'FILER_USE_CELERY', True): model = '{}.{}'.format(sender._meta.app_label, sender._meta.model_name) generate_thumbnails.apply_async((model, fieldfile.instance.pk, fieldfile.field.name))
def create_iam_account(self, username, group=None): """Create an IAM account for the given username. Returns: The required credentials. """ self.log("creating IAM user '%s'" % username, INFO) alias = getattr(settings, 'AWS_ACCOUNT_ID_ALIAS', None) if alias: url = 'https://{}.signin.aws.amazon.com/console'.format(alias) else: url = 'https://signin.aws.amazon.com/console' credentials = collections.OrderedDict([ ('Web Console URL', url), ('Username', username), ('Password', self._make_password()) ]) iam_resource = self._get_iam_resource() iam_user = iam_resource.User(username).create(Path='/openbare/') try: if group: iam_user.add_group(GroupName=group) iam_user.create_login_profile( Password=credentials['Password'], PasswordResetRequired=False ) access_key_pair = iam_user.create_access_key_pair() except Exception as e: self._cleanup_iam_user(iam_user) iam_user.delete() raise credentials.update([ ('Access Key ID', access_key_pair.access_key_id), ('Secret Access Key', access_key_pair.secret_access_key) ]) return credentials
def run_tests(self): # import here, cause outside the eggs aren't loaded import django from django.conf import settings from django.test.utils import get_runner os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.test_settings' django.setup() TestRunner = get_runner(settings) test_runner = TestRunner() failures = test_runner.run_tests(['tests']) sys.exit(bool(failures))
def __init__(self, auth=None): super(PaydirektWrapper, self).__init__() if getattr(settings, 'PAYDIREKT', False): self.auth = auth if django_paydirekt_settings.PAYDIREKT_SANDBOX: self.api_url = self.sandbox_url
def main(): # configure django settings with test settings from tests import settings as test_settings django.setup() TestRunner = get_runner(settings) test_runner = TestRunner() failures = test_runner.run_tests(['tests']) sys.exit(failures)
def runtests(): os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.settings' django.setup() test_runner = get_runner(settings) if sys.argv[0] != 'setup.py' and len(sys.argv) > 1: tests = sys.argv[1:] else: tests = ['tests'] failures = test_runner().run_tests(tests) sys.exit(bool(failures))
def default(self, obj): if isinstance(obj, Stack): if not getattr(settings, 'CAVALRY_POST_STACKS', True): return '' return '\n'.join(obj.as_lines()) return super().default(obj)
def _process(request, get_response): with force_debug_cursor(), managed( db_record_stacks=getattr(settings, 'CAVALRY_DB_RECORD_STACKS', True), ) as data: data['start_time'] = get_time() response = get_response(request) if isinstance(response, SimpleTemplateResponse): response.render() data['end_time'] = get_time() data['duration'] = data['end_time'] - data['start_time'] data['databases'] = {} for conn in connections.all(): queries = conn.queries data['databases'][conn.alias] = { 'queries': queries, 'n_queries': len(queries), 'time': (sum(q.get('hrtime', 0) * 1000 for q in queries) if queries else 0), } inject_stats(request, response, data) post_stats_kwargs = {'request': request, 'response': response, 'data': data} if getattr(settings, 'CAVALRY_THREADED_POST', False): Thread(name='cavalry poster', target=post_stats, kwargs=post_stats_kwargs, daemon=False).start() else: post_stats(**post_stats_kwargs) return response