我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils.module_loading.import_string()。
def _create_cache(backend, **kwargs): try: # Try to get the CACHES entry for the given backend name first try: conf = settings.CACHES[backend] except KeyError: try: # Trying to import the given backend, in case it's a dotted path import_string(backend) except ImportError as e: raise InvalidCacheBackendError("Could not find backend '%s': %s" % ( backend, e)) location = kwargs.pop('LOCATION', '') params = kwargs else: params = conf.copy() params.update(kwargs) backend = params.pop('BACKEND') location = params.pop('LOCATION', '') backend_cls = import_string(backend) except ImportError as e: raise InvalidCacheBackendError( "Could not find backend '%s': %s" % (backend, e)) return backend_cls(location, params)
def find_template_loader(self, loader): if isinstance(loader, (tuple, list)): args = list(loader[1:]) loader = loader[0] else: args = [] if isinstance(loader, six.string_types): loader_class = import_string(loader) if getattr(loader_class, '_accepts_engine_in_init', False): args.insert(0, self) else: warnings.warn( "%s inherits from django.template.loader.BaseLoader " "instead of django.template.loaders.base.Loader. " % loader, RemovedInDjango110Warning, stacklevel=2) return loader_class(*args) else: raise ImproperlyConfigured( "Invalid value in template loaders configuration: %r" % loader)
def __getitem__(self, alias): try: return self._engines[alias] except KeyError: try: params = self.templates[alias] except KeyError: raise InvalidTemplateEngineError( "Could not find config for '{}' " "in settings.TEMPLATES".format(alias)) # If importing or initializing the backend raises an exception, # self._engines[alias] isn't set and this code may get executed # again, so we must preserve the original params. See #24265. params = params.copy() backend = params.pop('BACKEND') engine_cls = import_string(backend) engine = engine_cls(params) self._engines[alias] = engine return engine
def configure_logging(logging_config, logging_settings): if not sys.warnoptions: # Route warnings through python logging logging.captureWarnings(True) # RemovedInNextVersionWarning is a subclass of DeprecationWarning which # is hidden by default, hence we force the "default" behavior warnings.simplefilter("default", RemovedInNextVersionWarning) if logging_config: # First find the logging configuration function ... logging_config_func = import_string(logging_config) logging.config.dictConfig(DEFAULT_LOGGING) # ... then invoke it with the logging settings if logging_settings: logging_config_func(logging_settings)
def import_backend(dotted_path): """ Theres two formats for the dotted_path. One with the backend class (old) and one without (new) eg: old: wagtail.wagtailsearch.backends.elasticsearch.ElasticsearchSearchBackend new: wagtail.wagtailsearch.backends.elasticsearch If a new style dotted path was specified, this function would look for a backend class from the "SearchBackend" attribute. """ try: # New backend_module = import_module(dotted_path) return backend_module.SearchBackend except ImportError as e: try: # Old return import_string(dotted_path) except ImportError: six.reraise(ImportError, e, sys.exc_info()[2])
def get_callback_function(setting_name, default=None): func = getattr(settings, setting_name, None) if not func: return default if callable(func): return func if isinstance(func, six.string_types): func = import_string(func) if not callable(func): raise ImproperlyConfigured( '{name} must be callable.'.format(name=setting_name) ) return func
def __init__(self, sitekey=None, secretkey=None, timeout=None, pass_on_error=None, **kwargs): """ :param sitekey: site key (public) :param secretkey: secret key (private) :param timeout: connection to recaptcha service timeout :param pass_on_error: do not raise exception if recaptcha service is not working. """ self.sitekey = sitekey or getattr(settings, 'RECAPTCHA_SITEKEY') self.secretkey = secretkey or getattr(settings, 'RECAPTCHA_SECRETKEY') if timeout is None: timeout = getattr(settings, 'RECAPTCHA_TIMEOUT', default_settings.RECAPTCHA_TIMEOUT) self.timeout = timeout if pass_on_error is None: pass_on_error = getattr(settings, 'RECAPTCHA_PASS_ON_ERROR', default_settings.RECAPTCHA_PASS_ON_ERROR) self.pass_on_error = pass_on_error if not 'widget' in kwargs: recaptcha_widget = import_string(getattr(settings, 'RECAPTCHA_WIDGET', default_settings.RECAPTCHA_WIDGET)) kwargs['widget'] = recaptcha_widget(sitekey=self.sitekey) elif isinstance(kwargs['widget'], type): kwargs['widget'] = kwargs['widget'](sitekey=self.sitekey) super().__init__(**kwargs)
def init(run_woker=True): from django.utils.module_loading import import_string global _runners configs = getattr(settings, "EASY_JOB", {}) logger = logging.getLogger(configs.get('logger', None)) runners = {} for worker_name, worker_options in configs['workers'].items(): # noinspection PyPep8Naming try: if 'result_backend' not in worker_options or worker_options['result_backend'] is None: worker_options['result_backend'] = "easy_job.result_backends.dummy.DummyBackend" WorkerInitializerClass = import_string(worker_options.pop('initializer')) worker = WorkerInitializerClass(**worker_options) runners[worker_name] = worker.start(not run_woker) except (AttributeError, ValueError): logger.fatal("invalid initializer specified for worker with name {}".format(worker_name)) if len(runners) == 0: logger.warning("No worker is available") _runners = runners
def get_backend(service_name): """ Gets the backend with the given service settings. It checks for a BACKEND key and later passes in these OPTIONS in the backend. :param service_settings: dict like object :returns: base.BaseBackend inherited object """ backend_import = DEFAULT_SERVICE service_settings = settings.USER_VERIFICATION.get(service_name, None) if service_settings is None: raise ValueError("service with {} key not found".format(service_name)) if service_settings.get('BACKEND', None): backend_import = service_settings.get('BACKEND', None) backend_cls = import_string(backend_import) return backend_cls(identifier=service_name, **service_settings.get('OPTIONS', {}))
def run_check(id, request=None, fail_silently=True, fail_status=500): status = 200 try: v = config.checks[id] if isinstance(v, six.string_types): c = import_string(v) ret, status = c(request) elif callable(v): ret, status = v(request) else: ret = v except Exception as e: ret = "ERROR" status = fail_status logger.exception(e) if settings.DEBUG: ret = str(e) if not fail_silently: raise return ret, status
def get_extra(config, request=None): extras = {} for k, v in config.extra.items(): try: if isinstance(v, six.string_types): c = import_string(v) extras[k] = c(request) elif callable(v): extras[k] = v(request) else: extras[k] = v except Exception as e: logger.exception(e) if settings.DEBUG: extras[k] = str(e) return extras
def _to_cls(self, dotted_path): return import_string(dotted_path)
def get_offset_backend(): default = 'logpipe.backend.kafka.ModelOffsetStore' backend_path = settings.get('OFFSET_BACKEND', default) return import_string(backend_path)()
def get_consumer_backend(topic_name, **kwargs): default = 'logpipe.backend.kafka.Consumer' backend_path = settings.get('CONSUMER_BACKEND', default) return import_string(backend_path)(topic_name, **kwargs)
def get_producer_backend(): default = 'logpipe.backend.kafka.Producer' backend_path = settings.get('PRODUCER_BACKEND', default) return import_string(backend_path)()
def usage_for_periods(*args, **kwargs): name = getattr(settings, 'SOUVENIRS_USAGE_REPORTS_FUNCTION', None) func = import_string(name) if name else _usage_for_periods return func(*args, **kwargs)
def wamp_dispatch(message): func = import_string(message.content['func_path']) return func(*message.content['args'], **message.content['kwargs'])
def call_processors(self, purpose, **kwargs): for proc in self.get_processors(purpose): if isinstance(proc, str): proc = getattr(self, proc, None) or import_string(proc) kwargs['purpose'] = purpose proc(**kwargs)
def load_handler(path, *args, **kwargs): """ Given a path to a handler, return an instance of that handler. E.g.:: >>> from django.http import HttpRequest >>> request = HttpRequest() >>> load_handler('django.core.files.uploadhandler.TemporaryFileUploadHandler', request) <TemporaryFileUploadHandler object at 0x...> """ return import_string(path)(*args, **kwargs)
def get_storage_class(import_path=None): return import_string(import_path or settings.DEFAULT_FILE_STORAGE)
def get_cookie_signer(salt='django.core.signing.get_cookie_signer'): Signer = import_string(settings.SIGNING_BACKEND) key = force_bytes(settings.SECRET_KEY) return Signer(b'django.http.cookies' + key, salt=salt)
def load_middleware(self): """ Populate middleware lists from settings.MIDDLEWARE_CLASSES. Must be called after the environment is fixed (see __call__ in subclasses). """ self._view_middleware = [] self._template_response_middleware = [] self._response_middleware = [] self._exception_middleware = [] request_middleware = [] for middleware_path in settings.MIDDLEWARE_CLASSES: mw_class = import_string(middleware_path) try: mw_instance = mw_class() except MiddlewareNotUsed as exc: if settings.DEBUG: if six.text_type(exc): logger.debug('MiddlewareNotUsed(%r): %s', middleware_path, exc) else: logger.debug('MiddlewareNotUsed: %r', middleware_path) continue if hasattr(mw_instance, 'process_request'): request_middleware.append(mw_instance.process_request) if hasattr(mw_instance, 'process_view'): self._view_middleware.append(mw_instance.process_view) if hasattr(mw_instance, 'process_template_response'): self._template_response_middleware.insert(0, mw_instance.process_template_response) if hasattr(mw_instance, 'process_response'): self._response_middleware.insert(0, mw_instance.process_response) if hasattr(mw_instance, 'process_exception'): self._exception_middleware.insert(0, mw_instance.process_exception) # We only assign to this when initialization is complete as it is used # as a flag for initialization being complete. self._request_middleware = request_middleware
def get_internal_wsgi_application(): """ Loads and returns the WSGI application as configured by the user in ``settings.WSGI_APPLICATION``. With the default ``startproject`` layout, this will be the ``application`` object in ``projectname/wsgi.py``. This function, and the ``WSGI_APPLICATION`` setting itself, are only useful for Django's internal server (runserver); external WSGI servers should just be configured to point to the correct application object directly. If settings.WSGI_APPLICATION is not set (is ``None``), we just return whatever ``django.core.wsgi.get_wsgi_application`` returns. """ from django.conf import settings app_path = getattr(settings, 'WSGI_APPLICATION') if app_path is None: return get_wsgi_application() try: return import_string(app_path) except ImportError as e: msg = ( "WSGI application '%(app_path)s' could not be loaded; " "Error importing module: '%(exception)s'" % ({ 'app_path': app_path, 'exception': e, }) ) six.reraise(ImproperlyConfigured, ImproperlyConfigured(msg), sys.exc_info()[2])
def get_connection(backend=None, fail_silently=False, **kwds): """Load an email backend and return an instance of it. If backend is None (default) settings.EMAIL_BACKEND is used. Both fail_silently and other keyword arguments are used in the constructor of the backend. """ klass = import_string(backend or settings.EMAIL_BACKEND) return klass(fail_silently=fail_silently, **kwds)
def get_password_validators(validator_config): validators = [] for validator in validator_config: try: klass = import_string(validator['NAME']) except ImportError: msg = "The module in NAME could not be imported: %s. Check your AUTH_PASSWORD_VALIDATORS setting." raise ImproperlyConfigured(msg % validator['NAME']) validators.append(klass(**validator.get('OPTIONS', {}))) return validators
def get_hashers(): hashers = [] for hasher_path in settings.PASSWORD_HASHERS: hasher_cls = import_string(hasher_path) hasher = hasher_cls() if not getattr(hasher, 'algorithm'): raise ImproperlyConfigured("hasher doesn't specify an " "algorithm name: %s" % hasher_path) hashers.append(hasher) return hashers
def load_backend(path): return import_string(path)()
def get_finder(import_path): """ Imports the staticfiles finder class described by import_path, where import_path is the full Python path to the class. """ Finder = import_string(import_path) if not issubclass(Finder, BaseFinder): raise ImproperlyConfigured('Finder "%s" is not a subclass of "%s"' % (Finder, BaseFinder)) return Finder()
def default_storage(request): """ Callable with the same interface as the storage classes. This isn't just default_storage = import_string(settings.MESSAGE_STORAGE) to avoid accessing the settings at the module level. """ return import_string(settings.MESSAGE_STORAGE)(request)
def setUpClass(cls): if not os.environ.get('DJANGO_SELENIUM_TESTS', False): raise SkipTest('Selenium tests not requested') try: cls.selenium = import_string(cls.webdriver_class)() except Exception as e: raise SkipTest('Selenium webdriver "%s" not installed or not ' 'operational: %s' % (cls.webdriver_class, str(e))) # This has to be last to ensure that resources are cleaned up properly! super(AdminSeleniumWebDriverTestCase, cls).setUpClass()
def construct_managers(self): "Deep-clone the managers using deconstruction" # Sort all managers by their creation counter sorted_managers = sorted(self.managers, key=lambda v: v[1].creation_counter) for mgr_name, manager in sorted_managers: mgr_name = force_text(mgr_name) as_manager, manager_path, qs_path, args, kwargs = manager.deconstruct() if as_manager: qs_class = import_string(qs_path) yield mgr_name, qs_class.as_manager() else: manager_class = import_string(manager_path) yield mgr_name, manager_class(*args, **kwargs)
def routers(self): if self._routers is None: self._routers = settings.DATABASE_ROUTERS routers = [] for r in self._routers: if isinstance(r, six.string_types): router = import_string(r)() else: router = r routers.append(router) return routers
def template_context_processors(self): context_processors = _builtin_context_processors context_processors += tuple(self.context_processors) return tuple(import_string(path) for path in context_processors)
def get_default_exception_reporter_filter(): # Instantiate the default filter for the first time and cache it. return import_string(settings.DEFAULT_EXCEPTION_REPORTER_FILTER)()
def get_key_func(key_func): """ Function to decide which key function to use. Defaults to ``default_key_func``. """ if key_func is not None: if callable(key_func): return key_func else: return import_string(key_func) return default_key_func