我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用django.db.models.signals()。
def list_pubs(): # import pdb; pdb.set_trace() import weakref msg = [] for receiver in signals.post_save.receivers: (receiver_class_name,_), receiver = receiver # django.contrib.contenttypes.generic.GenericForeignKey.instance_pre_init is not weakref if isinstance(receiver, weakref.ReferenceType): receiver = receiver() receiver = getattr(receiver, '__wraps__', receiver) receiver_name = getattr(receiver, '__name__', str(receiver)) text = "%s.%s" % (receiver_class_name, receiver_name) if receiver_name == 'publish_rdf' : msg.append(text) return str(msg)
def ready(self): from django.db.models.signals import post_save, post_delete from devour.django.receivers import produce_post_save, produce_post_delete from django.conf import settings import os signal_types = { 'post_save': produce_post_save, 'post_delete': produce_post_delete } if getattr(settings, 'KAFKA_CONFIG', None): try: excl = settings.KAFKA_CONFIG['producers'].get('exclude') except: #TODO: Error handling excl = [] pass for s,f in signal_types.items(): if next((True for ex in excl if s == ex), False): del signal_types[s] for sig in signal_types.keys(): getattr(signals, sig).connect(signal_types[sig])
def check_model_signals(app_configs=None, **kwargs): """ Ensure lazily referenced model signals senders are installed. """ # Avoid circular import from django.db import models errors = [] for name in dir(models.signals): obj = getattr(models.signals, name) if isinstance(obj, models.signals.ModelSignal): for reference, receivers in obj.unresolved_references.items(): for receiver, _, _ in receivers: # The receiver is either a function or an instance of class # defining a `__call__` method. if isinstance(receiver, types.FunctionType): description = "The '%s' function" % receiver.__name__ else: description = "An instance of the '%s' class" % receiver.__class__.__name__ errors.append( Error( "%s was connected to the '%s' signal " "with a lazy reference to the '%s' sender, " "which has not been installed." % ( description, name, '.'.join(reference) ), obj=receiver.__module__, hint=None, id='signals.E001' ) ) return errors
def ready(self): from cms.models import CMSPlugin, Page from cms.signals import post_publish from .cms_toolbar import DiffToolbar signals.pre_save.connect(self.ensure_initial) signals.post_save.connect(self.record_plugin_history) post_publish.connect(self.record_history, sender=Page)
def post_delete(sender, instance, using, **kwargs): """https://docs.djangoproject.com/es/1.10/ref/signals/#post-delete""" try: with transaction.atomic(): if not should_audit(instance): return False object_json_repr = serializers.serialize("json", [instance]) # user try: user = get_current_user() # validate that the user still exists user = get_user_model().objects.get(pk=user.pk) except: user = None if isinstance(user, AnonymousUser): user = None # crud event crud_event = CRUDEvent.objects.create( event_type=CRUDEvent.DELETE, object_repr=str(instance), object_json_repr=object_json_repr, content_type=ContentType.objects.get_for_model(instance), object_id=instance.pk, user=user, datetime=timezone.now(), user_pk_as_string=str(user.pk) if user else user ) except Exception: logger.exception('easy audit had a post-delete exception.')
def patch_all(): global done_patch_all if done_patch_all: return patch_python() patch_app_engine() # Add signals: post_save_committed, post_delete_committed from appenginepatcher import transactions setup_logging() done_patch_all = True
def sync_signals(): """For each ObjectMapping force signal to be added or removed""" mappedObj = ObjectMapping.objects.all() for objmapping in mappedObj: _setup(objmapping) return "synced RDf publishing signals"
def _setup(objmapping) : try: if objmapping.auto_push : ct = ContentType.objects.get(id = objmapping.content_type_id).model_class() signals.post_save.connect(publish_rdf, sender=ct, dispatch_uid=str(ct.__name__)) print "RDF publishing configured for model {}".format((ct)) logger.info( "RDF publishing configured for model {}".format((ct))) except Exception as e: print "Error trying to set up auto-publish for object mapping.. %s " % e pass
def clear_signals(): """For each ObjectMapping force signal to be removed""" mappedObj = ObjectMapping.objects.all() for objmapping in mappedObj: _clear(objmapping) return "synced RDf publishing signals"
def _clear(objmapping) : try: if objmapping.auto_push : ct = ContentType.objects.get(id = objmapping.content_type_id).model_class() signals.post_save.disconnect(publish_rdf, sender=ct, dispatch_uid=str(ct.__name__)) print "RDF publishing un-configured for model {}".format((ct)) logger.info( "RDF publishing un-configured for model {}".format((ct))) except Exception as e: print "Error trying to clear auto-publish for object mapping.. %s " % e pass
def auto_on(): """turn Auto push signals on""" from rdf_io.signals import setup_signals,list_pubs signals.post_save.connect(setup_signals, sender=ObjectMapping) return list_pubs()
def auto_off(): """turn off all Auto push signals on""" from rdf_io.signals import clear_signals return clear_signals()
def ready(self): """Validate config and connect signals.""" super(ElasticAppConfig, self).ready() _validate_config(settings.get_setting('strict_validation')) if settings.get_setting('auto_sync'): _connect_signals() else: logger.debug("SEARCH_AUTO_SYNC has been disabled.")
def _connect_signals(): """Connect up post_save, post_delete signals for models.""" for index in settings.get_index_names(): for model in settings.get_index_models(index): dispatch_uid = '%s.post_save' % model._meta.model_name logger.debug("Connecting search index model sync signal: %s", dispatch_uid) signals.post_save.connect(_on_model_save, sender=model, dispatch_uid=dispatch_uid) dispatch_uid = '%s.post_delete' % model._meta.model_name logger.debug("Connecting search index model sync signal: %s", dispatch_uid) signals.post_delete.connect(_on_model_delete, sender=model, dispatch_uid=dispatch_uid)
def post_save(sender, instance, created, raw, using, update_fields, **kwargs): """https://docs.djangoproject.com/es/1.10/ref/signals/#post-save""" try: with transaction.atomic(): if not should_audit(instance): return False object_json_repr = serializers.serialize("json", [instance]) # created or updated? if created: event_type = CRUDEvent.CREATE else: event_type = CRUDEvent.UPDATE # user try: user = get_current_user() # validate that the user still exists user = get_user_model().objects.get(pk=user.pk) except: user = None if isinstance(user, AnonymousUser): user = None # callbacks kwargs['request'] = get_current_request() # make request available for callbacks create_crud_event = all(callback(instance, object_json_repr, created, raw, using, update_fields, **kwargs) for callback in CRUD_DIFFERENCE_CALLBACKS if callable(callback)) # create crud event only if all callbacks returned True if create_crud_event: crud_event = CRUDEvent.objects.create( event_type=event_type, object_repr=str(instance), object_json_repr=object_json_repr, content_type=ContentType.objects.get_for_model(instance), object_id=instance.pk, user=user, datetime=timezone.now(), user_pk_as_string=str(user.pk) if user else user ) except Exception: logger.exception('easy audit had a post-save exception.')
def m2m_changed(sender, instance, action, reverse, model, pk_set, using, **kwargs): """https://docs.djangoproject.com/es/1.10/ref/signals/#m2m-changed""" try: with transaction.atomic(): if not should_audit(instance): return False if action not in ("post_add", "post_remove", "post_clear"): return False object_json_repr = serializers.serialize("json", [instance]) if reverse: event_type = CRUDEvent.M2M_CHANGE_REV # add reverse M2M changes to event. must use json lib because # django serializers ignore extra fields. tmp_repr = json.loads(object_json_repr) m2m_rev_field = _m2m_rev_field_name(instance._meta.concrete_model, model) related_instances = getattr(instance, m2m_rev_field).all() related_ids = [r.pk for r in related_instances] tmp_repr[0]['m2m_rev_model'] = force_text(model._meta) tmp_repr[0]['m2m_rev_pks'] = related_ids tmp_repr[0]['m2m_rev_action'] = action object_json_repr = json.dumps(tmp_repr) else: event_type = CRUDEvent.M2M_CHANGE # user try: user = get_current_user() # validate that the user still exists user = get_user_model().objects.get(pk=user.pk) except: user = None if isinstance(user, AnonymousUser): user = None crud_event = CRUDEvent.objects.create( event_type=event_type, object_repr=str(instance), object_json_repr=object_json_repr, content_type=ContentType.objects.get_for_model(instance), object_id=instance.pk, user=user, datetime=timezone.now(), user_pk_as_string=str(user.pk) if user else user ) except Exception: logger.exception('easy audit had an m2m-changed exception.')