我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用settings.MONGO_PORT。
def test_expire_after(self): # create mongo client client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT) # init MongoItemPool with Mongo client pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest") item = CacheItem() # set expire item.set("mykey", "myval") item.expire_after(1) pool.save(item) ret_item = pool.get_item("mykey") assert ret_item.is_hit() == True time.sleep(2) ret_item = pool.get_item("mykey") assert ret_item.is_hit() == False
def test_Decorator_expire(self): client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT) pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest") # decorate myAdder function in common way @cached(CacheItemPool=pool, expire_after=0.5) def myAdder_expire(a, b): import time print 'Add %d + %d need 3 seconds!' % (a, b) time.sleep(3) return a + b assert myAdder_expire(5, 6) == 11 # wait for 3 seconds time.sleep(1) cur = time.time() assert myAdder_expire(5, 6) == 11 # need wait another 3 seconds assert time.time()-cur > 1
def __init__(self, collection_name: str) -> None: """Constructor for mongodb connection Args: collection_name: name of collection to operate on """ self._host = settings.MONGO_HOST self._port = settings.MONGO_PORT self._db_name = settings.MONGO_DB try: self.db = pymongo.MongoClient(self._host, self._port)[self._db_name] self.db = self.db.get_collection(collection_name) except ConnectionError: print('Error connecting to database!')
def test_simpleKeyVal(self): # create mongo client client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT) # init MongoItemPool with Mongo client pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest") item = CacheItem() item.set("mykey", "myval") pool.save(item) ret_item = pool.get_item("mykey") assert ret_item.get() == "myval" assert ret_item.get_key() == "mykey" assert ret_item.is_hit() == True
def test_Decorator(self): client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT) pool = MongoItemPool(client, DB="pycache", COLLECTION="pytest") # decorate myAdder function in common way @cached(CacheItemPool=pool) def myAdder(a, b): import time print 'Add %d + %d need 3 seconds!' % (a, b) time.sleep(3) return a + b assert myAdder(5, 6) == 11 # wait for 3 seconds assert myAdder(5, 6) == 11 # no need wait
def connect_mongo(): client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT) return client[settings.MONGO_DATABASE]
def test_keywords(): client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT) db = client[settings.MONGO_DATABASE] cursor = db.arch.find().limit(10) for post in cursor: id = post['_id'] title = post['title'] tf_idf_dict = util.file2dict( settings.TFIDF_FILE_PATH, str(id) + '.txt') util.print_message(title.encode('gbk')) for item in sorted(tf_idf_dict.items(), key=operator.itemgetter(1), reverse=True)[:10]: util.print_message('{0}:{1}'.format(item[0].decode( 'utf8').encode('gbk'), item[1]))
def update_keywords(): util.print_message('Start updating keywords...', debug=True) client = MongoClient(settings.MONGO_HOST, settings.MONGO_PORT) db = client[settings.MONGO_DATABASE] cursor = db.arch.find() for post in cursor: id = post['_id'] tf_idf_dict = util.file2dict(settings.TFIDF_FILE_PATH, str(id) + '.txt') tags = [] for item in sorted(tf_idf_dict.items(), key=operator.itemgetter(1), reverse=True)[:20]: tags.append(item[0]) util.print_message(' '.join(tags)) db.arch.update_one({'_id': id}, {'$set': {'tags': tags}})