我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用django.db.models.signals.post_save.connect()。
def _do_healthcheck(self, containers, config): path = config.get('HEALTHCHECK_URL', '/') timeout = int(config.get('HEALTHCHECK_TIMEOUT', 1)) if not _etcd_client: raise exceptions.HealthcheckException('no etcd client available') for container in containers: try: key = "/deis/services/{self}/{container.job_id}".format(**locals()) url = "http://{}{}".format(_etcd_client.get(key).value, path) response = requests.get(url, timeout=timeout) if response.status_code != requests.codes.OK: raise exceptions.HealthcheckException( "app failed health check (got '{}', expected: '200')".format( response.status_code)) except (requests.Timeout, requests.ConnectionError, KeyError) as e: raise exceptions.HealthcheckException( 'failed to connect to container ({})'.format(e))
def test_add_vars(self): # add value handler def handler(vars_data, **kwarg): vars_data.append({ 'name': 'test_var', 'description': 'Is a test!', 'value': 'Hello!', }) make_template_vars.connect(handler) # test value var_data = self.get('test_var') self.assertIsNotNone(var_data) self.assertEqual(var_data['value'], 'Hello!') self.assertEqual(var_data['description'], 'Is a test!') # test replace content = replace_template_vars('{{ test_var }}') self.assertEqual(content, 'Hello!') # clean self.assertTrue(make_template_vars.disconnect(handler))
def test_edit_vars(self): # add value handler def handler(vars_data, **kwarg): var_data = self._search_name(vars_data, 'email') self.assertIsNotNone(var_data) var_data['value'] = 'Hello Word!' make_template_vars.connect(handler) # test value var_data = self.get('email') self.assertIsNotNone(var_data) self.assertEqual(var_data['value'], 'Hello Word!') # test replace content = replace_template_vars('{{ email }}') self.assertEqual(content, 'Hello Word!') # clean self.assertTrue(make_template_vars.disconnect(handler))
def test_delete_vars(self): # add value handler def handler(vars_data, **kwarg): var_data = self._search_name(vars_data, 'email') self.assertIsNotNone(var_data) vars_data.remove(var_data) make_template_vars.connect(handler) # test value var_data = self.get('email') self.assertIsNone(var_data) # test replace content = replace_template_vars('{{ email }}') self.assertEqual(content, '{{ email }}') # clean self.assertTrue(make_template_vars.disconnect(handler))
def test_add_item(self): # add value handler def handler(urls, **kwarg): urls.append({ 'name': 'test', 'url': 'http://google.fr', }) make_menu.connect(handler) # test item = self.get('test') self.assertIsNotNone(item) self.assertEqual(item['url'], 'http://google.fr') # clean self.assertTrue(make_menu.disconnect(handler))
def test_attachment_executed(self): def handler(instance, **kwarg): # get post on landing page event if instance.target_tracker.key != TRACKER_ATTACHMENT_EXECUTED: # ignore other target event return self.assertEqual(instance.ip, '127.0.0.1') self.assertEqual(instance.user_agent, 'Outlook') raise SuccessException() post_save.connect(handler, sender=TrackerInfos) # call tracker self.send_campaign() attachment = json.loads(mail.outbox[-1].attachments[0][1].decode()) tracker_url = attachment['tracker_url'] # test if handler has call with self.assertRaises(SuccessException): self.client.defaults['HTTP_USER_AGENT'] = 'Outlook' self.client.get(tracker_url) # clean self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
def test_email_open(self): def handler(instance, **kwarg): # get email open event if instance.target_tracker.key != TRACKER_EMAIL_OPEN: # ignore other target event return self.assertEqual(instance.ip, '127.0.0.1') self.assertEqual(instance.user_agent, 'Outlook') raise SuccessException() post_save.connect(handler, sender=TrackerInfos) # call tracker self.send_campaign() mail_html = mail.outbox[-1].alternatives[0][0] tracker_url = mail_html.split('src="')[-1].split('"')[0] # test if handler has call with self.assertRaises(SuccessException): self.client.defaults['HTTP_USER_AGENT'] = 'Outlook' self.client.get(tracker_url) # clean self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
def test_edit_campaign_report_item(self): def handler(context, **kwarg): context['tabs_layout']['Test'] = context['tabs_layout']. \ pop('Other') make_campaign_report.connect(handler) # test content = self.get() self.assertIn('<a href="#test" aria-controls="test" role="tab" ' 'data-toggle="tab">Test</a>', content) self.assertIn('id="test"', content) self.assertNotIn('<a href="#other" aria-controls="other" ' 'role="tab" data-toggle="tab">Other</a>', content) self.assertNotIn('id="other"', content) # clean self.assertTrue(make_campaign_report.disconnect(handler))
def create_auth(sender, instance, created, **kwargs): """ A hook that run when we create a new network that creates an auth user and token that BTSs on the network use to authenticate. """ if not instance.auth_group or not instance.auth_user: instance.auth_group, created_group = Group.objects.get_or_create(name='network_%s' % instance.pk) if created_group: assign_perm('view_network', instance.auth_group, instance) post_save.disconnect(UserProfile.new_user_hook, sender=User) instance.auth_user, created_user = User.objects.get_or_create(username='network_%s' % instance.pk) if created_user: Token.objects.create(user=instance.auth_user) instance.auth_group.user_set.add(instance.auth_user) post_save.connect(UserProfile.new_user_hook, sender=User) instance.save()
def test_delete_object_is_deleted(self): pre_delete.disconnect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler') pre_delete.connect(pre_delete_handler, dispatch_uid='pre_delete_handler.test') try: book = BookFixture(Book).create_one() klass = get_node_class_for_model(Book) pk = book.pk try: book.delete() klass.nodes.get(pk=pk) self.fail('Did not raise when trying to get non-existent book node.') except klass.DoesNotExist as e: self.assertEqual(str(e), "{'pk': %d}" % pk) finally: pre_delete.connect(pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler') pre_delete.disconnect(pre_delete_handler, dispatch_uid='pre_delete_handler.test')
def test_null_foreignkey_is_disconnected(self): post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test') try: store = StoreFixture(Store, generate_m2m=False).create_one() klass = get_node_class_for_model(Store) self.assertEqual(store.bestseller.pk, get_node_class_for_model(Book).nodes.get(pk=store.bestseller.pk).pk) self.assertEqual(1, len(klass.nodes.has(bestseller=True))) store.bestseller = None store.save() self.assertEqual(0, len(klass.nodes.has(bestseller=True))) finally: post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
def ready(self): from .signals.handlers import ( m2m_changed_handler, post_migrate_handler, post_save_handler, pre_delete_handler ) m2m_changed.connect(receiver=m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') post_save.connect(receiver=post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') pre_delete.connect(receiver=pre_delete_handler, dispatch_uid='chemtrails.signals.handlers.pre_delete_handler') post_migrate.connect(receiver=post_migrate_handler, dispatch_uid='neomodel.core.post_migrate_handler') # Neo4j config config.DATABASE_URL = getattr(settings, 'NEO4J_BOLT_URL', os.environ.get('NEO4J_BOLT_URL', config.DATABASE_URL)) config.FORCE_TIMEZONE = getattr(settings, 'NEO4J_FORCE_TIMEZONE', get_environment_variable('NEO4J_FORCE_TIMEZONE', False)) config.ENCRYPTED_CONNECTION = getattr(settings, 'NEO4J_ENCRYPTED_CONNECTION', get_environment_variable('NEO4J_ENCRYPTED_CONNECTION', True)) config.MAX_POOL_SIZE = getattr(settings, 'NEO4J_MAX_POOL_SIZE', get_environment_variable('NEO4J_MAX_POOL_SIZE', 50))
def test_previous_status(self): from django.db.models.signals import post_save def check_different(sender, instance, **kwargs): check_different.is_different = (instance._status != instance.status) check_different.is_different = None post_save.connect(check_different) s = Service(name="service", description="test", status=0) s.save() self.assertFalse(check_different.is_different) s.status = 0 s.save() self.assertFalse(check_different.is_different) s.status = 1 s.save() self.assertIsNotNone(check_different.is_different) self.assertTrue(check_different.is_different) post_save.disconnect(check_different)
def _fire_alerts(self): # Re-enable signals to enable testing of them post_save.connect(receiver=TriggerSet._fire_triggersets, dispatch_uid='Fire Trigger Sets') self._asJaneDoe() updated_address = {"city": "London", "country": "England"} response = self._client.patch("/addresses/%d/" % self._janeDoeAddress.id, updated_address, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) self._asJoeBloggs() updated_address = {"country": "England"} response = self._client.patch("/addresses/%d/" % self._joeBloggsAddress.id, updated_address, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) # Disable signals to avoid affecting other tests post_save.disconnect(receiver=TriggerSet._fire_triggersets, dispatch_uid='Fire Trigger Sets')
def ready(self): # The app is now ready. Include any monkey patches here. # Monkey patch CSRF to switch to session based CSRF. Session # based CSRF will prevent attacks from apps under the same # domain. If you're planning to host your app under it's own # domain you can remove session_csrf and use Django's CSRF # library. See also # https://github.com/mozilla/sugardough/issues/38 session_csrf.monkeypatch() # Connect signals. from atmo.jobs.models import SparkJob from atmo.jobs.signals import assign_group_perm, remove_group_perm post_save.connect( assign_group_perm, sender=SparkJob, dispatch_uid='sparkjob_post_save_assign_perm', ) pre_delete.connect( remove_group_perm, sender=SparkJob, dispatch_uid='sparkjob_pre_delete_remove_perm', )
def register_signal_handlers(): """Registers signal handlers. To create a signal for TranslatablePage we have to use wagtails get_page_model. """ post_save.connect(create_language_permissions_and_group, sender=Language) init_new_page.connect(force_parent_language) if get_wagtailtrans_setting('SYNC_TREE'): if get_wagtailtrans_setting('LANGUAGES_PER_SITE'): m2m_changed.connect( update_language_trees_for_site, sender=SiteLanguages.other_languages.through) else: post_save.connect(create_new_language_tree, sender=Language) for model in get_page_models(): if hasattr(model, 'create_translation'): post_save.connect(synchronize_trees, sender=model) if hasattr(model, 'get_translations'): pre_delete.connect(synchronize_deletions, sender=model)
def create_user_profile(sender, instance, created, **kwargs): """ Create profile and preference upon user creation (``post_save.connect``). :param sender: sender object :param instance: user instance :param created: user creation flag :param kwargs: dictionary argument :return: None """ if created: user = instance # Create profile / preference objects for user profile, created = UserProfile.objects.get_or_create(user=instance) preference, created = UserPreference.objects.get_or_create(user=instance, site=Site.objects.get_current()) # User automatically belongs to meta_all_members g, c = Group.objects.get_or_create(name=PermissionAlias.all) user.groups.add(g)
def remove_obj_perms_connected_with_user(sender, instance, **kwargs): """ Remove user's permissions upon user deletion (``pre_delete.connect``). :param sender: sender object :param instance: user instance :param kwargs: dictionary argument :return: None """ filters = Q(content_type=ContentType.objects.get_for_model(instance), object_pk=instance.pk) UserObjectPermission.objects.filter(filters).delete() GroupObjectPermission.objects.filter(filters).delete() if instance.profile: instance.profile.delete() if instance.preference: for i in instance.preference.all(): i.delete()
def register(self, model, adapter_cls=VersionAdapter, **field_overrides): """Registers a model with this revision manager.""" # Prevent multiple registration. if self.is_registered(model): raise RegistrationError("{model} has already been registered with django-reversion".format( model = model, )) # Prevent proxy models being registered. if model._meta.proxy: raise RegistrationError("Proxy models cannot be used with django-reversion, register the parent class instead") # Perform any customization. if field_overrides: adapter_cls = type(adapter_cls.__name__, (adapter_cls,), field_overrides) # Perform the registration. adapter_obj = adapter_cls(model) self._registered_models[model] = adapter_obj # Connect to the post save signal of the model. post_save.connect(self._post_save_receiver, model) pre_delete.connect(self._pre_delete_receiver, model)
def __str__(self): return '%s: %s' % self.user_id,self.user.username # def create_user_profile(sender, instance, created, **kwargs): # if created: # profile = Profile() # profile.user = instance # profile.save() # # post_save.connect(create_user_profile, sender=User)
def __init__(self, *args, **kwargs): super(DirtyFieldsMixin, self).__init__(*args, **kwargs) post_save.connect( reset_state, sender=self.__class__, dispatch_uid='%s-DirtyFieldsMixin-sweeper' % self.__class__.__name__, ) reset_state(sender=self.__class__, instance=self)
def setup(self): for model in self.register_models: post_save.connect(self.handle_save, sender=model) post_delete.connect(self.handle_delete, sender=model)
def register(model): """Register a model to the audit code. :param model: Model to register. :type model: object """ try: pre_save.connect(_pre_save, sender=model, dispatch_uid=str(model)) post_save.connect(_post_save, sender=model, dispatch_uid=str(model)) pre_delete.connect(_pre_delete, sender=model, dispatch_uid=str(model)) except Exception as e: logger.error("<Register> %s", e.message)
def get_etcd_client(): if not hasattr(get_etcd_client, "client"): # wire up etcd publishing if we can connect try: get_etcd_client.client = etcd.Client( host=settings.ETCD_HOST, port=int(settings.ETCD_PORT)) get_etcd_client.client.get('/deis') except etcd.EtcdException: logger.log(logging.WARNING, 'Cannot synchronize with etcd cluster') get_etcd_client.client = None return get_etcd_client.client
def test_delete_item(self): # add value handler def handler(urls, **kwarg): item = self._search_name(urls, 'Campaigns') self.assertIsNotNone(item) urls.remove(item) make_menu.connect(handler) # test item = self.get('Campaigns') self.assertIsNone(item) # clean self.assertTrue(make_menu.disconnect(handler))
def test_attachment_executed_with_data(self): def handler(instance, **kwarg): # get post on landing page event if instance.target_tracker.key != TRACKER_ATTACHMENT_EXECUTED: # ignore other target event return self.assertEqual(instance.ip, '127.0.0.1') self.assertEqual(instance.user_agent, 'Thunderbird') self.assertEqual(instance.raw, '{"hello": "world!"}') raise SuccessException() post_save.connect(handler, sender=TrackerInfos) # call tracker self.send_campaign() attachment = json.loads(mail.outbox[-1].attachments[0][1].decode()) tracker_url = attachment['tracker_url'] # test if handler has call with self.assertRaises(SuccessException): self.client.defaults['HTTP_USER_AGENT'] = 'Thunderbird' self.client.post(tracker_url, {'hello': 'world!'}) # clean self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
def test_email_send(self): def handler(instance, **kwarg): # get email open event if instance.key != TRACKER_EMAIL_SEND: # ignore other target event return self.assertEqual(instance.value, 'success') # add entry on db for set test in success => SuccessException make # infinite loop TrackerInfos.objects.create( target_tracker=instance, raw='handler_%s_test_ok' % TRACKER_EMAIL_SEND ) post_save.connect(handler, sender=Tracker) # send mail campaign = self.send_campaign() # test if handler has call raw = 'handler_%s_test_ok' % TRACKER_EMAIL_SEND test_result_tracker = TrackerInfos.objects.filter(raw=raw).first() self.assertIsNotNone(test_result_tracker) tracker = test_result_tracker.target_tracker self.assertEqual(campaign.pk, tracker.campaign.pk) self.assertEqual(TRACKER_EMAIL_SEND, tracker.key) # clean self.assertTrue(post_save.disconnect(handler, sender=Tracker))
def test_landing_page_post(self): def handler(instance, **kwarg): # get post on landing page event if instance.target_tracker.key != TRACKER_LANDING_PAGE_POST: # ignore other target event return self.assertEqual(instance.ip, '127.0.0.1') self.assertEqual(instance.user_agent, 'Firefox') self.assertEqual(instance.referer, 'https://webmail.com') self.assertIn('"login": "Admin"', instance.raw) raise SuccessException() post_save.connect(handler, sender=TrackerInfos) # call tracker self.send_campaign() tracker_url = mail.outbox[-1].body # call landing page response = self.client.get(tracker_url) self.assertEqual(response.status_code, 200) # get form infos html = response.content.decode() form = BeautifulSoup(html, 'html.parser').find('form') post_url = form.get('action') post_data = {i.get('name'): i.get('value') for i in form.find_all('input')} post_data['login'] = 'Admin' # test if handler has call with self.assertRaises(SuccessException): self.client.defaults['HTTP_USER_AGENT'] = 'Firefox' self.client.defaults['HTTP_REFERER'] = 'https://webmail.com' self.client.post(post_url, post_data) # clean self.assertTrue(post_save.disconnect(handler, sender=TrackerInfos))
def test_add_campaign_report_item(self): def handler(context, **kwarg): context['tabs_layout']['Test'] = [] make_campaign_report.connect(handler) # test content = self.get() self.assertIn('<a href="#test" aria-controls="test" role="tab" ' 'data-toggle="tab">Test</a>', content) self.assertIn('id="test"', content) # clean self.assertTrue(make_campaign_report.disconnect(handler))
def test_delete_campaign_report_item(self): def handler(context, **kwarg): del(context['tabs_layout']['Other']) make_campaign_report.connect(handler) # test content = self.get() self.assertNotIn('<a href="#other" aria-controls="other" ' 'role="tab" data-toggle="tab">Other</a>', content) self.assertNotIn('id="other"', content) # clean self.assertTrue(make_campaign_report.disconnect(handler))
def setup(): from .models import TextFormat, TextFilter post_save.connect(invalidate_cache, sender=TextFormat) post_save.connect(invalidate_cache, sender=TextFilter)
def register(indexer_class): """ Registers an indexer class. """ if indexer_class not in _REGISTERED_CLASSES: _REGISTERED_CLASSES.append(indexer_class) # register signal handlers model = indexer_class.serializer_class.Meta.model post_delete.connect(_instance_changed, sender=model) post_save.connect(_instance_changed, sender=model)
def register_post_save_functions(): # Disconnect first in case this function is called twice logger.debug('Thumbnail: Registering post_save functions.') post_save.disconnect(generate_thumbnail, sender=Layer) post_save.connect(generate_thumbnail, sender=Layer, weak=False) post_save.disconnect(generate_thumbnail, sender=Map) post_save.connect(generate_thumbnail, sender=Map, weak=False)
def ready(self): from .models import Action post_save.connect(self.on_action_save, sender=Action)
def __init__(self, sender, subject_prefix, host): super(MailAlerts, self).__init__() self.sender = sender self.subject_prefix = subject_prefix self.host = host self.change_event = threading.Event() self.exit = False post_save.connect(self._table_changed)
def register_signals(): pre_save.connect(LostKey.objects.on_pre_save, sender=LostKey) post_save.connect(LostKey.objects.send_notification, sender=LostKey) post_save.connect( Member.objects.send_transfer_workspace_key_info, sender=Member)
def save(self, *args, **kwargs): super(Comment, self).save(*args, **kwargs) # recipient_list = [self.post.author.email] # title = '{} ?? ??? ?????'.format(self.post.title) # content = '{}? {}??? ????'.format( # self.created_date.strftime('%Y.%m.%d %H:%M'), # self.content # ) # send_mail(title, content) # post_save.connect(send_comment_mail, sender=Comment)
def __init__(self): # Iterate over existing configurations that are moderated config_qs = SocialFeedConfiguration.objects.filter(moderated=True) for config in config_qs: self.config_menu_items[config.id] = \ self._create_moderate_menu_item(config) self._registered_menu_items = list(self.config_menu_items.values()) self.construct_hook_name = None post_save.connect(self._update_menu, sender=SocialFeedConfiguration)
def get_niming_user_info_url(self, url): url = 'https://open.weixin.qq.com/connect/oauth2/authorize?appid=%s&redirect_uri=%s&response_type=code&scope=snsapi_userinfo&state=STATE#wechat_redirect'\ %(self.appid, quote(url)) return url
def test_no_knock(self, handler): posts = [] post_save.connect(handler, NoKnockPost, dispatch_uid='test_knocker_mock') posts.append(NoKnockPost.objects.create( title='first post', slug='first-post', )) posts.append(NoKnockPost.objects.create( title='second post', slug='second-post', )) self.assertEqual(handler.call_count, 2)
def _connect(self): """ Connect signal to current model """ post_save.connect( notify_items, sender=self.__class__, dispatch_uid='knocker_{0}'.format(self.__class__.__name__) )
def test_create_new_object_is_synced(self): post_save.disconnect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') post_save.connect(post_save_handler, dispatch_uid='post_save_handler.test') try: book = BookFixture(Book).create_one() klass = get_node_class_for_model(Book) self.assertEqual(book.pk, klass.nodes.get(pk=book.pk).pk) self.assertEqual(1, len(klass.nodes.has(authors=True, publisher=True))) finally: post_save.connect(post_save_handler, dispatch_uid='chemtrails.signals.handlers.post_save_handler') post_save.disconnect(post_save_handler, dispatch_uid='post_save_handler.test')
def test_m2m_changed_post_add(self): m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.connect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test') try: book = BookFixture(Book, generate_m2m=False, field_values={'authors': []}).create_one() self.assertEqual(0, len(get_node_class_for_model(Book).nodes.has(authors=True))) author = AuthorFixture(Author).create_one() book.authors.add(author) self.assertEqual(1, len(get_node_class_for_model(Book).nodes.has(authors=True))) finally: m2m_changed.connect(m2m_changed_handler, dispatch_uid='chemtrails.signals.handlers.m2m_changed_handler') m2m_changed.disconnect(m2m_changed_handler, dispatch_uid='m2m_changed_handler.test')