我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用sqlalchemy.engine.create_engine()。
def make_ursulas(how_many_ursulas: int, ursula_starting_port: int) -> list: """ :param how_many_ursulas: How many Ursulas to create. :param ursula_starting_port: The port of the first created Ursula; subsequent Ursulas will increment the port number by 1. :return: A list of created Ursulas """ event_loop = asyncio.get_event_loop() URSULAS = [] for _u in range(how_many_ursulas): engine = create_engine('sqlite:///:memory:') Base.metadata.create_all(engine) ursulas_keystore = keystore.KeyStore(engine) _URSULA = Ursula(urulsas_keystore=ursulas_keystore) _URSULA.attach_server() _URSULA.listen(ursula_starting_port + _u, "127.0.0.1") URSULAS.append(_URSULA) for _counter, ursula in enumerate(URSULAS): event_loop.run_until_complete( ursula.server.bootstrap([("127.0.0.1", ursula_starting_port + _c) for _c in range(how_many_ursulas)])) ursula.publish_interface_information() return URSULAS
def __init__(self, db_url): """ Initialize the Peekaboo database handler. :param db_url: An RFC 1738 URL that points to the database. """ self.__engine = create_engine(db_url) self.__db_con = None session_factory = sessionmaker(bind=self.__engine) self.__Session = scoped_session(session_factory) self.__lock = threading.RLock() try: self.__db_con = self.__engine.connect() except SQLAlchemyError as e: raise PeekabooDatabaseError( 'Unable to connect to the database: %s' % e ) if not self.__db_con.dialect.has_table(self.__engine, '_meta'): self._init_db() logger.debug('Database schema created.') else: self.clear_in_progress()
def get_session(dbpath, scoped=False): # , enable_fk_if_sqlite=True): """ Create an sql alchemy session for IO db operations :param dbpath: the path to the database, e.g. sqlite:///path_to_my_dbase.sqlite :param scoped: boolean (False by default) if the session must be scoped session """ # init the session: engine = create_engine(dbpath) Base.metadata.create_all(engine) # @UndefinedVariable # enable fkeys if sqlite. This can be added also as event listener as outlined here: # http://stackoverflow.com/questions/13712381/how-to-turn-on-pragma-foreign-keys-on-in-sqlalchemy-migration-script-or-conf # NOT implemented YET. See models.py if not scoped: # create a configured "Session" class session = sessionmaker(bind=engine) # create a Session return session() # return session else: session_factory = sessionmaker(bind=engine) return scoped_session(session_factory)
def __init__(self, db_path: str, lang: str): engine = create_engine('sqlite:///{}'.format(db_path)) SessionMaker = sessionmaker(bind=engine) self.session = SessionMaker() self.lang = lang
def connect(self, uri, opts=None, **kwargs): """Establish connection to a real engine. """ key = "%s(%s,%s)" % (uri, repr(opts), repr(kwargs)) try: map = self.storage.connection except AttributeError: self.storage.connection = {} self.storage.engine = None map = self.storage.connection try: self.engine = map[key] except KeyError: map[key] = create_engine(uri, opts, **kwargs) self.storage.engine = map[key]
def test_empty_dense_state_table(): """An empty dense (input) state table produces a useful error.""" with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) utils.create_dense_state_table(engine, 'states', ()) # no data table_generator = StateTableGenerator( engine, 'exp_hash', dense_state_table='states' ) with pytest.raises(ValueError): table_generator.generate_sparse_table([datetime(2016, 1, 1)]) engine.dispose()
def test_empty_sparse_state_table(): """An empty sparse (generated) state table eagerly produces an error. (Rather than allowing execution to proceed.) """ with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) utils.create_dense_state_table(engine, 'states', ( (5, 'permitted', datetime(2016, 1, 1), datetime(2016, 6, 1)), (6, 'permitted', datetime(2016, 2, 5), datetime(2016, 5, 5)), (1, 'injail', datetime(2014, 7, 7), datetime(2014, 7, 15)), (1, 'injail', datetime(2016, 3, 7), datetime(2016, 4, 2)), )) table_generator = StateTableGenerator( engine, 'exp_hash', dense_state_table='states' ) with pytest.raises(ValueError): # Request time outside of available intervals table_generator.generate_sparse_table([datetime(2015, 12, 31)]) (state_count,) = engine.execute('''\ select count(*) from {generator.sparse_table_name} '''.format(generator=table_generator) ).first() assert state_count == 0 engine.dispose()
def create_engine(self): conn_str = 'awsathena+rest://athena.{region_name}.amazonaws.com:443/' + \ '{schema_name}?s3_staging_dir={s3_staging_dir}' return create_engine(conn_str.format(region_name=ENV.region_name, schema_name=SCHEMA, s3_staging_dir=quote_plus(ENV.s3_staging_dir)))
def create_engine(self): conn_str = 'awsathena+jdbc://athena.{region_name}.amazonaws.com:443/' + \ '{schema_name}?s3_staging_dir={s3_staging_dir}' return create_engine(conn_str.format(region_name=ENV.region_name, schema_name=SCHEMA, s3_staging_dir=quote_plus(ENV.s3_staging_dir)))
def __init__(self, config): logger.info("Marcotti-MLS v{0}: Python {1} on {2}".format(__version__, sys.version, sys.platform)) logger.info("Opened connection to {0}".format(self._public_db_uri(config.database_uri))) self.engine = create_engine(config.database_uri) self.connection = self.engine.connect() self.start_year = config.START_YEAR self.end_year = config.END_YEAR
def setup(self): self.metadata = MetaData(create_engine('sqlite:///'))
def main(): parser = argparse.ArgumentParser(description='Generates SQLAlchemy model code from an existing database.') parser.add_argument('url', nargs='?', help='SQLAlchemy url to the database') parser.add_argument('--version', action='store_true', help="print the version number and exit") parser.add_argument('--schema', help='load tables from an alternate schema') parser.add_argument('--tables', help='tables to process (comma-separated, default: all)') parser.add_argument('--noviews', action='store_true', help="ignore views") parser.add_argument('--noindexes', action='store_true', help='ignore indexes') parser.add_argument('--noconstraints', action='store_true', help='ignore constraints') parser.add_argument('--nojoined', action='store_true', help="don't autodetect joined table inheritance") parser.add_argument('--noinflect', action='store_true', help="don't try to convert tables names to singular form") parser.add_argument('--noclasses', action='store_true', help="don't generate classes, only tables") parser.add_argument('--outfile', help='file to write output to (default: stdout)') args = parser.parse_args() if args.version: version = pkg_resources.get_distribution('sqlacodegen').parsed_version print(version.public) return if not args.url: print('You must supply a url\n', file=sys.stderr) parser.print_help() return engine = create_engine(args.url) metadata = MetaData(engine) tables = args.tables.split(',') if args.tables else None metadata.reflect(engine, args.schema, not args.noviews, tables) outfile = codecs.open(args.outfile, 'w', encoding='utf-8') if args.outfile else sys.stdout generator = CodeGenerator(metadata, args.noindexes, args.noconstraints, args.nojoined, args.noinflect, args.noclasses) generator.render(outfile)
def init_db(cls): engine = create_engine(cls.url()) cls.create_db(engine) store = cls.create_store() store.create_all() engine.dispose()
def drop_db(cls): engine = create_engine(cls.url()) drop_database(engine.url) engine.dispose()
def setUpClass(cls): cls.create = False cls.engine = create_engine(cls.url()) if not database_exists(cls.engine.url): cls.create_db(cls.engine) cls.create = True cls.connection = cls.engine.connect() cls.transaction = cls.connection.begin() cls.store = cls.create_store(cls.connection) if cls.create: cls.store.create_all() super(StoreTestCase, cls).setUpClass()
def fx_session(): engine = create_engine(TEST_DATABASE_URL) try: metadata = Base.metadata metadata.drop_all(bind=engine) metadata.create_all(bind=engine) session = Session(bind=engine) yield session session.rollback() metadata.drop_all(bind=engine) finally: engine.dispose()
def get_engine(): return create_engine(settings.PRESTO_URL)
def __init__(self, sqlurl_or_engine: Union[str, Engine], **kwargs): super().__init__() if isinstance(sqlurl_or_engine, str): self._sqlurl = sqlurl_or_engine self._engine = create_engine(sqlurl_or_engine) elif isinstance(sqlurl_or_engine, Engine): self._engine = sqlurl_or_engine self._sqlurl = self._engine.url self._session_maker = AutoCloseSessionMaker(bind=self._engine, **kwargs)
def sqlalchemy_metadata(host, port, database, username, password): url = URL(drivername='postgresql+psycopg2', username=username, password=password, host=host, port=port, database=database) engine = create_engine(url, server_side_cursors=True, connect_args={'connect_timeout': 4}) # ensure that we can connect with engine.begin(): pass # this will throw OperationalError if it fails return MetaData(engine)
def metadata(): metadata = MetaData() metadata.bind = create_engine(URL) return metadata
def setUp(self): super(TestJsonifySQLAlchemyGenericEncoder, self).setUp() if not create_engine: self.create_fake_proxies() else: self.create_sa_proxies()
def create_sa_proxies(self): # create the table and mapper metadata = schema.MetaData() user_table = schema.Table( 'user', metadata, schema.Column('id', types.Integer, primary_key=True), schema.Column('first_name', types.Unicode(25)), schema.Column('last_name', types.Unicode(25)) ) class User(object): pass orm.mapper(User, user_table) # create the session engine = create_engine('sqlite:///:memory:') metadata.bind = engine metadata.create_all() session = orm.sessionmaker(bind=engine)() # add some dummy data user_table.insert().execute([ {'first_name': 'Jonathan', 'last_name': 'LaCour'}, {'first_name': 'Yoann', 'last_name': 'Roman'} ]) # get the SA objects self.sa_object = session.query(User).first() select = user_table.select() self.result_proxy = select.execute() self.row_proxy = select.execute().fetchone()
def engine(): engine = create_engine('bigquery://', echo=True) return engine
def startDatabase(self): self.engine = create_engine( # sqlite in-memory #"sqlite://", reactor=reactor, strategy=TWISTED_STRATEGY # sqlite on filesystem "sqlite:////tmp/kotori.sqlite", reactor=reactor, strategy=TWISTED_STRATEGY # mysql... todo ) # Create the table yield self.engine.execute(CreateTable(self.telemetry)) #yield self.engine
def test_sparse_state_table_generator(): input_data = [ (5, 'permitted', datetime(2016, 1, 1), datetime(2016, 6, 1)), (6, 'permitted', datetime(2016, 2, 5), datetime(2016, 5, 5)), (1, 'injail', datetime(2014, 7, 7), datetime(2014, 7, 15)), (1, 'injail', datetime(2016, 3, 7), datetime(2016, 4, 2)), ] with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) utils.create_dense_state_table(engine, 'states', input_data) table_generator = StateTableGenerator( engine, 'exp_hash', dense_state_table='states' ) as_of_dates = [ datetime(2016, 1, 1), datetime(2016, 2, 1), datetime(2016, 3, 1), datetime(2016, 4, 1), datetime(2016, 5, 1), datetime(2016, 6, 1), ] table_generator.generate_sparse_table(as_of_dates) results = [row for row in engine.execute( 'select entity_id, as_of_date, injail, permitted from {} order by entity_id, as_of_date'.format( table_generator.sparse_table_name ))] expected_output = [ # entity_id, as_of_date, injail, permitted (1, datetime(2016, 4, 1), True, False), (5, datetime(2016, 1, 1), False, True), (5, datetime(2016, 2, 1), False, True), (5, datetime(2016, 3, 1), False, True), (5, datetime(2016, 4, 1), False, True), (5, datetime(2016, 5, 1), False, True), (6, datetime(2016, 3, 1), False, True), (6, datetime(2016, 4, 1), False, True), (6, datetime(2016, 5, 1), False, True), ] assert results == expected_output utils.assert_index(engine, table_generator.sparse_table_name, 'entity_id') utils.assert_index(engine, table_generator.sparse_table_name, 'as_of_date')
def assert_compile(self, clause, result, params=None, checkparams=None, dialect=None, checkpositional=None, use_default_dialect=False, allow_dialect_select=False): if use_default_dialect: dialect = default.DefaultDialect() elif dialect == None and not allow_dialect_select: dialect = getattr(self, '__dialect__', None) if dialect == 'default': dialect = default.DefaultDialect() elif dialect is None: dialect = config.db.dialect elif isinstance(dialect, basestring): dialect = create_engine("%s://" % dialect).dialect kw = {} if params is not None: kw['column_keys'] = params.keys() if isinstance(clause, orm.Query): context = clause._compile_context() context.statement.use_labels = True clause = context.statement c = clause.compile(dialect=dialect, **kw) param_str = repr(getattr(c, 'params', {})) if util.py3k: param_str = param_str.encode('utf-8').decode('ascii', 'ignore') print(("\nSQL String:\n" + util.text_type(c) + param_str).encode('utf-8')) else: print(("\nSQL String:\n" + util.text_type(c).encode('utf-8') + param_str)) cc = re.sub(r'[\n\t]', '', util.text_type(c)) eq_(cc, result, "%r != %r on dialect %r" % (cc, result, dialect)) if checkparams is not None: eq_(c.construct_params(params), checkparams) if checkpositional is not None: p = c.construct_params(params) eq_(tuple([p[x] for x in c.positiontup]), checkpositional)
def get_session(self): if self.session is None: klass = sessionmaker(bind=create_engine(self.database)) self.session = klass() return self.session
def persist_bundle(self): from madmex.persistence.driver import persist_bundle from sqlalchemy import create_engine from sqlalchemy.orm.session import sessionmaker from madmex.util import remove_file dummy = DummyBundle() persist_bundle(dummy) my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE') klass = sessionmaker(bind=create_engine(my_database)) session = klass() query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id print query try: result_set = session.execute(query) for row in result_set: self.assertGreater(row['count'], 0) # Delete object from database. session.delete(dummy.get_database_object()) session.commit() for file_name in dummy.get_files(): full_path = os.path.join(dummy.get_output_directory(), os.path.basename(file_name)) self.assertTrue(os.path.isfile(full_path)) # Remove file from filesystem. remove_file(full_path) except: session.rollback() raise finally: session.close()
def persist_bundle_sensor(self): from madmex.persistence.driver import persist_bundle folder = '/LUSTRE/MADMEX/staging/madmex_antares/test_ingest/556_297_041114_dim_img_spot' from sqlalchemy import create_engine from sqlalchemy.orm.session import sessionmaker from madmex.mapper.bundle.spot5 import Bundle #from madmex.configuration import SETTINGS dummy = Bundle(folder) #dummy.target = '/LUSTRE/MADMEX/staging/' target_url = getattr(SETTINGS, 'TEST_FOLDER') print target_url #TODO please fix me, horrible hack dummy.target = target_url persist_bundle(dummy) my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE') klass = sessionmaker(bind=create_engine(my_database)) session = klass() query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id try: result_set = session.execute(query) for row in result_set: self.assertGreater(row['count'], 0) session.delete(dummy.get_database_object()) session.commit() for file_name in dummy.get_files(): full_path = os.path.join(target_url, os.path.basename(file_name)) self.assertTrue(os.path.isfile(full_path)) os.remove(full_path) except: session.rollback() raise finally: session.close()