我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用pymongo.version_tuple()。
def test_uri_prioritised_over_host_and_port(self): self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/database_name' self.app.config['MONGO_HOST'] = 'some_other_host' self.app.config['MONGO_PORT'] = 27018 self.app.config['MONGO_DBNAME'] = 'not_the_correct_db_name' with warnings.catch_warnings(): # URI connections without a username and password # work, but warn that auth should be supplied warnings.simplefilter('ignore') mongo = flask.ext.pymongo.PyMongo(self.app) if pymongo.version_tuple[0] > 2: time.sleep(0.2) assert ('localhost', 27017) == mongo.cx.address else: assert mongo.cx.host == 'localhost' assert mongo.cx.port == 27017 assert mongo.db.name == 'database_name'
def test_create_with_document_class(self): """ This test doesn't use self.mongo, because it has to change config It uses second mongo connection, using a CUSTOM prefix to avoid duplicate config_prefix exception. To make use of tearDown and thus DB deletion even in case of failure, it uses same DBNAME. """ # copying standard DBNAME, so this DB gets also deleted by tearDown self.app.config['CUSTOM_DBNAME'] = self.app.config['MONGO_DBNAME'] self.app.config['CUSTOM_DOCUMENT_CLASS'] = CustomDict # not using self.mongo, because we want to use updated config # also using CUSTOM, to avoid duplicate config_prefix exception mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM') assert mongo.db.things.find_one() is None # write document and retrieve, to check if type is really CustomDict if pymongo.version_tuple[0] > 2: # Write Concern is set to w=1 by default in pymongo > 3.0 mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'}) else: mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1) assert type(mongo.db.things.find_one()) == CustomDict
def _get_connection(self, host, port): ckey = "{}:{}".format(host, port) conn = self._conns.get(ckey, None) if conn is None: mps = ('max_pool_size' if pymongo.version_tuple[0] == 2 else 'maxPoolSize') conn = pymongo.MongoClient(host, port, **{mps: self.MAX_POOL}) self._conns[ckey] = conn return conn
def mongo_database(): connection_params = {'host': os.environ.get('MONGODB_HOST', 'localhost'), 'port': int(os.environ.get('MONGODB_PORT', 27017))} if pymongo.version_tuple < (3, 0): connection_params['safe'] = True mongo = pymongo.MongoClient(**connection_params) db = mongo.elasticapm_test yield db mongo.drop_database('elasticapm_test') mongo.close()
def unique(self, drop_dups=False): ''' Make this index unique, optionally dropping duplicate entries. :param drop_dups: Drop duplicate objects while creating the unique \ index? Default to ``False`` ''' self.__unique = True if drop_dups and pymongo.version_tuple >= (2, 7, 5): # pragma: nocover raise BadIndexException("drop_dups is not supported on pymongo >= 2.7.5") self.__drop_dups = drop_dups return self
def test_find_one_or_404(self): self.mongo.db.things.remove() try: self.mongo.db.things.find_one_or_404({'_id': 'thing'}) except HTTPException as notfound: assert notfound.code == 404, "raised wrong exception" if pymongo.version_tuple[0] > 2: self.mongo.db.things.insert_one({'_id': 'thing', 'val': 'foo'}) else: self.mongo.db.things.insert({'_id': 'thing', 'val': 'foo'}, w=1) # now it should not raise thing = self.mongo.db.things.find_one_or_404({'_id': 'thing'}) assert thing['val'] == 'foo', 'got wrong thing' # also test with dotted-named collections self.mongo.db.things.morethings.remove() try: self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'}) except HTTPException as notfound: assert notfound.code == 404, "raised wrong exception" if pymongo.version_tuple[0] > 2: # Write Concern is set to w=1 by default in pymongo > 3.0 self.mongo.db.things.morethings.insert_one({'_id': 'thing', 'val': 'foo'}) else: self.mongo.db.things.morethings.insert({'_id': 'thing', 'val': 'foo'}, w=1) # now it should not raise thing = self.mongo.db.things.morethings.find_one_or_404({'_id': 'thing'}) assert thing['val'] == 'foo', 'got wrong thing'
def test_default_config_prefix(self): self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db' self.app.config['MONGO_HOST'] = 'localhost' self.app.config['MONGO_PORT'] = 27017 mongo = flask.ext.pymongo.PyMongo(self.app) assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name if pymongo.version_tuple[0] > 2: time.sleep(0.2) assert ('localhost', 27017) == mongo.cx.address else: assert mongo.cx.host == 'localhost' assert mongo.cx.port == 27017
def test_custom_config_prefix(self): self.app.config['CUSTOM_DBNAME'] = 'flask_pymongo_test_db' self.app.config['CUSTOM_HOST'] = 'localhost' self.app.config['CUSTOM_PORT'] = 27017 mongo = flask.ext.pymongo.PyMongo(self.app, 'CUSTOM') assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name if pymongo.version_tuple[0] > 2: time.sleep(0.2) assert ('localhost', 27017) == mongo.cx.address else: assert mongo.cx.host == 'localhost' assert mongo.cx.port == 27017
def test_converts_str_to_int(self): self.app.config['MONGO_DBNAME'] = 'flask_pymongo_test_db' self.app.config['MONGO_HOST'] = 'localhost' self.app.config['MONGO_PORT'] = '27017' mongo = flask.ext.pymongo.PyMongo(self.app) assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name if pymongo.version_tuple[0] > 2: time.sleep(0.2) assert ('localhost', 27017) == mongo.cx.address else: assert mongo.cx.host == 'localhost' assert mongo.cx.port == 27017
def test_config_with_uri(self): self.app.config['MONGO_URI'] = 'mongodb://localhost:27017/flask_pymongo_test_db' with warnings.catch_warnings(): # URI connections without a username and password # work, but warn that auth should be supplied warnings.simplefilter('ignore') mongo = flask.ext.pymongo.PyMongo(self.app) assert mongo.db.name == 'flask_pymongo_test_db', 'wrong dbname: %s' % mongo.db.name if pymongo.version_tuple[0] > 2: time.sleep(0.2) assert ('localhost', 27017) == mongo.cx.address else: assert mongo.cx.host == 'localhost' assert mongo.cx.port == 27017
def test_config_with_document_class(self): self.app.config['MONGO_DOCUMENT_CLASS'] = CustomDict mongo = flask.ext.pymongo.PyMongo(self.app) if pymongo.version_tuple[0] > 2: assert mongo.cx.codec_options.document_class == CustomDict else: assert mongo.cx.document_class == CustomDict
def test_config_without_document_class(self): mongo = flask.ext.pymongo.PyMongo(self.app) if pymongo.version_tuple[0] > 2: assert mongo.cx.codec_options.document_class == dict else: assert mongo.cx.document_class == dict
def test_host_with_port_does_not_get_overridden_by_separate_port_config_value(self): self.app.config['MONGO_HOST'] = 'localhost:27017' self.app.config['MONGO_PORT'] = 27018 with warnings.catch_warnings(): # URI connections without a username and password # work, but warn that auth should be supplied warnings.simplefilter('ignore') mongo = flask.ext.pymongo.PyMongo(self.app) if pymongo.version_tuple[0] > 2: time.sleep(0.2) assert ('localhost', 27017) == mongo.cx.address else: assert mongo.cx.host == 'localhost' assert mongo.cx.port == 27017
def connectMongoEngine(pmcol, conn_uri=None): if pymongo.version_tuple[0] == 2: #really? REALLY? #host = pmcol.database.connection.HOST #port = pmcol.database.connection.PORT host = pmcol.database.connection.host port = pmcol.database.connection.port else: host = pmcol.database.client.HOST port = pmcol.database.client.PORT # Can just use the connection uri, which has credentials if conn_uri: return meng.connect(pmcol.database.name, host=conn_uri) return meng.connect(pmcol.database.name, host=host, port=port)
def do_chunks(init, proc, src_col, dest_col, query, key, sort, verbose, sleep=60): while housekeep.objects(state = 'done').count() < housekeep.objects.count(): tnow = datetime.datetime.utcnow() raw = housekeep._collection.find_and_modify( {'state': 'open'}, { '$set': { 'state': 'working', 'tstart': tnow, 'procname': procname(), } } ) # if raw==None, someone scooped us if raw != None: raw_id = raw['_id'] #reload as mongoengine object -- _id is .start (because set as primary_key) hko = housekeep.objects(start = raw_id)[0] # Record git commit for sanity # hko.git = git.Git('.').rev_parse('HEAD') # hko.save() # get data pointed to by housekeep qq = {'$and': [query, {key: {'$gte': hko.start}}, {key: {'$lte': hko.end}}]} # Make cursor not timeout, using version-appropriate paramater if pymongo.version_tuple[0] == 2: cursor = src_col.find(qq, timeout=False) elif pymongo.version_tuple[0] == 3: cursor = src_col.find(qq, no_cursor_timeout=True) else: raise Exception("Unknown pymongo version") # Set the sort parameters on the cursor if sort[0] == "-": cursor = cursor.sort(sort[1:], pymongo.DESCENDING) else: cursor = cursor.sort(sort, pymongo.ASCENDING) if verbose & 2: print "mongo_process: %d elements in chunk %s-%s" % (cursor.count(), hko.start, hko.end) sys.stdout.flush() # This is where processing happens hko.good =_process(init, proc, cursor, dest_col, verbose, hkstart=raw_id) # Check if another job finished it while this one was plugging away hko_later = housekeep.objects(start = raw_id).only('state')[0] if hko.good == -1: # Early exit signal print "Chunk at %s lost to another process; not updating" % raw_id sys.stdout.flush() elif hko_later.state == 'done': print "Chunk at %s had already finished; not updating" % raw_id sys.stdout.flush() else: hko.state = 'done' hko.procname = 'none' hko.time = datetime.datetime.utcnow() hko.save() else: # Not all done, but none were open for processing; thus, wait to # see if one re-opens print 'Standing by for reopening of "working" job...' sys.stdout.flush() time.sleep(sleep)