我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.orm()。
def __init__(self, indicator=None, group='everyone', provider=None, firsttime=None, lasttime=None, tags=None): self.indicator = indicator self.group = group self.provider = provider self.firsttime = firsttime self.lasttime = lasttime self.tags = tags if isinstance(group, list): self.group = group[0] if isinstance(self.tags, list): self.tags.sort() self.tags = ','.join(self.tags) if self.lasttime and isinstance(self.lasttime, basestring): self.lasttime = arrow.get(self.lasttime).datetime if self.firsttime and isinstance(self.firsttime, basestring): self.firsttime = arrow.get(self.firsttime).datetime # http://www.pythoncentral.io/sqlalchemy-orm-examples/
def __init__(self, dbfile=DB_FILE, autocommit=False, dictrows=True, **kwargs): self.dbfile = dbfile self.autocommit = autocommit self.dictrows = dictrows self.path = "sqlite:///{0}".format(self.dbfile) echo = False if TRACE: echo = True # http://docs.sqlalchemy.org/en/latest/orm/contextual.html self.engine = create_engine(self.path, echo=echo) self.handle = sessionmaker(bind=self.engine) self.handle = scoped_session(self.handle) self._session = None self._tx_count = 0 Base.metadata.create_all(self.engine) logger.debug('database path: {}'.format(self.path)) self.clear_memcache()
def load_session(db_path): """Load and return a new SQLalchemy session and engine. Parameters ---------- db_path : str Path to desired database location, can be relative or use tilde to specify the user $HOME. Returns ------- session : sqlalchemy.orm.session.Session Session instance. engine : sqlalchemy.engine.Engine Engine instance. """ db_path = "sqlite:///" + path.abspath(path.expanduser(db_path)) engine = create_engine(db_path, echo=False) #it is very important that `autoflush == False`, otherwise if "treatments" or "measurements" entried precede "external_ids" the latter will insert a null on the animal_id column Session = sessionmaker(bind=engine, autoflush=False) session = Session() Base.metadata.create_all(engine) return session, engine
def add_to_db(session, engine, myobject): """Add an object to session and return the .id attribute value. Parameters ---------- session : sqlalchemy.orm.session.Session Session instance, as created with labbookdb.db.add.load_session(). engine : sqlalchemy.engine.Engine Engine instance correponding to the Session instance under session, as created with labbookdb.db.add.load_session(). myobject : object LabbookDB object with SQLAlchemy-compatible attributes (e.g. as found under labbookdb.db.common_classes). Returns ------- object_id : int Value of myobject.id attribute """ session.add(myobject) try: session.commit() except sqlalchemy.exc.IntegrityError: print("Please make sure this was not a double entry:", myobject) object_id=myobject.id return object_id
def commit_and_close(session, engine): """Commit and close session and dispose of engine. Nonfatal for sqlalchemy.exc.IntegrityError with print notification. Parameters ---------- session : sqlalchemy.orm.session.Session, optional Session instance, as created with labbookdb.db.add.load_session(). engine : sqlalchemy.engine.Engine, optional Engine instance correponding to the Session instance under session, as created with labbookdb.db.add.load_session(). """ try: session.commit() except sqlalchemy.exc.IntegrityError: print("Please make sure this was not a double entry.") session.close() engine.dispose()
def getStories(self, query, active, sort_by='number'): self.database.log.debug("Search query: %s sort: %s" % (query, sort_by)) q = self.session().query(Story) if query: q = q.filter(self.search.parse(query)) if active: q = q.filter(story_table.c.hidden==False, story_table.c.status=='active') if sort_by == 'updated': q = q.order_by(story_table.c.updated) elif sort_by == 'last-seen': q = q.order_by(story_table.c.last_seen) else: q = q.order_by(story_table.c.id) self.database.log.debug("Search SQL: %s" % q) try: return q.all() except sqlalchemy.orm.exc.NoResultFound: return []
def setup_class(cls): if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') ctd.CreateTestData.create() cls.sysadmin_user = model.User.get('testsysadmin') cls.normal_user = model.User.get('annafan') resource = model.Package.get('annakarenina').resources[0] cls.data = { 'resource_id': resource.id, 'aliases': u'b\xfck2', 'fields': [{'id': 'book', 'type': 'text'}, {'id': 'author', 'type': 'text'}, {'id': 'rating with %', 'type': 'text'}], 'records': [{'book': 'annakarenina', 'author': 'tolstoy', 'rating with %': '90%'}, {'book': 'warandpeace', 'author': 'tolstoy', 'rating with %': '42%'}] } engine = db._get_engine( {'connection_url': config['ckan.datastore.write_url']}) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine)) set_url_type( model.Package.get('annakarenina').resources, cls.sysadmin_user)
def create_scoped_session(self, options=None): """Create a :class:`~sqlalchemy.orm.scoping.scoped_session` on the factory from :meth:`create_session`. An extra key ``'scopefunc'`` can be set on the ``options`` dict to specify a custom scope function. If it's not provided, Flask's app context stack identity is used. This will ensure that sessions are created and removed with the request/response cycle, and should be fine in most cases. :param options: dict of keyword arguments passed to session class in ``create_session`` """ if options is None: options = {} scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__) options.setdefault('query_cls', self.Query) return orm.scoped_session( self.create_session(options), scopefunc=scopefunc )
def reflect_hints_db(db_path): """ Reflect the database schema of the hints database, automapping the existing tables The NullPool is used to avoid concurrency issues with luigi. Using this activates pooling, but since sqlite doesn't really support pooling, what effectively happens is just that it locks the database and the other connections wait. :param db_path: path to hints sqlite database :return: sqlalchemy.MetaData object, sqlalchemy.orm.Session object """ engine = sqlalchemy.create_engine('sqlite:///{}'.format(db_path), poolclass=NullPool) metadata = sqlalchemy.MetaData() metadata.reflect(bind=engine) Base = automap_base(metadata=metadata) Base.prepare() speciesnames = Base.classes.speciesnames seqnames = Base.classes.seqnames hints = Base.classes.hints featuretypes = Base.classes.featuretypes Session = sessionmaker(bind=engine) session = Session() return speciesnames, seqnames, hints, featuretypes, session
def test_get_object_type_multiple_objects(self): """ Test that a sqlalchemy.orm.exc.MultipleResultsFound error is generated when getting the object type of multiple objects map to the same object ID. """ e = engine.KmipEngine() e._data_store = self.engine e._data_store_session_factory = self.session_factory e._data_session = e._data_store_session_factory() test_exception = exc.MultipleResultsFound() e._data_session.query = mock.MagicMock(side_effect=test_exception) e._logger = mock.MagicMock() args = ('1', ) self.assertRaises( exc.MultipleResultsFound, e._get_object_type, *args ) e._data_session.commit() e._logger.warning.assert_called_once_with( "Multiple objects found for ID: 1" )
def run(self, args: argparse.Namespace) -> None: deck_name: str = args.deck path: Optional[str] = args.path with db.session_scope() as session: deck = db.get_deck_by_name(session, deck_name) deck = ( session .query(db.Deck) .filter(db.Deck.id == deck.id) .options( sa.orm .subqueryload(db.Deck.cards) .subqueryload(db.Card.tags), sa.orm .subqueryload(db.Deck.cards) .subqueryload(db.Card.user_answers)) .one()) if path: with open(path, 'w') as handle: _export(deck, handle) else: _export(deck, sys.stdout)
def get_account_id(s: sqlalchemy.orm.session.Session, name: str, server: Server) -> int: """Get an account id, creating the account if needed. Note that player names are not case sensitive, so names are stored with their canonical capitalisation but we always compare the lowercase version. """ player_id = get_player_id(s, name) acc = s.query(Account.id).filter( func.lower(Account.name) == name.lower(), Account.server == server).one_or_none() if acc: return acc[0] else: acc = Account(name=name, server=server, player_id=player_id) s.add(acc) s.commit() return acc.id
def setup_achievements(s: sqlalchemy.orm.session.Session) -> None: """Load manual achievements into the database.""" for proto in const.ACHIEVEMENTS: if not s.query(Achievement).filter( Achievement.name == proto.name).one_or_none(): print("Adding achievement '%s'" % proto.name) achievement = Achievement() achievement.key = proto.key achievement.name = proto.name achievement.description = proto.description s.add(achievement) for player_name in proto.players: print("Awarding achievement '%s' to '%s'" % (achievement.name, player_name)) player = get_player(s, player_name) player.achievements.append(achievement) s.add(player) s.commit()
def _highscores_helper( s: sqlalchemy.orm.session.Session, mapped_class: sqlalchemy.ext.declarative.api.DeclarativeMeta, game_column: sqlalchemy.orm.attributes.InstrumentedAttribute ) -> Sequence[Game]: """Generic function to find highscores against arbitrary foreign keys. Parameters: mapped_class: the foreign key table's class game_column: the foreign key's column in Games table Returns: Array of results """ results = [] q = s.query(Game) for i in s.query(mapped_class).filter( # error: Type[Any] has no attribute "playable" mapped_class.playable == sqlalchemy.true()).order_by(mapped_class.name).all(): result = q.filter( game_column == i).order_by(Game.score.desc()).limit(1).first() if result: results.append(result) return results
def combo_highscores(s: sqlalchemy.orm.session.Session) -> Sequence[Game]: """Return the top score for each playable combo. Not every combo may have a game in the database. """ results = [] q = s.query(Game).order_by(Game.score.desc()) for sp in s.query(Species).filter( Species.playable == sqlalchemy.true()).order_by('name').all(): for bg in s.query(Background).filter( Background.playable == sqlalchemy.true()).order_by('name').all(): query = q.filter(Game.species == sp, Game.background == bg) result = query.first() if result: results.append(result) return results
def fastest_wins(s: sqlalchemy.orm.session.Session, *, limit: int=const.GLOBAL_TABLE_LENGTH, exclude_bots: bool=True, player: Optional[Player]=None) -> Sequence[Game]: """Return up to limit fastest wins. exclude_bots: If True, exclude known bot accounts from the rankings. """ ktyp = get_ktyp(s, 'winning') q = s.query(Game).filter(Game.ktyp == ktyp).order_by('dur') if exclude_bots: for bot_name in const.BLACKLISTS['bots']: bot_id = get_player_id(s, bot_name) q = q.filter(Game.player_id != bot_id) for bad_gid in const.BLACKLISTS['bot-games']: q = q.filter(Game.gid != bad_gid) if player is not None: q = q.filter(Game.player_id == player.id) return q.limit(limit).all()
def combo_highscore_holders(s: sqlalchemy.orm.session.Session, limit: int=const.GLOBAL_TABLE_LENGTH) \ -> Sequence[Tuple[Player, Sequence[Game]]]: """Return the players with the most combo highscores. May return fewer than limit names. Returns a list of (player, games) tuples. """ highscore_games = combo_highscores(s) results = {} # type: dict for game in highscore_games: player = game.account.player.name results.setdefault(player, []).append(game) return sorted( results.items(), key=lambda i: len(i[1]), reverse=True)[:limit]
def sessionmaker_factory(self): return sqlalchemy.orm.sessionmaker
def __call__(self): """ :return sqlalchemy.orm.session.Session: """ session = self.sessionmaker() try: yield session finally: session.remove()
def create_sessionmaker(self, engine): def sessionmaker(): return scoped_session( sqlalchemy.orm.sessionmaker(bind=engine, **self.sessionmaker_options) ) return sessionmaker
def getFieldInfo(self): """ Return a list of fields that are known and can be queried. :return: a list of known fields. Each entry is a dictionary with name, datatype, and optionally a description. """ if self.fields is not None: return self.fields db = self.connect() fields = [] for column in sqlalchemy.orm.class_mapper( self.tableClass).iterate_properties: if (isinstance(column, sqlalchemy.orm.ColumnProperty) and len(column.columns) == 1): try: coltype = str(column.columns[0].type) except sqlalchemy.exc.CompileError: coltype = 'unknown' fields.append({ 'name': column.key, 'type': coltype }) self.disconnect(db) if len(fields): self.fields = fields return fields
def _resolver(cls, prop): import sqlalchemy from sqlalchemy.orm import foreign, remote fallback = sqlalchemy.__dict__.copy() fallback.update({'foreign': foreign, 'remote': remote}) def resolve_arg(arg): return _class_resolver(cls, prop, fallback, arg) return resolve_arg
def _include_sqlalchemy(obj): for module in sqlalchemy, sqlalchemy.orm: for key in module.__all__: if not hasattr(obj, key): setattr(obj, key, getattr(module, key)) # Note: obj.Table does not attempt to be a SQLAlchemy Table class. obj.Table = _make_table(obj) obj.relationship = _wrap_with_default_query_class(obj.relationship) obj.relation = _wrap_with_default_query_class(obj.relation) obj.dynamic_loader = _wrap_with_default_query_class(obj.dynamic_loader) obj.event = event
def __get__(self, obj, type): try: mapper = orm.class_mapper(type) if mapper: return type.query_class(mapper, session=self.sa.session()) except UnmappedClassError: return None
def create_scoped_session(self, options=None): """Helper factory method that creates a scoped session. It internally calls :meth:`create_session`. """ if options is None: options = {} scopefunc = options.pop('scopefunc', None) return orm.scoped_session(partial(self.create_session, options), scopefunc=scopefunc)
def recordAuthenticodeHashes(fileID, authenticode_hashes): # Find the executable file in the DB session = Session() try: exefile = session.query(ExecutableFiles) \ .filter(ExecutableFiles.id == fileID).one() except sqlalchemy.orm.exc.NoResultFound: raise Exception("File ID not found in database") for hashType, hashValue in authenticode_hashes.iteritems(): recordAuthenticodeHash(session, exefile, hashType, hashValue) ################################################################################ # getSignatures looks for signature info
def create_session(self, options): return sqlalchemy.orm.sessionmaker(class_=VersionedSignallingSession, db=self, **options)
def _record(mapper, target, operation): s = orm.object_session(target) if isinstance(s, _SignallingSession): pk = tuple(mapper.primary_key_from_instance(target)) s._model_changes[pk] = (target, operation)
def create_scoped_session(self, options=None): """Helper factory method that creates a scoped session.""" if options is None: options = {} scopefunc=options.pop('scopefunc', None) return orm.scoped_session( partial(_SignallingSession, self, **options), scopefunc=scopefunc )
def setUp(self): engine = sqlalchemy.create_engine('sqlite://') Base.metadata.create_all(engine) self.session = sqlalchemy.orm.sessionmaker(bind=engine) self.credentials = oauth2client.client.OAuth2Credentials( access_token='token', client_id='client_id', client_secret='client_secret', refresh_token='refresh_token', token_expiry=datetime.datetime.utcnow(), token_uri=oauth2client.GOOGLE_TOKEN_URI, user_agent='DummyAgent', )
def getProject(self, key): try: return self.session().query(Project).filter_by(key=key).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getProjectByName(self, name): try: return self.session().query(Project).filter_by(name=name).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getProjectByID(self, id): try: return self.session().query(Project).filter_by(id=id).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getTopic(self, key): try: return self.session().query(Topic).filter_by(key=key).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getTopicByName(self, name): try: return self.session().query(Topic).filter_by(name=name).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getStory(self, key): query = self.session().query(Story).filter_by(key=key) try: return query.one() except sqlalchemy.orm.exc.NoResultFound: return None
def getStoryByID(self, id): try: return self.session().query(Story).filter_by(id=id).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getTag(self, name): try: return self.session().query(Tag).filter_by(name=name).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getTagByID(self, id): try: return self.session().query(Tag).filter_by(id=id).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getTaskByID(self, id): try: return self.session().query(Task).filter_by(id=id).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getComment(self, key): try: return self.session().query(Comment).filter_by(key=key).one() except sqlalchemy.orm.exc.NoResultFound: return None
def getCommentByID(self, id): try: return self.session().query(Comment).filter_by(id=id).one() except sqlalchemy.orm.exc.NoResultFound: return None