我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用sqlalchemy_utils.database_exists()。
def app(request): test_app = ApplicationFactory.create_application('testing') test_app.app_context().push() if database_exists(db.engine.url): drop_database(db.engine.url) create_database(db.engine.url) db.create_all() def teardown(): db.session.expunge_all() db.session.remove() drop_database(db.engine.url) db.engine.dispose() request.addfinalizer(teardown) return test_app
def init(config): db_connect = config.conf['db_connect'] if not db_connect: sys.exit("Error: database connection string not " "found in any of the configuration files") try: engine = create_engine(db_connect) except exc.NoSuchModuleError as e: sys.exit("Error: %s" % str(e)) try: if not database_exists(engine.url): printv(config, "Creating database: 'openpassphrase'") create_database(engine.url) except exc.OperationalError as e: sys.exit("Error: %s" % str(e)) printv(config, "Creating tables based on models") models.Base.metadata.create_all(engine)
def setUpClass(self): self.DBURI = "postgresql+psycopg2://@:5432/dbimport" if database_exists(self.DBURI): drop_database(self.DBURI) create_database(self.DBURI) engine = create_engine(self.DBURI) models.bind_engine(engine) self.references = {"1": 249250621, "2": 243199373, "3": 198022430} self.array = "test" # referenceset, workspace, and individual registration previously # tested with dbimport.DBImport(self.DBURI).getSession() as session: self.referenceset = session.registerReferenceSet( str(uuid.uuid4()), "testAssembly", references=self.references) self.workspace = session.registerWorkspace( str(uuid.uuid4()), "/test/dbimport/workspace") self.individual = session.registerIndividual( str(uuid.uuid4()), name="testIndividual")
def setUpClass(self): self.TESTDB_URI = "postgresql+psycopg2://@:5432/mafend2end" if not database_exists(self.TESTDB_URI): create_database(self.TESTDB_URI) engine = create_engine(self.TESTDB_URI) models.bind_engine(engine) self.config_path = os.path.join(os.path.realpath( sys.argv[-1]), "utils/example_configs/icgc_config.json") with open(self.config_path, 'r') as readFP: config_json = json.load(readFP) config_json["TileDBConfig"] = os.path.join(os.path.realpath( sys.argv[-1]), "utils/example_configs/tiledb_config.json") config_json["TileDBAssembly"] = os.path.join( os.path.realpath(sys.argv[-1]), "utils/example_configs/hg19.json") config_json["VariantSetMap"]["VariantConfig"] = os.path.join( os.path.realpath(sys.argv[-1]), "utils/example_configs/icgc_variants.json") with open(self.config_path, 'w') as writeFP: writeFP.write(json.dumps(config_json)) config = ConfigReader(self.config_path) config.DB_URI = self.TESTDB_URI imp.helper.registerWithMetadb(config)
def setUpClass(self): self.DBURI = "postgresql+psycopg2://@:5432/vcfimport" if database_exists(self.DBURI): drop_database(self.DBURI) create_database(self.DBURI) engine = create_engine(self.DBURI) models.bind_engine(engine) self.assembly = "testAssembly" # test files with open("test/data/header.vcf", "r") as f: self.header = f.read() # create the base config self.config_path = os.path.abspath( "utils/example_configs/vcf_import.config") with open(self.config_path, 'r') as readFP: self.config = json.load(readFP) self.config["dburi"] = self.DBURI
def init(db_url, db_log_flag = False, recycle_time = 3600): # ?? ?? ??? DBManager.__engine = create_engine(db_url, pool_recycle = recycle_time, echo = db_log_flag) # DATABASE Not exist case if not database_exists(DBManager.__engine.url): # Creatre Database create_database(DBManager.__engine.url) DBManager.__engine = create_engine(db_url, pool_recycle = recycle_time, echo = db_log_flag) DBManager.__session = scoped_session(sessionmaker(autocommit = False, autoflush = False, bind = DBManager.__engine)) # ?? ??? ?? global dao dao = DBManager.__session
def write(self, if_exists:str=None): def _create_database(): return self._engine.dialect.dbapi.create_database( user=self._username, password=self._password, host=self._server, database=self.uuid, page_size=self._pagesize ) from sqlalchemy_utils import database_exists from sqlalchemy import exc as dbexceptions try: if not database_exists(self._engine.url): a = _create_database() except dbexceptions.DatabaseError: a = _create_database() super().write(if_exists=if_exists)
def init(): """Create database.""" click.gecho(f'Creating database {_db.engine.url}') if not database_exists(str(_db.engine.url)): create_database(str(_db.engine.url))
def handle(self, **options): """Create test databases for all db connections.""" # Create test db for key in settings.DATABASES: test_db = settings.DATABASES[key] test_db['database'] = 'test_' + test_db['database'] engine = get_engine(key) if not database_exists(engine.url): create_database(engine.url) create_tables(settings.INSTALLED_APPS, warn=False) pytest.main(options['unknown'])
def create(): """ Create database and all tables """ if not database_exists(db.engine.url): app.logger.debug('Creating a fresh database to work with') create_database(db.engine.url) alembic_stamp() db.create_all() app.logger.debug('Database already exists, please drop the database and try again!')
def drop(): """ Drop the database if it exists :return: """ app.logger.debug('Dropping the database!') if database_exists(db.engine.url): drop_database(db.engine.url) app.logger.error('Database does not exists!')
def createdb(): """ Create DB """ if not database_exists(app.config['SQLALCHEMY_DATABASE_URI']): create_database(app.config['SQLALCHEMY_DATABASE_URI']) print("DB created.")
def db_connection(db_connection_string): """ Create one test database for all database tests. """ engine = create_engine(db_connection_string) if not database_exists(engine.url): create_database(engine.url) connection = engine.connect() yield connection connection.close() engine.dispose() drop_database(engine.url)
def init_db(app): app.db_engine = create_engine( app.config['DATABASE_URL'], convert_unicode=True ) if not database_exists(app.db_engine.url): create_database(app.db_engine.url) app.db_session = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=app.db_engine) ) Base.query = app.db_session.query_property() init_query_logging(app)
def restore(config): # pragma: no cover db_connect = config.conf['db_connect'] if not db_connect: sys.exit("Error: database connection string not " "found in any of the configuration files") try: engine = create_engine(db_connect) except exc.NoSuchModuleError as e: sys.exit("Error: %s" % str(e)) if database_exists(engine.url): sys.exit("Error: database already exists, will not overwrite!") try: create_database(engine.url) except exc.OperationalError as e: sys.exit("Error: %s" % str(e)) models.Base.metadata.create_all(engine) conn = engine.connect() print("Restoring database from backup file: opp.db.tz") code, out, err = utils.execute("tar zxf opp.db.tz", propagate=False) if code != 0: sys.exit("Error extracting database archive file: %s" % err) for table in models.Base.metadata.sorted_tables: with open("%s.pickle" % table, "rb") as f: while True: try: ins = table.insert().values(load(f)) conn.execute(ins) except EOFError: break files = ["%s.pickle" % x.name for x in models.Base.metadata.sorted_tables] code, out, err = utils.execute("rm %s" % " ".join(files)) if code != 0: print("Unable to remove .pickle files: %s" % err)
def setUpClass(self): self.DBURI = "postgresql+psycopg2://@:5432/dbimport" if database_exists(self.DBURI): drop_database(self.DBURI) create_database(self.DBURI) engine = create_engine(self.DBURI) models.bind_engine(engine) self.assembly = "testAssembly" self.workspace = "/test/dbimport/workspace"
def setUpClass(self): self.DBURI = "postgresql+psycopg2://@:5432/dbimport" if database_exists(self.DBURI): drop_database(self.DBURI) create_database(self.DBURI) engine = create_engine(self.DBURI) models.bind_engine(engine) # all these function have been previously tested with dbimport.DBImport(self.DBURI).getSession() as session: self.referenceset = session.registerReferenceSet( str(uuid.uuid4()), "testAssembly") self.workspace = session.registerWorkspace( str(uuid.uuid4()), "/test/dbimport/workspace") self.array = session.registerDBArray( str(uuid.uuid4()), self.referenceset.id, self.workspace.id, "test") self.array2 = session.registerDBArray( str(uuid.uuid4()), self.referenceset.id, self.workspace.id, "test2") self.variantset = session.registerVariantSet( str(uuid.uuid4()), self.referenceset.id, "Dataset") self.variantset2 = session.registerVariantSet( str(uuid.uuid4()), self.referenceset.id, "Dataset2") self.variantset3 = session.registerVariantSet( str(uuid.uuid4()), self.referenceset.id, "Dataset3") self.variantset4 = session.registerVariantSet( str(uuid.uuid4()), self.referenceset.id, "Dataset4") self.individual = session.registerIndividual( str(uuid.uuid4()), name="testIndividual") self.source = session.registerSample( str(uuid.uuid4()), self.individual.guid, name="source") self.target = session.registerSample( str(uuid.uuid4()), self.individual.guid, name="target")
def create_all(): from zou.app import db engine = create_engine(get_db_uri()) if not database_exists(engine.url): create_database(engine.url) db.create_all()
def __init__(self, problem=None, database="featurehub"): """Create the ORMManager and connect to DB. If problem name is given, load it. Parameters ---------- problem : str, optional (default=None) Name of problem database : str, optional (default="featurehub") Name of database within DBMS. """ self.__orm = ORMManager(database) if not database_exists(self.__orm.engine.url): print("database {} does not seem to exist.".format(database)) print("You might want to create it by calling set_up method") elif problem: try: with self.__orm.session_scope() as session: problem = session.query(Problem)\ .filter(Problem.name == problem).one() self.__problemid = problem.id except NoResultFound: print("WARNING: Problem {} does not exist!".format(problem)) print("You might want to create it by calling create_problem" " method")
def set_up(self, drop=False): """Create a new DB and create the initial scheme. If the database exists and drop=True, the existing database is dropped and recreated. Regardless, any tables defined by the schema that do not exist are created. Parameters ---------- drop : bool, optional (default=False) Drop database if it already exists. """ # todo extract database name from engine url and report for brevity engine = self.__orm.engine if database_exists(engine.url): print("Database {} already exists.".format(engine.url)) if drop: print("Dropping old database {}".format(engine.url)) drop_database(engine.url) with possibly_talking_action("Re-creating database..."): create_database(engine.url) else: with possibly_talking_action("Creating database..."): create_database(engine.url) with possibly_talking_action("Creating tables..."): Base.metadata.create_all(engine) print("Database {} created successfully".format(engine.url))
def init(self): if self._local_path: dirname = os.path.dirname(self._local_path) if dirname and not os.path.isdir(dirname): os.makedirs(dirname) if not database_exists(self.db.url): create_database(self.db.url) # More on connection strings for sqlalchemy: # http://docs.sqlalchemy.org/en/latest/core/engines.html self.metadata.bind = self.db self.metadata.create_all()
def is_initialized(self): return database_exists(self.db.url)
def init_db(): """ Create database if doesn't exist and create all tables. """ if not database_exists(engine.url): create_database(engine.url) BaseModel.metadata.create_all(bind=engine)
def test_create_and_drop(self, dsn): assert not database_exists(dsn) create_database(dsn) assert database_exists(dsn) drop_database(dsn) assert not database_exists(dsn)
def test_exists_memory(self, dsn): assert database_exists(dsn)
def test_exists_memory_none_database(self, sqlite_none_database_dsn): assert database_exists(sqlite_none_database_dsn)
def write(self, if_exists:str=None): jp.java.lang.Class.forName("com.mysql.jdbc.Driver", True, jp.java.lang.ClassLoader.getSystemClassLoader()) from sqlalchemy_utils import database_exists, create_database if not database_exists(self._engine.url): create_database(self._engine.url) super().write(if_exists=if_exists)
def init_db(): if sqlalchemy_utils.database_exists(engine.url): sqlalchemy_utils.drop_database(engine.url) sqlalchemy_utils.create_database(engine.url) print("DB ??? ??")
def connect(self): # noqa if not database_exists(self.engine.url): create_database(self.engine.url) self.session = sessionmaker(bind=self.engine)() _Base.metadata.create_all(self.engine)
def destroy_database(uri): """Destroy the database at ``uri``, if it exists. """ if database_exists(uri): drop_database(uri)
def create_app(config_name): global user_datastore app = Flask(__name__) app.config.from_object(app_config[config_name]) csrf = CSRFProtect() csrf.init_app(app) assets = Environment(app) create_assets(assets) via = Via() via.init_app(app) # Code for desmostration the flask upload in several models - - - - from user import user_photo from restaurant import restaurant_photo from food import food_photo configure_uploads(app, (restaurant_photo, food_photo, user_photo)) engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI']) if not database_exists(engine.url): create_database(engine.url) security = Security(app, user_datastore, register_form=SecurityRegisterForm) create_security_admin(app=app, path=os.path.join(os.path.dirname(__file__))) with app.app_context(): db.init_app(app) db.create_all() user_datastore.find_or_create_role(name='admin', description='Administrator') db.session.commit() user_datastore.find_or_create_role(name='end-user', description='End user') db.session.commit() @app.route('/', methods=['GET']) @app.route('/home', methods=['GET']) def index(): return render_template('index.html') @app.errorhandler(403) def forbidden(error): return render_template('error/403.html', title='Forbidden'), 403 @app.errorhandler(404) def page_not_found(error): return render_template('error/404.html', title='Page Not Found'), 404 @app.errorhandler(500) def internal_server_error(error): db.session.rollback() return render_template('error/500.html', title='Server Error'), 500 return app