Python google.appengine.ext.db 模块,put() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用google.appengine.ext.db.put()

项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def test_signals(self):
        global received_pre_delete
        global received_post_save
        received_pre_delete = False
        received_post_save = False
        def handle_pre_delete(**kwargs):
            global received_pre_delete
            received_pre_delete = True
        signals.pre_delete.connect(handle_pre_delete, sender=TestC)
        def handle_post_save(**kwargs):
            global received_post_save
            received_post_save = True
        signals.post_save.connect(handle_post_save, sender=TestC)
        a = TestC()
        a.put()
        a.delete()
        self.assertTrue(received_pre_delete)
        self.assertTrue(received_post_save)
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def test_batch_signals(self):
        global received_pre_delete
        global received_post_save
        received_pre_delete = False
        received_post_save = False
        def handle_pre_delete(**kwargs):
            global received_pre_delete
            received_pre_delete = True
        signals.pre_delete.connect(handle_pre_delete, sender=TestC)
        def handle_post_save(**kwargs):
            global received_post_save
            received_post_save = True
        signals.post_save.connect(handle_post_save, sender=TestC)
        a = TestC()
        db.put([a])
        db.delete([a])
        self.assertTrue(received_pre_delete)
        self.assertTrue(received_post_save)

# Test serialization
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def test_cleanup(self):
        signals.pre_delete.connect(cleanup_relations, sender=TestC)
        c1 = TestC()
        c2 = TestC()
        db.put((c1, c2))
        TestModelRel(modelrel=c1).put()
        child = SigChild(owner=c1, rel=c2)
        child.put()
        self.assertEqual(TestC.all().count(), 2)
        self.assertEqual(SigChild.all().count(), 1)
        self.assertEqual(TestModelRel.all().count(), 1)
        c1.delete()
        signals.pre_delete.disconnect(cleanup_relations, sender=TestC)
        self.assertEqual(SigChild.all().count(), 0)
        self.assertEqual(TestC.all().count(), 0)
        self.assertEqual(TestModelRel.all().count(), 0)
项目:t2-crash-reporter    作者:tessel    | 项目源码 | 文件源码
def update(cls, cursor=None):
        logging.info('Upgrading schema for Crash Reports (Cursor = %s)' % unicode(cursor))
        query = CrashReport.all()
        if cursor:
            query.with_cursor(cursor)

        crash_reports = list()
        for crash_report in query.fetch(limit=BATCH_SIZE):
            crash_report.version = '2'
            crash_report.state = 'unresolved'
            crash_reports.append(crash_report)

        if crash_reports:
            updated = len(crash_reports)
            logging.info('Updating %s entities', updated)
            # update
            db.put(crash_reports)
            Search.add_crash_reports(crash_reports)
            # schedule next request
            deferred.defer(SchemaUpdater.update, cursor=query.cursor())
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _perform_backup_complete(
    operation, job_id, kind, backup_info_pk, gcs_path_prefix, filenames, queue):
  backup_info = BackupInformation.get(backup_info_pk)
  if backup_info:
    if job_id in backup_info.active_jobs:
      backup_info.active_jobs.remove(job_id)
      backup_info.completed_jobs = list(
          set(backup_info.completed_jobs + [job_id]))


    filenames = [GCSUtil.add_gs_prefix_if_missing(name) for name in filenames]
    kind_backup_files = backup_info.get_kind_backup_files([kind])[0]
    if kind_backup_files:
      kind_backup_files.files = list(set(kind_backup_files.files + filenames))
    else:
      kind_backup_files = backup_info.create_kind_backup_files(kind, filenames)
    db.put((backup_info, kind_backup_files), force_writes=True)
    if operation.status == utils.DatastoreAdminOperation.STATUS_COMPLETED:
      deferred.defer(finalize_backup_info, backup_info.key(),
                     gcs_path_prefix,
                     _url=config.DEFERRED_PATH,
                     _queue=queue,
                     _transactional=True)
  else:
    logging.warn('BackupInfo was not found for %s', backup_info_pk)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _save_states(self, state, serialized_readers_entity):
    """Run transaction to save state.

    Args:
      state: a model.MapreduceState entity.
      serialized_readers_entity: a model._HugeTaskPayload entity containing
        json serialized input readers.

    Returns:
      False if a fatal error is encountered and this task should be dropped
    immediately. True if transaction is successful. None if a previous
    attempt of this same transaction has already succeeded.
    """
    mr_id = state.key().id_or_name()
    fresh_state = model.MapreduceState.get_by_job_id(mr_id)
    if not self._check_mr_state(fresh_state, mr_id):
      return False
    if fresh_state.active_shards != 0:
      logging.warning(
          "Mapreduce %s already has active shards. Looks like spurious task "
          "execution.", mr_id)
      return None
    config = util.create_datastore_write_config(state.mapreduce_spec)
    db.put([state, serialized_readers_entity], config=config)
    return True
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _create_and_save_state(cls, mapreduce_spec, _app):
    """Save mapreduce state to datastore.

    Save state to datastore so that UI can see it immediately.

    Args:
      mapreduce_spec: model.MapreduceSpec,
      _app: app id if specified. None otherwise.

    Returns:
      The saved Mapreduce state.
    """
    state = model.MapreduceState.create_new(mapreduce_spec.mapreduce_id)
    state.mapreduce_spec = mapreduce_spec
    state.active = True
    state.active_shards = 0
    if _app:
      state.app_id = _app
    config = util.create_datastore_write_config(mapreduce_spec)
    state.put(config=config)
    return state
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def update(self,tablename,query,update_fields):
        # self.db['_lastsql'] = self._update(tablename,query,update_fields)
        (items, tablename, fields) = self.select_raw(query)
        counter = 0
        for item in items:
            for field, value in update_fields:
                setattr(item, field.name, self.represent(value,field.type))
            item.put()
            counter += 1
        LOGGER.info(str(counter))
        return counter
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def insert(self,table,fields):
        dfields=dict((f.name,self.represent(v,f.type)) for f,v in fields)
        # table._db['_lastsql'] = self._insert(table,fields)
        tmp = table._tableobj(**dfields)
        tmp.put()
        rid = Reference(tmp.key().id())
        (rid._table, rid._record, rid._gaekey) = (table, None, tmp.key())
        return rid
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def bulk_insert(self,table,items):
        parsed_items = []
        for item in items:
            dfields=dict((f.name,self.represent(v,f.type)) for f,v in item)
            parsed_items.append(table._tableobj(**dfields))
        gae.put(parsed_items)
        return True
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def test_serializer(self, format='json'):
        from django.core import serializers
        created = datetime.now()
        x = SerializeModel(key_name='blue_key', name='blue', count=4)
        x.put()
        SerializeModel(name='green', count=1, created=created).put()
        data = serializers.serialize(format, SerializeModel.all())
        db.delete(SerializeModel.all().fetch(100))
        for obj in serializers.deserialize(format, data):
            obj.save()
        self.validate_state(
            ('key.name', 'name',  'count', 'created'),
            (None,       'green', 1, created),
            ('blue_key', 'blue',  4, None),
        )
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def test_fake_model_property(self):
        value = {'bla': [1, 2, {'blub': 'bla'*1000}]}
        FM(data=FakeModel(value=value)).put()
        self.assertEqual(FM.all()[0].data.value, value)
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def db_add(model, key_name, parent=None, **kwargs):
    """
    This function creates an object transactionally if it does not exist in
    the datastore. Otherwise it returns None.
    """
    existing = model.get_by_key_name(key_name, parent=parent)
    if not existing:
        new_entity = model(parent=parent, key_name=key_name, **kwargs)
        new_entity.put()
        return new_entity
    return None
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def cleanup_relations(instance, **kwargs):
    if getattr(instance, '__handling_delete', False):
        return
    rels_seen, to_delete, to_put = get_cleanup_entities(instance)
    _get_included_cleanup_entities((instance,), rels_seen, to_delete, to_put)
    for entity in [instance] + to_delete:
        entity.__handling_delete = True
    if to_delete:
        db.delete(to_delete)
    for entity in [instance] + to_delete:
        del entity.__handling_delete
    if to_put:
        db.put(to_put)
项目:t2-crash-reporter    作者:tessel    | 项目源码 | 文件源码
def update(cls, property_name, property_value):
        key_name = GlobalPreferences.key_name(property_name)
        preference = GlobalPreferences.get_or_insert(key_name, property_name=property_name)
        preference.property_value = property_value
        # update
        db.put(preference)
        return preference
项目:t2-crash-reporter    作者:tessel    | 项目源码 | 文件源码
def add_or_remove(cls, fingerprint, crash, argv=None, labels=None, is_add=True, delta=1):
        # use an issue if one already exists
        issue = CrashReport.most_recent_issue(CrashReport.key_name(fingerprint))
        key_name = CrashReport.key_name(fingerprint)
        config = ShardedCounterConfig.get_sharded_config(key_name)
        shards = config.shards
        shard_to_use = random.randint(0, shards-1)
        shard_key_name = key_name + '_' + str(shard_to_use)
        if not argv:
            argv = []
        crash_report = CrashReport \
            .get_or_insert(shard_key_name,
                           name=key_name,
                           crash=crash,
                           fingerprint=fingerprint,
                           argv=argv,
                           labels=labels,
                           issue=issue)
        if is_add:
            crash_report.count += delta
            crash_report.put()
            # update caches
            memcache.incr(CrashReport.count_cache_key(key_name), delta, initial_value=0)
        else:
            crash_report.count -= delta
            crash_report.put()
            memcache.decr(CrashReport.count_cache_key(key_name), delta)

        # clear properties cache
        CrashReport.clear_properties_cache(key_name)
        return crash_report
项目:t2-crash-reporter    作者:tessel    | 项目源码 | 文件源码
def update_crash_report(cls, fingerprint, delta_state):
        name = CrashReport.key_name(fingerprint)
        to_update = list()
        q = CrashReport.all()
        q.filter('name = ', name)
        for crash_report in q.run():
            # update state
            # only allow * mutable * properties of crash reports to be updated

            # having to manually update properties on the entity this was, as expando entities
            # do not have a way to update an entity via a property name :(
            if 'argv' in delta_state:
                crash_report.argv = delta_state.get('argv')
            if 'labels' in delta_state:
                crash_report.labels = delta_state.get('labels')
            if 'date_time' in delta_state:
                crash_report.date_time = delta_state.get('date_time')
            if 'count' in delta_state:
                crash_report.count = delta_state.get('count')
            if 'issue' in delta_state:
                crash_report.issue = delta_state.get('issue')
            if 'state' in delta_state:
                crash_report.state = delta_state.get('state')

            to_update.append(crash_report)

        # update datastore and search indexes
        db.put(to_update)
        Search.add_crash_reports(to_update)
        # clear memcache
        CrashReport.clear_properties_cache(name)
        # return crash report
        return CrashReport.get_crash(fingerprint)
项目:dancedeets-monorepo    作者:mikelambert    | 项目源码 | 文件源码
def save_objects(self, keys_to_objects):
        cache_keys = [self.key_to_cache_key(object_key) for object_key, this_object in keys_to_objects.iteritems()]
        if cache_keys:
            existing_objects = FacebookCachedObject.get_by_key_name(cache_keys)
        else:
            existing_objects = []

        db_objects_to_put = []
        for obj, (object_key, this_object) in zip(existing_objects, keys_to_objects.iteritems()):
            if not self._is_cacheable(object_key, this_object):
                #TODO(lambert): cache the fact that it's a private-unshowable event somehow? same as deleted events?
                logging.warning("BatchLookup: Looked up id %s but is not cacheable.", object_key)
                continue
            cache_key = self.key_to_cache_key(object_key)
            if not obj:
                obj = FacebookCachedObject(key_name=cache_key)
            old_json_data = obj.json_data
            obj.encode_data(this_object)
            if old_json_data != obj.json_data:
                self.db_updates += 1
                db_objects_to_put.append(obj)
        if db_objects_to_put:
            try:
                db.put(db_objects_to_put)
            except apiproxy_errors.CapabilityDisabledError as e:
                logging.warning('CapabilityDisabledError: %s', e)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
                  backup_handler, input_reader, output_writer, mapper_params,
                  mapreduce_params, queue):
  """Creates backup/restore MR jobs for the given operation.

  Args:
    job_operation_key: a key of utils.DatastoreAdminOperation entity.
    backup_info_key: a key of BackupInformation entity.
    kinds: a list of kinds to run the M/R for.
    job_name: the M/R job name prefix.
    backup_handler: M/R job completion handler.
    input_reader: M/R input reader.
    output_writer: M/R output writer.
    mapper_params: custom parameters to pass to mapper.
    mapreduce_params: dictionary parameters relevant to the whole job.
    queue: the name of the queue that will be used by the M/R.

  Returns:
    Ids of all started mapper jobs as list of strings.
  """
  backup_info = BackupInformation.get(backup_info_key)
  if not backup_info:
    return []
  jobs = utils.RunMapForKinds(
      job_operation_key,
      kinds,
      job_name,
      backup_handler,
      input_reader,
      output_writer,
      mapper_params,
      mapreduce_params,
      queue_name=queue)
  backup_info.active_jobs = jobs
  backup_info.put(force_writes=True)
  return jobs
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def finalize_backup_info(backup_info_pk, gcs_path_prefix):
  """Finalize the state of BackupInformation and creates info file for GS."""

  def get_backup_info():
    return BackupInformation.get(backup_info_pk)

  backup_info = db.run_in_transaction(get_backup_info)
  if backup_info:
    complete_time = datetime.datetime.now()
    backup_info.complete_time = complete_time
    gs_handle = None
    if backup_info.filesystem == FILES_API_GS_FILESYSTEM:





      gs_handle = BackupInfoWriter(gcs_path_prefix).write(backup_info)[0]

    def set_backup_info_with_finalize_info():
      backup_info = get_backup_info()
      backup_info.complete_time = complete_time
      backup_info.gs_handle = gs_handle
      backup_info.put(force_writes=True)
    db.run_in_transaction(set_backup_info_with_finalize_info)
    logging.info('Backup %s completed', backup_info.name)
  else:
    logging.warn('Backup %s could not be found', backup_info_pk)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def flush(self):
    """Save aggregated type information to the datastore if changed."""
    if self.__needs_save:

      def update_aggregation_tx():
        aggregation = SchemaAggregationResult.load(
            self.__backup_id, self.__kind, self.__shard_id)
        if aggregation:
          if aggregation.merge(self.__aggregation):
            aggregation.put(force_writes=True)
          self.__aggregation = aggregation
        else:
          self.__aggregation.put(force_writes=True)

      def mark_aggregation_as_partial_tx():
        aggregation = SchemaAggregationResult.load(
            self.__backup_id, self.__kind, self.__shard_id)
        if aggregation is None:
          aggregation = SchemaAggregationResult.create(
              self.__backup_id, self.__kind, self.__shard_id)
        aggregation.is_partial = True
        aggregation.put(force_writes=True)
        self.__aggregation = aggregation

      try:
        db.run_in_transaction(update_aggregation_tx)
      except apiproxy_errors.RequestTooLargeError:
        db.run_in_transaction(mark_aggregation_as_partial_tx)
      self.__needs_save = False
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _drop_gracefully(self):
    """Gracefully drop controller task.

    This method is called when decoding controller task payload failed.
    Upon this we mark ShardState and MapreduceState as failed so all
    tasks can stop.

    Writing to datastore is forced (ignore read-only mode) because we
    want the tasks to stop badly, and if force_writes was False,
    the job would have never been started.
    """
    mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
    state = model.MapreduceState.get_by_job_id(mr_id)
    if not state or not state.active:
      return

    state.active = False
    state.result_status = model.MapreduceState.RESULT_FAILED
    config = util.create_datastore_write_config(state.mapreduce_spec)
    puts = []
    for ss in model.ShardState.find_all_by_mapreduce_state(state):
      if ss.active:
        ss.set_for_failure()
        puts.append(ss)

        if len(puts) > model.ShardState._MAX_STATES_IN_MEMORY:
          db.put(puts, config=config)
          puts = []
    db.put(puts, config=config)


    self._finalize_job(state.mapreduce_spec, state)
项目:cloud-memory    作者:onejgordon    | 项目源码 | 文件源码
def get(self, d):
        BATCH_SIZE = 100
        cursor = self.request.get('cursor')
        q = Question.all()
        if cursor:
            q.with_cursor(cursor)
        logging.debug("Query count: %d" % q.count())
        batch = q.fetch(BATCH_SIZE)
        identifiers = []
        if batch:
            new_cursor = q.cursor()
            next_url = '/admin/schema/do_schema?cursor=%s' % (new_cursor)

            changed = []
            for item in batch:
                edited = False
                try:
                    if item.correct_response:
                        item.correct_responses=[item.correct_response]
                        item.correct_response = None
                        edited = True
                    if edited:
                        changed.append(item)
                except Exception, e:
                    logging.error("Error: %s" % e)
                identifiers.append("Question: %s, Changed: %s" % (item.correct_responses, edited))
            db.put(changed)

            context = {
                'identifiers': identifiers,
                'next_url': next_url,
                }
            self.render_template("schema_update.html", **context)
        else:
            self.redirect("/admin")
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def update(self, tablename, query, update_fields):
        # self.db['_lastsql'] = self._update(tablename, query, update_fields)
        (items, tablename, fields) = self.select_raw(query)
        counter = 0
        for item in items:
            for field, value in update_fields:
                setattr(item, field.name, self.represent(value, field.type))
            item.put()
            counter += 1
        LOGGER.info(str(counter))
        return counter
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def insert(self, table, fields):
        dfields = dict((f.name, self.represent(v, f.type)) for f, v in fields)
        # table._db['_lastsql'] = self._insert(table, fields)
        tmp = table._tableobj(**dfields)
        tmp.put()
        key = tmp.key if self.use_ndb else tmp.key()
        rid = Reference(key.id())
        (rid._table, rid._record, rid._gaekey) = (table, None, key)
        return rid
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def bulk_insert(self, table, items):
        parsed_items = []
        for item in items:
            dfields = dict((f.name, self.represent(v, f.type)) for f, v in item)
            parsed_items.append(table._tableobj(**dfields))
        if self.use_ndb:
            ndb.put_multi(parsed_items)
        else:
            gae.put(parsed_items)
        return True
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def post(self):
    """Handler for post requests to datastore_admin/import_backup.do.

    Import is executed and user is redirected to the base-path handler.
    """
    gs_handle = self.request.get('gs_handle')
    token = self.request.get('xsrf_token')
    error = None
    if gs_handle and utils.ValidateXsrfToken(token, XSRF_ACTION):
      try:
        bucket_name, path = parse_gs_handle(gs_handle)
        file_content = get_gs_object(bucket_name, path)
        entities = parse_backup_info_file(file_content)
        original_backup_info = entities.next()
        entity = datastore.Entity(BackupInformation.kind())
        entity.update(original_backup_info)
        backup_info = BackupInformation.from_entity(entity)
        if original_backup_info.key().app() != os.getenv('APPLICATION_ID'):
          backup_info.original_app = original_backup_info.key().app()

        def tx():
          backup_info.put(force_writes=True)
          kind_files_models = []
          for entity in entities:
            kind_files = backup_info.create_kind_backup_files(
                entity.key().name(), entity['files'])
            kind_files_models.append(kind_files)
          db.put(kind_files_models, force_writes=True)
        db.run_in_transaction(tx)
        backup_id = str(backup_info.key())
      except Exception, e:
        logging.exception('Failed to Import datastore backup information.')
        error = e.message

    if error:
      self.SendRedirect(params=[('error', error)])
    elif self.request.get('Restore'):
      ConfirmRestoreFromBackupHandler.Render(
          self, default_backup_id=backup_id,
          default_delete_backup_after_restore=True)
    else:
      self.SendRedirect()
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _try_free_lease(self, shard_state, slice_retry=False):
    """Try to free lease.

    A lightweight transaction to update shard_state and unset
    slice_start_time to allow the next retry to happen without blocking.
    We don't care if this fails or not because the lease will expire
    anyway.

    Under normal execution, _save_state_and_schedule_next is the exit point.
    It updates/saves shard state and schedules the next slice or returns.
    Other exit points are:
    1. _are_states_consistent: at the beginning of handle, checks
      if datastore states and the task are in sync.
      If not, raise or return.
    2. _attempt_slice_retry: may raise exception to taskqueue.
    3. _save_state_and_schedule_next: may raise exception when taskqueue/db
       unreachable.

    This handler should try to free the lease on every exceptional exit point.

    Args:
      shard_state: model.ShardState.
      slice_retry: whether to count this as a failed slice execution.
    """
    @db.transactional
    def _tx():
      fresh_state = model.ShardState.get_by_shard_id(shard_state.shard_id)
      if fresh_state and fresh_state.active:

        fresh_state.slice_start_time = None
        fresh_state.slice_request_id = None
        if slice_retry:
          fresh_state.slice_retries += 1
        fresh_state.put()
    try:
      _tx()

    except Exception, e:
      logging.warning(e)
      logging.warning(
          "Release lock for shard %s failed. Wait for lease to expire.",
          shard_state.shard_id)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _finalize_job(cls, mapreduce_spec, mapreduce_state):
    """Finalize job execution.

    Invokes done callback and save mapreduce state in a transaction,
    and schedule necessary clean ups. This method is idempotent.

    Args:
      mapreduce_spec: an instance of MapreduceSpec
      mapreduce_state: an instance of MapreduceState
    """
    config = util.create_datastore_write_config(mapreduce_spec)
    queue_name = util.get_queue_name(mapreduce_spec.params.get(
        model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
    done_callback = mapreduce_spec.params.get(
        model.MapreduceSpec.PARAM_DONE_CALLBACK)
    done_callback_target = mapreduce_spec.params.get(
        model.MapreduceSpec.PARAM_DONE_CALLBACK_TARGET)

    done_task = None
    if done_callback:


      headers = util._get_task_headers(
          mapreduce_spec.mapreduce_id,
          util.CALLBACK_MR_ID_TASK_HEADER,
          set_host_header=(done_callback_target is None))
      done_task = taskqueue.Task(
          url=done_callback,
          target=done_callback_target,
          headers=headers,
          method=mapreduce_spec.params.get("done_callback_method", "POST"))

    @db.transactional(retries=5)
    def _put_state():
      """Helper to store state."""
      fresh_state = model.MapreduceState.get_by_job_id(
          mapreduce_spec.mapreduce_id)
      if not fresh_state.active:
        logging.warning(
            "Job %s is not active. Looks like spurious task execution. "
            "Dropping task.", mapreduce_spec.mapreduce_id)
        return
      mapreduce_state.put(config=config)

      if done_task and not _run_task_hook(
          mapreduce_spec.get_hooks(),
          "enqueue_done_task",
          done_task,
          queue_name,
          transactional=True):
        done_task.add(queue_name, transactional=True)

    _put_state()
    logging.info("Final result for job '%s' is '%s'",
                 mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
    cls._clean_up_mr(mapreduce_spec)