Python django.db.models.signals.pre_delete 模块,connect() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用django.db.models.signals.pre_delete.connect()

项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_delete_object_is_deleted(self):
        pre_delete.disconnect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
        pre_delete.connect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
        try:
            book = BookFixture(Book).create_one()
            klass = get_node_class_for_model(Book)
            pk = book.pk
            try:
                book.delete()
                klass.nodes.get(pk=pk)
                self.fail('Did not raise when trying to get non-existent book node.')
            except klass.DoesNotExist as e:
                self.assertEqual(str(e), "{'pk': %d}" % pk)
        finally:
            pre_delete.connect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
            pre_delete.disconnect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_null_foreignkey_is_disconnected(self):
        post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
        try:
            store = StoreFixture(Store, generate_m2m=False).create_one()
            klass = get_node_class_for_model(Store)

            self.assertEqual(store.bestseller.pk,
                             get_node_class_for_model(Book).nodes.get(pk=store.bestseller.pk).pk)
            self.assertEqual(1, len(klass.nodes.has(bestseller=True)))

            store.bestseller = None
            store.save()

            self.assertEqual(0, len(klass.nodes.has(bestseller=True)))
        finally:
            post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
            post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def ready(self):
        from .signals.handlers import (
            m2m_changed_handler, post_migrate_handler,
            post_save_handler, pre_delete_handler
        )

        m2m_changed.connect(receiver=m2m_changed_handler,
                            dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        post_save.connect(receiver=post_save_handler,
                          dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        pre_delete.connect(receiver=pre_delete_handler,
                           dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')

        post_migrate.connect(receiver=post_migrate_handler,
                             dispatch_uid='neomodel.core.post_migrate_handler')

        # Neo4j config
        config.DATABASE_URL = getattr(settings, 'NEO4J_BOLT_URL',
                                      os.environ.get('NEO4J_BOLT_URL', config.DATABASE_URL))
        config.FORCE_TIMEZONE = getattr(settings, 'NEO4J_FORCE_TIMEZONE',
                                        get_environment_variable('NEO4J_FORCE_TIMEZONE', False))
        config.ENCRYPTED_CONNECTION = getattr(settings, 'NEO4J_ENCRYPTED_CONNECTION',
                                              get_environment_variable('NEO4J_ENCRYPTED_CONNECTION', True))
        config.MAX_POOL_SIZE = getattr(settings, 'NEO4J_MAX_POOL_SIZE',
                                       get_environment_variable('NEO4J_MAX_POOL_SIZE', 50))
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def ready(self):
        # The app is now ready. Include any monkey patches here.

        # Monkey patch CSRF to switch to session based CSRF. Session
        # based CSRF will prevent attacks from apps under the same
        # domain. If you're planning to host your app under it's own
        # domain you can remove session_csrf and use Django's CSRF
        # library. See also
        # https://github.com/mozilla/sugardough/issues/38
        session_csrf.monkeypatch()

        # Connect signals.
        from atmo.jobs.models import SparkJob
        from atmo.jobs.signals import assign_group_perm, remove_group_perm

        post_save.connect(
            assign_group_perm,
            sender=SparkJob,
            dispatch_uid='sparkjob_post_save_assign_perm',
        )
        pre_delete.connect(
            remove_group_perm,
            sender=SparkJob,
            dispatch_uid='sparkjob_pre_delete_remove_perm',
        )
项目:wagtailtrans    作者:LUKKIEN    | 项目源码 | 文件源码
def register_signal_handlers():
    """Registers signal handlers.

    To create a signal for TranslatablePage we have to use wagtails
    get_page_model.

    """
    post_save.connect(create_language_permissions_and_group, sender=Language)
    init_new_page.connect(force_parent_language)
    if get_wagtailtrans_setting('SYNC_TREE'):
        if get_wagtailtrans_setting('LANGUAGES_PER_SITE'):
            m2m_changed.connect(
                update_language_trees_for_site,
                sender=SiteLanguages.other_languages.through)
        else:
            post_save.connect(create_new_language_tree, sender=Language)

        for model in get_page_models():
            if hasattr(model, 'create_translation'):
                post_save.connect(synchronize_trees, sender=model)

            if hasattr(model, 'get_translations'):
                pre_delete.connect(synchronize_deletions, sender=model)
项目:talkativot    作者:lablup    | 项目源码 | 文件源码
def create_user_profile(sender, instance, created, **kwargs):
    """ Create profile and preference upon user creation (``post_save.connect``).

    :param sender: sender object
    :param instance: user instance
    :param created: user creation flag
    :param kwargs: dictionary argument
    :return: None
    """
    if created:
        user = instance
        # Create profile / preference objects for user
        profile, created = UserProfile.objects.get_or_create(user=instance)
        preference, created = UserPreference.objects.get_or_create(user=instance,
                                                                   site=Site.objects.get_current())
        # User automatically belongs to meta_all_members
        g, c = Group.objects.get_or_create(name=PermissionAlias.all)
        user.groups.add(g)
项目:talkativot    作者:lablup    | 项目源码 | 文件源码
def remove_obj_perms_connected_with_user(sender, instance, **kwargs):
    """ Remove user's permissions upon user deletion (``pre_delete.connect``).

    :param sender: sender object
    :param instance: user instance
    :param kwargs: dictionary argument
    :return: None
    """
    filters = Q(content_type=ContentType.objects.get_for_model(instance),
                object_pk=instance.pk)
    UserObjectPermission.objects.filter(filters).delete()
    GroupObjectPermission.objects.filter(filters).delete()

    if instance.profile:
        instance.profile.delete()
    if instance.preference:
        for i in instance.preference.all():
            i.delete()
项目:mes    作者:osess    | 项目源码 | 文件源码
def register(self, model, adapter_cls=VersionAdapter, **field_overrides):
        """Registers a model with this revision manager."""
        # Prevent multiple registration.
        if self.is_registered(model):
            raise RegistrationError("{model} has already been registered with django-reversion".format(
                model = model,
            ))
        # Prevent proxy models being registered.
        if model._meta.proxy:
            raise RegistrationError("Proxy models cannot be used with django-reversion, register the parent class instead")
        # Perform any customization.
        if field_overrides:
            adapter_cls = type(adapter_cls.__name__, (adapter_cls,), field_overrides)
        # Perform the registration.
        adapter_obj = adapter_cls(model)
        self._registered_models[model] = adapter_obj
        # Connect to the post save signal of the model.
        post_save.connect(self._post_save_receiver, model)
        pre_delete.connect(self._pre_delete_receiver, model)
项目:django-audit-tools    作者:PeRDy    | 项目源码 | 文件源码
def register(model):
    """Register a model to the audit code.

    :param model: Model to register.
    :type model: object
    """
    try:
        pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model))
        post_save.connect(_post_save, sender=model, dispatch_uid=str(model))
        pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model))
    except Exception as e:
        logger.error("<Register> %s", e.message)
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def remove_xform(xform):
    # disconnect parsed instance pre delete signal
    pre_delete.disconnect(_remove_from_mongo, sender=ParsedInstance)

    # delete instances from mongo db
    query = {
        ParsedInstance.USERFORM_ID:
        "%s_%s" % (xform.user.username, xform.id_string)}
    xform_instances.remove(query, j=True)

    # delete xform, and all related models
    xform.delete()

    # reconnect parsed instance pre delete signal?
    pre_delete.connect(_remove_from_mongo, sender=ParsedInstance)
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def remove_xform(xform):
    # disconnect parsed instance pre delete signal
    pre_delete.disconnect(_remove_from_mongo, sender=ParsedInstance)

    # delete instances from mongo db
    query = {
        ParsedInstance.USERFORM_ID:
        "%s_%s" % (xform.user.username, xform.id_string)}
    xform_instances.remove(query, j=True)

    # delete xform, and all related models
    xform.delete()

    # reconnect parsed instance pre delete signal?
    pre_delete.connect(_remove_from_mongo, sender=ParsedInstance)
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_create_new_object_is_synced(self):
        post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
        try:
            book = BookFixture(Book).create_one()
            klass = get_node_class_for_model(Book)

            self.assertEqual(book.pk, klass.nodes.get(pk=book.pk).pk)
            self.assertEqual(1, len(klass.nodes.has(authors=True, publisher=True)))
        finally:
            post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
            post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_add(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m=False, field_values={'authors': []}).create_one()
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = AuthorFixture(Author).create_one()
            book.authors.add(author)
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_add_reverse(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            author = AuthorFixture(Author).create_one()
            self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))

            book = BookFixture(Book, follow_m2m=False, field_values={'authors': []}).create_one()
            author.book_set.add(book)
            self.assertEqual(1, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_clear_reverse(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = book.authors.get()
            author.book_set.clear()
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
            self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_remove(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = book.authors.get()
            book.authors.remove(author)
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
            self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_remove_reverse(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m={'authors': (1, 1)}).create_one()
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = book.authors.get()
            author.book_set.remove(book)
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))
            self.assertEqual(0, len(get_node_class_for_model(Author).nodes.has(book_set=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
项目:djfiles    作者:luminousmen    | 项目源码 | 文件源码
def cleanup(cls):
    uid = cls.__name__
    pre_delete.connect(receiver=on_delete, sender=cls, dispatch_uid=uid)
    pre_save.connect(receiver=on_save, sender=cls, dispatch_uid=uid)
    return cls
项目:mes    作者:osess    | 项目源码 | 文件源码
def __init__(self):
        """Initializes the revision state."""
        self.clear()
        # Connect to the request finished signal.
        request_finished.connect(self._request_finished_receiver)
项目:Kiwi    作者:kiwitcms    | 项目源码 | 文件源码
def _listen():
    post_save.connect(plan_watchers.on_plan_save, TestPlan)
    pre_delete.connect(plan_watchers.on_plan_delete, TestPlan)
    pre_save.connect(plan_watchers.pre_save_clean, TestPlan)
项目:FormShare    作者:qlands    | 项目源码 | 文件源码
def remove_xform(xform):
    # disconnect parsed instance pre delete signal
    pre_delete.disconnect(_remove_from_mongo, sender=ParsedInstance)

    # delete instances from mongo db
    query = {
        ParsedInstance.USERFORM_ID:
        "%s_%s" % (xform.user.username, xform.id_string)}
    xform_instances.remove(query, j=True)

    # delete xform, and all related models
    xform.delete()

    # reconnect parsed instance pre delete signal?
    pre_delete.connect(_remove_from_mongo, sender=ParsedInstance)
项目:django_postgres_extensions    作者:primal100    | 项目源码 | 文件源码
def ready(self):
        query.QuerySet.format = format
        query.QuerySet.update = update
        query.QuerySet._update = _update
        if getattr(settings, 'ENABLE_ARRAY_M2M', False):
            datastructures.Join.as_sql = as_sql
            query.prefetch_one_level = prefetch_one_level
            pre_delete.connect(delete_reverse_related)