我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.current_app.extensions()。
def init_app(self, app): """Initializes your mail settings from the application settings. You can use this if you want to set up your Mail instance at configuration time. :param app: Flask application instance """ state = _Mail( app.config.get('MAIL_SERVER', '127.0.0.1'), app.config.get('MAIL_USERNAME'), app.config.get('MAIL_PASSWORD'), app.config.get('MAIL_PORT', 25), app.config.get('MAIL_USE_TLS', False), app.config.get('MAIL_USE_SSL', False), app.config.get('MAIL_DEFAULT_SENDER'), int(app.config.get('MAIL_DEBUG', app.debug)), app.config.get('MAIL_MAX_EMAILS'), app.config.get('MAIL_SUPPRESS_SEND', app.testing)) # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['mail'] = state return state
def init_app(self, app): for k, v in CONFIG_DEFAULTS.items(): app.config.setdefault(k, v) auth_url_rule = app.config.get('JWT_AUTH_URL_RULE', None) if auth_url_rule: if self.auth_request_callback == _default_auth_request_handler: msg = ("an authentication_handler function must be defined " "when using the built in authentication resource") assert self.authentication_callback is not None, (msg) auth_url_options = app.config.get('JWT_AUTH_URL_OPTIONS', {'methods': ['POST']}) auth_url_options.setdefault('view_func', self.auth_request_callback) app.add_url_rule(auth_url_rule, **auth_url_options) app.errorhandler(JWTError)(self._jwt_error_callback) if not hasattr(app, 'extensions'): # pragma: no cover app.extensions = {} app.extensions['jwt'] = self
def run_migrations_online(): """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ engine = engine_from_config(config.get_section(config.config_ini_section), prefix='sqlalchemy.', poolclass=pool.NullPool) connection = engine.connect() context.configure(connection=connection, target_metadata=target_metadata, **current_app.extensions['migrate'].configure_args) try: with context.begin_transaction(): context.run_migrations() finally: connection.close()
def run_migrations_online(): """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ # this callback is used to prevent an auto-migration from being generated # when there are no changes to the schema # reference: http://alembic.readthedocs.org/en/latest/cookbook.html def process_revision_directives(context, revision, directives): if getattr(config.cmd_opts, 'autogenerate', False): script = directives[0] if script.upgrade_ops.is_empty(): directives[:] = [] logger.info('No changes in schema detected.') engine = engine_from_config(config.get_section(config.config_ini_section), prefix='sqlalchemy.', poolclass=pool.NullPool) connection = engine.connect() context.configure(connection=connection, target_metadata=target_metadata, process_revision_directives=process_revision_directives, **current_app.extensions['migrate'].configure_args) try: with context.begin_transaction(): context.run_migrations() finally: connection.close()
def run_migrations_online(): """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ # this callback is used to prevent an auto-migration from being generated # when there are no changes to the schema # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html def process_revision_directives(context, revision, directives): if getattr(config.cmd_opts, 'autogenerate', False): script = directives[0] if script.upgrade_ops.is_empty(): directives[:] = [] logger.info('No changes in schema detected.') engine = engine_from_config(config.get_section(config.config_ini_section), prefix='sqlalchemy.', poolclass=pool.NullPool) connection = engine.connect() context.configure(connection=connection, target_metadata=target_metadata, process_revision_directives=process_revision_directives, **current_app.extensions['migrate'].configure_args) try: with context.begin_transaction(): context.run_migrations() finally: connection.close()
def __init__(self, subject='', recipients=None, body=None, html=None, sender=None, cc=None, bcc=None, attachments=None, reply_to=None, date=None, charset=None, extra_headers=None, mail_options=None, rcpt_options=None): sender = sender or current_app.extensions['mail'].default_sender if isinstance(sender, tuple): sender = "%s <%s>" % sender self.recipients = recipients or [] self.subject = subject self.sender = sender self.reply_to = reply_to self.cc = cc or [] self.bcc = bcc or [] self.body = body self.html = html self.date = date self.msgId = make_msgid() self.charset = charset self.extra_headers = extra_headers self.mail_options = mail_options or [] self.rcpt_options = rcpt_options or [] self.attachments = attachments or []
def connect(self): """Opens a connection to the mail host.""" app = getattr(self, "app", None) or current_app try: return Connection(app.extensions['mail']) except KeyError: raise RuntimeError("The curent application was not configured with Flask-Mail")
def init_app(self, app): """Initializes your mail settings from the application settings. You can use this if you want to set up your Mail instance at configuration time. :param app: Flask application instance """ state = self.init_mail(app.config, app.debug, app.testing) # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['mail'] = state return state
def metadata(self): """ Backwards compatibility, in old releases app.extensions['migrate'] was set to db, and env.py accessed app.extensions['migrate'].metadata """ return self.db.metadata
def init(directory=None, multidb=False): """Generates a new migration""" if directory is None: directory = current_app.extensions['migrate'].directory config = Config() config.set_main_option('script_location', directory) config.config_file_name = os.path.join(directory, 'alembic.ini') config = current_app.extensions['migrate'].\ migrate.call_configure_callbacks(config) if multidb: command.init(config, directory, 'flask-multidb') else: command.init(config, directory, 'flask')
def revision(directory=None, message=None, autogenerate=False, sql=False, head='head', splice=False, branch_label=None, version_path=None, rev_id=None): """Create a new revision file.""" config = current_app.extensions['migrate'].migrate.get_config(directory) if alembic_version >= (0, 7, 0): command.revision(config, message, autogenerate=autogenerate, sql=sql, head=head, splice=splice, branch_label=branch_label, version_path=version_path, rev_id=rev_id) else: command.revision(config, message, autogenerate=autogenerate, sql=sql)
def migrate(directory=None, message=None, sql=False, head='head', splice=False, branch_label=None, version_path=None, rev_id=None): """Alias for 'revision --autogenerate'""" config = current_app.extensions['migrate'].migrate.get_config( directory, opts=['autogenerate']) if alembic_version >= (0, 7, 0): command.revision(config, message, autogenerate=True, sql=sql, head=head, splice=splice, branch_label=branch_label, version_path=version_path, rev_id=rev_id) else: command.revision(config, message, autogenerate=True, sql=sql)
def edit(revision='current', directory=None): """Edit current revision.""" if alembic_version >= (0, 8, 0): config = current_app.extensions['migrate'].migrate.get_config( directory) command.edit(config, revision) else: raise RuntimeError('Alembic 0.8.0 or greater is required')
def merge(directory=None, revisions='', message=None, branch_label=None, rev_id=None): """Merge two revisions together. Creates a new migration file""" if alembic_version >= (0, 7, 0): config = current_app.extensions['migrate'].migrate.get_config( directory) command.merge(config, revisions, message=message, branch_label=branch_label, rev_id=rev_id) else: raise RuntimeError('Alembic 0.7.0 or greater is required')
def downgrade(directory=None, revision='-1', sql=False, tag=None, x_arg=None): """Revert to a previous version""" config = current_app.extensions['migrate'].migrate.get_config(directory, x_arg=x_arg) if sql and revision == '-1': revision = 'head:-1' command.downgrade(config, revision, sql=sql, tag=tag)
def show(directory=None, revision='head'): """Show the revision denoted by the given symbol.""" if alembic_version >= (0, 7, 0): config = current_app.extensions['migrate'].migrate.get_config( directory) command.show(config, revision) else: raise RuntimeError('Alembic 0.7.0 or greater is required')
def history(directory=None, rev_range=None, verbose=False): """List changeset scripts in chronological order.""" config = current_app.extensions['migrate'].migrate.get_config(directory) if alembic_version >= (0, 7, 0): command.history(config, rev_range, verbose=verbose) else: command.history(config, rev_range)
def heads(directory=None, verbose=False, resolve_dependencies=False): """Show current available heads in the script directory""" if alembic_version >= (0, 7, 0): config = current_app.extensions['migrate'].migrate.get_config( directory) command.heads(config, verbose=verbose, resolve_dependencies=resolve_dependencies) else: raise RuntimeError('Alembic 0.7.0 or greater is required')
def branches(directory=None, verbose=False): """Show current branch points""" config = current_app.extensions['migrate'].migrate.get_config(directory) if alembic_version >= (0, 7, 0): command.branches(config, verbose=verbose) else: command.branches(config)
def stamp(directory=None, revision='head', sql=False, tag=None): """'stamp' the revision table with the given revision; don't run any migrations""" config = current_app.extensions['migrate'].migrate.get_config(directory) command.stamp(config, revision, sql=sql, tag=tag)
def init_app(self, app): if not hasattr(app, 'extensions'): app.extensions = {} app.extensions['pagedown'] = _pagedown() app.context_processor(self.context_processor)
def context_processor(): return { 'pagedown': current_app.extensions['pagedown'] }
def __init__(self, subject=None, recipients=None, body=None, html=None, sender=None, cc=None, bcc=None, attachments=None, reply_to=None, date=None, charset=None, extra_headers=None): sender = sender if isinstance(sender, tuple): sender = "%s <%s>" % sender self.recipients = recipients or [] self.subject = subject self.sender = sender or current_app.extensions['mail'].default_sender self.reply_to = reply_to self.cc = cc or [] self.bcc = bcc or [] self.body = body self.html = html self.date = date self.msgId = make_msgid() self.charset = charset self.extra_headers = extra_headers self.attachments = attachments or []
def init_app(self, app): if not hasattr(app, 'extensions'): app.extensions = {} app.extensions['moment'] = _moment app.context_processor(self.context_processor)
def context_processor(): return { 'moment': current_app.extensions['moment'] }
def create(self, timestamp = None): return current_app.extensions['moment'](timestamp)
def metadata(self): """Backwards compatibility, in old releases app.extensions['migrate'] was set to db, and env.py accessed app.extensions['migrate'].metadata""" return self.db.metadata
def init_app(self, app, db, directory = 'migrations'): if not hasattr(app, 'extensions'): app.extensions = {} app.extensions['migrate'] = _MigrateConfig(db, directory)
def init(directory = None): "Generates a new migration" if directory is None: directory = current_app.extensions['migrate'].directory config = Config() config.set_main_option('script_location', directory) config.config_file_name = os.path.join(directory, 'alembic.ini') command.init(config, directory, 'flask')