我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用settings.DB_HOST。
def connect(self): """ Connect to the database and set some parameters. """ if Database.database is None: Database.database = QtSql.QSqlDatabase("QPSQL") Database.database.setHostName(os.environ.get( "DATABASE_HOST", settings.DB_HOST )) Database.database.setPort(int(os.environ.get( "DATABASE_PORT", settings.DB_PORT, ))) Database.database.setUserName(os.environ.get( "DATABASE_USER", settings.USERNAME )) Database.database.setPassword(os.environ.get( "DATABASE_PASSWORD", settings.PASSWORD )) Database.database.setDatabaseName(os.environ.get( "DATABASE_NAME", settings.DBNAME )) if not Database.database.open(): if rapi.utils.check_x11(): # We need this to create an app before opening a window. import gui.utils self.tmp = QtWidgets.QApplication(sys.argv) gui.utils.error("Error", "Can't join database") print("Can't join database") sys.exit(1)
def migrate_tables(): import psycopg2 print('This will migrate database to latest schema, you are advised to backup database before running this command') if prompt_bool('Do you want to continue?'): mdir = os.path.join(SQL_DIR, 'migration') version_obj=model.Version.query.one_or_none() if not version_obj: version_obj=model.Version(version=0, version_id=1) db.session.add(version_obj) old_version = version_obj.version if old_version == db_version: print('DB is at correct version %d'% old_version) scripts = [] for script in os.listdir(mdir): m=re.match(r'v(\d+)\.sql', script) if m: version = int(m.group(1)) if version <= db_version and version > old_version: scripts.append((version, os.path.join(mdir,script))) scripts.sort() connection = psycopg2.connect(database=settings.DB_NAME, user = settings.DB_USER, password = settings.DB_PASSWORD, host = settings.DB_HOST, port = settings.DB_PORT) connection.autocommit = True #connection = db.engine.raw_connection() # @UndefinedVariable try: c = connection.cursor() for v,fname in scripts: script = open(fname, 'rt', encoding='utf-8-sig').read() print('Upgrading database to version %d'% v) res = c.execute(script) version_obj.version = v db.session.commit() #connection.commit() finally: connection.close()
def _datebase_cursor(self): db = MySQLdb.connect( host=DB_HOST, user=DB_USER, passwd=DB_PASS ) cur = db.cursor() return db, cur
def __init__(self): self.dbpool = adbapi.ConnectionPool( dbapiName='MySQLdb', host=settings.DB_HOST, db=settings.DB, user=settings.DB_NAME, passwd=settings.DB_PASSWD, cursorclass= MySQLdb.cursors.DictCursor, charset = 'utf8', use_unicode = False )
def mysql_process(cnx): """ Entry point for our application """ logger.info("Setting up connection to {}@{}".format(settings.DB_DATABASE, settings.DB_HOST)) dry_run = settings.GLOBAL_DRY_RUN # Your CODE here all_ops = [] for table_name in utils.get_all_tables(cnx): try: operations = utils.get_operation_for_table(cnx, table_name) for operation in operations: all_ops.append(operation) except OperationError, e: logger.error("A Table lacks operations: \033[91m{}\033[00m".format(e.msg)) # Sort and execute logger.info("Executing Operations") cnx.execute("SET FOREIGN_KEY_CHECKS=0", dry_run=dry_run) all_ops.sort(key=operator.attrgetter('priority'), reverse=True) for op in all_ops: logger.info("{} => result: {}".format(op, op())) # logger.info("{} => result: {}".format(op, op)) if not dry_run: cnx._connection.commit() cnx.execute("SET FOREIGN_KEY_CHECKS=1", dry_run=dry_run) logger.info("Closing connection to DB") cnx.close()