我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用django.db.models.signals.pre_delete.connect()。
def test_delete_object_is_deleted(self): pre_delete.disconnect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler') pre_delete.connect(pre_delete_handler, dispatch_uid='pre_delete_handler.test') try: book = BookFixture(Book).create_one() klass = get_node_class_for_model(Book) pk = book.pk try: book.delete() klass.nodes.get(pk=pk) self.fail('Did not raise when trying to get non-existent book node.') except klass.DoesNotExist as e: self.assertEqual(str(e), "{'pk': %d}" % pk) finally: pre_delete.connect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler') pre_delete.disconnect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
def test_null_foreignkey_is_disconnected(self): post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test') try: store = StoreFixture(Store, generate_m2m=False).create_one() klass = get_node_class_for_model(Store) self.assertEqual(store.bestseller.pk, get_node_class_for_model(Book).nodes.get(pk=store.bestseller.pk).pk) self.assertEqual(1, len(klass.nodes.has(bestseller=True))) store.bestseller = None store.save() self.assertEqual(0, len(klass.nodes.has(bestseller=True))) finally: post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
def ready(self): from .signals.handlers import ( m2m_changed_handler, post_migrate_handler, post_save_handler, pre_delete_handler ) m2m_changed.connect(receiver=m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') post_save.connect(receiver=post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') pre_delete.connect(receiver=pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler') post_migrate.connect(receiver=post_migrate_handler, dispatch_uid='neomodel.core.post_migrate_handler') # Neo4j config config.DATABASE_URL = getattr(settings, 'NEO4J_BOLT_URL', os.environ.get('NEO4J_BOLT_URL', config.DATABASE_URL)) config.FORCE_TIMEZONE = getattr(settings, 'NEO4J_FORCE_TIMEZONE', get_environment_variable('NEO4J_FORCE_TIMEZONE', False)) config.ENCRYPTED_CONNECTION = getattr(settings, 'NEO4J_ENCRYPTED_CONNECTION', get_environment_variable('NEO4J_ENCRYPTED_CONNECTION', True)) config.MAX_POOL_SIZE = getattr(settings, 'NEO4J_MAX_POOL_SIZE', get_environment_variable('NEO4J_MAX_POOL_SIZE', 50))
def ready(self): # The app is now ready. Include any monkey patches here. # Monkey patch CSRF to switch to session based CSRF. Session # based CSRF will prevent attacks from apps under the same # domain. If you're planning to host your app under it's own # domain you can remove session_csrf and use Django's CSRF # library. See also # https://github.com/mozilla/sugardough/issues/38 session_csrf.monkeypatch() # Connect signals. from atmo.jobs.models import SparkJob from atmo.jobs.signals import assign_group_perm, remove_group_perm post_save.connect( assign_group_perm, sender=SparkJob, dispatch_uid='sparkjob_post_save_assign_perm', ) pre_delete.connect( remove_group_perm, sender=SparkJob, dispatch_uid='sparkjob_pre_delete_remove_perm', )
def register_signal_handlers(): """Registers signal handlers. To create a signal for TranslatablePage we have to use wagtails get_page_model. """ post_save.connect(create_language_permissions_and_group, sender=Language) init_new_page.connect(force_parent_language) if get_wagtailtrans_setting('SYNC_TREE'): if get_wagtailtrans_setting('LANGUAGES_PER_SITE'): m2m_changed.connect( update_language_trees_for_site, sender=SiteLanguages.other_languages.through) else: post_save.connect(create_new_language_tree, sender=Language) for model in get_page_models(): if hasattr(model, 'create_translation'): post_save.connect(synchronize_trees, sender=model) if hasattr(model, 'get_translations'): pre_delete.connect(synchronize_deletions, sender=model)
def create_user_profile(sender, instance, created, **kwargs): """ Create profile and preference upon user creation (``post_save.connect``). :param sender: sender object :param instance: user instance :param created: user creation flag :param kwargs: dictionary argument :return: None """ if created: user = instance # Create profile / preference objects for user profile, created = UserProfile.objects.get_or_create(user=instance) preference, created = UserPreference.objects.get_or_create(user=instance, site=Site.objects.get_current()) # User automatically belongs to meta_all_members g, c = Group.objects.get_or_create(name=PermissionAlias.all) user.groups.add(g)
def remove_obj_perms_connected_with_user(sender, instance, **kwargs): """ Remove user's permissions upon user deletion (``pre_delete.connect``). :param sender: sender object :param instance: user instance :param kwargs: dictionary argument :return: None """ filters = Q(content_type=ContentType.objects.get_for_model(instance), object_pk=instance.pk) UserObjectPermission.objects.filter(filters).delete() GroupObjectPermission.objects.filter(filters).delete() if instance.profile: instance.profile.delete() if instance.preference: for i in instance.preference.all(): i.delete()
def register(self, model, adapter_cls=VersionAdapter, **field_overrides): """Registers a model with this revision manager.""" # Prevent multiple registration. if self.is_registered(model): raise RegistrationError("{model} has already been registered with django-reversion".format( model = model, )) # Prevent proxy models being registered. if model._meta.proxy: raise RegistrationError("Proxy models cannot be used with django-reversion, register the parent class instead") # Perform any customization. if field_overrides: adapter_cls = type(adapter_cls.__name__, (adapter_cls,), field_overrides) # Perform the registration. adapter_obj = adapter_cls(model) self._registered_models[model] = adapter_obj # Connect to the post save signal of the model. post_save.connect(self._post_save_receiver, model) pre_delete.connect(self._pre_delete_receiver, model)
def register(model): """Register a model to the audit code. :param model: Model to register. :type model: object """ try: pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model)) post_save.connect(_post_save, sender=model, dispatch_uid=str(model)) pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model)) except Exception as e: logger.error("<Register> %s", e.message)
def remove_xform(xform): # disconnect parsed instance pre delete signal pre_delete.disconnect(_remove_from_mongo, sender=ParsedInstance) # delete instances from mongo db query = { ParsedInstance.USERFORM_ID: "%s_%s" % (xform.user.username, xform.id_string)} xform_instances.remove(query, j=True) # delete xform, and all related models xform.delete() # reconnect parsed instance pre delete signal? pre_delete.connect(_remove_from_mongo, sender=ParsedInstance)
def test_create_new_object_is_synced(self): post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test') try: book = BookFixture(Book).create_one() klass = get_node_class_for_model(Book) self.assertEqual(book.pk, klass.nodes.get(pk=book.pk).pk) self.assertEqual(1, len(klass.nodes.has(authors=True, publisher=True))) finally: post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
def test_m2m_changed_post_add(self): m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test') try: book = BookFixture(Book, generate_m2m=False, field_values={'authors': []}).create_one() self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True))) author = AuthorFixture(Author).create_one() book.authors.add(author) self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True))) finally: m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def test_m2m_changed_post_add_reverse(self): m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test') try: author = AuthorFixture(Author).create_one() self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True))) book = BookFixture(Book, follow_m2m=False, field_values={'authors': []}).create_one() author.book_set.add(book) self.assertEqual(1, len(get_node_class_for_model(Author).nodes.has(book_set=True))) finally: m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def test_m2m_changed_post_clear_reverse(self): m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test') try: book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one() self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True))) author = book.authors.get() author.book_set.clear() self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True))) self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True))) finally: m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def test_m2m_changed_post_remove(self): m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test') try: book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one() self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True))) author = book.authors.get() book.authors.remove(author) self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True))) self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True))) finally: m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def test_m2m_changed_post_remove_reverse(self): m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test') try: book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one() self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True))) author = book.authors.get() author.book_set.remove(book) self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True))) self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True))) finally: m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
def cleanup(cls): uid = cls.__name__ pre_delete.connect(receiver=on_delete, sender=cls, dispatch_uid=uid) pre_save.connect(receiver=on_save, sender=cls, dispatch_uid=uid) return cls
def __init__(self): """Initializes the revision state.""" self.clear() # Connect to the request finished signal. request_finished.connect(self._request_finished_receiver)
def _listen(): post_save.connect(plan_watchers.on_plan_save, TestPlan) pre_delete.connect(plan_watchers.on_plan_delete, TestPlan) pre_save.connect(plan_watchers.pre_save_clean, TestPlan)
def ready(self): query.QuerySet.format = format query.QuerySet.update = update query.QuerySet._update = _update if getattr(settings, 'ENABLE_ARRAY_M2M', False): datastructures.Join.as_sql = as_sql query.prefetch_one_level = prefetch_one_level pre_delete.connect(delete_reverse_related)