我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.event.listens_for()。
def instrument_class(self, mapper, class_): """Receive a class when the mapper is first constructed, before instrumentation is applied to the mapped class. This event is the earliest phase of mapper construction. Most attributes of the mapper are not yet initialized. This listener can either be applied to the :class:`.Mapper` class overall, or to any un-mapped class which serves as a base for classes that will be mapped (using the ``propagate=True`` flag):: Base = declarative_base() @event.listens_for(Base, "instrument_class", propagate=True) def on_new_class(mapper, cls_): " ... " :param mapper: the :class:`.Mapper` which is the target of this event. :param class\_: the mapped class. """
def before_compile(self, query): """Receive the :class:`.Query` object before it is composed into a core :class:`.Select` object. This event is intended to allow changes to the query given:: @event.listens_for(Query, "before_compile", retval=True) def no_deleted(query): for desc in query.column_descriptions: if desc['type'] is User: entity = desc['entity'] query = query.filter(entity.deleted == False) return query The event should normally be listened with the ``retval=True`` parameter set, so that the modified query may be returned. """
def init_app(self, app): """Flask application initialization.""" self.init_config(app) app.extensions['invenio-github'] = self @app.before_first_request def connect_signals(): """Connect OAuthClient signals.""" from invenio_oauthclient.models import RemoteAccount from invenio_oauthclient.signals import account_setup_committed from .api import GitHubAPI from .handlers import account_post_init account_setup_committed.connect( account_post_init, sender=GitHubAPI.remote._get_current_object() ) @event.listens_for(RemoteAccount, 'before_delete') def receive_before_delete(mapper, connection, target): """Listen for the 'before_delete' event.""" # TODO remove hooks
def register_to_celery(celery_broker, celery_config, async_task, max_retries=12, DBSession=None): def send_after_commit_tasks(session): if not hasattr(async_ctx, 'reged_tasks'): return for task in async_ctx.reged_tasks: task.send(async_api) delattr(async_ctx, 'reged_tasks') broker = 'amqp://{user}:{password}@{host}:{port}/{vhost}'.\ format(**celery_broker) app = Celery(broker=broker) app.conf.update(**celery_config) async_api = app.task(max_retries=max_retries, bind=True)(async_task) signals.setup_logging.connect(init_celery_log) if DBSession: if event: event.listens_for(DBSession, 'after_commit')(send_after_commit_tasks) else: raise ImportError('You must install sqlalchemy first.') return app, async_api
def init_app(app: t.Any) -> None: db.init_app(app) if app.config['_USING_SQLITE']: # pragma: no cover with app.app_context(): @event.listens_for(db.engine, "connect") def do_connect( dbapi_connection: t.Any, connection_record: t.Any ) -> None: # disable pysqlite's emitting of the BEGIN statement entirely. # also stops it from emitting COMMIT before any DDL. dbapi_connection.isolation_level = None dbapi_connection.execute('pragma foreign_keys=ON') @event.listens_for(db.engine, "begin") def do_begin(conn: t.Any) -> None: # emit our own BEGIN conn.execute("BEGIN")
def instrument_class(self, mapper, class_): r"""Receive a class when the mapper is first constructed, before instrumentation is applied to the mapped class. This event is the earliest phase of mapper construction. Most attributes of the mapper are not yet initialized. This listener can either be applied to the :class:`.Mapper` class overall, or to any un-mapped class which serves as a base for classes that will be mapped (using the ``propagate=True`` flag):: Base = declarative_base() @event.listens_for(Base, "instrument_class", propagate=True) def on_new_class(mapper, cls_): " ... " :param mapper: the :class:`.Mapper` which is the target of this event. :param class\_: the mapped class. """
def listens_for(target, identifier, *args, **kw): """Decorate a function as a listener for the given target + identifier. e.g.:: from sqlalchemy import event from sqlalchemy.schema import UniqueConstraint @event.listens_for(UniqueConstraint, "after_parent_attach") def unique_constraint_name(const, table): const.name = "uq_%s_%s" % ( table.name, list(const.columns)[0].name ) """ def decorate(fn): listen(target, identifier, fn, *args, **kw) return fn return decorate
def _fk_pragma_on_connect(dbapi_con, connection_record): ''' Code to execute every time the database connection is opened. Ref: http://docs.sqlalchemy.org/en/rel_0_9/core/event.html#sqlalchemy.event.listens_for ''' # Support for foreign keys must be explicitly turned on # every time the database is opened. dbapi_con.execute('PRAGMA foreign_keys=ON') # Only uncomment these if you know what you are doing. # See the SQLite documentation for details. #dbapi_con.execute("PRAGMA journal_mode = MEMORY") #dbapi_con.execute("PRAGMA synchronous = OFF") #dbapi_con.execute("PRAGMA temp_store = MEMORY") #dbapi_con.execute("PRAGMA cache_size = 500000") # This allows the file to be 'import'ed any number of times, but attempts to # connect to the database only once.
def before_compile(self, query): """Receive the :class:`.Query` object before it is composed into a core :class:`.Select` object. This event is intended to allow changes to the query given:: @event.listens_for(Query, "before_compile", retval=True) def no_deleted(query): for desc in query.column_descriptions: if desc['type'] is User: entity = desc['expr'] query = query.filter(entity.deleted == False) return query The event should normally be listened with the ``retval=True`` parameter set, so that the modified query may be returned. """
def configure_db(app): from sqlalchemy import event from sqlalchemy.orm import mapper from sqlalchemy.inspection import inspect alembic.init_app(app) db.init_app(app) nplusone.init_app(app) @event.listens_for(mapper, "init") def instant_defaults_listener(target, args, kwargs): for key, column in inspect(type(target)).columns.items(): if column.default is not None: if callable(column.default.arg): setattr(target, key, column.default.arg(target)) else: setattr(target, key, column.default.arg) event.listen(mapper, 'init', instant_defaults_listener)
def setup_default_value_handling(cls): """A SQLAlchemy model class decorator that ensures JSON/JSONB default values are correctly handled. Settings default values for JSON fields have a bunch of issues, because dict and list instances need to be wrapped in ``NestedMutationDict`` and ``NestedMutationList`` that correclty set the parent object dirty state if the container content is modified. This class decorator sets up hooks, so that default values are automatically converted to their wrapped versions. .. note :: We are only concerned about initial values. When values are loaded from the database, ``Mutable`` base class of ``NestedMutationDict`` correctly sets up the parent pointers. .. note :: This must be applid to the class directly, as SQLAlchemy does not seem ot pick it up if it's applied to an (abstract) parent class. This might be addressed in future SQLAlchemy / Websauna versions so that we can get rid of this ugly little decorator. """ @event.listens_for(cls, "init") def init(target, args, kwargs): for c in target.__table__.columns: if is_json_like_column(c): default =_get_column_default(target, c) default = wrap_as_nested(c.name, default, target) setattr(target, c.name, default) return cls
def after_soft_rollback(self, session, previous_transaction): """Execute after any rollback has occurred, including "soft" rollbacks that don't actually emit at the DBAPI level. This corresponds to both nested and outer rollbacks, i.e. the innermost rollback that calls the DBAPI's rollback() method, as well as the enclosing rollback calls that only pop themselves from the transaction stack. The given :class:`.Session` can be used to invoke SQL and :meth:`.Session.query` operations after an outermost rollback by first checking the :attr:`.Session.is_active` flag:: @event.listens_for(Session, "after_soft_rollback") def do_something(session, previous_transaction): if session.is_active: session.execute("select * from some_table") :param session: The target :class:`.Session`. :param previous_transaction: The :class:`.SessionTransaction` transactional marker object which was just closed. The current :class:`.SessionTransaction` for the given :class:`.Session` is available via the :attr:`.Session.transaction` attribute. .. versionadded:: 0.7.3 """
def listens_for(target, identifier, *args, **kw): """Decorate a function as a listener for the given target + identifier. e.g.:: from sqlalchemy import event from sqlalchemy.schema import UniqueConstraint @event.listens_for(UniqueConstraint, "after_parent_attach") def unique_constraint_name(const, table): const.name = "uq_%s_%s" % ( table.name, list(const.columns)[0].name ) A given function can also be invoked for only the first invocation of the event using the ``once`` argument:: @event.listens_for(Mapper, "before_configure", once=True) def on_config(): do_config() .. versionadded:: 0.9.4 Added ``once=True`` to :func:`.event.listen` and :func:`.event.listens_for`. .. seealso:: :func:`.listen` - general description of event listening """ def decorate(fn): listen(target, identifier, fn, *args, **kw) return fn return decorate
def before_execute(self, conn, clauseelement, multiparams, params): """Intercept high level execute() events, receiving uncompiled SQL constructs and other objects prior to rendering into SQL. This event is good for debugging SQL compilation issues as well as early manipulation of the parameters being sent to the database, as the parameter lists will be in a consistent format here. This event can be optionally established with the ``retval=True`` flag. The ``clauseelement``, ``multiparams``, and ``params`` arguments should be returned as a three-tuple in this case:: @event.listens_for(Engine, "before_execute", retval=True) def before_execute(conn, conn, clauseelement, multiparams, params): # do something with clauseelement, multiparams, params return clauseelement, multiparams, params :param conn: :class:`.Connection` object :param clauseelement: SQL expression construct, :class:`.Compiled` instance, or string statement passed to :meth:`.Connection.execute`. :param multiparams: Multiple parameter sets, a list of dictionaries. :param params: Single parameter set, a single dictionary. See also: :meth:`.before_cursor_execute` """
def before_cursor_execute(self, conn, cursor, statement, parameters, context, executemany): """Intercept low-level cursor execute() events before execution, receiving the string SQL statement and DBAPI-specific parameter list to be invoked against a cursor. This event is a good choice for logging as well as late modifications to the SQL string. It's less ideal for parameter modifications except for those which are specific to a target backend. This event can be optionally established with the ``retval=True`` flag. The ``statement`` and ``parameters`` arguments should be returned as a two-tuple in this case:: @event.listens_for(Engine, "before_cursor_execute", retval=True) def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): # do something with statement, parameters return statement, parameters See the example at :class:`.ConnectionEvents`. :param conn: :class:`.Connection` object :param cursor: DBAPI cursor object :param statement: string SQL statement, as to be passed to the DBAPI :param parameters: Dictionary, tuple, or list of parameters being passed to the ``execute()`` or ``executemany()`` method of the DBAPI ``cursor``. In some cases may be ``None``. :param context: :class:`.ExecutionContext` object in use. May be ``None``. :param executemany: boolean, if ``True``, this is an ``executemany()`` call, if ``False``, this is an ``execute()`` call. See also: :meth:`.before_execute` :meth:`.after_cursor_execute` """
def _sqlite_post_configure_engine(url, engine, follower_ident): from sqlalchemy import event @event.listens_for(engine, "connect") def connect(dbapi_connection, connection_record): # use file DBs in all cases, memory acts kind of strangely # as an attached if not follower_ident: dbapi_connection.execute( 'ATTACH DATABASE "test_schema.db" AS test_schema') else: dbapi_connection.execute( 'ATTACH DATABASE "%s_test_schema.db" AS test_schema' % follower_ident)