我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.engine.url.make_url()。
def get_engine(self): with self._lock: uri = self.get_uri() echo = self._app.config['SQLALCHEMY_ECHO'] if (uri, echo) == self._connected_for: return self._engine info = make_url(uri) options = {'convert_unicode': True} self._sa.apply_pool_defaults(self._app, options) self._sa.apply_driver_hacks(self._app, info, options) if echo: options['echo'] = True self._engine = rv = sqlalchemy.create_engine(info, **options) if _record_queries(self._app): _EngineDebuggingSignalEvents(self._engine, self._app.import_name).register() self._connected_for = (uri, echo) return rv
def create(self, name_or_url, executor, **kwargs): # create url.URL object u = url.make_url(name_or_url) dialect_cls = u.get_dialect() dialect_args = {} # consume dialect arguments from kwargs for k in util.get_cls_kwargs(dialect_cls): if k in kwargs: dialect_args[k] = kwargs.pop(k) # create dialect dialect = dialect_cls(**dialect_args) return MockEngineStrategy.MockConnection(dialect, executor)
def kill_db_processes(database_url, db_name=None, db_user_name=None): url = make_url(database_url) if url.get_backend_name() == 'sqlite': return [], [] processes = get_db_processes(database_url) all_procs = [] killed_procs = [] with isolated_nullpool_engine(database_url) as engine: for process in processes: logger.debug( 'process: Id:%s User:%s db:%s Command:%s State:%s Info:%s', process.Id, process.User, process.db, process.Command, process.State, process.Info) all_procs.append(process) if process.db == db_name and process.User == db_user_name: if process.Info != 'SHOW PROCESSLIST': logger.debug('killing process %s on db %s owned by %s', process.Id, process.db, process.User) engine.execute('KILL %s' % process.Id) killed_procs.append(process) return all_procs, killed_procs
def _read_connection_has_correct_privileges(self): ''' Returns True if the right permissions are set for the read only user. A table is created by the write user to test the read only user. ''' write_connection = db._get_engine( {'connection_url': self.write_url}).connect() read_connection_user = sa_url.make_url(self.read_url).username drop_foo_sql = u'DROP TABLE IF EXISTS _foo' write_connection.execute(drop_foo_sql) try: write_connection.execute(u'CREATE TEMP TABLE _foo ()') for privilege in ['INSERT', 'UPDATE', 'DELETE']: test_privilege_sql = u"SELECT has_table_privilege(%s, '_foo', %s)" have_privilege = write_connection.execute( test_privilege_sql, (read_connection_user, privilege)).first()[0] if have_privilege: return False finally: write_connection.execute(drop_foo_sql) write_connection.close() return True
def __init__(self, url): self.table = Table('__tablename__', MetaData(), Column('taskid', String(64), primary_key=True, nullable=False), Column('url', String(1024)), Column('result', LargeBinary), Column('updatetime', Float(32)), mysql_engine='InnoDB', mysql_charset='utf8' ) self.url = make_url(url) if self.url.database: database = self.url.database self.url.database = None try: engine = create_engine(self.url, convert_unicode=True, pool_recycle=3600) engine.execute("CREATE DATABASE IF NOT EXISTS %s" % database) except sqlalchemy.exc.SQLAlchemyError: pass self.url.database = database self.engine = create_engine(url, convert_unicode=True, pool_recycle=3600) self._list_project()
def get_engine(self): with self._lock: uri = self.get_uri() echo = self._app.config['SQLALCHEMY_ECHO'] if (uri, echo) == self._connected_for: return self._engine info = make_url(uri) options = {'convert_unicode': True} self._sa.apply_pool_defaults(self._app, options) self._sa.apply_driver_hacks(self._app, info, options) if echo: options['echo'] = echo self._engine = rv = sqlalchemy.create_engine(info, **options) if _record_queries(self._app): _EngineDebuggingSignalEvents(self._engine, self._app.import_name).register() self._connected_for = (uri, echo) return rv
def utf8_engine(url=None, options=None): """Hook for dialects or drivers that don't handle utf8 by default.""" from sqlalchemy.engine import url as engine_url if config.db.dialect.name == 'mysql' and \ config.db.driver in ['mysqldb', 'pymysql', 'cymysql']: # note 1.2.1.gamma.6 or greater of MySQLdb # needed here url = url or config.db_url url = engine_url.make_url(url) url.query['charset'] = 'utf8' url.query['use_unicode'] = '0' url = str(url) return testing_engine(url, options)
def __init__(self, db_url=None): self.db_url = db_url self.url = make_url(self.db_url) if self.backend == 'postgres': self.loop = asyncio.get_event_loop() self.async_engine = self.loop.run_until_complete( create_engine( user=self.url.username, password=self.url.password, host=self.url.host, port=self.url.port, database=self.url.database)) self.engine = sa.create_engine(self.db_url) else: self.engine = sa.create_engine(self.db_url) self.metadata = metadata self.metadata.create_all(bind=self.engine) # db helper methods
def __call__(self, cfg, *arg): if isinstance(cfg, compat.string_types): url = sa_url.make_url(cfg) elif isinstance(cfg, sa_url.URL): url = cfg else: url = cfg.db.url backend = url.get_backend_name() if backend in self.fns: return self.fns[backend](cfg, *arg) else: return self.fns['*'](cfg, *arg)
def _follower_url_from_main(url, ident): url = sa_url.make_url(url) url.database = ident return url