我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用pymongo.IndexModel()。
def initialize_indexes(database): """Ensure the necessary indexes exist.""" submissions = database['submissions'] comments = database['comments'] index_id = pymongo.IndexModel('reddit_id') index_created = pymongo.IndexModel([('created', pymongo.DESCENDING)]) index_text_title_and_body = pymongo.IndexModel([('title', pymongo.TEXT), ('body', pymongo.TEXT)]) index_text_body = pymongo.IndexModel([('body', pymongo.TEXT)]) submissions.create_indexes([index_id, index_created, index_text_title_and_body]) comments.create_indexes([index_id, index_created, index_text_body])
def rebuild_people_indexes(): indexes = [] # indexes.append(IndexModel('pid', name='_pid')) indexes.append(IndexModel('PersonNameLastName', name= '_LastName')) indexes.append(IndexModel('PersonNameFirstName', name= '_FirstName')) indexes.append(IndexModel('BirthPlace.Place', name= '_BirthPlace')) indexes.append(IndexModel('relatives.pid', name= '_RelativesPid')) # indexes.append(IndexModel('BirthDate', name= '_BirthDate')) indexes.append(IndexModel([('BirthDate.Year', ASCENDING), ('BirthDate.Month', ASCENDING), ('BirthDate.Day', ASCENDING)], name="_BirthDate")) mc[write_table].create_indexes(indexes)
def parse_index(index, base_compound_field=None): keys = None args = {} if isinstance(index, IndexModel): keys = [(k, d) for k, d in index.document['key'].items()] args = {k: v for k, v in index.document.items() if k != 'key'} elif isinstance(index, (tuple, list)): # Compound indexes keys = [explicit_key(e) for e in index] elif isinstance(index, str): keys = [explicit_key(index)] elif isinstance(index, dict): assert 'key' in index, 'Index passed as dict must have a `key` entry' assert hasattr(index['key'], '__iter__'), '`key` entry must be iterable' keys = [explicit_key(e) for e in index['key']] args = {k: v for k, v in index.items() if k != 'key'} else: raise TypeError('Index type must be <str>, <list>, <dict> or <pymongo.IndexModel>') if base_compound_field: keys.append(explicit_key(base_compound_field)) return IndexModel(keys, **args)
def __init__(self): clinet = pymongo.MongoClient("localhost", 27017) db = clinet["PornHub"] self.PhRes = db["PhRes"] idx = IndexModel([('link_url', ASCENDING)], unique=True) self.PhRes.create_indexes([idx]) # if your existing DB has duplicate records, refer to: # https://stackoverflow.com/questions/35707496/remove-duplicate-in-mongodb/35711737
def startup(self): index_date = IndexModel([('date', DESCENDING)]) index_text = IndexModel([('content', TEXT)]) await self._db.create_indexes([index_date, index_text])
def _ensure_indexes(self): def idx(*args, **kwargs): keys = [(field, pymongo.ASCENDING) for field in args] return pymongo.IndexModel(keys, **kwargs) self._jobs.create_indexes([idx('job_id', unique=True), idx('job_id', 'version'), idx('tag', 'status', 'run_at', 'worker_heartbeat')])