Python sqlalchemy 模块,orm() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.orm()

项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def __init__(self, indicator=None, group='everyone', provider=None, firsttime=None, lasttime=None, tags=None):

        self.indicator = indicator
        self.group = group
        self.provider = provider
        self.firsttime = firsttime
        self.lasttime = lasttime
        self.tags = tags

        if isinstance(group, list):
            self.group = group[0]

        if isinstance(self.tags, list):
            self.tags.sort()
            self.tags = ','.join(self.tags)

        if self.lasttime and isinstance(self.lasttime, basestring):
            self.lasttime = arrow.get(self.lasttime).datetime

        if self.firsttime and isinstance(self.firsttime, basestring):
            self.firsttime = arrow.get(self.firsttime).datetime


# http://www.pythoncentral.io/sqlalchemy-orm-examples/
项目:csirtg-smrt-py    作者:csirtgadgets    | 项目源码 | 文件源码
def __init__(self, dbfile=DB_FILE, autocommit=False, dictrows=True, **kwargs):

        self.dbfile = dbfile
        self.autocommit = autocommit
        self.dictrows = dictrows
        self.path = "sqlite:///{0}".format(self.dbfile)

        echo = False
        if TRACE:
            echo = True

        # http://docs.sqlalchemy.org/en/latest/orm/contextual.html
        self.engine = create_engine(self.path, echo=echo)
        self.handle = sessionmaker(bind=self.engine)
        self.handle = scoped_session(self.handle)
        self._session = None
        self._tx_count = 0

        Base.metadata.create_all(self.engine)

        logger.debug('database path: {}'.format(self.path))

        self.clear_memcache()
项目:LabbookDB    作者:TheChymera    | 项目源码 | 文件源码
def load_session(db_path):
    """Load and return a new SQLalchemy session and engine.

    Parameters
    ----------
    db_path : str
        Path to desired database location, can be relative or use tilde to specify the user $HOME.

    Returns
    -------
    session : sqlalchemy.orm.session.Session
        Session instance.
    engine : sqlalchemy.engine.Engine
        Engine instance.
    """

    db_path = "sqlite:///" + path.abspath(path.expanduser(db_path))
    engine = create_engine(db_path, echo=False)
    #it is very important that `autoflush == False`, otherwise if "treatments" or "measurements" entried precede "external_ids" the latter will insert a null on the animal_id column
    Session = sessionmaker(bind=engine, autoflush=False)
    session = Session()
    Base.metadata.create_all(engine)
    return session, engine
项目:LabbookDB    作者:TheChymera    | 项目源码 | 文件源码
def add_to_db(session, engine, myobject):
    """Add an object to session and return the .id attribute value.

    Parameters
    ----------
    session : sqlalchemy.orm.session.Session
        Session instance, as created with labbookdb.db.add.load_session().
    engine : sqlalchemy.engine.Engine
        Engine instance correponding to the Session instance under session, as created with labbookdb.db.add.load_session().
    myobject : object
        LabbookDB object with SQLAlchemy-compatible attributes (e.g. as found under labbookdb.db.common_classes).

    Returns
    -------
    object_id : int
        Value of myobject.id attribute
    """

    session.add(myobject)
    try:
        session.commit()
    except sqlalchemy.exc.IntegrityError:
        print("Please make sure this was not a double entry:", myobject)
    object_id=myobject.id
    return object_id
项目:LabbookDB    作者:TheChymera    | 项目源码 | 文件源码
def commit_and_close(session, engine):
    """Commit and close session and dispose of engine.
    Nonfatal for sqlalchemy.exc.IntegrityError with print notification.

    Parameters
    ----------
    session : sqlalchemy.orm.session.Session, optional
        Session instance, as created with labbookdb.db.add.load_session().
    engine : sqlalchemy.engine.Engine, optional
        Engine instance correponding to the Session instance under session, as created with labbookdb.db.add.load_session().
    """

    try:
        session.commit()
    except sqlalchemy.exc.IntegrityError:
        print("Please make sure this was not a double entry.")
    session.close()
    engine.dispose()
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getStories(self, query, active, sort_by='number'):
        self.database.log.debug("Search query: %s sort: %s" % (query, sort_by))
        q = self.session().query(Story)
        if query:
            q = q.filter(self.search.parse(query))
        if active:
            q = q.filter(story_table.c.hidden==False, story_table.c.status=='active')
        if sort_by == 'updated':
            q = q.order_by(story_table.c.updated)
        elif sort_by == 'last-seen':
            q = q.order_by(story_table.c.last_seen)
        else:
            q = q.order_by(story_table.c.id)
        self.database.log.debug("Search SQL: %s" % q)
        try:
            return q.all()
        except sqlalchemy.orm.exc.NoResultFound:
            return []
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        resource = model.Package.get('annakarenina').resources[0]
        cls.data = {
            'resource_id': resource.id,
            'aliases': u'b\xfck2',
            'fields': [{'id': 'book', 'type': 'text'},
                       {'id': 'author', 'type': 'text'},
                       {'id': 'rating with %', 'type': 'text'}],
            'records': [{'book': 'annakarenina', 'author': 'tolstoy',
                         'rating with %': '90%'},
                        {'book': 'warandpeace', 'author': 'tolstoy',
                         'rating with %': '42%'}]
        }

        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
        set_url_type(
            model.Package.get('annakarenina').resources, cls.sysadmin_user)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def create_scoped_session(self, options=None):
        """Create a :class:`~sqlalchemy.orm.scoping.scoped_session`
        on the factory from :meth:`create_session`.

        An extra key ``'scopefunc'`` can be set on the ``options`` dict to
        specify a custom scope function.  If it's not provided, Flask's app
        context stack identity is used. This will ensure that sessions are
        created and removed with the request/response cycle, and should be fine
        in most cases.

        :param options: dict of keyword arguments passed to session class  in
            ``create_session``
        """

        if options is None:
            options = {}

        scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
        options.setdefault('query_cls', self.Query)
        return orm.scoped_session(
            self.create_session(options), scopefunc=scopefunc
        )
项目:Comparative-Annotation-Toolkit    作者:ComparativeGenomicsToolkit    | 项目源码 | 文件源码
def reflect_hints_db(db_path):
    """
    Reflect the database schema of the hints database, automapping the existing tables

    The NullPool is used to avoid concurrency issues with luigi. Using this activates pooling, but since sqlite doesn't
    really support pooling, what effectively happens is just that it locks the database and the other connections wait.

    :param db_path: path to hints sqlite database
    :return: sqlalchemy.MetaData object, sqlalchemy.orm.Session object
    """
    engine = sqlalchemy.create_engine('sqlite:///{}'.format(db_path), poolclass=NullPool)
    metadata = sqlalchemy.MetaData()
    metadata.reflect(bind=engine)
    Base = automap_base(metadata=metadata)
    Base.prepare()
    speciesnames = Base.classes.speciesnames
    seqnames = Base.classes.seqnames
    hints = Base.classes.hints
    featuretypes = Base.classes.featuretypes
    Session = sessionmaker(bind=engine)
    session = Session()
    return speciesnames, seqnames, hints, featuretypes, session
项目:deb-python-pykmip    作者:openstack    | 项目源码 | 文件源码
def test_get_object_type_multiple_objects(self):
        """
        Test that a sqlalchemy.orm.exc.MultipleResultsFound error is generated
        when getting the object type of multiple objects map to the same
        object ID.
        """
        e = engine.KmipEngine()
        e._data_store = self.engine
        e._data_store_session_factory = self.session_factory
        e._data_session = e._data_store_session_factory()
        test_exception = exc.MultipleResultsFound()
        e._data_session.query = mock.MagicMock(side_effect=test_exception)
        e._logger = mock.MagicMock()

        args = ('1', )
        self.assertRaises(
            exc.MultipleResultsFound,
            e._get_object_type,
            *args
        )
        e._data_session.commit()
        e._logger.warning.assert_called_once_with(
            "Multiple objects found for ID: 1"
        )
项目:deb-python-kmip    作者:openstack    | 项目源码 | 文件源码
def test_get_object_type_multiple_objects(self):
        """
        Test that a sqlalchemy.orm.exc.MultipleResultsFound error is generated
        when getting the object type of multiple objects map to the same
        object ID.
        """
        e = engine.KmipEngine()
        e._data_store = self.engine
        e._data_store_session_factory = self.session_factory
        e._data_session = e._data_store_session_factory()
        test_exception = exc.MultipleResultsFound()
        e._data_session.query = mock.MagicMock(side_effect=test_exception)
        e._logger = mock.MagicMock()

        args = ('1', )
        self.assertRaises(
            exc.MultipleResultsFound,
            e._get_object_type,
            *args
        )
        e._data_session.commit()
        e._logger.warning.assert_called_once_with(
            "Multiple objects found for ID: 1"
        )
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def create_scoped_session(self, options=None):
        """Create a :class:`~sqlalchemy.orm.scoping.scoped_session`
        on the factory from :meth:`create_session`.

        An extra key ``'scopefunc'`` can be set on the ``options`` dict to
        specify a custom scope function.  If it's not provided, Flask's app
        context stack identity is used. This will ensure that sessions are
        created and removed with the request/response cycle, and should be fine
        in most cases.

        :param options: dict of keyword arguments passed to session class  in
            ``create_session``
        """

        if options is None:
            options = {}

        scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
        options.setdefault('query_cls', self.Query)
        return orm.scoped_session(
            self.create_session(options), scopefunc=scopefunc
        )
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def create_scoped_session(self, options=None):
        """Create a :class:`~sqlalchemy.orm.scoping.scoped_session`
        on the factory from :meth:`create_session`.

        An extra key ``'scopefunc'`` can be set on the ``options`` dict to
        specify a custom scope function.  If it's not provided, Flask's app
        context stack identity is used. This will ensure that sessions are
        created and removed with the request/response cycle, and should be fine
        in most cases.

        :param options: dict of keyword arguments passed to session class  in
            ``create_session``
        """

        if options is None:
            options = {}

        scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
        options.setdefault('query_cls', self.Query)
        return orm.scoped_session(
            self.create_session(options), scopefunc=scopefunc
        )
项目:drill    作者:rr-    | 项目源码 | 文件源码
def run(self, args: argparse.Namespace) -> None:
        deck_name: str = args.deck
        path: Optional[str] = args.path

        with db.session_scope() as session:
            deck = db.get_deck_by_name(session, deck_name)
            deck = (
                session
                .query(db.Deck)
                .filter(db.Deck.id == deck.id)
                .options(
                    sa.orm
                    .subqueryload(db.Deck.cards)
                    .subqueryload(db.Card.tags),
                    sa.orm
                    .subqueryload(db.Deck.cards)
                    .subqueryload(db.Card.user_answers))
                .one())

            if path:
                with open(path, 'w') as handle:
                    _export(deck, handle)
            else:
                _export(deck, sys.stdout)
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def create_scoped_session(self, options=None):
        """Create a :class:`~sqlalchemy.orm.scoping.scoped_session`
        on the factory from :meth:`create_session`.

        An extra key ``'scopefunc'`` can be set on the ``options`` dict to
        specify a custom scope function.  If it's not provided, Flask's app
        context stack identity is used. This will ensure that sessions are
        created and removed with the request/response cycle, and should be fine
        in most cases.

        :param options: dict of keyword arguments passed to session class  in
            ``create_session``
        """

        if options is None:
            options = {}

        scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
        options.setdefault('query_cls', self.Query)
        return orm.scoped_session(
            self.create_session(options), scopefunc=scopefunc
        )
项目:dcss-scoreboard    作者:zxc23    | 项目源码 | 文件源码
def get_account_id(s: sqlalchemy.orm.session.Session,
                   name: str,
                   server: Server) -> int:
    """Get an account id, creating the account if needed.

    Note that player names are not case sensitive, so names are stored with
    their canonical capitalisation but we always compare the lowercase version.
    """
    player_id = get_player_id(s, name)
    acc = s.query(Account.id).filter(
        func.lower(Account.name) == name.lower(),
        Account.server == server).one_or_none()
    if acc:
        return acc[0]
    else:
        acc = Account(name=name, server=server, player_id=player_id)
        s.add(acc)
        s.commit()
        return acc.id
项目:dcss-scoreboard    作者:zxc23    | 项目源码 | 文件源码
def setup_achievements(s: sqlalchemy.orm.session.Session) -> None:
    """Load manual achievements into the database."""
    for proto in const.ACHIEVEMENTS:
        if not s.query(Achievement).filter(
                Achievement.name == proto.name).one_or_none():
            print("Adding achievement '%s'" % proto.name)
            achievement = Achievement()
            achievement.key = proto.key
            achievement.name = proto.name
            achievement.description = proto.description
            s.add(achievement)
            for player_name in proto.players:
                print("Awarding achievement '%s' to '%s'" %
                      (achievement.name, player_name))
                player = get_player(s, player_name)
                player.achievements.append(achievement)
                s.add(player)
    s.commit()
项目:dcss-scoreboard    作者:zxc23    | 项目源码 | 文件源码
def _highscores_helper(
        s: sqlalchemy.orm.session.Session,
        mapped_class: sqlalchemy.ext.declarative.api.DeclarativeMeta,
        game_column: sqlalchemy.orm.attributes.InstrumentedAttribute
) -> Sequence[Game]:
    """Generic function to find highscores against arbitrary foreign keys.

    Parameters:
        mapped_class: the foreign key table's class
        game_column: the foreign key's column in Games table

    Returns:
        Array of results
    """
    results = []
    q = s.query(Game)
    for i in s.query(mapped_class).filter(
            # error: Type[Any] has no attribute "playable"
            mapped_class.playable ==
            sqlalchemy.true()).order_by(mapped_class.name).all():
        result = q.filter(
            game_column == i).order_by(Game.score.desc()).limit(1).first()
        if result:
            results.append(result)
    return results
项目:dcss-scoreboard    作者:zxc23    | 项目源码 | 文件源码
def combo_highscores(s: sqlalchemy.orm.session.Session) -> Sequence[Game]:
    """Return the top score for each playable combo.

    Not every combo may have a game in the database.
    """
    results = []
    q = s.query(Game).order_by(Game.score.desc())
    for sp in s.query(Species).filter(
            Species.playable == sqlalchemy.true()).order_by('name').all():
        for bg in s.query(Background).filter(
                Background.playable ==
                sqlalchemy.true()).order_by('name').all():
            query = q.filter(Game.species == sp, Game.background == bg)
            result = query.first()
            if result:
                results.append(result)

    return results
项目:dcss-scoreboard    作者:zxc23    | 项目源码 | 文件源码
def fastest_wins(s: sqlalchemy.orm.session.Session,
                 *,
                 limit: int=const.GLOBAL_TABLE_LENGTH,
                 exclude_bots: bool=True,
                 player: Optional[Player]=None) -> Sequence[Game]:
    """Return up to limit fastest wins.

    exclude_bots: If True, exclude known bot accounts from the rankings.
    """
    ktyp = get_ktyp(s, 'winning')
    q = s.query(Game).filter(Game.ktyp == ktyp).order_by('dur')
    if exclude_bots:
        for bot_name in const.BLACKLISTS['bots']:
            bot_id = get_player_id(s, bot_name)
            q = q.filter(Game.player_id != bot_id)
        for bad_gid in const.BLACKLISTS['bot-games']:
            q = q.filter(Game.gid != bad_gid)
    if player is not None:
        q = q.filter(Game.player_id == player.id)
    return q.limit(limit).all()
项目:dcss-scoreboard    作者:zxc23    | 项目源码 | 文件源码
def combo_highscore_holders(s: sqlalchemy.orm.session.Session,
                            limit: int=const.GLOBAL_TABLE_LENGTH) \
        -> Sequence[Tuple[Player, Sequence[Game]]]:
    """Return the players with the most combo highscores.

    May return fewer than limit names.

    Returns a list of (player, games) tuples.
    """
    highscore_games = combo_highscores(s)
    results = {}  # type: dict
    for game in highscore_games:
        player = game.account.player.name
        results.setdefault(player, []).append(game)

    return sorted(
        results.items(), key=lambda i: len(i[1]), reverse=True)[:limit]
项目:windflow    作者:hartym    | 项目源码 | 文件源码
def sessionmaker_factory(self):
        return sqlalchemy.orm.sessionmaker
项目:windflow    作者:hartym    | 项目源码 | 文件源码
def __call__(self):
        """
        :return sqlalchemy.orm.session.Session:
        """
        session = self.sessionmaker()
        try:
            yield session
        finally:
            session.remove()
项目:windflow    作者:hartym    | 项目源码 | 文件源码
def create_sessionmaker(self, engine):
        def sessionmaker():
            return scoped_session(
                sqlalchemy.orm.sessionmaker(bind=engine, **self.sessionmaker_options)
            )

        return sessionmaker
项目:database_assetstore    作者:OpenGeoscience    | 项目源码 | 文件源码
def getFieldInfo(self):
        """
        Return a list of fields that are known and can be queried.

        :return: a list of known fields.  Each entry is a dictionary with name,
                 datatype, and optionally a description.
        """
        if self.fields is not None:
            return self.fields
        db = self.connect()
        fields = []
        for column in sqlalchemy.orm.class_mapper(
                self.tableClass).iterate_properties:
            if (isinstance(column, sqlalchemy.orm.ColumnProperty) and
                    len(column.columns) == 1):
                try:
                    coltype = str(column.columns[0].type)
                except sqlalchemy.exc.CompileError:
                    coltype = 'unknown'
                fields.append({
                    'name': column.key,
                    'type': coltype
                })
        self.disconnect(db)
        if len(fields):
            self.fields = fields
        return fields
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _resolver(cls, prop):
    import sqlalchemy
    from sqlalchemy.orm import foreign, remote

    fallback = sqlalchemy.__dict__.copy()
    fallback.update({'foreign': foreign, 'remote': remote})

    def resolve_arg(arg):
        return _class_resolver(cls, prop, fallback, arg)
    return resolve_arg
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _include_sqlalchemy(obj):
    for module in sqlalchemy, sqlalchemy.orm:
        for key in module.__all__:
            if not hasattr(obj, key):
                setattr(obj, key, getattr(module, key))
    # Note: obj.Table does not attempt to be a SQLAlchemy Table class.
    obj.Table = _make_table(obj)
    obj.relationship = _wrap_with_default_query_class(obj.relationship)
    obj.relation = _wrap_with_default_query_class(obj.relation)
    obj.dynamic_loader = _wrap_with_default_query_class(obj.dynamic_loader)
    obj.event = event
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def create_scoped_session(self, options=None):
        """Helper factory method that creates a scoped session.  It
        internally calls :meth:`create_session`.
        """
        if options is None:
            options = {}
        scopefunc = options.pop('scopefunc', None)
        return orm.scoped_session(partial(self.create_session, options),
                                  scopefunc=scopefunc)
项目:srepp_server    作者:SummitRoute    | 项目源码 | 文件源码
def recordAuthenticodeHashes(fileID, authenticode_hashes):
    # Find the executable file in the DB
    session = Session()
    try:
        exefile = session.query(ExecutableFiles) \
            .filter(ExecutableFiles.id == fileID).one()
    except sqlalchemy.orm.exc.NoResultFound:
        raise Exception("File ID not found in database")

    for hashType, hashValue in authenticode_hashes.iteritems():
        recordAuthenticodeHash(session, exefile, hashType, hashValue)


################################################################################
# getSignatures looks for signature info
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def _resolver(cls, prop):
    import sqlalchemy
    from sqlalchemy.orm import foreign, remote

    fallback = sqlalchemy.__dict__.copy()
    fallback.update({'foreign': foreign, 'remote': remote})

    def resolve_arg(arg):
        return _class_resolver(cls, prop, fallback, arg)
    return resolve_arg
项目:chrononaut    作者:onecodex    | 项目源码 | 文件源码
def create_session(self, options):
        return sqlalchemy.orm.sessionmaker(class_=VersionedSignallingSession, db=self, **options)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _resolver(cls, prop):
    import sqlalchemy
    from sqlalchemy.orm import foreign, remote

    fallback = sqlalchemy.__dict__.copy()
    fallback.update({'foreign': foreign, 'remote': remote})

    def resolve_arg(arg):
        return _class_resolver(cls, prop, fallback, arg)
    return resolve_arg
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _include_sqlalchemy(obj):
    for module in sqlalchemy, sqlalchemy.orm:
        for key in module.__all__:
            if not hasattr(obj, key):
                setattr(obj, key, getattr(module, key))
    # Note: obj.Table does not attempt to be a SQLAlchemy Table class.
    obj.Table = _make_table(obj)
    obj.relationship = _wrap_with_default_query_class(obj.relationship)
    obj.relation = _wrap_with_default_query_class(obj.relation)
    obj.dynamic_loader = _wrap_with_default_query_class(obj.dynamic_loader)
    obj.event = event
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _record(mapper, target, operation):
        s = orm.object_session(target)
        if isinstance(s, _SignallingSession):
            pk = tuple(mapper.primary_key_from_instance(target))
            s._model_changes[pk] = (target, operation)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def __get__(self, obj, type):
        try:
            mapper = orm.class_mapper(type)
            if mapper:
                return type.query_class(mapper, session=self.sa.session())
        except UnmappedClassError:
            return None
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def create_scoped_session(self, options=None):
        """Helper factory method that creates a scoped session."""
        if options is None:
            options = {}
        scopefunc=options.pop('scopefunc', None)
        return orm.scoped_session(
            partial(_SignallingSession, self, **options), scopefunc=scopefunc
        )
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        engine = sqlalchemy.create_engine('sqlite://')
        Base.metadata.create_all(engine)

        self.session = sqlalchemy.orm.sessionmaker(bind=engine)
        self.credentials = oauth2client.client.OAuth2Credentials(
            access_token='token',
            client_id='client_id',
            client_secret='client_secret',
            refresh_token='refresh_token',
            token_expiry=datetime.datetime.utcnow(),
            token_uri=oauth2client.GOOGLE_TOKEN_URI,
            user_agent='DummyAgent',
        )
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getProject(self, key):
        try:
            return self.session().query(Project).filter_by(key=key).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getProjectByName(self, name):
        try:
            return self.session().query(Project).filter_by(name=name).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getProjectByID(self, id):
        try:
            return self.session().query(Project).filter_by(id=id).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getTopic(self, key):
        try:
            return self.session().query(Topic).filter_by(key=key).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getTopicByName(self, name):
        try:
            return self.session().query(Topic).filter_by(name=name).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getStory(self, key):
        query = self.session().query(Story).filter_by(key=key)
        try:
            return query.one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getStoryByID(self, id):
        try:
            return self.session().query(Story).filter_by(id=id).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getTag(self, name):
        try:
            return self.session().query(Tag).filter_by(name=name).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getTagByID(self, id):
        try:
            return self.session().query(Tag).filter_by(id=id).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getTaskByID(self, id):
        try:
            return self.session().query(Task).filter_by(id=id).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getComment(self, key):
        try:
            return self.session().query(Comment).filter_by(key=key).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def getCommentByID(self, id):
        try:
            return self.session().query(Comment).filter_by(id=id).one()
        except sqlalchemy.orm.exc.NoResultFound:
            return None