Python bson 模块,ObjectId() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bson.ObjectId()

项目:Ushio    作者:Hanaasagi    | 项目源码 | 文件源码
def post(self):
        #
        # ???????
        # ????session
        #
        password = self.get_body_argument('password', '')
        if password:
            user = yield self.db.user.find_one({
                'username': self.current_user['username']
            })
            _ = md5(password + self.settings['salt'])
            if user['password'] == _.hexdigest():
                if self.get_cookie('TORNADOSESSION'):
                    self.clear_cookie('TORNADOSESSION')
                self.db.user.remove({
                    '_id': ObjectId(self.current_user['_id'])
                })
                self.session.delete('user_session')
                self.redirect('/')
        self.custom_error('????????')
项目:Ushio    作者:Hanaasagi    | 项目源码 | 文件源码
def get(self, topic_id):
        '''
            ?????
        '''
        topic = dict()
        tid = ObjectId(topic_id)
        topic = yield self.db.topic.find_one({
            '_id': tid
        })
        if topic is None:
            self.custom_error()
        isfavorite = False
        current_user = self.current_user

        topic['content'] = markdown.markdown(topic['content'])

        isfavorite = False
        if current_user:
            user = yield self.db.user.find_one({
                '_id': ObjectId(current_user['_id'])
            })
            if topic['_id'] in user['favorite']:
                isfavorite = True
        self.render('topic/template/topic-detail.html',
                    topic=topic, isfavorite=isfavorite)
项目:Ushio    作者:Hanaasagi    | 项目源码 | 文件源码
def get(self):
        uid = self.current_user['_id']
        limit = 20
        page = int(self.get_query_argument('page', '1'))
        fav_list = yield self.db.user.find_one({
            '_id': ObjectId(uid)
        }, {
            'favorite': 1
        })

        cursor = self.db.topic.find({
            '_id': {
                '$in': fav_list['favorite']
            }
        })
        total = yield cursor.count()
        cursor.limit(
            limit).skip((page - 1) * limit)
        topics = yield cursor.to_list(length=limit)
        self.render('topic/template/topic-favorite.html',
                    topics=topics, page=page, limit=limit, total=total)
项目:Ushio    作者:Hanaasagi    | 项目源码 | 文件源码
def post(self, tid):
        topic = {
            'title': self.get_body_argument('title', ''),
            'content': self.get_body_argument('content', ''),
            'price': self.get_body_argument('price', 0)
        }
        model = TopicModel()
        if model(topic):
            try:
                tid = ObjectId(tid)
            except:
                self.cache['topic'] = topic
                self.custom_error('????????????')
            yield self.db.topic.update({
                '_id': tid
            }, {
                '$set': topic
            })
            self.redirect('/topics/{}'.format(tid))
        else:
            self.custom_error()
项目:Ushio    作者:Hanaasagi    | 项目源码 | 文件源码
def user_action(self, *args, **kwargs):
        uid = self.get_body_argument('uid')
        user = {
            'email': self.get_body_argument('email'),
            'website': self.get_body_argument('website'),
            'qq': self.get_body_argument('qq'),
            'address': self.get_body_argument('address')
        }
        # model ??
        #
        #
        password = self.get_body_argument('password', '')
        # password ?????
        if password:
            user['password'] = md5(password, self.settings['salt'])
        user = yield self.db.user.find_and_modify({
            '_id': ObjectId(uid)
        }, {
            '$set': user
        })
        self.redirect('/manage/userdetail/{}'.format(uid))
项目:Ushio    作者:Hanaasagi    | 项目源码 | 文件源码
def post(self):
        id_ = self.get_body_argument('id', None)
        if not id_:
            self.custom_error()
        rtn = yield self.db.tag.remove({
            '_id': ObjectId(id_)
        })
        if rtn:
            self.write('{"success":true}')
            self.finish()
        else:
            self.custom_error()


#
# ???????
#
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def handle_aggregation(self, callback):
        def handle(*arguments, **kw):
            if arguments[1]:
                raise RuntimeError('Aggregation failed due to: %s' % str(arguments[1]))

            results = []
            for item in arguments[0]['result']:
                #if '_id' in item and isinstance(item['_id'], ObjectId):
                    #results.append(self.get_instance(item))
                #else:
                self.fill_ids(item)
                results.append(edict(item))

            callback(results)

        return handle
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def test_id_can_be_anything(self, test_db):
        await test_db.test.delete_many({})
        auto_id = {'hello': 'world'}
        await test_db.test.insert_one(auto_id)
        assert isinstance(auto_id['_id'], ObjectId)

        numeric = {'_id': 240, 'hello': 'world'}
        await test_db.test.insert_one(numeric)
        assert numeric['_id'] == 240

        obj = {'_id': numeric, 'hello': 'world'}
        await test_db.test.insert_one(obj)
        assert obj['_id'] == numeric

        async with test_db.test.find() as cursor:
            async for x in cursor:
                assert x['hello'] == 'world'
                assert '_id' in x
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def test_find_one(self, test_db):

        _id = (await test_db.test.insert_one({'hello': 'world', 'foo': 'bar'})).inserted_id

        assert 'world' == (await test_db.test.find_one())['hello']
        assert await test_db.test.find_one(_id) == await test_db.test.find_one()
        assert await test_db.test.find_one(None) == await test_db.test.find_one()
        assert await test_db.test.find_one({}) == await test_db.test.find_one()
        assert await test_db.test.find_one({'hello': 'world'}) == await test_db.test.find_one()

        assert 'hello' in await test_db.test.find_one(projection=['hello'])
        assert 'hello' not in await test_db.test.find_one(projection=['foo'])
        assert ['_id'] == list(await test_db.test.find_one(projection=[]))

        assert await test_db.test.find_one({'hello': 'foo'}) is None
        assert await test_db.test.find_one(ObjectId()) is None
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def test_deref(self, test_db):
        with pytest.raises(TypeError):
            await test_db.dereference(5)
        with pytest.raises(TypeError):
            await test_db.dereference('hello')
        with pytest.raises(TypeError):
            await test_db.dereference(None)

        assert await test_db.dereference(DBRef("test", ObjectId())) is None
        obj = {'x': True}
        key = (await test_db.test.insert_one(obj)).inserted_id
        assert await test_db.dereference(DBRef('test', key)) == obj
        assert await test_db.dereference(DBRef('test', key, 'aiomongo_test')) == obj

        with pytest.raises(ValueError):
            await test_db.dereference(DBRef('test', key, 'foo'))

        assert await test_db.dereference(DBRef('test', 4)) is None
        obj = {'_id': 4}
        await test_db.test.insert_one(obj)
        assert await test_db.dereference(DBRef('test', 4)) == obj
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def test_insert_find_one(self, test_db):
        a_doc = SON({'hello': 'world'})
        a_key = (await test_db.test.insert_one(a_doc)).inserted_id
        assert isinstance(a_doc['_id'], ObjectId)
        assert a_doc['_id'] == a_key
        assert a_doc == await test_db.test.find_one({'_id': a_doc['_id']})
        assert a_doc == await test_db.test.find_one(a_key)
        assert await test_db.test.find_one(ObjectId()) is None
        assert a_doc == await test_db.test.find_one({'hello': 'world'})
        assert await test_db.test.find_one({'hello': 'test'}) is None

        b = await test_db.test.find_one()
        b['hello'] = 'mike'
        await test_db.test.replace_one({'_id': b['_id']}, b)

        assert a_doc != await test_db.test.find_one(a_key)
        assert b == await test_db.test.find_one(a_key)
        assert b == await test_db.test.find_one()

        count = 0

        async with test_db.test.find() as cursor:
            async for _ in cursor:
                count += 1
        assert count == 1
项目:CaptsLog    作者:jaehoonhwang    | 项目源码 | 文件源码
def search_entries_by_id(self, id):
        """Search For Entries created on the specified Date in the Entries_Table

        Args:
            id(string): the objectID you are searching for

        Return:
            result(collection): the search result

        """

        entries_table = self.db["Entries_Table"]
        try:
            return entries_table.find(
                {"_id": ObjectId(id)})
        except errors.ServerSelectionTimeoutError:
            print('ERROR : No connection could be made because'
                  ' the target machine actively refused it')
            return True
        return False
项目:CaptsLog    作者:jaehoonhwang    | 项目源码 | 文件源码
def update_entries(self, _id, vals):
        """Update entries in the Entries_Table

        Args:
            _id(ObjectId):  ObjectID of the entry you want to change
            vals(collection): New values

        Return:
            result(bool):True if the update was successful. False if it fails
        """

        entries_table = self.db["Entries_Table"]
        try:
            vals["Last_Modified"] = datetime.now()
            if not entries_table.find_one({"_id": ObjectId(_id)}):
                return False
            entries_table.update_one({"_id": ObjectId(_id)},
                                     {"$set": vals})
            return True
        except errors.ServerSelectionTimeoutError:
            print('ERROR : No connection could be made because'
                  ' the target machine actively refused it')
            return False
项目:CaptsLog    作者:jaehoonhwang    | 项目源码 | 文件源码
def delete_entries(self, _id):
        """Delete entries in the Entries_Table

        Args:
            _id(str):  Object ID of the entry you want to change

        Return:
            result(bool):true if the delete was successful. false if it fails
        """

        entries_table = self.db["Entries_Table"]
        try:
            if not entries_table.find_one({"_id": ObjectId(_id)}):
                # print "The specified entry does not Exist"
                return False
            entries_table.delete_one({"_id": ObjectId(_id)})
            return True
        except errors.ServerSelectionTimeoutError:
            print('ERROR : No connection could be made because'
                  ' the target machine actively refused it')
            return False
项目:sacredboard    作者:chovanecm    | 项目源码 | 文件源码
def get_run(self, run_id):
        """
        Get a single run from the database.

        :param run_id: The ID of the run.
        :return: The whole object from the database.
        """
        try:
            cursor = getattr(self._db, self._collection_name) \
                .find({"_id": int(run_id)})
        except ValueError:
            # Probably not a number.
            cursor = getattr(self._db, self._collection_name) \
                .find({"_id": bson.ObjectId(run_id)})
        run = None
        for c in cursor:
            run = c
        return run
项目:sacredboard    作者:chovanecm    | 项目源码 | 文件源码
def test_find_record(mongo_client):
    generic_dao = GenericDAO(mongo_client, "testdb")
    r = generic_dao.find_record("runs", {"_id": "NON_EXISTING_ID"})
    assert r is None

    r = generic_dao.find_record("runs",
                                {"_id": bson.ObjectId(
                                    "58163443b1758523257c69ca")})
    assert r is not None
    assert r["config"] is not None
    assert r["config"]["seed"] == 185616783

    r = generic_dao.find_record("runs", {"config.learning_rate": 0.0001})
    assert r is not None
    assert r["config"] is not None
    assert r["config"]["seed"] == 144363069
项目:pymongo-schema    作者:pajachiet    | 项目源码 | 文件源码
def test04_write_json_with_mongo_objects():
    schema = {
        'db': {'coll': {'object': {'_id': {'type': 'oid', 'anonymized': ObjectId()},
                                   'date': {'type': 'date', 'anonymized': datetime(2015, 1, 1)}}}}
    }

    output = os.path.join(TEST_DIR, 'output_data_dict.json')
    output_maker = JsonOutput({})
    output_maker.data = schema
    with open(output, 'w') as out_fd:
        output_maker.write_data(out_fd)
    with open(output, 'r') as out_fd:
        # custom json_options - DEFAULT_JSON_OPTIONS sets tzinfo when loading datetime (!= schema)
        json_options = json_util.JSONOptions(tz_aware=False)
        # object_hook allows to interpret $oid, $date, etc from json and convert them to objects
        object_hook = partial(json_util.object_hook, json_options=json_options)
        assert json.load(out_fd, object_hook=object_hook) == schema
    os.remove(output)
项目:OmMongo    作者:bapakode    | 项目源码 | 文件源码
def test_unwrap():
    class A(Document):
        x = IntField()
    s = get_session()

    a = A(x=5)
    s.save(a)

    aref = {'$id':a.mongo_id, '$ref':'A'}
    dbaref = DBRef(db='unit-testing', collection='A', id=a.mongo_id)

    ret = RefField(DocumentField(A)).unwrap(dbaref)
    assert isinstance(ret, DBRef), ret

    ret = SRefField(A).unwrap(a.mongo_id)
    assert isinstance(ret, ObjectId), ret
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_address_document(self, object_id=None, name=None):
        """
        Delete an address_document.

        :param object_id: ObjectId
        :param name: string
        :return: True if the address_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.address_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif name is not None:
            result = self.address_documents_collection.delete_one({
                'name': name
            })
        else:
            return False

        return result.deleted_count == 1
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_address_documents(self, object_ids=None, names=None):
        """
        Delete multiple address_documents.

        :param object_ids: [ObjectId]
        :param names: [string]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.address_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif names is not None:
            result = self.address_documents_collection.delete_many({
                'name': {'$in': names}
            })
        else:
            return 0

        return result.deleted_count
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_bus_line_documents(self, object_ids=None, bus_line_ids=None):
        """
        Delete multiple bus_line_document.

        :param object_ids: [ObjectId]
        :param bus_line_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.bus_line_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif bus_line_ids is not None:
            result = self.bus_line_documents_collection.delete_many({
                'bus_line_id': {'$in': bus_line_ids}
            })
        else:
            return 0

        return result.deleted_count
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_bus_stop_document(self, object_id=None, osm_id=None):
        """
        Delete a bus_stop_document.

        :param object_id: ObjectId
        :param osm_id: int
        :return: True if the bus_stop_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.bus_stop_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif osm_id is not None:
            result = self.bus_stop_documents_collection.delete_one({
                'osm_id': osm_id
            })
        else:
            return False

        return result.deleted_count == 1
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_bus_stop_documents(self, object_ids=None, osm_ids=None):
        """
        Delete multiple bus_stop_documents.

        :param object_ids: [ObjectId]
        :param osm_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.bus_stop_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif osm_ids is not None:
            result = self.bus_stop_documents_collection.delete_many({
                'osm_id': {'$in': osm_ids}
            })
        else:
            return 0

        return result.deleted_count
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None):
        """
        Delete multiple bus_vehicle_documents.

        :param object_ids: [ObjectId]
        :param bus_vehicle_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.bus_vehicle_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif bus_vehicle_ids is not None:
            result = self.bus_vehicle_documents_collection.delete_many({
                'bus_vehicle_id': {'$in': bus_vehicle_ids}
            })
        else:
            return 0

        return result.deleted_count
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_edge_document(self, object_id=None, starting_node_osm_id=None, ending_node_osm_id=None):
        """
        Delete an edge_document.

        :param object_id: ObjectId
        :param starting_node_osm_id: int
        :param ending_node_osm_id: int
        :return: True if the document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.edge_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif starting_node_osm_id is not None and ending_node_osm_id is not None:
            result = self.edge_documents_collection.delete_one({
                'starting_node.osm_id': starting_node_osm_id,
                'ending_node.osm_id': ending_node_osm_id
            })
        else:
            return False

        return result.deleted_count == 1
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_node_document(self, object_id=None, osm_id=None):
        """
        Delete a node_document.

        :param object_id: ObjectId
        :param osm_id: int
        :return: True if node_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.node_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif osm_id is not None:
            result = self.node_documents_collection.delete_one({
                'osm_id': osm_id
            })
        else:
            return False

        return result.deleted_count == 1
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_node_documents(self, object_ids=None, osm_ids=None):
        """
        Delete multiple node_documents.

        :param object_ids: [ObjectId]
        :param osm_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.node_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif osm_ids is not None:
            result = self.node_documents_collection.delete_many({
                'osm_id': {'$in': osm_ids}
            })
        else:
            return 0

        return result.deleted_count
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_point_documents(self, object_ids=None, osm_ids=None):
        """
        Delete multiple point_document.

        :param object_ids: [ObjectId]
        :param osm_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.point_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif osm_ids is not None:
            result = self.point_documents_collection.delete_many({
                'osm_id': {'$in': osm_ids}
            })
        else:
            return 0

        return result.deleted_count
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_timetable_document(self, object_id=None, timetable_id=None):
        """
        Delete a timetable_document.

        :param object_id: ObjectId
        :param timetable_id: int
        :return: True if the document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.timetable_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif timetable_id is not None:
            result = self.timetable_documents_collection.delete_one({
                'timetable_id': timetable_id
            })
        else:
            return False

        return result.deleted_count == 1
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_traffic_event_document(self, object_id=None, event_id=None):
        """
        Delete a traffic_event_document.

        :param object_id: ObjectId
        :param event_id: string
        :return: True if the traffic_event_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.traffic_event_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif event_id is not None:
            result = self.traffic_event_documents_collection.delete_one({
                'event_id': event_id
            })
        else:
            return False

        return result.deleted_count == 1
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_traffic_event_documents(self, object_ids=None, event_ids=None):
        """
        Delete multiple traffic_event_documents.

        :param object_ids: [ObjectId]
        :param event_ids: [string]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.traffic_event_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif event_ids is not None:
            result = self.traffic_event_documents_collection.delete_many({
                'event_id': {'$in': event_ids}
            })
        else:
            return 0

        return result.deleted_count
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_way_document(self, object_id=None, osm_id=None):
        """
        Delete a way_document.

        :param object_id: ObjectId
        :param osm_id: int
        :return: True if the way_document was successfully deleted, otherwise False.
        """
        if object_id is not None:
            result = self.way_documents_collection.delete_one({
                '_id': ObjectId(object_id)
            })
        elif osm_id is not None:
            result = self.way_documents_collection.delete_one({
                'osm_id': osm_id
            })
        else:
            return False

        return result.deleted_count == 1
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def delete_way_documents(self, object_ids=None, osm_ids=None):
        """
        Delete multiple way_documents.

        :param object_ids: [ObjectId]
        :param osm_ids: [int]
        :return: The number of deleted documents.
        """
        if object_ids is not None:
            processed_object_ids = [ObjectId(object_id) for object_id in object_ids]
            result = self.way_documents_collection.delete_many({
                '_id': {'$in': processed_object_ids}
            })
        elif osm_ids is not None:
            result = self.way_documents_collection.delete_many({
                'osm_id': {'$in': osm_ids}
            })
        else:
            return 0

        return result.deleted_count
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def find_bus_vehicle_document(self, object_id=None, bus_vehicle_id=None):
        """
        Retrieve a bus_vehicle_document.

        :param object_id: ObjectId
        :param bus_vehicle_id: int
        :return: bus_vehicle_document
        """
        if object_id is not None:
            bus_vehicle_document = self.bus_vehicle_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif bus_vehicle_id is not None:
            bus_vehicle_document = self.bus_vehicle_documents_collection.find_one({
                'bus_vehicle_id': bus_vehicle_id
            })
        else:
            return None

        return bus_vehicle_document
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def find_edge_document(self, object_id=None, starting_node_osm_id=None, ending_node_osm_id=None):
        """
        Retrieve an edge_document.

        :param object_id: ObjectId
        :param starting_node_osm_id: int
        :param ending_node_osm_id: int
        :return: edge_document
        """
        if object_id is not None:
            edge_document = self.edge_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif starting_node_osm_id is not None and ending_node_osm_id is not None:
            edge_document = self.edge_documents_collection.find_one({
                'starting_node.osm_id': starting_node_osm_id,
                'ending_node.oms_id': ending_node_osm_id}
            )
        else:
            return None

        return edge_document
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def find_timetable_document(self, object_id=None, timetable_id=None):
        """
        Retrieve a timetable_document.

        :param object_id: ObjectId
        :param timetable_id: int
        :return: timetable_document
        """
        if object_id is not None:
            timetable_document = self.timetable_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif timetable_id is not None:
            timetable_document = self.timetable_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        else:
            return None

        return timetable_document
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def find_traffic_event_document(self, object_id=None, event_id=None):
        """
        Retrieve a traffic_event_document.

        :param object_id: ObjectId
        :param event_id: string
        :return: traffic_event_document
        """
        if object_id is not None:
            traffic_event_document = self.traffic_event_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif event_id is not None:
            traffic_event_document = self.traffic_event_documents_collection.find_one({
                'event_id': event_id
            })
        else:
            return None

        return traffic_event_document
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def find_way_document(self, object_id=None, osm_id=None):
        """
        Retrieve a way_document.

        :param object_id: ObjectId
        :param osm_id: int
        :return: way_document
        """
        if object_id is not None:
            way_document = self.way_documents_collection.find_one({
                '_id': ObjectId(object_id)
            })
        elif osm_id is not None:
            way_document = self.way_documents_collection.find_one({
                'osm_id': osm_id
            })
        else:
            return None

        return way_document
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def insert_address_document(self, address_document=None, name=None, node_id=None, point=None):
        """
        Insert an address_document.

        :param address_document
        :param name: string
        :param node_id: int
        :param point: Point
        :return: new_object_id: ObjectId
        """
        if address_document is None:
            address_document = {
                'name': name,
                'node_id': node_id,
                'point': {
                    'longitude': point.longitude,
                    'latitude': point.latitude
                }
            }

        result = self.address_documents_collection.insert_one(address_document)
        new_object_id = result.inserted_id
        return new_object_id
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def insert_bus_stop_document(self, bus_stop_document=None, osm_id=None, name=None, point=None):
        """
        Insert a bus_stop_document.

        :param bus_stop_document
        :param osm_id: int
        :param name: string
        :param point: Point
        :return: new_object_id: ObjectId
        """
        if bus_stop_document is None:
            bus_stop_document = {
                'osm_id': osm_id,
                'name': name,
                'point': {
                    'longitude': point.longitude,
                    'latitude': point.latitude
                }
            }

        result = self.bus_stop_documents_collection.insert_one(bus_stop_document)
        new_object_id = result.inserted_id
        return new_object_id
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def insert_node_document(self, node_document=None, osm_id=None, tags=None, point=None):
        """
        Insert a node_document.

        :param node_document
        :param osm_id: int
        :param tags: dict
        :param point: Point
        :return: new_object_id: ObjectId
        """
        if node_document is None:
            node_document = {
                'osm_id': osm_id,
                'tags': tags,
                'point': {
                    'longitude': point.longitude,
                    'latitude': point.latitude
                }
            }

        result = self.node_documents_collection.insert_one(node_document)
        new_object_id = result.inserted_id
        return new_object_id
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def insert_point_document(self, point_document=None, osm_id=None, point=None):
        """
        Insert a point_document.

        :param point_document
        :param osm_id: int
        :param point: Point
        :return: new_object_id: ObjectId
        """
        if point_document is None:
            point_document = {
                'osm_id': osm_id,
                'point': {
                    'longitude': point.longitude,
                    'latitude': point.latitude
                }
            }

        result = self.point_documents_collection.insert_one(point_document)
        new_object_id = result.inserted_id
        return new_object_id
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def insert_timetable_document(self, timetable):
        """
        Insert a new timetable_document or update, if it already exists in the database.

        :param timetable: timetable_document
        :return: new_object_id: ObjectId
        """
        key = {
            '_id': ObjectId(timetable.get('_id'))
        }
        data = {
            '$set': {
                'bus_line_id': timetable.get('bus_line_id'),
                'timetable_entries': timetable.get('timetable_entries'),
                'travel_requests': timetable.get('travel_requests')
            }
        }
        result = self.timetable_documents_collection.update_one(key, data, upsert=True)
        new_object_id = result.upserted_id
        return new_object_id
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def insert_traffic_event_document(self, traffic_event_document):
        """
        Insert a new traffic_event_document or update, if it already exists in the database.

        :param traffic_event_document
        :return: new_object_id: ObjectId
        """
        key = {
            'event_id': traffic_event_document.get('event_id')
        }
        data = {
            '$set': {
                'event_type': traffic_event_document.get('event_type'),
                'event_level': traffic_event_document.get('event_level'),
                'point': traffic_event_document.get('point'),
                'datetime': traffic_event_document.get('datetime')
            }
        }
        result = self.traffic_event_documents_collection.update_one(key, data, upsert=True)
        new_object_id = result.upserted_id
        return new_object_id
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def insert_traffic_event_documents(self, traffic_event_documents):
        """
        Insert multiple new traffic_event_documents or update, if they already exist in the database.

        :param traffic_event_documents: [traffic_event_document]
        :return: new_object_ids: [ObjectId]
        """
        new_object_ids = []

        for traffic_event_document in traffic_event_documents:
            new_object_id = self.insert_traffic_event_document(
                traffic_event_document=traffic_event_document
            )
            new_object_ids.append(new_object_id)

        return new_object_ids
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def insert_way_document(self, way_document=None, osm_id=None, tags=None, references=None):
        """
        Insert a way_document.

        :param way_document
        :param osm_id: int
        :param tags: dict
        :param references: [osm_id]
        :return: new_object_id: ObjectId
        """
        if way_document is None:
            way_document = {
                'osm_id': osm_id,
                'tags': tags,
                'references': references
            }

        result = self.way_documents_collection.insert_one(way_document)
        new_object_id = result.inserted_id
        return new_object_id
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def print_address_document(self, object_id=None, name=None, node_id=None, longitude=None, latitude=None):
        """
        Print an address_document.

        :param object_id: ObjectId
        :param name: string
        :param node_id: int
        :param longitude: float
        :param latitude: float
        :return: None
        """
        address_document = self.find_address_document(
            object_id=object_id,
            name=name,
            node_id=node_id,
            longitude=longitude,
            latitude=latitude
        )
        print address_document
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def print_bus_stop_document(self, object_id=None, osm_id=None, name=None, longitude=None, latitude=None):
        """
        Retrieve a bus_stop_document.

        :param object_id: ObjectId
        :param osm_id: int
        :param name: string
        :param longitude: float
        :param latitude: float
        :return: None
        """
        bus_stop_document = self.find_bus_stop_document(
            object_id=object_id,
            osm_id=osm_id, name=name,
            longitude=longitude,
            latitude=latitude
        )
        print bus_stop_document
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def print_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None, counter=None):
        """
        Print multiple bus_vehicle_documents.

        :param object_ids: [ObjectId]
        :param bus_vehicle_ids: [int]
        :param counter: int
        :return: None
        """
        bus_vehicle_documents = self.find_bus_vehicle_documents(
            object_ids=object_ids,
            bus_vehicle_ids=bus_vehicle_ids
        )
        number_of_bus_vehicle_documents = len(bus_vehicle_documents)

        if counter is None or number_of_bus_vehicle_documents <= counter:
            for bus_vehicle_document in bus_vehicle_documents:
                print bus_vehicle_document
        else:
            for i in range(0, counter):
                bus_vehicle_document = bus_vehicle_documents[i]
                print bus_vehicle_document

        print 'number_of_bus_vehicle_documents:', number_of_bus_vehicle_documents
项目:dynamic-bus-scheduling    作者:pinac0099    | 项目源码 | 文件源码
def print_node_document(self, object_id=None, osm_id=None, longitude=None, latitude=None):
        """
        Print a node_document.

        :param object_id: ObjectId
        :param osm_id: int
        :param longitude: float
        :param latitude: float
        :return: None
        """
        node_document = self.find_node_document(
            object_id=object_id,
            osm_id=osm_id,
            longitude=longitude,
            latitude=latitude
        )
        print node_document