我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils()。
def setActiveUser(self, id=None, token=None, username=None, tenant_id=None, service_catalog=None, tenant_name=None, roles=None, authorized_tenants=None, enabled=True, domain_id=None, user_domain_name=None): def get_user(request): return user.User(id=id, token=token, user=username, domain_id=domain_id, user_domain_name=user_domain_name, tenant_id=tenant_id, service_catalog=service_catalog, roles=roles, enabled=enabled, authorized_tenants=authorized_tenants, endpoint=settings.OPENSTACK_KEYSTONE_URL) utils.get_user = get_user
def setUp(self): super(SeleniumTestCase, self).setUp() test_utils.load_test_data(self) self.mox = mox.Mox() self._real_get_user = utils.get_user self.setActiveUser(id=self.user.id, token=self.token, username=self.user.name, tenant_id=self.tenant.id, service_catalog=self.service_catalog, authorized_tenants=self.tenants.list()) self.patchers = {} self.patchers['aggregates'] = mock.patch( 'openstack_dashboard.dashboards.admin' '.aggregates.panel.Aggregates.can_access', mock.Mock(return_value=True)) self.patchers['aggregates'].start() os.environ["HORIZON_TEST_RUN"] = "True"
def preprocess(self): """ Preprocess (if necessary) a translatable file before passing it to xgettext GNU gettext utility. """ from django.utils.translation import templatize if not self.is_templatized: return with io.open(self.path, 'r', encoding=settings.FILE_CHARSET) as fp: src_data = fp.read() if self.domain == 'djangojs': content = prepare_js_for_gettext(src_data) elif self.domain == 'django': content = templatize(src_data, self.path[2:]) with io.open(self.work_path, 'w', encoding='utf-8') as fp: fp.write(content)
def _execute_wrapper(self, method, query, args): """Wrapper around execute() and executemany()""" try: return method(query, args) except (PyMysqlPool.mysql.connector.ProgrammingError) as err: six.reraise(utils.ProgrammingError, utils.ProgrammingError(err.msg), sys.exc_info()[2]) except (PyMysqlPool.mysql.connector.IntegrityError) as err: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) except PyMysqlPool.mysql.connector.OperationalError as err: # Map some error codes to IntegrityError, since they seem to be # misclassified and Django would prefer the more logical place. if err.args[0] in self.codes_for_integrityerror: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) else: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2]) except PyMysqlPool.mysql.connector.DatabaseError as err: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2])
def _execute_wrapper(self, method, query, args): """Wrapper around execute() and executemany()""" try: return method(query, args) except (mysql.connector.ProgrammingError) as err: six.reraise(utils.ProgrammingError, utils.ProgrammingError(err.msg), sys.exc_info()[2]) except (mysql.connector.IntegrityError) as err: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) except mysql.connector.OperationalError as err: # Map some error codes to IntegrityError, since they seem to be # misclassified and Django would prefer the more logical place. if err.args[0] in self.codes_for_integrityerror: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) else: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2]) except mysql.connector.DatabaseError as err: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2])
def get(self, *args, **kwargs): req = super(RequestFactoryWithMessages, self).get(*args, **kwargs) req.user = utils.get_user(req) req.session = [] req._messages = default_storage(req) return req
def post(self, *args, **kwargs): req = super(RequestFactoryWithMessages, self).post(*args, **kwargs) req.user = utils.get_user(req) req.session = [] req._messages = default_storage(req) return req
def _setup_user(self): self._real_get_user = utils.get_user tenants = self.context['authorized_tenants'] self.setActiveUser(id=self.user.id, token=self.token, username=self.user.name, domain_id=self.domain.id, user_domain_name=self.domain.name, tenant_id=self.tenant.id, service_catalog=self.service_catalog, authorized_tenants=tenants)
def tearDown(self): HTTPConnection.connect = self._real_conn_request context_processors.openstack = self._real_context_processor utils.get_user = self._real_get_user mock.patch.stopall() super(TestCase, self).tearDown() # cause a test failure if an unmocked API call was attempted if self.missing_mocks: raise AssertionError("An unmocked API call was made.")
def tearDown(self): self.mox.UnsetStubs() utils.get_user = self._real_get_user mock.patch.stopall() self.mox.VerifyAll() del os.environ["HORIZON_TEST_RUN"]
def setActiveUser(self, id=None, token=None, username=None, tenant_id=None, service_catalog=None, tenant_name=None, roles=None, authorized_tenants=None, enabled=True): def get_user(request): return user.User(id=id, token=token, user=username, tenant_id=tenant_id, service_catalog=service_catalog, roles=roles, enabled=enabled, authorized_tenants=authorized_tenants, endpoint=settings.OPENSTACK_KEYSTONE_URL) utils.get_user = get_user
def invalidate(): OFFBOARD_DAYS = 31 from django.utils import timezone from django.contrib.auth.models import User from django.db.models import Q from social.apps.django_app.default.models import UserSocialAuth invalidate_delta = timezone.now() - timezone.timedelta(OFFBOARD_DAYS) print "Running invalidate process with date: %s" % (invalidate_delta) # has auth credentials (non-null) criteria = Q(extra_data__isnull=False) # also is past timeframe criteria = criteria & (Q(user__last_login__lte=invalidate_delta) | (Q(user__last_login__isnull=True) & Q(user__date_joined__lte=invalidate_delta))) users = UserSocialAuth.objects.filter(criteria) for u in users: try: u.extra_data = None u.save() print "\tInvalidate user %s (last_login=%s, date_joined=%s)" % (u.user, u.user.last_login, u.user.date_joined) except UserSocialAuth.DoesNotExist, e: # no record exists; no need to clear anything pass
def download(self, url): """ Downloads the given URL and returns the file name. """ def cleanup_url(url): tmp = url.rstrip('/') filename = tmp.split('/')[-1] if url.endswith('/'): display_url = tmp + '/' else: display_url = url return filename, display_url prefix = 'django_%s_template_' % self.app_or_project tempdir = tempfile.mkdtemp(prefix=prefix, suffix='_download') self.paths_to_remove.append(tempdir) filename, display_url = cleanup_url(url) if self.verbosity >= 2: self.stdout.write("Downloading %s\n" % display_url) try: the_path, info = urlretrieve(url, path.join(tempdir, filename)) except IOError as e: raise CommandError("couldn't download URL %s to %s: %s" % (url, filename, e)) used_name = the_path.split('/')[-1] # Trying to get better name from response headers content_disposition = info.get('content-disposition') if content_disposition: _, params = cgi.parse_header(content_disposition) guessed_filename = params.get('filename') or used_name else: guessed_filename = used_name # Falling back to content type guessing ext = self.splitext(guessed_filename)[1] content_type = info.get('content-type') if not ext and content_type: ext = mimetypes.guess_extension(content_type) if ext: guessed_filename += ext # Move the temporary file to a filename that has better # chances of being recognized by the archive utils if used_name != guessed_filename: guessed_path = path.join(tempdir, guessed_filename) shutil.move(the_path, guessed_path) return guessed_path # Giving up return the_path
def execute(self, *args, **options): """ Try to execute this command, performing system checks if needed (as controlled by the ``requires_system_checks`` attribute, except if force-skipped). """ if options.get('no_color'): self.style = no_style() self.stderr.style_func = None if options.get('stdout'): self.stdout = OutputWrapper(options['stdout']) if options.get('stderr'): self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func) saved_locale = None if not self.leave_locale_alone: # Only mess with locales if we can assume we have a working # settings file, because django.utils.translation requires settings # (The final saying about whether the i18n machinery is active will be # found in the value of the USE_I18N setting) if not self.can_import_settings: raise CommandError("Incompatible values of 'leave_locale_alone' " "(%s) and 'can_import_settings' (%s) command " "options." % (self.leave_locale_alone, self.can_import_settings)) # Deactivate translations, because django-admin creates database # content like permissions, and those shouldn't contain any # translations. from django.utils import translation saved_locale = translation.get_language() translation.deactivate_all() try: if (self.requires_system_checks and not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`. not options.get('skip_checks')): self.check() output = self.handle(*args, **options) if output: if self.output_transaction: # This needs to be imported here, because it relies on # settings. from django.db import connections, DEFAULT_DB_ALIAS connection = connections[options.get('database', DEFAULT_DB_ALIAS)] if connection.ops.start_transaction_sql(): self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql())) self.stdout.write(output) if self.output_transaction: self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql())) finally: if saved_locale is not None: translation.activate(saved_locale)
def execute(self, *args, **options): """ Try to execute this command, performing system checks if needed (as controlled by the ``requires_system_checks`` attribute, except if force-skipped). """ if options['no_color']: self.style = no_style() self.stderr.style_func = None if options.get('stdout'): self.stdout = OutputWrapper(options['stdout']) if options.get('stderr'): self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func) saved_locale = None if not self.leave_locale_alone: # Only mess with locales if we can assume we have a working # settings file, because django.utils.translation requires settings # (The final saying about whether the i18n machinery is active will be # found in the value of the USE_I18N setting) if not self.can_import_settings: raise CommandError("Incompatible values of 'leave_locale_alone' " "(%s) and 'can_import_settings' (%s) command " "options." % (self.leave_locale_alone, self.can_import_settings)) # Deactivate translations, because django-admin creates database # content like permissions, and those shouldn't contain any # translations. from django.utils import translation saved_locale = translation.get_language() translation.deactivate_all() try: if self.requires_system_checks and not options.get('skip_checks'): self.check() if self.requires_migrations_checks: self.check_migrations() output = self.handle(*args, **options) if output: if self.output_transaction: connection = connections[options.get('database', DEFAULT_DB_ALIAS)] output = '%s\n%s\n%s' % ( self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()), output, self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()), ) self.stdout.write(output) finally: if saved_locale is not None: translation.activate(saved_locale) return output
def execute(self, *args, **options): """ Try to execute this command, performing system checks if needed (as controlled by the ``requires_system_checks`` attribute, except if force-skipped). """ if options['no_color']: self.style = no_style() self.stderr.style_func = None if options.get('stdout'): self.stdout = OutputWrapper(options['stdout']) if options.get('stderr'): self.stderr = OutputWrapper(options['stderr'], self.stderr.style_func) saved_locale = None if not self.leave_locale_alone: # Deactivate translations, because django-admin creates database # content like permissions, and those shouldn't contain any # translations. from django.utils import translation saved_locale = translation.get_language() translation.deactivate_all() try: if self.requires_system_checks and not options.get('skip_checks'): self.check() if self.requires_migrations_checks: self.check_migrations() output = self.handle(*args, **options) if output: if self.output_transaction: connection = connections[options.get('database', DEFAULT_DB_ALIAS)] output = '%s\n%s\n%s' % ( self.style.SQL_KEYWORD(connection.ops.start_transaction_sql()), output, self.style.SQL_KEYWORD(connection.ops.end_transaction_sql()), ) self.stdout.write(output) finally: if saved_locale is not None: translation.activate(saved_locale) return output
def check_constraints(self, table_names=None): """Check rows in tables for invalid foreign key references Checks each table name in `table_names` for rows with invalid foreign key references. This method is intended to be used in conjunction with `disable_constraint_checking()` and `enable_constraint_checking()`, to determine if rows with invalid references were entered while constraint checks were off. Raises an IntegrityError on the first invalid foreign key reference encountered (if any) and provides detailed information about the invalid reference in the error message. Backends can override this method if they can more directly apply constraint checking (e.g. via "SET CONSTRAINTS ALL IMMEDIATE") """ ref_query = """ SELECT REFERRING.`{0}`, REFERRING.`{1}` FROM `{2}` as REFERRING LEFT JOIN `{3}` as REFERRED ON (REFERRING.`{4}` = REFERRED.`{5}`) WHERE REFERRING.`{6}` IS NOT NULL AND REFERRED.`{7}` IS NULL""" cursor = self.cursor() if table_names is None: table_names = self.introspection.table_names(cursor) for table_name in table_names: primary_key_column_name = \ self.introspection.get_primary_key_column(cursor, table_name) if not primary_key_column_name: continue key_columns = self.introspection.get_key_columns(cursor, table_name) for column_name, referenced_table_name, referenced_column_name \ in key_columns: cursor.execute(ref_query.format(primary_key_column_name, column_name, table_name, referenced_table_name, column_name, referenced_column_name, column_name, referenced_column_name)) for bad_row in cursor.fetchall(): msg = ("The row in table '{0}' with primary key '{1}' has " "an invalid foreign key: {2}.{3} contains a value " "'{4}' that does not have a corresponding value in " "{5}.{6}.".format(table_name, bad_row[0], table_name, column_name, bad_row[1], referenced_table_name, referenced_column_name)) raise utils.IntegrityError(msg)
def execute(self, *args, **options): """ Try to execute this command, performing system checks if needed (as controlled by attributes ``self.requires_system_checks`` and ``self.requires_model_validation``, except if force-skipped). """ if options.get('no_color'): self.style = no_style() self.stderr.style_func = None if options.get('stdout'): self.stdout = OutputWrapper(options['stdout']) if options.get('stderr'): self.stderr = OutputWrapper(options.get('stderr'), self.stderr.style_func) saved_locale = None if not self.leave_locale_alone: # Only mess with locales if we can assume we have a working # settings file, because django.utils.translation requires settings # (The final saying about whether the i18n machinery is active will be # found in the value of the USE_I18N setting) if not self.can_import_settings: raise CommandError("Incompatible values of 'leave_locale_alone' " "(%s) and 'can_import_settings' (%s) command " "options." % (self.leave_locale_alone, self.can_import_settings)) # Deactivate translations, because django-admin creates database # content like permissions, and those shouldn't contain any # translations. from django.utils import translation saved_locale = translation.get_language() translation.deactivate_all() try: if (self.requires_system_checks and not options.get('skip_validation') and # Remove at the end of deprecation for `skip_validation`. not options.get('skip_checks')): self.check() output = self.handle(*args, **options) if output: if self.output_transaction: # This needs to be imported here, because it relies on # settings. from django.db import connections, DEFAULT_DB_ALIAS connection = connections[options.get('database', DEFAULT_DB_ALIAS)] if connection.ops.start_transaction_sql(): self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql())) self.stdout.write(output) if self.output_transaction: self.stdout.write('\n' + self.style.SQL_KEYWORD(connection.ops.end_transaction_sql())) finally: if saved_locale is not None: translation.activate(saved_locale)