我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用google.appengine.ext.ndb.put_multi()。
def init_zoneinfo(): """ Add each zone info to the datastore. This will overwrite existing zones. This must be called before the AppengineTimezoneLoader will work. """ import os, logging from zipfile import ZipFile zoneobjs = [] zoneinfo_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'zoneinfo.zip')) with ZipFile(zoneinfo_path) as zf: for zfi in zf.filelist: key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE) zobj = Zoneinfo(key=key, data=zf.read(zfi)) zoneobjs.append(zobj) logging.info("Adding %d timezones to the pytz-appengine database" % len(zoneobjs) ) ndb.put_multi(zoneobjs)
def store_specs_from_subscription(subscription_key, week_start, specs): """ Idempotent function to store meeting specs for this week. """ try: current_specs = MeetingSpec.query( MeetingSpec.meeting_subscription == subscription_key, MeetingSpec.datetime > week_start ).fetch() except NeedIndexError: current_specs = [] if current_specs: return ndb.put_multi(specs) return specs
def IncrementAccountsWithTaskExperiment(): def Go(): def AddFreeCredit(creditamount): @task def ProcessOnePage(cursor): accounts, cursor, kontinue = Account.query().fetch_page( 100, start_cursor = cursor ) for account in accounts: account.balance += creditamount ndb.put_multi(accounts) if kontinue: ProcessOnePage(cursor) ProcessOnePage(None) AddFreeCredit(10) return "Increment Accounts With Task", Go
def MakeAccountsExperiment(): def Go(): @task(includeheaders=True, logname="MakeAccountsExperiment") def MakeAccounts(numaccounts, headers): logging.debug(headers) logging.debug(numaccounts) if numaccounts <= 10: accounts = [] for _ in range(numaccounts): account = Account() account.balance = 0 accounts.append(account) ndb.put_multi(accounts) else: doaccounts = numaccounts while doaccounts > 0: batch = (numaccounts / 10) if ((numaccounts / 10) <= doaccounts) else doaccounts MakeAccounts(batch) doaccounts -= batch MakeAccounts(1000) return "Make Accounts", Go
def init_zoneinfo(): """ Add each zone info to the datastore. This will overwrite existing zones. This must be called before the AppengineTimezoneLoader will work. """ import os import logging from zipfile import ZipFile zoneobjs = [] zoneinfo_path = os.path.abspath( os.path.join(os.path.dirname(__file__), 'zoneinfo.zip')) with ZipFile(zoneinfo_path) as zf: for zfi in zf.filelist: key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE) zobj = Zoneinfo(key=key, data=zf.read(zfi)) zoneobjs.append(zobj) logging.info( "Adding %d timezones to the pytz-appengine database" % len(zoneobjs)) ndb.put_multi(zoneobjs)
def post(self): # Force ndb to use v1 of the model by re-loading it. reload(models_v1) # Save some example data. ndb.put_multi([ models_v1.Picture(author='Alice', name='Sunset'), models_v1.Picture(author='Bob', name='Sunrise') ]) self.response.write(""" Entities created. <a href="/">View entities</a>. """) # [END add_entities] # [START update_schema]
def testEventuallyConsistentGlobalQueryResult(self): class TestModel(ndb.Model): pass user_key = ndb.Key('User', 'ryan') # Put two entities ndb.put_multi([ TestModel(parent=user_key), TestModel(parent=user_key) ]) # Global query doesn't see the data. self.assertEqual(0, TestModel.query().count(3)) # Ancestor query does see the data. self.assertEqual(2, TestModel.query(ancestor=user_key).count(3)) # [END HRD_example_1] # [START HRD_example_2]
def create_forums( cls ): # create an executable string from the forums settings to add the forums expression_forum = '''enki.modelforum.EnkiModelForum( group_order = {group_order}, forum_order = {forum_order}, group = "{group}", title = "{title}", description = "{description}" ), ''' expression = "ndb.put_multi([ " increment = 10 group_order = 0 forum_order = 0 current_group = '' for index, item in enumerate( settings.FORUMS ): if item[ 0 ] != current_group: # new group: increment the group order index and reset the forum order index current_group = item[ 0 ] group_order += increment forum_order = increment else: forum_order += increment expression += expression_forum.format( group_order = group_order, forum_order = forum_order, group = current_group, title = item[ 1 ], description = item[ 2 ] ) expression += " ])" exec( expression )
def get(self, d): # Run after admin user logs in u = User.query().get() if u: today = datetime.today() g = Goal.CreateMonthly(u, date=today) g.Update(text=["Get it done"]) g2 = Goal.Create(u, str(today.year)) g2.Update(text=["Make progress"]) ndb.put_multi([g, g2]) h = Habit.Create(u) h.Update(name="Run") h.put() p = Project.Create(u) p.Update(title="Blog post", subhead="How Flow works") p.put() Task.Create(u, "Get this done").put() t = Task.Create(u, "Think hard", due=datetime.today()) t2 = Task.Create(u, "Think even harder", due=datetime.today()) message = "OK" else: message = "No user" self.json_out({'message': message})
def get(self): from services import pocket, goodreads logging.debug("Running SyncReadables cron...") TS_KEY = 'pocket_last_timestamp' users = User.SyncActive(['pocket', 'goodreads']) user_put = [] for user in users: # Pocket user_changes = False access_token = user.get_integration_prop('pocket_access_token') if access_token: success, readables, latest_timestamp = pocket.sync(user, access_token) logging.debug("Got %d readables from pocket" % len(readables)) user.set_integration_prop(TS_KEY, latest_timestamp) user_changes = True success, readables = goodreads.get_books_on_shelf(user, shelf='currently-reading') logging.debug("Got %d readables from good reads" % len(readables)) if user_changes: user_put.append(user) ndb.put_multi(user_put)
def CreateFromText(user, text): people = re.findall(r'@([a-zA-Z]{3,30})', text) hashtags = re.findall(r'#([a-zA-Z]{3,30})', text) new_jts = [] all_jts = [] people_ids = [JournalTag.Key(user, p) for p in people] hashtag_ids = [JournalTag.Key(user, ht, prefix='#') for ht in hashtags] existing_tags = ndb.get_multi(people_ids + hashtag_ids) for existing_tag, key in zip(existing_tags, people_ids + hashtag_ids): if not existing_tag: prefix = key.id()[0] type = JOURNALTAG.HASHTAG if prefix == '#' else JOURNALTAG.PERSON jt = JournalTag(id=key.id(), name=key.id()[1:], type=type, parent=user.key) new_jts.append(jt) all_jts.append(jt) else: all_jts.append(existing_tag) ndb.put_multi(new_jts) return all_jts
def merge_user_dbs(user_db, deprecated_keys): # TODO: Merge possible user data before handling deprecated users deprecated_dbs = ndb.get_multi(deprecated_keys) for deprecated_db in deprecated_dbs: deprecated_db.auth_ids = [] deprecated_db.active = False deprecated_db.verified = False if not deprecated_db.username.startswith('_'): deprecated_db.username = '_%s' % deprecated_db.username ndb.put_multi(deprecated_dbs)
def create_new_employees_from_list(new_employees): user_list = [] for new_employee in new_employees: user = User( email=new_employee['email'], first_name=new_employee['first_name'], last_name=new_employee['last_name'], photo_url=new_employee['photo_url'], metadata=new_employee['metadata'], subscription_preferences=[], ) user_list.append(user) ndb.put_multi(user_list)
def update_current_employees(local_data_employee, remote_data_employee): users = set(local_data_employee.keys()) for user in users: local_employee = local_data_employee[user] remote_employee = remote_data_employee[user] local_employee.first_name = remote_employee['first_name'] local_employee.last_name = remote_employee['last_name'] local_employee.photo_url = remote_employee['photo_url'] local_employee.metadata = remote_employee['metadata'] local_employee.terminated = False ndb.put_multi(local_data_employee.values())
def mark_termed_employees(termed_employees): for employee in termed_employees: employee.terminated = True ndb.put_multi(termed_employees)
def IncrementAccountsExperimentNaive(): def Go(): def AddFreeCredit(creditamount): def ProcessOnePage(cursor): accounts, cursor, kontinue = Account.query().fetch_page( 100, start_cursor = cursor ) for account in accounts: account.balance += creditamount ndb.put_multi(accounts) if kontinue: ProcessOnePage(cursor) ProcessOnePage(None) AddFreeCredit(10) return "Increment Accounts (Naive)", Go
def operate_on_multiple_keys_at_once(list_of_entities): list_of_keys = ndb.put_multi(list_of_entities) list_of_entities = ndb.get_multi(list_of_keys) ndb.delete_multi(list_of_keys)
def test_data(): from main import Note ancestor_key = ndb.Key(Note, '123') notes = [ Note(parent=ancestor_key, message='1'), Note(parent=ancestor_key, message='2') ] ndb.put_multi(notes) yield
def resave_display_events(db_events): display_events = [search.DisplayEvent.build(x) for x in db_events] ndb.put_multi([x for x in display_events if x])
def _save_events(db_events, disable_updates=None): objects_to_put = list(db_events) objects_to_put += [search.DisplayEvent.build(x) for x in db_events] # Because some DisplayEvent.build() calls return None (from errors, or from inability) objects_to_put = [x for x in objects_to_put if x] ndb.put_multi(objects_to_put) if 'index' not in (disable_updates or []): search.update_fulltext_search_index_batch(db_events)
def run(): # Set your downloaded folder's path here (must be readable by dev_appserver) mypath = '/Users/lambert/Dropbox/dancedeets/data/datastore_backup_datastore_backup_2016_11_19_DBEvent/15700286559371541387849311E815D' # Se the class of the objects here cls = DBEvent # Set your app's name here appname = "dev~None" # Do the harlem shake onlyfiles = [f for f in listdir(mypath) if isfile(join(mypath, f))] for file in onlyfiles: i = 0 try: raw = open(mypath + "/" + file, 'r') reader = records.RecordsReader(raw) to_put = list() for record in reader: entity_proto = entity_pb.EntityProto(contents=record) entity_proto.key_.app_ = appname obj = cls._from_pb(entity_proto) to_put.append(obj) i += 1 if i % 100 == 0: print "Saved %d %ss" % (i, '') #entity.kind()) ndb.put_multi(to_put) # use_memcache=False) to_put = list() ndb.put_multi(to_put) # use_memcache=False) to_put = list() print "Saved %d" % i except ProtocolBufferDecodeError: """ All good """
def _flush_ndb_puts(self, items, options): """Flush all NDB puts to datastore.""" assert ndb is not None ndb.put_multi(items, config=self._create_config(options))
def get(self): from services.github import GithubClient last_date = self.request.get('date') if last_date: last_date = tools.fromISODate(last_date) else: last_date = (datetime.today() - timedelta(days=1)) users = User.SyncActive('github') res = {} td_put = [] for user in users: gh_client = GithubClient(user) if gh_client._can_run(): date_range = [(last_date - timedelta(days=x)).date() for x in range(self.GH_COMMIT_OVERLAP)] logging.debug("Running SyncGithub cron for %s on %s..." % (user, date_range)) commits_dict = gh_client.get_contributions_on_date_range(date_range) if commits_dict is not None: for date, n_commits in commits_dict.items(): td = TrackingDay.Create(user, date) td.set_properties({ 'commits': n_commits }) td_put.append(td) else: logging.debug("Github updater can't run") if td_put: ndb.put_multi(td_put) self.json_out(res)
def action(self, d): ''' ''' action = self.request.get('action') res = {} if action == 'create_common': common_tasks = self.user.get_setting_prop(['tasks', 'common_tasks']) if common_tasks: task_put = [] for ct in common_tasks: title = ct.get('title') if title: task_put.append(Task.Create(self.user, title)) if task_put: ndb.put_multi(task_put) self.message = "Created %d task(s)" % len(task_put) res['tasks'] = [t.json() for t in task_put] self.success = True else: self.message = "You haven't configured any common tasks" elif action == 'archive_complete': recent = Task.Recent(self.user, limit=20) to_archive = [] for t in recent: if not t.archived and t.is_done(): t.archive() to_archive.append(t) if to_archive: ndb.put_multi(to_archive) res['archived_ids'] = [t.key.id() for t in to_archive] self.message = "Archived %d %s" % (len(to_archive), tools.pluralize('task', count=len(to_archive))) else: self.message = "No completed tasks to archive" self.success = True else: self.message = "Unknown action" self.set_response(res)
def batch_create(self, d): readings = json.loads(self.request.get('readings')) source = self.request.get('source', default_value='form') dbp = [] for r in readings: type_string = r.get('type') if type_string: r['type'] = READABLE.LOOKUP.get(type_string.lower()) r = Readable.CreateOrUpdate(self.user, None, source=source, read=True, **r) dbp.append(r) if dbp: ndb.put_multi(dbp) self.success = True self.message = "Putting %d" % len(dbp) self.set_response()
def batch_create(self, d): quotes = json.loads(self.request.get('quotes')) dbp = [] for q in quotes: if 'dt_added' in q and isinstance(q['dt_added'], basestring): q['dt_added'] = tools.fromISODate(q['dt_added']) q = Quote.Create(self.user, **q) dbp.append(q) if dbp: ndb.put_multi(dbp) self.success = True self.message = "Putting %d" % len(dbp) self.set_response()
def submit(self, d): ''' Submit today's journal (yesterday if 00:00 - 04:00) ''' date = None _date = self.request.get('date') if _date: date = tools.fromISODate(_date) task_json = tools.getJson(self.request.get('tasks')) # JSON params = tools.gets(self, strings=['lat', 'lon', 'tags_from_text'], json=['data'], lists=['tags'] ) jrnl = None if params.get('data'): if not params.get('tags'): params['tags'] = [] jrnl = MiniJournal.Create(self.user, date) jrnl.Update(**params) jrnl.parse_tags() jrnl.put() if task_json: # Save new tasks for tomorrow tasks = [] for t in task_json: if t: task = Task.Create(self.user, t) tasks.append(task) ndb.put_multi(tasks) self.success = True self.message = "Journal submitted!" else: self.message = "Malformed request - data param required" self.set_response({ 'journal': jrnl.json() if jrnl else None })
def bulk_insert(self, table, items): parsed_items = [] for item in items: dfields = dict((f.name, self.represent(v, f.type)) for f, v in item) parsed_items.append(table._tableobj(**dfields)) if self.use_ndb: ndb.put_multi(parsed_items) else: gae.put(parsed_items) return True
def _update_employees(employee_dicts): """Given a JSON string in the format "[{employee info 1}, {employee info 2}, ...]", create new employee records and update existing records as necessary. Then determine whether any employees have been terminated since the last update, and mark these employees as such. """ logging.info('Updating employees...') all_employees, new_employees = [], [] current_usernames = set() for d in employee_dicts: existing_employee = Employee.query(Employee.username == d['username']).get() if existing_employee is None: new_employee = Employee.create_from_dict(d, persist=False) all_employees.append(new_employee) new_employees.append(new_employee) else: existing_employee.update_from_dict(d) # If the user is in the S3 dump, then the user is no longer # terminated. existing_employee.terminated = False all_employees.append(existing_employee) current_usernames.add(d['username']) ndb.put_multi(all_employees) # Figure out if there are any employees in the DB that aren't in the S3 # dump. These are terminated employees, and we need to mark them as such. usernames_to_employees = dict( (employee.username, employee) for employee in Employee.query() ) db_usernames = set(usernames_to_employees.keys()) terminated_usernames = db_usernames - current_usernames terminated_employees = [] for u in terminated_usernames: employee = usernames_to_employees[u] employee.terminated = True terminated_employees.append(employee) ndb.put_multi(terminated_employees) logging.info('Done.')
def update_schema_task(cursor=None, num_updated=0, batch_size=100): """Task that handles updating the models' schema. This is started by UpdateSchemaHandler. It scans every entity in the datastore for the Picture model and re-saves it so that it has the new schema fields. """ # Force ndb to use v2 of the model by re-loading it. reload(models_v2) # Get all of the entities for this Model. query = models_v2.Picture.query() pictures, next_cursor, more = query.fetch_page( batch_size, start_cursor=cursor) to_put = [] for picture in pictures: # Give the new fields default values. # If you added new fields and were okay with the default values, you # would not need to do this. picture.num_votes = 1 picture.avg_rating = 5 to_put.append(picture) # Save the updated entities. if to_put: ndb.put_multi(to_put) num_updated += len(to_put) logging.info( 'Put {} entities to Datastore for a total of {}'.format( len(to_put), num_updated)) # If there are more entities, re-queue this task for the next page. if more: deferred.defer( update_schema_task, cursor=next_cursor, num_updated=num_updated) else: logging.debug( 'update_schema_task complete with {0} updates!'.format( num_updated)) # [END update_schema]
def get(self, d): hack_id = self.request.get('hack_id') res = {} if hack_id == 'index_quotes_readables': page = self.request.get_range('page') PAGE_SIZE = 50 index_lookup = {} # index_name -> (index, list of items) for q in Quote.query().fetch(limit=PAGE_SIZE, offset=page * PAGE_SIZE): sd, index = q.update_sd(index_put=False) if index and index.name not in index_lookup: index_lookup[index.name] = (index, [sd]) else: index_lookup[index.name][1].append(sd) for r in Readable.query().fetch(limit=PAGE_SIZE, offset=page * PAGE_SIZE): sd, index = r.update_sd(index_put=False) if index and index.name not in index_lookup: index_lookup[index.name] = (index, [sd]) else: index_lookup[index.name][1].append(sd) if index_lookup: n = 0 for index_tuple in index_lookup.values(): index, items = index_tuple index.put(items) n += len(items) res['result'] = "Put %d items in %d indexes" % (n, len(index_tuple)) res['page'] = page elif hack_id == 'normalize_key_props': dbp = [] for hd in HabitDay.query().iter(): habit_key = hd.habit if habit_key.parent() is None: # Need to update hd.habit = ndb.Key('User', hd.key.parent().id(), 'Habit', int(habit_key.id())) dbp.append(hd) res['habitdays'] = len(dbp) ndb.put_multi(dbp) dbp = [] for jrnl in MiniJournal.query().iter(): changes = False for i, tag_key in enumerate(jrnl.tags): if tag_key.parent() is None: # Need to update jrnl.tags[i] = ndb.Key('User', jrnl.key.parent().id(), 'JournalTag', tag_key.id()) changes = True if changes: dbp.append(jrnl) res['journals'] = len(dbp) ndb.put_multi(dbp) else: res['result'] = 'hack_id not found' self.json_out(res)
def get_books_on_shelf(user, shelf='currently-reading'): ''' Return JSON array {title, author, isbn, image} ''' user_id = user.get_integration_prop('goodreads_user_id') readables = [] success = False if user_id: data = urllib.urlencode({ 'shelf': shelf, 'key': GR_API_KEY, 'v': 2 }) params = data url = "https://www.goodreads.com/review/list/%s.xml?%s" % (user_id, params) logging.debug("Fetching %s for %s" % (url, user)) res = urlfetch.fetch( url=url, method=urlfetch.GET, validate_certificate=True) logging.debug(res.status_code) if res.status_code == 200: xml = res.content data = etree.parse(StringIO(xml)) for r in data.getroot().find('reviews').findall('review'): book = r.find('book') isbn = book.find('isbn13').text image_url = book.find('image_url').text title = book.find('title').text authors = book.find('authors') link = book.find('link').text first_author = authors.find('author') if first_author is not None: name = first_author.find('name') if name is not None: author = name.text r = Readable.CreateOrUpdate(user, isbn, title=title, url=link, source='goodreads', image_url=image_url, author=author, type=READABLE.BOOK, read=False) readables.append(r) success = True logging.debug("Putting %d readable(s)" % len(readables)) ndb.put_multi(readables) Readable.put_sd_batch(readables) return (success, readables)