Python sqlalchemy.engine 模块,create_engine() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用sqlalchemy.engine.create_engine()

项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def make_ursulas(how_many_ursulas: int, ursula_starting_port: int) -> list:
    """
    :param how_many_ursulas: How many Ursulas to create.
    :param ursula_starting_port: The port of the first created Ursula; subsequent Ursulas will increment the port number by 1.
    :return: A list of created Ursulas
    """
    event_loop = asyncio.get_event_loop()

    URSULAS = []
    for _u in range(how_many_ursulas):
        engine = create_engine('sqlite:///:memory:')
        Base.metadata.create_all(engine)
        ursulas_keystore = keystore.KeyStore(engine)
        _URSULA = Ursula(urulsas_keystore=ursulas_keystore)
        _URSULA.attach_server()
        _URSULA.listen(ursula_starting_port + _u, "127.0.0.1")

        URSULAS.append(_URSULA)

    for _counter, ursula in enumerate(URSULAS):
        event_loop.run_until_complete(
            ursula.server.bootstrap([("127.0.0.1", ursula_starting_port + _c) for _c in range(how_many_ursulas)]))
        ursula.publish_interface_information()

    return URSULAS
项目:PeekabooAV    作者:scVENUS    | 项目源码 | 文件源码
def __init__(self, db_url):
        """
        Initialize the Peekaboo database handler.

        :param db_url: An RFC 1738 URL that points to the database.
        """
        self.__engine = create_engine(db_url)
        self.__db_con = None
        session_factory = sessionmaker(bind=self.__engine)
        self.__Session = scoped_session(session_factory)
        self.__lock = threading.RLock()
        try:
            self.__db_con = self.__engine.connect()
        except SQLAlchemyError as e:
            raise PeekabooDatabaseError(
                'Unable to connect to the database: %s' % e
            )
        if not self.__db_con.dialect.has_table(self.__engine, '_meta'):
            self._init_db()
            logger.debug('Database schema created.')
        else:
            self.clear_in_progress()
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def get_session(dbpath, scoped=False):  # , enable_fk_if_sqlite=True):
    """
    Create an sql alchemy session for IO db operations
    :param dbpath: the path to the database, e.g. sqlite:///path_to_my_dbase.sqlite
    :param scoped: boolean (False by default) if the session must be scoped session
    """
    # init the session:
    engine = create_engine(dbpath)
    Base.metadata.create_all(engine)  # @UndefinedVariable

    # enable fkeys if sqlite. This can be added also as event listener as outlined here:
    # http://stackoverflow.com/questions/13712381/how-to-turn-on-pragma-foreign-keys-on-in-sqlalchemy-migration-script-or-conf
    # NOT implemented YET. See models.py

    if not scoped:
        # create a configured "Session" class
        session = sessionmaker(bind=engine)
        # create a Session
        return session()
    # return session
    else:
        session_factory = sessionmaker(bind=engine)
        return scoped_session(session_factory)
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def __init__(self, db_path: str, lang: str):

        engine = create_engine('sqlite:///{}'.format(db_path))
        SessionMaker = sessionmaker(bind=engine)
        self.session = SessionMaker()
        self.lang = lang
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def connect(self, uri, opts=None, **kwargs):
        """Establish connection to a real engine.
        """
        key = "%s(%s,%s)" % (uri, repr(opts), repr(kwargs))
        try:
            map = self.storage.connection
        except AttributeError:
            self.storage.connection = {}
            self.storage.engine = None
            map = self.storage.connection
        try:
            self.engine = map[key]
        except KeyError:
            map[key] = create_engine(uri, opts, **kwargs)
            self.storage.engine = map[key]
项目:triage    作者:dssg    | 项目源码 | 文件源码
def test_empty_dense_state_table():
    """An empty dense (input) state table produces a useful error."""
    with testing.postgresql.Postgresql() as postgresql:
        engine = create_engine(postgresql.url())
        utils.create_dense_state_table(engine, 'states', ())  # no data
        table_generator = StateTableGenerator(
            engine,
            'exp_hash',
            dense_state_table='states'
        )

        with pytest.raises(ValueError):
            table_generator.generate_sparse_table([datetime(2016, 1, 1)])

        engine.dispose()
项目:triage    作者:dssg    | 项目源码 | 文件源码
def test_empty_sparse_state_table():
    """An empty sparse (generated) state table eagerly produces an
    error.

    (Rather than allowing execution to proceed.)

    """
    with testing.postgresql.Postgresql() as postgresql:
        engine = create_engine(postgresql.url())
        utils.create_dense_state_table(engine, 'states', (
            (5, 'permitted', datetime(2016, 1, 1), datetime(2016, 6, 1)),
            (6, 'permitted', datetime(2016, 2, 5), datetime(2016, 5, 5)),
            (1, 'injail', datetime(2014, 7, 7), datetime(2014, 7, 15)),
            (1, 'injail', datetime(2016, 3, 7), datetime(2016, 4, 2)),
        ))
        table_generator = StateTableGenerator(
            engine,
            'exp_hash',
            dense_state_table='states'
        )

        with pytest.raises(ValueError):
            # Request time outside of available intervals
            table_generator.generate_sparse_table([datetime(2015, 12, 31)])

        (state_count,) = engine.execute('''\
            select count(*) from {generator.sparse_table_name}
        '''.format(generator=table_generator)
        ).first()

        assert state_count == 0

        engine.dispose()
项目:PyAthena    作者:laughingman7743    | 项目源码 | 文件源码
def create_engine(self):
        conn_str = 'awsathena+rest://athena.{region_name}.amazonaws.com:443/' + \
                   '{schema_name}?s3_staging_dir={s3_staging_dir}'
        return create_engine(conn_str.format(region_name=ENV.region_name,
                                             schema_name=SCHEMA,
                                             s3_staging_dir=quote_plus(ENV.s3_staging_dir)))
项目:PyAthenaJDBC    作者:laughingman7743    | 项目源码 | 文件源码
def create_engine(self):
        conn_str = 'awsathena+jdbc://athena.{region_name}.amazonaws.com:443/' + \
                   '{schema_name}?s3_staging_dir={s3_staging_dir}'
        return create_engine(conn_str.format(region_name=ENV.region_name,
                                             schema_name=SCHEMA,
                                             s3_staging_dir=quote_plus(ENV.s3_staging_dir)))
项目:marcotti-mls    作者:soccermetrics    | 项目源码 | 文件源码
def __init__(self, config):
        logger.info("Marcotti-MLS v{0}: Python {1} on {2}".format(__version__, sys.version, sys.platform))
        logger.info("Opened connection to {0}".format(self._public_db_uri(config.database_uri)))
        self.engine = create_engine(config.database_uri)
        self.connection = self.engine.connect()
        self.start_year = config.START_YEAR
        self.end_year = config.END_YEAR
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def setup(self):
        self.metadata = MetaData(create_engine('sqlite:///'))
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description='Generates SQLAlchemy model code from an existing database.')
    parser.add_argument('url', nargs='?', help='SQLAlchemy url to the database')
    parser.add_argument('--version', action='store_true', help="print the version number and exit")
    parser.add_argument('--schema', help='load tables from an alternate schema')
    parser.add_argument('--tables', help='tables to process (comma-separated, default: all)')
    parser.add_argument('--noviews', action='store_true', help="ignore views")
    parser.add_argument('--noindexes', action='store_true', help='ignore indexes')
    parser.add_argument('--noconstraints', action='store_true', help='ignore constraints')
    parser.add_argument('--nojoined', action='store_true', help="don't autodetect joined table inheritance")
    parser.add_argument('--noinflect', action='store_true', help="don't try to convert tables names to singular form")
    parser.add_argument('--noclasses', action='store_true', help="don't generate classes, only tables")
    parser.add_argument('--outfile', help='file to write output to (default: stdout)')
    args = parser.parse_args()

    if args.version:
        version = pkg_resources.get_distribution('sqlacodegen').parsed_version
        print(version.public)
        return
    if not args.url:
        print('You must supply a url\n', file=sys.stderr)
        parser.print_help()
        return

    engine = create_engine(args.url)
    metadata = MetaData(engine)
    tables = args.tables.split(',') if args.tables else None
    metadata.reflect(engine, args.schema, not args.noviews, tables)
    outfile = codecs.open(args.outfile, 'w', encoding='utf-8') if args.outfile else sys.stdout
    generator = CodeGenerator(metadata, args.noindexes, args.noconstraints, args.nojoined, args.noinflect,
                              args.noclasses)
    generator.render(outfile)
项目:lymph-sqlalchemy    作者:deliveryhero    | 项目源码 | 文件源码
def init_db(cls):
        engine = create_engine(cls.url())
        cls.create_db(engine)
        store = cls.create_store()
        store.create_all()
        engine.dispose()
项目:lymph-sqlalchemy    作者:deliveryhero    | 项目源码 | 文件源码
def drop_db(cls):
        engine = create_engine(cls.url())
        drop_database(engine.url)
        engine.dispose()
项目:lymph-sqlalchemy    作者:deliveryhero    | 项目源码 | 文件源码
def setUpClass(cls):
        cls.create = False
        cls.engine = create_engine(cls.url())
        if not database_exists(cls.engine.url):
            cls.create_db(cls.engine)
            cls.create = True
        cls.connection = cls.engine.connect()
        cls.transaction = cls.connection.begin()
        cls.store = cls.create_store(cls.connection)
        if cls.create:
            cls.store.create_all()
        super(StoreTestCase, cls).setUpClass()
项目:dodotable    作者:spoqa    | 项目源码 | 文件源码
def fx_session():
    engine = create_engine(TEST_DATABASE_URL)
    try:
        metadata = Base.metadata
        metadata.drop_all(bind=engine)
        metadata.create_all(bind=engine)
        session = Session(bind=engine)
        yield session
        session.rollback()
        metadata.drop_all(bind=engine)
    finally:
        engine.dispose()
项目:missioncontrol    作者:mozilla    | 项目源码 | 文件源码
def get_engine():
    return create_engine(settings.PRESTO_URL)
项目:falcon-sqlalchemy    作者:vixcess    | 项目源码 | 文件源码
def __init__(self, sqlurl_or_engine: Union[str, Engine], **kwargs):
        super().__init__()
        if isinstance(sqlurl_or_engine, str):
            self._sqlurl = sqlurl_or_engine
            self._engine = create_engine(sqlurl_or_engine)
        elif isinstance(sqlurl_or_engine, Engine):
            self._engine = sqlurl_or_engine
            self._sqlurl = self._engine.url
        self._session_maker = AutoCloseSessionMaker(bind=self._engine, **kwargs)
项目:lang2program    作者:kelvinguu    | 项目源码 | 文件源码
def sqlalchemy_metadata(host, port, database, username, password):
    url = URL(drivername='postgresql+psycopg2', username=username,
              password=password, host=host, port=port, database=database)
    engine = create_engine(url, server_side_cursors=True, connect_args={'connect_timeout': 4})
    # ensure that we can connect
    with engine.begin():
        pass  # this will throw OperationalError if it fails
    return MetaData(engine)
项目:lang2program    作者:kelvinguu    | 项目源码 | 文件源码
def sqlalchemy_metadata(host, port, database, username, password):
    url = URL(drivername='postgresql+psycopg2', username=username,
              password=password, host=host, port=port, database=database)
    engine = create_engine(url, server_side_cursors=True, connect_args={'connect_timeout': 4})
    # ensure that we can connect
    with engine.begin():
        pass  # this will throw OperationalError if it fails
    return MetaData(engine)
项目:asyncpgsa    作者:CanopyTax    | 项目源码 | 文件源码
def metadata():
    metadata = MetaData()
    metadata.bind = create_engine(URL)
    return metadata
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(TestJsonifySQLAlchemyGenericEncoder, self).setUp()
        if not create_engine:
            self.create_fake_proxies()
        else:
            self.create_sa_proxies()
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def create_sa_proxies(self):

        # create the table and mapper
        metadata = schema.MetaData()
        user_table = schema.Table(
            'user',
            metadata,
            schema.Column('id', types.Integer, primary_key=True),
            schema.Column('first_name', types.Unicode(25)),
            schema.Column('last_name', types.Unicode(25))
        )

        class User(object):
            pass
        orm.mapper(User, user_table)

        # create the session
        engine = create_engine('sqlite:///:memory:')
        metadata.bind = engine
        metadata.create_all()
        session = orm.sessionmaker(bind=engine)()

        # add some dummy data
        user_table.insert().execute([
            {'first_name': 'Jonathan', 'last_name': 'LaCour'},
            {'first_name': 'Yoann', 'last_name': 'Roman'}
        ])

        # get the SA objects
        self.sa_object = session.query(User).first()
        select = user_table.select()
        self.result_proxy = select.execute()
        self.row_proxy = select.execute().fetchone()
项目:pybigquery    作者:mxmzdlv    | 项目源码 | 文件源码
def engine():
    engine = create_engine('bigquery://', echo=True)
    return engine
项目:kotori    作者:daq-tools    | 项目源码 | 文件源码
def startDatabase(self):
        self.engine = create_engine(

            # sqlite in-memory
            #"sqlite://", reactor=reactor, strategy=TWISTED_STRATEGY

            # sqlite on filesystem
            "sqlite:////tmp/kotori.sqlite", reactor=reactor, strategy=TWISTED_STRATEGY

            # mysql... todo
        )

        # Create the table
        yield self.engine.execute(CreateTable(self.telemetry))
        #yield self.engine
项目:triage    作者:dssg    | 项目源码 | 文件源码
def test_sparse_state_table_generator():
    input_data = [
        (5, 'permitted', datetime(2016, 1, 1), datetime(2016, 6, 1)),
        (6, 'permitted', datetime(2016, 2, 5), datetime(2016, 5, 5)),
        (1, 'injail', datetime(2014, 7, 7), datetime(2014, 7, 15)),
        (1, 'injail', datetime(2016, 3, 7), datetime(2016, 4, 2)),
    ]

    with testing.postgresql.Postgresql() as postgresql:
        engine = create_engine(postgresql.url())
        utils.create_dense_state_table(engine, 'states', input_data)

        table_generator = StateTableGenerator(
            engine,
            'exp_hash',
            dense_state_table='states'
        )
        as_of_dates = [
            datetime(2016, 1, 1),
            datetime(2016, 2, 1),
            datetime(2016, 3, 1),
            datetime(2016, 4, 1),
            datetime(2016, 5, 1),
            datetime(2016, 6, 1),
        ]
        table_generator.generate_sparse_table(as_of_dates)
        results = [row for row in engine.execute(
            'select entity_id, as_of_date, injail, permitted from {} order by entity_id, as_of_date'.format(
                table_generator.sparse_table_name
            ))]
        expected_output = [
            # entity_id, as_of_date, injail, permitted
            (1, datetime(2016, 4, 1), True, False),
            (5, datetime(2016, 1, 1), False, True),
            (5, datetime(2016, 2, 1), False, True),
            (5, datetime(2016, 3, 1), False, True),
            (5, datetime(2016, 4, 1), False, True),
            (5, datetime(2016, 5, 1), False, True),
            (6, datetime(2016, 3, 1), False, True),
            (6, datetime(2016, 4, 1), False, True),
            (6, datetime(2016, 5, 1), False, True),
        ]
        assert results == expected_output
        utils.assert_index(engine, table_generator.sparse_table_name, 'entity_id')
        utils.assert_index(engine, table_generator.sparse_table_name, 'as_of_date')
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def assert_compile(self, clause, result, params=None,
                        checkparams=None, dialect=None,
                        checkpositional=None,
                        use_default_dialect=False,
                        allow_dialect_select=False):
        if use_default_dialect:
            dialect = default.DefaultDialect()
        elif dialect == None and not allow_dialect_select:
            dialect = getattr(self, '__dialect__', None)
            if dialect == 'default':
                dialect = default.DefaultDialect()
            elif dialect is None:
                dialect = config.db.dialect
            elif isinstance(dialect, basestring):
                dialect = create_engine("%s://" % dialect).dialect

        kw = {}
        if params is not None:
            kw['column_keys'] = params.keys()

        if isinstance(clause, orm.Query):
            context = clause._compile_context()
            context.statement.use_labels = True
            clause = context.statement

        c = clause.compile(dialect=dialect, **kw)

        param_str = repr(getattr(c, 'params', {}))

        if util.py3k:
            param_str = param_str.encode('utf-8').decode('ascii', 'ignore')
            print(("\nSQL String:\n" + util.text_type(c) + param_str).encode('utf-8'))
        else:
            print(("\nSQL String:\n" + util.text_type(c).encode('utf-8') + param_str))

        cc = re.sub(r'[\n\t]', '', util.text_type(c))

        eq_(cc, result, "%r != %r on dialect %r" % (cc, result, dialect))

        if checkparams is not None:
            eq_(c.construct_params(params), checkparams)
        if checkpositional is not None:
            p = c.construct_params(params)
            eq_(tuple([p[x] for x in c.positiontup]), checkpositional)
项目:antares    作者:CONABIO    | 项目源码 | 文件源码
def get_session(self):
        if self.session is None:
            klass = sessionmaker(bind=create_engine(self.database))
            self.session = klass()
        return self.session
项目:antares    作者:CONABIO    | 项目源码 | 文件源码
def persist_bundle(self):
        from madmex.persistence.driver import persist_bundle
        from sqlalchemy import create_engine
        from sqlalchemy.orm.session import sessionmaker
        from madmex.util import remove_file
        dummy = DummyBundle()
        persist_bundle(dummy)


        my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
        klass = sessionmaker(bind=create_engine(my_database))
        session = klass()
        query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id
        print query
        try:
            result_set = session.execute(query)
            for row in result_set:
                self.assertGreater(row['count'], 0)
            # Delete object from database.
            session.delete(dummy.get_database_object())
            session.commit()
            for file_name in dummy.get_files():
                full_path = os.path.join(dummy.get_output_directory(), os.path.basename(file_name))
                self.assertTrue(os.path.isfile(full_path))
                # Remove file from filesystem.
                remove_file(full_path)
        except:
            session.rollback()
            raise
        finally:
            session.close()
项目:antares    作者:CONABIO    | 项目源码 | 文件源码
def persist_bundle_sensor(self):
        from madmex.persistence.driver import persist_bundle
        folder = '/LUSTRE/MADMEX/staging/madmex_antares/test_ingest/556_297_041114_dim_img_spot'
        from sqlalchemy import create_engine
        from sqlalchemy.orm.session import sessionmaker
        from madmex.mapper.bundle.spot5 import Bundle
        #from madmex.configuration import SETTINGS

        dummy = Bundle(folder)
        #dummy.target = '/LUSTRE/MADMEX/staging/'
        target_url = getattr(SETTINGS, 'TEST_FOLDER')
        print target_url
        #TODO please fix me, horrible hack
        dummy.target = target_url
        persist_bundle(dummy)
        my_database = getattr(SETTINGS, 'ANTARES_TEST_DATABASE')
        klass = sessionmaker(bind=create_engine(my_database))
        session = klass()
        query = 'SELECT count(*) FROM product WHERE uuid=\'%s\';' % dummy.uuid_id

        try:
            result_set = session.execute(query)
            for row in result_set:
                self.assertGreater(row['count'], 0)
            session.delete(dummy.get_database_object())
            session.commit()
            for file_name in dummy.get_files():
                full_path = os.path.join(target_url, os.path.basename(file_name))
                self.assertTrue(os.path.isfile(full_path))
                os.remove(full_path)
        except:
            session.rollback()
            raise
        finally:
            session.close()