我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用pymongo.database()。
def get_default_database(self): """Get the database named in the MongoDB connection URI. .. doctest:: >>> uri = 'mongodb://localhost/my_database' >>> client = MotorClient(uri) >>> db = client.get_default_database() >>> assert db.name == 'my_database' Useful in scripts where you want to choose which database to use based only on the URI in a configuration file. """ attr_name = mangle_delegate_name( self.__class__, '__default_database_name') default_db_name = getattr(self.delegate, attr_name) if default_db_name is None: raise pymongo.errors.ConfigurationError( 'No default database defined') return self[default_db_name]
def add_son_manipulator(self, manipulator): """Add a new son manipulator to this database. Newly added manipulators will be applied before existing ones. :Parameters: - `manipulator`: the manipulator to add """ # We override add_son_manipulator to unwrap the AutoReference's # database attribute. if isinstance(manipulator, pymongo.son_manipulator.AutoReference): db = manipulator.database if isinstance(db, MotorDatabase): manipulator.database = db.delegate self.delegate.add_son_manipulator(manipulator)
def __init__(self, database, collection="fs"): """ An instance of GridFS on top of a single Database. :Parameters: - `database`: a :class:`MotorDatabase` - `collection` (optional): A string, name of root collection to use, such as "fs" or "my_files" .. mongodoc:: gridfs .. versionchanged:: 0.2 ``open`` method removed; no longer needed. """ if not isinstance(database, MotorDatabase): raise TypeError("First argument to MotorGridFS must be " "MotorDatabase, not %r" % database) self.io_loop = database.get_io_loop() self.collection = database[collection] self.delegate = self.__delegate_class__( database.delegate, collection, _connect=False)
def finish_startup(self): self.log.info('Using MongoDB database %r', self.config['MONGO_DBNAME']) self._config_celery() api.setup_app(self) web.setup_app(self) authentication.setup_app(self) for ext in self.pillar_extensions.values(): self.log.info('Setting up extension %s', ext.name) ext.setup_app(self) self._config_jinja_env() self._config_static_dirs() self._config_user_roles() self._config_user_caps() # Only enable this when debugging. # self._list_routes()
def __init__(self, database, name, _delegate=None): db_class = create_class_with_framework( SanicMongoAgnosticDatabase, self._framework, self.__module__) if not isinstance(database, db_class): raise TypeError("First argument to SanicMongoCollection must be " "SanicMongoDatabase, not %r" % database) delegate = _delegate or Collection(database.delegate, name) super(AgnosticCollection, self).__init__(delegate) self.database = database
def __getitem__(self, name): collection_class = create_class_with_framework( SanicMongoAgnosticCollection, self._framework, self.__module__) return collection_class(self.database, self.name + '.' + name)
def __getattr__(self, name): if name.startswith('_'): # samething. try get from delegate first try: ret = getattr(self.delegate, name) except AttributeError: raise AttributeError( "%s has no attribute %r. To access the %s" " collection, use database['%s']." % ( self.__class__.__name__, name, name, name)) return ret return self[name]
def __getattr__(self, name): if name.startswith('_'): # the same. Try get from delegate. try: ret = getattr(self.delegate, name) except AttributeError: raise AttributeError( "%s has no attribute %r. To access the %s" " database, use client['%s']." % ( self.__class__.__name__, name, name, name)) return ret return self[name]
def __init__(self, database, name): if not isinstance(database, MotorDatabase): raise TypeError("First argument to MotorCollection must be " "MotorDatabase, not %r" % database) delegate = Collection(database.delegate, name) super(MotorCollection, self).__init__(delegate) self.database = database
def __getattr__(self, name): # Dotted collection name, like "foo.bar". return MotorCollection(self.database, self.name + '.' + name)
def wrap(self, obj): if obj.__class__ is Collection: # Replace pymongo.collection.Collection with MotorCollection return self.database[obj.name] elif obj.__class__ is Cursor: return MotorCursor(obj, self) elif obj.__class__ is CommandCursor: return MotorCommandCursor(obj, self) else: return obj
def get_io_loop(self): return self.database.get_io_loop()
def __init__(self, config, api): """ Constructor. parameters ---------- * config: DBConfig object; specifies database configs. * api: PyApi object. """ self._api = api # Set Datayes PyApi. if config.body: try: self._config = config.body self._client = config.body['client'] self._dbs = config.body['dbs'] self._dbNames = config.body['dbNames'] self._connected = True except KeyError: msg = '[MONGOD]: Unable to configure database; ' + \ 'config file is incomplete.' raise VNPAST_ConfigError(msg) except Exception,e: msg = '[MONGOD]: Unable to configure database; ' + str(e) raise VNPAST_ConfigError(msg) if self._connected: #self._get_coll_names() #self._ensure_index() pass #---------------------------------------------------------------------- # Get collection names methods.
def _ensure_index(self): """ Ensure indices for all databases and collections. first access self._dbs config to get index column names; then get collection names from self._collNames and loop over all collections. """ if self._collNames and self._dbs: try: for dbName in self._dbs: # Iterate over database configurations. db = self._dbs[dbName] dbSelf = db['self'] index = db['index'] collNames = self._collNames[db['collNames']] # db['self'] is the pymongo.Database object. for name in collNames: coll = dbSelf[name] coll.ensure_index([(index, pymongo.DESCENDING)], unique=True) print '[MONGOD]: MongoDB index set.' return 1 except KeyError: msg = '[MONGOD]: Unable to set collection indices; ' + \ 'infomation in Config.body["dbs"] is incomplete.' raise VNPAST_DatabaseError(msg) except Exception, e: msg = '[MONGOD]: Unable to set collection indices; ' + str(e) raise VNPAST_DatabaseError(msg) #---------------------------------------------------------------------- # Download method.
def __update(self, key, target1, target2, sessionNum): """ Basic update method. Looks into the database specified by 'key', find the latest record in the collection of it. Then update the collections till last trading date. parameters ---------- * key: string; a database alias (refer to the database config) e.g., 'EQU_D1'. * target1: method; pointer to the function with which controller obtain all tickers in the database. Concretely, target1 are self._all#Tickers methods. * target2: method; pointer to the api overlord requesting functions i.e. self._api.get_###_mongod methods. * sessionNum: integer; the number of threads. """ try: # get databases and tickers db = self._dbs[key]['self'] index = self._dbs[key]['index'] allTickers = target1() coll = db[allTickers[0]] # find the latest timestamp in collection. latest = coll.find_one( sort=[(index, pymongo.DESCENDING)])[index] start = datetime.strftime( latest + timedelta(days=1),'%Y%m%d') end = datetime.strftime(datetime.now(), '%Y%m%d') # then download. target2(db, start, end, sessionNum) return db except Exception, e: msg = '[MONGOD]: Unable to update data; ' + str(e) raise VNPAST_DatabaseError(msg)
def fetch(self, dbName, ticker, start, end, output='list'): """ """ # check inputs' validity. if output not in ['df', 'list', 'json']: raise ValueError('[MONGOD]: Unsupported output type.') if dbName not in self._dbNames: raise ValueError('[MONGOD]: Unable to locate database name.') db = self._dbs[dbName] dbSelf = db['self'] dbIndex = db['index'] try: coll = db[ticker] if len(start)==8 and len(end)==8: # yyyymmdd, len()=8 start = datetime.strptime(start, '%Y%m%d') end = datetime.strptime(end, '%Y%m%d') elif len(start)==14 and len(end)==14: # yyyymmdd HH:MM, len()=14 start = datetime.strptime(start, '%Y%m%d %H:%M') end = datetime.strptime(end, '%Y%m%d %H:%M') else: pass docs = [] # find in MongoDB. for doc in coll.find(filter={dbIndex: {'$lte': end, '$gte': start}}, projection={'_id': False}): docs.append(doc) if output == 'list': return docs[::-1] except Exception, e: msg = '[MONGOD]: Error encountered when fetching data' + \ 'from MongoDB; '+ str(e) return -1
def __init__(self,database, collection): self.database = database self.collection = collection self.name = self.collection.name
def _to_dict_inner(self, d): result = {} for k, v in d.items(): if isinstance(v, DBRef): result[k] = self.collection.database.dereference(v).to_dict() elif isinstance(v, dict): result[k] = self._to_dict_inner(v) elif isinstance(v, ObjectId): result[k] = str(v) elif isinstance(v, datetime): result[k] = datetime_to_stamp(v) else: result[k] = v return result
def get_ref(self, key): if isinstance(self.get(key, None), DBRef): return self[key].dereference(self.collection.database)
def db(self, collection_name: str = None) \ -> typing.Union[pymongo.collection.Collection, pymongo.database.Database]: """Returns the MongoDB database, or the collection (if given)""" if collection_name: return self.data.driver.db[collection_name] return self.data.driver.db
def __init__(self, database, name, _delegate=None): db_class = create_class_with_framework( MongoMotorAgnosticDatabase, self._framework, self.__module__) if not isinstance(database, db_class): raise TypeError("First argument to MongoMotorCollection must be " "MongoMotorDatabase, not %r" % database) delegate = _delegate or Collection(database.delegate, name) super(AgnosticCollection, self).__init__(delegate) self.database = database
def __getitem__(self, name): collection_class = create_class_with_framework( MongoMotorAgnosticCollection, self._framework, self.__module__) return collection_class(self.database, self.name + '.' + name)
def setup_db_indices(self): """Adds missing database indices. This does NOT drop and recreate existing indices, nor does it reconfigure existing indices. If you want that, drop them manually first. """ self.log.debug('Adding any missing database indices.') import pymongo db = self.data.driver.db coll = db['tokens'] coll.create_index([('user', pymongo.ASCENDING)]) coll.create_index([('token', pymongo.ASCENDING)]) coll.create_index([('token_hashed', pymongo.ASCENDING)]) coll = db['notifications'] coll.create_index([('user', pymongo.ASCENDING)]) coll = db['activities-subscriptions'] coll.create_index([('context_object', pymongo.ASCENDING)]) coll = db['nodes'] # This index is used for queries on project, and for queries on # the combination (project, node type). coll.create_index([('project', pymongo.ASCENDING), ('node_type', pymongo.ASCENDING)]) coll.create_index([('parent', pymongo.ASCENDING)]) coll.create_index([('short_code', pymongo.ASCENDING)], sparse=True, unique=True) # Used for latest assets & comments coll.create_index([('properties.status', pymongo.ASCENDING), ('node_type', pymongo.ASCENDING), ('_created', pymongo.DESCENDING)]) coll = db['projects'] # This index is used for statistics, and for fetching public projects. coll.create_index([('is_private', pymongo.ASCENDING)]) coll.create_index([('category', pymongo.ASCENDING)])