Python django.db.transaction 模块,commit_on_success() 实例源码

我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用django.db.transaction.commit_on_success()

项目:pyconjp-website    作者:pyconjp    | 项目源码 | 文件源码
def file_create(request):
    if not can_upload(request.user):
        raise Http404

    if request.method == "POST":
        form = FileUploadForm(request.POST, request.FILES)
        if form.is_valid():
            with transaction.commit_on_success():
                kwargs = {
                    "file": form.cleaned_data["file"],
                }
                File.objects.create(**kwargs)
            return redirect("file_index")
    else:
        form = FileUploadForm()

    ctx = {
        "form": form,
    }
    return render(request, "cms/file_create.html", ctx)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def nested_commit_on_success(func):
    """Like commit_on_success, but doesn't commit existing transactions.

    This decorator is used to run a function within the scope of a
    database transaction, committing the transaction on success and
    rolling it back if an exception occurs.

    Unlike the standard transaction.commit_on_success decorator, this
    version first checks whether a transaction is already active.  If so
    then it doesn't perform any commits or rollbacks, leaving that up to
    whoever is managing the active transaction.
    """
    from django.db import transaction
    commit_on_success = transaction.commit_on_success(func)

    def _nested_commit_on_success(*args, **kwds):
        if transaction.is_managed():
            return func(*args, **kwds)
        else:
            return commit_on_success(*args, **kwds)
    return transaction.wraps(func)(_nested_commit_on_success)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def update(self, stat_name, stat_properties, stat_data):
        from chroma_core.lib.storage_plugin.api import statistics

        if isinstance(stat_properties, statistics.BytesHistogram):
            # Histograms
            for dp in stat_data:
                ts = dp['timestamp']
                bin_vals = dp['value']
                from django.db import transaction
                with transaction.commit_on_success():
                    time = SimpleHistoStoreTime.objects.create(time = ts, storage_resource_statistic = self)
                    for i in range(0, len(stat_properties.bins)):
                        SimpleHistoStoreBin.objects.create(bin_idx = i, value = bin_vals[i], histo_store_time = time)
                    # Only keep latest time
                    SimpleHistoStoreTime.objects.filter(~Q(id = time.id), storage_resource_statistic = self).delete()
            return []
        for i in stat_data:
            i['value'] = float(i['value'])
        return self.metrics.serialize(stat_name, stat_properties, stat_data)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def update_corosync_configuration(self, corosync_configuration_id, mcast_port, network_interface_ids):
        with self._lock:
            with transaction.commit_on_success():
                # For now we only support 1 or 2 network configurations, jobs aren't so helpful at supporting lists
                corosync_configuration = CorosyncConfiguration.objects.get(id=corosync_configuration_id)

                assert len(network_interface_ids) == 1 or len(network_interface_ids) == 2
                network_interface_0 = NetworkInterface.objects.get(id = network_interface_ids[0])
                network_interface_1 = None if len(network_interface_ids) == 1 else NetworkInterface.objects.get(id = network_interface_ids[1])

                command_id = CommandPlan(self._lock_cache, self._job_collection).command_run_jobs_preserve_states(
                    [{"class_name": corosync_configuration.configure_job_name,
                      "args": {"corosync_configuration": corosync_configuration,
                               "mcast_port": mcast_port,
                               "network_interface_0": network_interface_0,
                               "network_interface_1": network_interface_1}}],
                    [corosync_configuration, corosync_configuration.host.pacemaker_configuration],
                    "Update Corosync Configuration on host %s" % corosync_configuration.host.fqdn)

        self.progress.advance()

        return command_id
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def _create_client_mount(self, host, filesystem, mountpoint):
        # Used for intra-JobScheduler calls
        log.debug("Creating client mount for %s as %s:%s" % (filesystem, host, mountpoint))

        with self._lock:
            from django.db import transaction
            with transaction.commit_on_success():
                mount, created = LustreClientMount.objects.get_or_create(
                    host = host,
                    filesystem = filesystem)
                mount.mountpoint = mountpoint
                mount.save()

            ObjectCache.add(LustreClientMount, mount)

        if created:
            log.info("Created client mount: %s" % mount)

        return mount
项目:django-twilio-tfa    作者:rtindru    | 项目源码 | 文件源码
def change(self, request, new_email, confirm=True):
        """
        Given a new email address, change self and re-confirm.
        """
        try:
            atomic_transaction = transaction.atomic
        except AttributeError:
            atomic_transaction = transaction.commit_on_success

        with atomic_transaction():
            user_email(self.user, new_email)
            self.user.save()
            self.email = new_email
            self.verified = False
            self.save()
            if confirm:
                self.send_confirmation(request)
项目:Provo-Housing-Database    作者:marcopete5    | 项目源码 | 文件源码
def change(self, request, new_email, confirm=True):
        """
        Given a new email address, change self and re-confirm.
        """
        try:
            atomic_transaction = transaction.atomic
        except AttributeError:
            atomic_transaction = transaction.commit_on_success

        with atomic_transaction():
            user_email(self.user, new_email)
            self.user.save()
            self.email = new_email
            self.verified = False
            self.save()
            if confirm:
                self.send_confirmation(request)
项目:nrp    作者:django-rea    | 项目源码 | 文件源码
def change(self, new_email, confirm=True):
        """
        Given a new email address, change self and re-confirm.
        """
        #with transaction.commit_on_success():
        #todo: no longer exists as of django 1.6
        #commented out for now
        #fix later, see https://docs.djangoproject.com/en/1.8/topics/db/transactions/
        self.user.email = new_email
        self.user.save()
        self.email = new_email
        self.verified = False
        self.save()
        if confirm:
            self.send_confirmation()
项目:acacia_main    作者:AcaciaTrading    | 项目源码 | 文件源码
def save(self, must_create=False):
        """
        Saves the current session data to the database. If 'must_create' is
        True, a database error will be raised if the saving operation doesn't
        create a *new* entry (as opposed to possibly updating an existing
        entry).
        """
        obj = Session(
            session_key=self._get_or_create_session_key(),
            session_data=self.encode(self._get_session(no_load=must_create)),
            expire_date=self.get_expiry_date(),
            user_agent=self.user_agent,
            user_id=self.user_id,
            ip=self.ip,
        )
        using = router.db_for_write(Session, instance=obj)
        try:
            if django.VERSION >= (1, 6):
                with transaction.atomic(using):
                    obj.save(force_insert=must_create, using=using)
            else:
                with transaction.commit_on_success(using):
                    obj.save(force_insert=must_create, using=using)
        except IntegrityError as e:
            if must_create and 'session_key' in str(e):
                raise CreateError
            raise
项目:django-seven    作者:iwoca    | 项目源码 | 文件源码
def view_with_context_manager(request):
    with commit_on_success():
        return HttpResponse('Request in a transaction')
项目:django-seven    作者:iwoca    | 项目源码 | 文件源码
def managed_transaction(func):
        """ This decorator wraps a function so that all sql executions in the function are atomic

            It's used instead of django.db.transaction.commit_on_success in cases where reporting exceptions is necessary
            as commit_on_success swallows exceptions
        """
        @wraps(func)
        @transaction.commit_manually
        def _inner(*args, **kwargs):
            try:
                ret = func(*args, **kwargs)
            except Exception:
                transaction.rollback()
                raise
            else:
                transaction.commit()
                return ret

        return _inner
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def validate_token(key, credits=1):
    """
    Validate that a token is valid to authorize a setup/register operation:
     * Check it's not expired
     * Check it has some credits

    :param credits: number of credits to decrement if valid
    :return 2-tuple (<http response if error, else None>, <registration token if valid, else None>)
    """
    try:
        with transaction.commit_on_success():
            token = RegistrationToken.objects.get(secret = key)
            if not token.credits:
                log.warning("Attempt to register with exhausted token %s" % key)
                return HttpForbidden(), None
            else:
                # Decrement .credits
                RegistrationToken.objects.filter(secret = key).update(credits = token.credits - credits)
    except RegistrationToken.DoesNotExist:
        log.warning("Attempt to register with non-existent token %s" % key)
        return HttpForbidden(), None
    else:
        now = IMLDateTime.utcnow()

        if token.expiry < now:
            log.warning("Attempt to register with expired token %s (now %s, expired at %s)" % (key, now, token.expiry))
            return HttpForbidden(), None
        elif token.cancelled:
            log.warning("Attempt to register with cancelled token %s" % key)
            return HttpForbidden(), None

    return None, token
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def get_steps(self):
        from chroma_core.models.registration_token import RegistrationToken

        # Commit token so that registration request handler will see it
        with transaction.commit_on_success():
            token = RegistrationToken.objects.create(credits=1, profile=self.managed_host.server_profile)

        return [
            (DeployStep, {
                'token': token,
                'host': self.managed_host,
                'profile_name': self.managed_host.server_profile.name,
                '__auth_args': self.auth_args},)
        ]
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def run(self, kwargs):
        from chroma_core.models import ManagedHost
        from chroma_core.lib.detection import DetectScan

        # Get all the host data
        host_data = {}
        threads = []
        host_target_devices = defaultdict(list)

        for host in ManagedHost.objects.filter(id__in = kwargs['host_ids']):
            volume_nodes = VolumeNode.objects.filter(host = host)

            for volume_node in volume_nodes:
                resource = volume_node.volume.storage_resource.to_resource()
                try:
                    uuid = resource.uuid
                except AttributeError:
                    uuid = None

                host_target_devices[host].append({"path": volume_node.path,
                                                  "type": resource.device_type(),
                                                  "uuid": uuid})

            with transaction.commit_on_success():
                self.log("Scanning server %s..." % host)

            thread = ExceptionThrowingThread(target=self.detect_scan,
                                             args=(host, host_data, host_target_devices[host]))
            thread.start()
            threads.append(thread)

        ExceptionThrowingThread.wait_for_threads(threads)               # This will raise an exception if any of the threads raise an exception

        with transaction.commit_on_success():
            DetectScan(self).run(host_data)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def run(self, kwargs):
        job_log.info("%s passed pre-format check, allowing subsequent reformats" % kwargs['target'])
        with transaction.commit_on_success():
            kwargs['target'].reformat = True
            kwargs['target'].save()
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def global_remove_resource(self, resource_id):
        with self._instance_lock:
            with transaction.commit_manually():
                # Be extra-sure to see a fresh view (HYD-1301)
                transaction.commit()
            with transaction.commit_on_success():
                log.debug("global_remove_resource: %s" % resource_id)
                try:
                    record = StorageResourceRecord.objects.get(pk = resource_id)
                except StorageResourceRecord.DoesNotExist:
                    log.error("ResourceManager received invalid request to remove non-existent resource %s" % resource_id)
                    return

                self._delete_resource(record)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def add_jobs(self, jobs, command):
        """Add a job, and any others which are required in order to reach its prerequisite state"""
        # Important: the Job must not be committed until all
        # its dependencies and locks are in.
        assert transaction.is_managed()

        for job in jobs:
            for dependency in self._dep_cache.get(job).all():
                if not dependency.satisfied():
                    log.info("add_jobs: setting required dependency %s %s" % (dependency.stateful_object, dependency.preferred_state))
                    self._set_state(dependency.get_stateful_object(), dependency.preferred_state, command)
            log.info("add_jobs: done checking dependencies")
            locks = self._create_locks(job)
            job.locks_json = json.dumps([l.to_dict() for l in locks])
            self._create_dependencies(job, locks)
            with transaction.commit_on_success():
                job.save()

            log.info("add_jobs: created Job %s (%s)" % (job.pk, job.description()))

            for l in locks:
                self._lock_cache.add(l)

            command.jobs.add(job)

        self._job_collection.add_command(command, jobs)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def _handle(self, msg):
        fn = getattr(self, "_%s" % msg[0])
        # Commit after each message to ensure the next message handler
        # doesn't see a stale transaction
        with transaction.commit_on_success():
            fn(*msg[1], **msg[2])
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def _start_step(self, job_id, **kwargs):
        with transaction.commit_on_success():
            result = StepResult(job_id=job_id, **kwargs)
            result.save()
        self._job_to_result[job_id] = result
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def _console(self, job_id, log_string):
        result = self._job_to_result[job_id]
        with transaction.commit_on_success():
            result.console += log_string
            result.save()
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def _step_failure(self, job_id, backtrace):
        result = self._job_to_result[job_id]
        with transaction.commit_on_success():
            result.state = 'failed'
            result.backtrace = backtrace
            result.save()
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def _step_success(self, job_id, step_result):
        result = self._job_to_result[job_id]
        with transaction.commit_on_success():
            result.state = 'success'
            result.result = json.dumps(step_result)
            result.save()
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def set_state(self, object_ids, message, run):
        with self._lock:
            with transaction.commit_on_success():
                command = self.CommandPlan.command_set_state(object_ids, message)
            if run:
                self.progress.advance()
        return command.id
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def create_copytool(self, copytool_data):
        from django.db import transaction
        log.debug("Creating copytool from: %s" % copytool_data)
        with self._lock:
            host = ObjectCache.get_by_id(ManagedHost, int(copytool_data['host']))
            copytool_data['host'] = host
            filesystem = ObjectCache.get_by_id(ManagedFilesystem, int(copytool_data['filesystem']))
            copytool_data['filesystem'] = filesystem

            with transaction.commit_on_success():
                copytool = Copytool.objects.create(**copytool_data)

            # Add the copytool after the transaction commits
            ObjectCache.add(Copytool, copytool)

        log.debug("Created copytool: %s" % copytool)

        mount = self._create_client_mount(host, filesystem, copytool_data['mountpoint'])

        # Make the association between the copytool and client mount
        with self._lock:
            copytool.client_mount = mount

            with transaction.commit_on_success():
                copytool.save()

            ObjectCache.update(copytool)

        self.progress.advance()
        return copytool.id
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def register_copytool(self, copytool_id, uuid):
        from django.db import transaction
        with self._lock:
            copytool = ObjectCache.get_by_id(Copytool, int(copytool_id))
            log.debug("Registering copytool %s with uuid %s" % (copytool, uuid))

            with transaction.commit_on_success():
                copytool.register(uuid)

            ObjectCache.update(copytool)

        self.progress.advance()
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def unregister_copytool(self, copytool_id):
        from django.db import transaction

        with self._lock:
            copytool = ObjectCache.get_by_id(Copytool, int(copytool_id))
            log.debug("Unregistering copytool %s" % copytool)

            with transaction.commit_on_success():
                copytool.unregister()

            ObjectCache.update(copytool)

        self.progress.advance()
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def set_host_profile(self, host_id, server_profile_id):
        '''
        Set the profile for the given host to the given profile.

        :param host_id:
        :param server_profile_id:
        :return: Command for the host job or None if no commands were created.
        '''

        with self._lock:
            with transaction.commit_on_success():
                server_profile = ServerProfile.objects.get(pk=server_profile_id)
                host = ObjectCache.get_one(ManagedHost, lambda mh: mh.id == host_id)

                commands_required = host.set_profile(server_profile_id)

                if commands_required:
                    command = self.CommandPlan.command_run_jobs(commands_required,
                                                                help_text['change_host_profile'] % (host.fqdn, server_profile.ui_name))
                else:
                    command = None

        if command:
            self.progress.advance()

        return command
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def create_host(self, fqdn, nodename, address, server_profile_id):
        """
        Create a new host, or update a host in the process of being deployed.
        """
        server_profile = ServerProfile.objects.get(pk=server_profile_id)

        with self._lock:
            with transaction.commit_on_success():
                try:
                    # If there is already a host record (SSH-assisted host addition) then
                    # update it
                    host = ManagedHost.objects.get(fqdn=fqdn, state='undeployed')
                    # host.fqdn = fqdn
                    # host.nodename = nodename
                    # host.save()
                    job = DeployHostJob.objects.filter(~Q(state='complete'), managed_host=host)
                    command = Command.objects.filter(jobs=job)[0]

                except ManagedHost.DoesNotExist:
                    # Else create a new one
                    host = ManagedHost.objects.create(
                        fqdn=fqdn,
                        nodename=nodename,
                        immutable_state=not server_profile.managed,
                        address=address,
                        server_profile=server_profile,
                        install_method = ManagedHost.INSTALL_MANUAL)
                    lnet_configuration = LNetConfiguration.objects.create(host=host)

                    ObjectCache.add(LNetConfiguration, lnet_configuration)
                    ObjectCache.add(ManagedHost, host)

                    with transaction.commit_on_success():
                        command = self.CommandPlan.command_set_state(
                            [(ContentType.objects.get_for_model(host).natural_key(), host.id, server_profile.initial_state)],
                            help_text["deploying_host"] % host)

        self.progress.advance()

        return host.id, command.id
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def update_nids(self, nid_list):
        # Although this is creating/deleting a NID it actually rewrites the whole NID configuration for the node
        # this is all in here for now, but as we move to dynamic lnet it will probably get it's own file.
        with self._lock:
            lnet_configurations = set()
            lnet_nid_data = defaultdict(lambda: {'nid_updates': {}, 'nid_deletes': {}})

            for nid_data in nid_list:
                network_interface = NetworkInterface.objects.get(id = nid_data['network_interface'])
                lnet_configuration = LNetConfiguration.objects.get(host = network_interface.host_id)
                lnet_configurations.add(lnet_configuration)

                if str(nid_data['lnd_network']) == '-1':
                    lnet_nid_data[lnet_configuration]['nid_deletes'][network_interface.id] = nid_data
                else:
                    lnet_nid_data[lnet_configuration]['nid_updates'][network_interface.id] = nid_data

            jobs = []
            for lnet_configuration in lnet_configurations:
                jobs.append(ConfigureLNetJob(lnet_configuration = lnet_configuration,
                                             config_changes = json.dumps(lnet_nid_data[lnet_configuration])))

            with transaction.commit_on_success():
                command = Command.objects.create(message = "Configuring NIDS for hosts")
                self.CommandPlan.add_jobs(jobs, command)

        self.progress.advance()

        return command.id
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def trigger_plugin_update(self, include_host_ids, exclude_host_ids, plugin_names):
        """
        Cause the plugins on the hosts passed to send an update irrespective of whether any
        changes have occurred.

        :param include_host_ids: List of host ids to include in the trigger update.
        :param exclude_host_ids: List of host ids to exclude from the include list (makes for usage easy)
        :param plugin_names: list of plugins to trigger update on - empty list means all.
        :return: command id that caused updates to be sent.
        """

        host_ids = [host.id for host in ManagedHost.objects.all()] if include_host_ids is None else include_host_ids
        host_ids = host_ids if exclude_host_ids is None else list(set(host_ids) - set(exclude_host_ids))

        if host_ids:
            with self._lock:
                jobs = [TriggerPluginUpdatesJob(host_ids=json.dumps(host_ids),
                                                plugin_names_json=json.dumps(plugin_names))]

                with transaction.commit_on_success():
                    command = Command.objects.create(message="%s triggering updates from agents" % ManagedHost.objects.get(id=exclude_host_ids[0]).fqdn)
                    self.CommandPlan.add_jobs(jobs, command)

            self.progress.advance()

            return command.id
        else:
            return None
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def _check_size(self):
        """Apply a size limit to the table of log messages"""
        MAX_ROWS_PER_TRANSACTION = 10000
        removed_num_entries = 0
        overflow_filename = os.path.join(settings.LOG_PATH, "db_log")

        if self._table_size > settings.DBLOG_HW:
            remove_num_entries = self._table_size - settings.DBLOG_LW

            trans_size = min(MAX_ROWS_PER_TRANSACTION, remove_num_entries)
            with transaction.commit_on_success():
                while remove_num_entries > 0:
                    removed_entries = LogMessage.objects.all().order_by('id')[0:trans_size]
                    self.log.debug("writing %s batch of entries" % trans_size)
                    try:
                        f = open(overflow_filename, "a")
                        for line in removed_entries:
                            f.write("%s\n" % line.__str__())
                        LogMessage.objects.filter(id__lte = removed_entries[-1].id).delete()
                    except Exception, e:
                        self.log.error("error opening/writing/closing the db_log: %s" % e)
                    finally:
                        f.close()

                    remove_num_entries -= trans_size
                    removed_num_entries += trans_size
                    if remove_num_entries < trans_size:
                        trans_size = remove_num_entries

        self._table_size -= removed_num_entries
        self.log.info("Wrote %s DB log entries to %s" % (removed_num_entries, overflow_filename))

        return removed_num_entries
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def remove_host(self, fqdn):
        log.info("remove_host: %s" % fqdn)

        self.sessions.remove_host(fqdn)
        self.queues.remove_host(fqdn)
        self.hosts.remove_host(fqdn)

        with transaction.commit_on_success():
            for cert in ClientCertificate.objects.filter(host__fqdn = fqdn, revoked = False):
                log.info("Revoking %s:%s" % (fqdn, cert.serial))
                self.valid_certs.pop(cert.serial, None)
            ClientCertificate.objects.filter(host__fqdn = fqdn, revoked = False).update(revoked = True)

        # TODO: ensure there are no GETs left in progress after this completes
        # TODO: drain plugin_rx_queue so that anything we will send to AMQP has been sent before this returns
项目:mes    作者:osess    | 项目源码 | 文件源码
def change(self, new_email, confirm=True):
        """
        Given a new email address, change self and re-confirm.
        """
        with transaction.commit_on_success():
            self.user.email = new_email
            self.user.save()
            self.email = new_email
            self.verified = False
            self.save()
            if confirm:
                self.send_confirmation()
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def complete_job(self, job_id, errored = False, cancelled = False):
        # TODO: document the rules here: jobs may only modify objects that they
        # have taken out a writelock on, and they may only modify instances obtained
        # via ObjectCache, or via their stateful_object attribute.  Jobs may not
        # modify objects via .update() calls, all changes must be done on loaded instances.
        # They do not have to .save() their stateful_object, but they do have to .save()
        # any other objects that they modify (having obtained their from ObjectCache and
        # held a writelock on them)

        job = self._job_collection.get(job_id)
        with self._lock:
            with transaction.commit_on_success():
                if not errored and not cancelled:
                    try:
                        job.on_success()
                    except Exception:
                        log.error("Error in Job %s on_success:%s" % (job.id, traceback.format_exc()))
                        errored = True

                log.info("Job %d: complete_job: Updating cache" % job.pk)
                # Freshen cached information about anything that this job held a writelock on
                for lock in self._lock_cache.get_by_job(job):
                    if lock.write:
                        if hasattr(lock.locked_item, 'not_deleted'):
                            log.info("Job %d: locked_item %s %s %s %s" % (
                                job.id,
                                id(lock.locked_item),
                                lock.locked_item.__class__,
                                isinstance(lock.locked_item, DeletableStatefulObject),
                                lock.locked_item.not_deleted
                            ))
                        if hasattr(lock.locked_item, 'not_deleted') and lock.locked_item.not_deleted is None:
                            log.debug("Job %d: purging %s/%s" %
                                      (job.id, lock.locked_item.__class__, lock.locked_item.id))
                            ObjectCache.purge(lock.locked_item.__class__, lambda o: o.id == lock.locked_item.id)
                        else:
                            log.debug("Job %d: updating write-locked %s/%s" %
                                      (job.id, lock.locked_item.__class__, lock.locked_item.id))

                            # Ensure that any notifications prior to release of the writelock are not
                            # applied
                            if hasattr(lock.locked_item, 'state_modified_at'):
                                lock.locked_item.__class__.objects.filter(pk=lock.locked_item.pk).update(
                                    state_modified_at=django.utils.timezone.now())

                            ObjectCache.update(lock.locked_item)

                if job.state != 'tasked':
                    # This happens if a Job is cancelled while it's calling this
                    log.info("Job %s has state %s in complete_job" % (job.id, job.state))
                    return

                self._complete_job(job, errored, cancelled)

            with transaction.commit_on_success():
                self._drain_notification_buffer()
                self._run_next()