Python django.db.models.signals.post_save 模块,connect() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用django.db.models.signals.post_save.connect()

项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def _do_healthcheck(self, containers, config):
        path = config.get('HEALTHCHECK_URL', '/')
        timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1))
        if not _etcd_client:
            raise exceptions.HealthcheckException('no etcd client available')
        for container in containers:
            try:
                key = "/deis/services/{self}/{container.job_id}".format(**locals())
                url = "http://{}{}".format(_etcd_client.get(key).value, path)
                response = requests.get(url, timeout=timeout)
                if response.status_code != requests.codes.OK:
                    raise exceptions.HealthcheckException(
                        "app failed health check (got '{}', expected: '200')".format(
                            response.status_code))
            except (requests.Timeout, requests.ConnectionError, KeyError) as e:
                raise exceptions.HealthcheckException(
                    'failed to connect to container ({})'.format(e))
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def _do_healthcheck(self, containers, config):
        path = config.get('HEALTHCHECK_URL', '/')
        timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1))
        if not _etcd_client:
            raise exceptions.HealthcheckException('no etcd client available')
        for container in containers:
            try:
                key = "/deis/services/{self}/{container.job_id}".format(**locals())
                url = "http://{}{}".format(_etcd_client.get(key).value, path)
                response = requests.get(url, timeout=timeout)
                if response.status_code != requests.codes.OK:
                    raise exceptions.HealthcheckException(
                        "app failed health check (got '{}', expected: '200')".format(
                            response.status_code))
            except (requests.Timeout, requests.ConnectionError, KeyError) as e:
                raise exceptions.HealthcheckException(
                    'failed to connect to container ({})'.format(e))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_add_vars(self):
        # add value handler
        def handler(vars_data, **kwarg):
            vars_data.append({
                'name': 'test_var',
                'description': 'Is a test!',
                'value': 'Hello!',
            })

        make_template_vars.connect(handler)

        # test value
        var_data = self.get('test_var')
        self.assertIsNotNone(var_data)
        self.assertEqual(var_data['value'], 'Hello!')
        self.assertEqual(var_data['description'], 'Is a test!')

        # test replace
        content = replace_template_vars('{{ test_var }}')
        self.assertEqual(content, 'Hello!')

        # clean
        self.assertTrue(make_template_vars.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_edit_vars(self):
        # add value handler
        def handler(vars_data, **kwarg):
            var_data = self._search_name(vars_data, 'email')
            self.assertIsNotNone(var_data)
            var_data['value'] = 'Hello Word!'

        make_template_vars.connect(handler)

        # test value
        var_data = self.get('email')
        self.assertIsNotNone(var_data)
        self.assertEqual(var_data['value'], 'Hello Word!')

        # test replace
        content = replace_template_vars('{{ email }}')
        self.assertEqual(content, 'Hello Word!')

        # clean
        self.assertTrue(make_template_vars.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_delete_vars(self):
        # add value handler
        def handler(vars_data, **kwarg):
            var_data = self._search_name(vars_data, 'email')
            self.assertIsNotNone(var_data)
            vars_data.remove(var_data)

        make_template_vars.connect(handler)

        # test value
        var_data = self.get('email')
        self.assertIsNone(var_data)

        # test replace
        content = replace_template_vars('{{ email }}')
        self.assertEqual(content, '{{ email }}')

        # clean
        self.assertTrue(make_template_vars.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_add_item(self):
        # add value handler
        def handler(urls, **kwarg):
            urls.append({
                'name': 'test',
                'url': 'http://google.fr',
            })

        make_menu.connect(handler)

        # test
        item = self.get('test')
        self.assertIsNotNone(item)
        self.assertEqual(item['url'], 'http://google.fr')

        # clean
        self.assertTrue(make_menu.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_attachment_executed(self):
        def handler(instance, **kwarg):
            # get post on landing page event
            if instance.target_tracker.key != TRACKER_ATTACHMENT_EXECUTED:
                # ignore other target event
                return

            self.assertEqual(instance.ip, '127.0.0.1')
            self.assertEqual(instance.user_agent, 'Outlook')
            raise SuccessException()

        post_save.connect(handler, sender=TrackerInfos)

        # call tracker
        self.send_campaign()
        attachment = json.loads(mail.outbox[-1].attachments[0][1].decode())
        tracker_url = attachment['tracker_url']

        # test if handler has call
        with self.assertRaises(SuccessException):
            self.client.defaults['HTTP_USER_AGENT'] = 'Outlook'
            self.client.get(tracker_url)

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_email_open(self):
        def handler(instance, **kwarg):
            # get email open event
            if instance.target_tracker.key != TRACKER_EMAIL_OPEN:
                # ignore other target event
                return

            self.assertEqual(instance.ip, '127.0.0.1')
            self.assertEqual(instance.user_agent, 'Outlook')
            raise SuccessException()

        post_save.connect(handler, sender=TrackerInfos)

        # call tracker
        self.send_campaign()
        mail_html = mail.outbox[-1].alternatives[0][0]
        tracker_url = mail_html.split('src="')[-1].split('"')[0]

        # test if handler has call
        with self.assertRaises(SuccessException):
            self.client.defaults['HTTP_USER_AGENT'] = 'Outlook'
            self.client.get(tracker_url)

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_edit_campaign_report_item(self):
        def handler(context, **kwarg):
            context['tabs_layout']['Test'] = context['tabs_layout']. \
                pop('Other')

        make_campaign_report.connect(handler)

        # test
        content = self.get()
        self.assertIn('<a href="#test" aria-controls="test" role="tab" '
                      'data-toggle="tab">Test</a>', content)
        self.assertIn('id="test"', content)
        self.assertNotIn('<a href="#other" aria-controls="other" '
                         'role="tab" data-toggle="tab">Other</a>', content)
        self.assertNotIn('id="other"', content)

        # clean
        self.assertTrue(make_campaign_report.disconnect(handler))
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def create_auth(sender, instance, created, **kwargs):
        """
        A hook that run when we create a new network that creates
        an auth user and token that BTSs on the network use to
        authenticate.
        """
        if not instance.auth_group or not instance.auth_user:
            instance.auth_group, created_group = Group.objects.get_or_create(name='network_%s'
                % instance.pk)
            if created_group:
                assign_perm('view_network', instance.auth_group, instance)

            post_save.disconnect(UserProfile.new_user_hook, sender=User)
            instance.auth_user, created_user = User.objects.get_or_create(username='network_%s'
                % instance.pk)
            if created_user:
                Token.objects.create(user=instance.auth_user)
                instance.auth_group.user_set.add(instance.auth_user)
            post_save.connect(UserProfile.new_user_hook, sender=User)
            instance.save()
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_delete_object_is_deleted(self):
        pre_delete.disconnect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
        pre_delete.connect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
        try:
            book = BookFixture(Book).create_one()
            klass = get_node_class_for_model(Book)
            pk = book.pk
            try:
                book.delete()
                klass.nodes.get(pk=pk)
                self.fail('Did not raise when trying to get non-existent book node.')
            except klass.DoesNotExist as e:
                self.assertEqual(str(e), "{'pk': %d}" % pk)
        finally:
            pre_delete.connect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')
            pre_delete.disconnect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_null_foreignkey_is_disconnected(self):
        post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
        try:
            store = StoreFixture(Store, generate_m2m=False).create_one()
            klass = get_node_class_for_model(Store)

            self.assertEqual(store.bestseller.pk,
                             get_node_class_for_model(Book).nodes.get(pk=store.bestseller.pk).pk)
            self.assertEqual(1, len(klass.nodes.has(bestseller=True)))

            store.bestseller = None
            store.save()

            self.assertEqual(0, len(klass.nodes.has(bestseller=True)))
        finally:
            post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
            post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def ready(self):
        from .signals.handlers import (
            m2m_changed_handler, post_migrate_handler,
            post_save_handler, pre_delete_handler
        )

        m2m_changed.connect(receiver=m2m_changed_handler,
                            dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        post_save.connect(receiver=post_save_handler,
                          dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        pre_delete.connect(receiver=pre_delete_handler,
                           dispatch_uid='chemtrails.signals.handlers.pre_delete_handler')

        post_migrate.connect(receiver=post_migrate_handler,
                             dispatch_uid='neomodel.core.post_migrate_handler')

        # Neo4j config
        config.DATABASE_URL = getattr(settings, 'NEO4J_BOLT_URL',
                                      os.environ.get('NEO4J_BOLT_URL', config.DATABASE_URL))
        config.FORCE_TIMEZONE = getattr(settings, 'NEO4J_FORCE_TIMEZONE',
                                        get_environment_variable('NEO4J_FORCE_TIMEZONE', False))
        config.ENCRYPTED_CONNECTION = getattr(settings, 'NEO4J_ENCRYPTED_CONNECTION',
                                              get_environment_variable('NEO4J_ENCRYPTED_CONNECTION', True))
        config.MAX_POOL_SIZE = getattr(settings, 'NEO4J_MAX_POOL_SIZE',
                                       get_environment_variable('NEO4J_MAX_POOL_SIZE', 50))
项目:django-statusboard    作者:edigiacomo    | 项目源码 | 文件源码
def test_previous_status(self):
        from django.db.models.signals import post_save

        def check_different(sender, instance, **kwargs):
            check_different.is_different = (instance._status != instance.status)

        check_different.is_different = None

        post_save.connect(check_different)

        s = Service(name="service", description="test", status=0)
        s.save()
        self.assertFalse(check_different.is_different)
        s.status = 0
        s.save()
        self.assertFalse(check_different.is_different)
        s.status = 1
        s.save()
        self.assertIsNotNone(check_different.is_different)
        self.assertTrue(check_different.is_different)

        post_save.disconnect(check_different)
项目:LIMS-Backend    作者:LeafLIMS    | 项目源码 | 文件源码
def _fire_alerts(self):
        # Re-enable signals to enable testing of them
        post_save.connect(receiver=TriggerSet._fire_triggersets, dispatch_uid='Fire Trigger Sets')
        self._asJaneDoe()
        updated_address = {"city": "London", "country": "England"}
        response = self._client.patch("/addresses/%d/" % self._janeDoeAddress.id,
                                      updated_address, format='json')
        self.assertEqual(response.status_code, status.HTTP_200_OK)
        self._asJoeBloggs()
        updated_address = {"country": "England"}
        response = self._client.patch("/addresses/%d/" % self._joeBloggsAddress.id,
                                      updated_address, format='json')
        self.assertEqual(response.status_code, status.HTTP_200_OK)
        # Disable signals to avoid affecting other tests
        post_save.disconnect(receiver=TriggerSet._fire_triggersets,
                             dispatch_uid='Fire Trigger Sets')
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def ready(self):
        # The app is now ready. Include any monkey patches here.

        # Monkey patch CSRF to switch to session based CSRF. Session
        # based CSRF will prevent attacks from apps under the same
        # domain. If you're planning to host your app under it's own
        # domain you can remove session_csrf and use Django's CSRF
        # library. See also
        # https://github.com/mozilla/sugardough/issues/38
        session_csrf.monkeypatch()

        # Connect signals.
        from atmo.jobs.models import SparkJob
        from atmo.jobs.signals import assign_group_perm, remove_group_perm

        post_save.connect(
            assign_group_perm,
            sender=SparkJob,
            dispatch_uid='sparkjob_post_save_assign_perm',
        )
        pre_delete.connect(
            remove_group_perm,
            sender=SparkJob,
            dispatch_uid='sparkjob_pre_delete_remove_perm',
        )
项目:wagtailtrans    作者:LUKKIEN    | 项目源码 | 文件源码
def register_signal_handlers():
    """Registers signal handlers.

    To create a signal for TranslatablePage we have to use wagtails
    get_page_model.

    """
    post_save.connect(create_language_permissions_and_group, sender=Language)
    init_new_page.connect(force_parent_language)
    if get_wagtailtrans_setting('SYNC_TREE'):
        if get_wagtailtrans_setting('LANGUAGES_PER_SITE'):
            m2m_changed.connect(
                update_language_trees_for_site,
                sender=SiteLanguages.other_languages.through)
        else:
            post_save.connect(create_new_language_tree, sender=Language)

        for model in get_page_models():
            if hasattr(model, 'create_translation'):
                post_save.connect(synchronize_trees, sender=model)

            if hasattr(model, 'get_translations'):
                pre_delete.connect(synchronize_deletions, sender=model)
项目:talkativot    作者:lablup    | 项目源码 | 文件源码
def create_user_profile(sender, instance, created, **kwargs):
    """ Create profile and preference upon user creation (``post_save.connect``).

    :param sender: sender object
    :param instance: user instance
    :param created: user creation flag
    :param kwargs: dictionary argument
    :return: None
    """
    if created:
        user = instance
        # Create profile / preference objects for user
        profile, created = UserProfile.objects.get_or_create(user=instance)
        preference, created = UserPreference.objects.get_or_create(user=instance,
                                                                   site=Site.objects.get_current())
        # User automatically belongs to meta_all_members
        g, c = Group.objects.get_or_create(name=PermissionAlias.all)
        user.groups.add(g)
项目:talkativot    作者:lablup    | 项目源码 | 文件源码
def remove_obj_perms_connected_with_user(sender, instance, **kwargs):
    """ Remove user's permissions upon user deletion (``pre_delete.connect``).

    :param sender: sender object
    :param instance: user instance
    :param kwargs: dictionary argument
    :return: None
    """
    filters = Q(content_type=ContentType.objects.get_for_model(instance),
                object_pk=instance.pk)
    UserObjectPermission.objects.filter(filters).delete()
    GroupObjectPermission.objects.filter(filters).delete()

    if instance.profile:
        instance.profile.delete()
    if instance.preference:
        for i in instance.preference.all():
            i.delete()
项目:mes    作者:osess    | 项目源码 | 文件源码
def register(self, model, adapter_cls=VersionAdapter, **field_overrides):
        """Registers a model with this revision manager."""
        # Prevent multiple registration.
        if self.is_registered(model):
            raise RegistrationError("{model} has already been registered with django-reversion".format(
                model = model,
            ))
        # Prevent proxy models being registered.
        if model._meta.proxy:
            raise RegistrationError("Proxy models cannot be used with django-reversion, register the parent class instead")
        # Perform any customization.
        if field_overrides:
            adapter_cls = type(adapter_cls.__name__, (adapter_cls,), field_overrides)
        # Perform the registration.
        adapter_obj = adapter_cls(model)
        self._registered_models[model] = adapter_obj
        # Connect to the post save signal of the model.
        post_save.connect(self._post_save_receiver, model)
        pre_delete.connect(self._pre_delete_receiver, model)
项目:TaskApp    作者:isheng5    | 项目源码 | 文件源码
def __str__(self):
        return '%s: %s' % self.user_id,self.user.username

# def create_user_profile(sender, instance, created, **kwargs):
#     if created:
#         profile = Profile()
#         profile.user = instance
#         profile.save()
#
# post_save.connect(create_user_profile, sender=User)
项目:zing    作者:evernote    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(DirtyFieldsMixin, self).__init__(*args, **kwargs)
        post_save.connect(
            reset_state, sender=self.__class__,
            dispatch_uid='%s-DirtyFieldsMixin-sweeper' %
                         self.__class__.__name__,
        )
        reset_state(sender=self.__class__, instance=self)
项目:alexandriadocs    作者:srtab    | 项目源码 | 文件源码
def setup(self):
        for model in self.register_models:
            post_save.connect(self.handle_save, sender=model)
            post_delete.connect(self.handle_delete, sender=model)
项目:django-audit-tools    作者:PeRDy    | 项目源码 | 文件源码
def register(model):
    """Register a model to the audit code.

    :param model: Model to register.
    :type model: object
    """
    try:
        pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model))
        post_save.connect(_post_save, sender=model, dispatch_uid=str(model))
        pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model))
    except Exception as e:
        logger.error("<Register> %s", e.message)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def get_etcd_client():
    if not hasattr(get_etcd_client, "client"):
        # wire up etcd publishing if we can connect
        try:
            get_etcd_client.client = etcd.Client(
                host=settings.ETCD_HOST,
                port=int(settings.ETCD_PORT))
            get_etcd_client.client.get('/deis')
        except etcd.EtcdException:
            logger.log(logging.WARNING, 'Cannot synchronize with etcd cluster')
            get_etcd_client.client = None
    return get_etcd_client.client
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def get_etcd_client():
    if not hasattr(get_etcd_client, "client"):
        # wire up etcd publishing if we can connect
        try:
            get_etcd_client.client = etcd.Client(
                host=settings.ETCD_HOST,
                port=int(settings.ETCD_PORT))
            get_etcd_client.client.get('/deis')
        except etcd.EtcdException:
            logger.log(logging.WARNING, 'Cannot synchronize with etcd cluster')
            get_etcd_client.client = None
    return get_etcd_client.client
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_delete_item(self):
        # add value handler
        def handler(urls, **kwarg):
            item = self._search_name(urls, 'Campaigns')
            self.assertIsNotNone(item)
            urls.remove(item)

        make_menu.connect(handler)

        # test
        item = self.get('Campaigns')
        self.assertIsNone(item)

        # clean
        self.assertTrue(make_menu.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_attachment_executed_with_data(self):
        def handler(instance, **kwarg):
            # get post on landing page event
            if instance.target_tracker.key != TRACKER_ATTACHMENT_EXECUTED:
                # ignore other target event
                return

            self.assertEqual(instance.ip, '127.0.0.1')
            self.assertEqual(instance.user_agent, 'Thunderbird')
            self.assertEqual(instance.raw, '{"hello": "world!"}')
            raise SuccessException()

        post_save.connect(handler, sender=TrackerInfos)

        # call tracker
        self.send_campaign()
        attachment = json.loads(mail.outbox[-1].attachments[0][1].decode())
        tracker_url = attachment['tracker_url']

        # test if handler has call
        with self.assertRaises(SuccessException):
            self.client.defaults['HTTP_USER_AGENT'] = 'Thunderbird'
            self.client.post(tracker_url, {'hello': 'world!'})

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_email_send(self):
        def handler(instance, **kwarg):
            # get email open event
            if instance.key != TRACKER_EMAIL_SEND:
                # ignore other target event
                return

            self.assertEqual(instance.value, 'success')

            # add entry on db for set test in success => SuccessException make
            # infinite loop
            TrackerInfos.objects.create(
                target_tracker=instance,
                raw='handler_%s_test_ok' % TRACKER_EMAIL_SEND
            )

        post_save.connect(handler, sender=Tracker)

        # send mail
        campaign = self.send_campaign()

        # test if handler has call
        raw = 'handler_%s_test_ok' % TRACKER_EMAIL_SEND
        test_result_tracker = TrackerInfos.objects.filter(raw=raw).first()
        self.assertIsNotNone(test_result_tracker)
        tracker = test_result_tracker.target_tracker
        self.assertEqual(campaign.pk, tracker.campaign.pk)
        self.assertEqual(TRACKER_EMAIL_SEND, tracker.key)

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=Tracker))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_landing_page_post(self):
        def handler(instance, **kwarg):
            # get post on landing page event
            if instance.target_tracker.key != TRACKER_LANDING_PAGE_POST:
                # ignore other target event
                return

            self.assertEqual(instance.ip, '127.0.0.1')
            self.assertEqual(instance.user_agent, 'Firefox')
            self.assertEqual(instance.referer, 'https://webmail.com')
            self.assertIn('"login": "Admin"', instance.raw)
            raise SuccessException()

        post_save.connect(handler, sender=TrackerInfos)

        # call tracker
        self.send_campaign()
        tracker_url = mail.outbox[-1].body

        # call landing page
        response = self.client.get(tracker_url)
        self.assertEqual(response.status_code, 200)

        # get form infos
        html = response.content.decode()
        form = BeautifulSoup(html, 'html.parser').find('form')
        post_url = form.get('action')
        post_data = {i.get('name'): i.get('value')
                     for i in form.find_all('input')}
        post_data['login'] = 'Admin'

        # test if handler has call
        with self.assertRaises(SuccessException):
            self.client.defaults['HTTP_USER_AGENT'] = 'Firefox'
            self.client.defaults['HTTP_REFERER'] = 'https://webmail.com'
            self.client.post(post_url, post_data)

        # clean
        self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_add_campaign_report_item(self):
        def handler(context, **kwarg):
            context['tabs_layout']['Test'] = []

        make_campaign_report.connect(handler)

        # test
        content = self.get()
        self.assertIn('<a href="#test" aria-controls="test" role="tab" '
                      'data-toggle="tab">Test</a>', content)
        self.assertIn('id="test"', content)

        # clean
        self.assertTrue(make_campaign_report.disconnect(handler))
项目:mercure    作者:synhack    | 项目源码 | 文件源码
def test_delete_campaign_report_item(self):
        def handler(context, **kwarg):
            del(context['tabs_layout']['Other'])

        make_campaign_report.connect(handler)

        # test
        content = self.get()
        self.assertNotIn('<a href="#other" aria-controls="other" '
                         'role="tab" data-toggle="tab">Other</a>', content)
        self.assertNotIn('id="other"', content)

        # clean
        self.assertTrue(make_campaign_report.disconnect(handler))
项目:django_textformat    作者:team23    | 项目源码 | 文件源码
def setup():
    from .models import TextFormat, TextFilter

    post_save.connect(invalidate_cache, sender=TextFormat)
    post_save.connect(invalidate_cache, sender=TextFilter)
项目:django-rest-search    作者:wemap    | 项目源码 | 文件源码
def register(indexer_class):
    """
    Registers an indexer class.
    """
    if indexer_class not in _REGISTERED_CLASSES:
        _REGISTERED_CLASSES.append(indexer_class)

        # register signal handlers
        model = indexer_class.serializer_class.Meta.model
        post_delete.connect(_instance_changed, sender=model)
        post_save.connect(_instance_changed, sender=model)
项目:exchange    作者:boundlessgeo    | 项目源码 | 文件源码
def register_post_save_functions():
    # Disconnect first in case this function is called twice
    logger.debug('Thumbnail: Registering post_save functions.')
    post_save.disconnect(generate_thumbnail, sender=Layer)
    post_save.connect(generate_thumbnail, sender=Layer, weak=False)
    post_save.disconnect(generate_thumbnail, sender=Map)
    post_save.connect(generate_thumbnail, sender=Map, weak=False)
项目:buggy    作者:fusionbox    | 项目源码 | 文件源码
def ready(self):
        from .models import Action
        post_save.connect(self.on_action_save, sender=Action)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def __init__(self, sender, subject_prefix, host):
        super(MailAlerts, self).__init__()

        self.sender = sender
        self.subject_prefix = subject_prefix
        self.host = host
        self.change_event = threading.Event()
        self.exit = False

        post_save.connect(self._table_changed)
项目:vaultier    作者:Movile    | 项目源码 | 文件源码
def register_signals():
    pre_save.connect(LostKey.objects.on_pre_save, sender=LostKey)
    post_save.connect(LostKey.objects.send_notification, sender=LostKey)
    post_save.connect(
        Member.objects.send_transfer_workspace_key_info, sender=Member)
项目:WPS-3rd    作者:Fastcampus-WPS    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        super(Comment, self).save(*args, **kwargs)
        # recipient_list = [self.post.author.email]
        # title = '{} ?? ??? ?????'.format(self.post.title)
        # content = '{}? {}??? ????'.format(
        #     self.created_date.strftime('%Y.%m.%d %H:%M'),
        #     self.content
        # )
        # send_mail(title, content)



# post_save.connect(send_comment_mail, sender=Comment)
项目:WPS-3rd    作者:Fastcampus-WPS    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        super(Comment, self).save(*args, **kwargs)
        # recipient_list = [self.post.author.email]
        # title = '{} ?? ??? ?????'.format(self.post.title)
        # content = '{}? {}??? ????'.format(
        #     self.created_date.strftime('%Y.%m.%d %H:%M'),
        #     self.content
        # )
        # send_mail(title, content)



# post_save.connect(send_comment_mail, sender=Comment)
项目:WPS-3rd    作者:Fastcampus-WPS    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        super(Comment, self).save(*args, **kwargs)
        # recipient_list = [self.post.author.email]
        # title = '{} ?? ??? ?????'.format(self.post.title)
        # content = '{}? {}??? ????'.format(
        #     self.created_date.strftime('%Y.%m.%d %H:%M'),
        #     self.content
        # )
        # send_mail(title, content)



# post_save.connect(send_comment_mail, sender=Comment)
项目:WPS-3rd    作者:Fastcampus-WPS    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        super(Comment, self).save(*args, **kwargs)
        # recipient_list = [self.post.author.email]
        # title = '{} ?? ??? ?????'.format(self.post.title)
        # content = '{}? {}??? ????'.format(
        #     self.created_date.strftime('%Y.%m.%d %H:%M'),
        #     self.content
        # )
        # send_mail(title, content)



# post_save.connect(send_comment_mail, sender=Comment)
项目:wagtailsocialfeed    作者:LUKKIEN    | 项目源码 | 文件源码
def __init__(self):
        # Iterate over existing configurations that are moderated
        config_qs = SocialFeedConfiguration.objects.filter(moderated=True)
        for config in config_qs:
            self.config_menu_items[config.id] = \
                self._create_moderate_menu_item(config)

        self._registered_menu_items = list(self.config_menu_items.values())
        self.construct_hook_name = None

        post_save.connect(self._update_menu, sender=SocialFeedConfiguration)
项目:python-django-wechat    作者:JingRanCor    | 项目源码 | 文件源码
def get_niming_user_info_url(self, url):
        url = 'https://open.weixin.qq.com/connect/oauth2/authorize?appid=%s&redirect_uri=%s&response_type=code&scope=snsapi_userinfo&state=STATE#wechat_redirect'\
        %(self.appid, quote(url))
        return url
项目:django-knocker    作者:nephila    | 项目源码 | 文件源码
def test_no_knock(self, handler):
        posts = []
        post_save.connect(handler, NoKnockPost, dispatch_uid='test_knocker_mock')
        posts.append(NoKnockPost.objects.create(
            title='first post',
            slug='first-post',
        ))
        posts.append(NoKnockPost.objects.create(
            title='second post',
            slug='second-post',
        ))
        self.assertEqual(handler.call_count, 2)
项目:django-knocker    作者:nephila    | 项目源码 | 文件源码
def _connect(self):
        """
        Connect signal to current model
        """
        post_save.connect(
            notify_items, sender=self.__class__,
            dispatch_uid='knocker_{0}'.format(self.__class__.__name__)
        )
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_create_new_object_is_synced(self):
        post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
        post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test')
        try:
            book = BookFixture(Book).create_one()
            klass = get_node_class_for_model(Book)

            self.assertEqual(book.pk, klass.nodes.get(pk=book.pk).pk)
            self.assertEqual(1, len(klass.nodes.has(authors=True, publisher=True)))
        finally:
            post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler')
            post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def test_m2m_changed_post_add(self):
        m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
        m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')
        try:
            book = BookFixture(Book, generate_m2m=False, field_values={'authors': []}).create_one()
            self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True)))

            author = AuthorFixture(Author).create_one()
            book.authors.add(author)
            self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True)))
        finally:
            m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler')
            m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')