Python flask_sqlalchemy 模块,SQLAlchemy() 实例源码

我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用flask_sqlalchemy.SQLAlchemy()

项目:flask-msearch    作者:honmaple    | 项目源码 | 文件源码
def setUp(self):
        class TestConfig(object):
            SQLALCHEMY_TRACK_MODIFICATIONS = True
            SQLALCHEMY_DATABASE_URI = 'sqlite://'
            DEBUG = True
            TESTING = True
            MSEARCH_INDEX_NAME = mkdtemp()
            # MSEARCH_BACKEND = 'whoosh'

        self.app = Flask(__name__)
        self.app.config.from_object(TestConfig())
        # we need this instance to be:
        #  a) global for all objects we share and
        #  b) fresh for every test run
        global db
        db = SQLAlchemy(self.app)
        self.search = Search(self.app, db=db)
        self.Post = None
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def github_callback_get_account(db, gh_api):
    """Processing GitHub callback action

    :param db: Database for storing GitHub user info
    :type db: ``flask_sqlalchemy.SQLAlchemy``
    :param gh_api: GitHub API client ready for the communication
    :type gh_api: ``repocribro.github.GitHubAPI``
    :return: User account and flag if it's new one
    :rtype: tuple of ``repocribro.models.UserAccount``, bool
    """
    user_data = gh_api.get('/user').data
    gh_user = db.session.query(User).filter(
        User.github_id == user_data['id']
    ).first()
    is_new = False
    if gh_user is None:
        user_account = UserAccount()
        db.session.add(user_account)
        gh_user = User.create_from_dict(user_data, user_account)
        db.session.add(gh_user)
        db.session.commit()
        is_new = True
    return gh_user.user_account, is_new
项目:cartographer    作者:Patreon    | 项目源码 | 文件源码
def connect(app):
    from flask_sqlalchemy import SQLAlchemy
    from sqlalchemy.ext.declarative import declarative_base
    from generic_social_network.app.models.tables.my_model import MyModel

    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + _db_file_location()

    class MySQLAlchemy(SQLAlchemy):
        def make_declarative_base(self):
            from flask.ext.sqlalchemy import _BoundDeclarativeMeta, _QueryProperty
            base = declarative_base(cls=MyModel, name='MyModel', metaclass=_BoundDeclarativeMeta)
            base.query = _QueryProperty(self)
            return base

    db = MySQLAlchemy(app)
    # db.engine.echo = True
    return db
项目:pwnedhub    作者:lanmaster53    | 项目源码 | 文件源码
def __init__(self, app, db, table, key_prefix):
        if db is None:
            from flask_sqlalchemy import SQLAlchemy
            db = SQLAlchemy(app)
        self.db = db
        self.key_prefix = key_prefix

        class Session(self.db.Model):
            __tablename__ = table

            id = self.db.Column(self.db.Integer, primary_key=True)
            session_id = self.db.Column(self.db.String(256), unique=True)
            data = self.db.Column(self.db.LargeBinary)
            expiry = self.db.Column(self.db.DateTime)

            def __init__(self, session_id, data, expiry):
                self.session_id = session_id
                self.data = data
                self.expiry = expiry

            def __repr__(self):
                return '<Session data %s>' % self.data

        self.db.create_all()
        self.sql_session_model = Session
项目:fleaker    作者:croscon    | 项目源码 | 文件源码
def _find_db_ext():
    """Find the current ORM Extension this App is using.

    After we discover and initialize the proper ORM Extension we need the
    ``db`` proxy to resolve to it, which is exactly what this method does here.

    Does this by simply inspecting a global, but that could be done better.

    Returns:
        flask_utils.FlaskDB|flask_sqlalchemy.SQLAlchemy: The correct and
            current ORM Extension in use for this App.
    """
    if _SELECTED_BACKEND is MISSING:
        # @TODO: Do like Flask does with the context error and have a single
        # leading line that sort of explains the issue and is easily searched?
        raise RuntimeError("You have attempted to use the Fleaker DB proxy "
                           "before it was initialized! Please ensure you are "
                           "in the right context or push it yourself! Please "
                           "see the documentation for more information!")
    return _SELECTED_BACKEND
项目:flask-image-alchemy    作者:rstit    | 项目源码 | 文件源码
def create_app():
    app = Flask(__name__)

    # SQLAlchemy config
    app.config['SECRET_KEY'] = '123456790'
    app.config['SQLALCHEMY_DATABASE_URI'] = \
        'postgresql+psycopg2://localhost:5432/test'

    # Flask-ImageAlchemy config
    app.config['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID')
    app.config['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY')
    app.config['AWS_REGION_NAME'] = os.environ.get('AWS_REGION_NAME', 'eu-central-1')
    app.config['S3_BUCKET_NAME'] = os.environ.get('AWS_REGION_NAME', 'haraka-local')

    # init extensions
    db.init_app(app)
    s3_storage.init_app(app)

    return app

# create app
项目:Prism    作者:Stumblinbear    | 项目源码 | 文件源码
def __init__(self, flask_app, config):
        self._flask_app = flask_app
        self.config = config

        self._plugin_manager = None

        self._flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///%s' % os.path.join(prism.settings.CONFIG_FOLDER, 'prism.db')
        self._flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
        self._database = SQLAlchemy(self._flask_app)
项目:chrononaut    作者:onecodex    | 项目源码 | 文件源码
def test_unversioned_db_fixture(unversioned_db):
    """Test unversioned SQLAlchemy object.
    """
    assert unversioned_db.__class__ == flask_sqlalchemy.SQLAlchemy
    assert unversioned_db.session.__class__ == sqlalchemy.orm.scoping.scoped_session
    assert (unversioned_db.session.session_factory().__class__.__name__ ==
            flask_sqlalchemy.SignallingSession.__name__)
项目:chrononaut    作者:onecodex    | 项目源码 | 文件源码
def unversioned_db(app, request):
    """An unversioned db fixture.
    """
    db = flask_sqlalchemy.SQLAlchemy(app)
    yield db
项目:chrononaut    作者:onecodex    | 项目源码 | 文件源码
def versions(self, before=None, after=None, return_query=False):
        """Fetch the history of the given object from its history table.

        :param before: Return changes only _before_ the provided ``DateTime``.
        :param before: Return changes only _after_ the provided ``DateTime``.
        :param return_query: Return a SQLAlchemy query instead of a list of models.
        :return: List of history models for the given object (or a query object).
        """
        # get the primary keys for this table
        prim_keys = [k.key for k in self.__history_mapper__.primary_key if k.key != 'version']

        # Find all previous versions that have the same primary keys as myself
        query = self.__history_mapper__.class_.query.filter_by(
            **{k: getattr(self, k) for k in prim_keys}
        )

        # Filter additionally by date as needed
        if before is not None:
            query = query.filter(self.__history_mapper__.class_.changed <= before)
        if after is not None:
            query = query.filter(self.__history_mapper__.class_.changed >= after)

        # Order by the version
        query = query.order_by(self.__history_mapper__.class_.version)

        if return_query:
            return query
        else:
            return query.all()
项目:game_recommendations    作者:ceorourke    | 项目源码 | 文件源码
def init_app():
    # So that we can use Flask-SQLAlchemy, we'll make a Flask app.
    from flask import Flask
    app = Flask(__name__)

    connect_to_db(app)
    print "Connected to DB."
项目:flask-bitmapist    作者:cuttlesoft    | 项目源码 | 文件源码
def sqlalchemy_db(app):
    from flask_sqlalchemy import SQLAlchemy

    db = SQLAlchemy(app)
    return db
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def get_all_latest_submissions(self) -> '_MyQuery[Work]':
        """Get a list of all the latest submissions (:class:`Work`) by each
        :class:`User` who has submitted at least one work for this assignment.

        :returns: The latest submissions.
        """
        # get_from_latest_submissions uses SQLAlchemy magic that MyPy cannot
        # encode.
        return self.get_from_latest_submissions(t.cast(Work, Work))
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def configure_app(config_object, app):
    app.config.from_object(config_object)
    db = SQLAlchemy(app)
项目:commento    作者:rahulrrixe    | 项目源码 | 文件源码
def new():

    if request.method == 'POST':
        # The request is POST with some data, get POST data and validate it.
        # The form data is available in request.form dictionary.
        # Check if all the fields are entered. If not, raise an error
        if not request.form['name'] or not request.form['email'] or not request.form['comment']:
            flash('Please enter all the fields', 'error')
        else:
            # The data is valid. So create a new 'Comments' object
            # to save to the database
            comment = Comments(request.form['name'],
                               request.form['email'],
                               request.form['comment'])

            # Add it to the SQLAlchemy session and commit it to
            # save it to the database
            db.session.add(comment)
            db.session.commit()

            # Flash a success message
            flash('Comment was successfully submitted')

            # Redirect to the view showing all the comments
            return redirect(url_for('show_all'))

    # Render the form template if the request is a GET request or
    # the form validation failed
    return render_template('new.html')


# This is the code that gets executed when the current python file is
# executed.
项目:librarian    作者:HERA-Team    | 项目源码 | 文件源码
def _initialize():
    import json
    import os.path
    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy

    config_path = os.environ.get('LIBRARIAN_CONFIG_PATH', 'server-config.json')
    with open(config_path) as f:
        config = json.load(f)

    if 'SECRET_KEY' not in config:
        print('cannot start server: must define the Flask "secret key" as the item '
              '"SECRET_KEY" in "server-config.json"', file=sys.stderr)
        sys.exit(1)

    # TODO: configurable logging parameters will likely be helpful. We use UTC
    # for timestamps using standard ISO-8601 formatting. The Python docs claim
    # that 8601 is the default format but this does not appear to be true.
    loglevel_cfg = config.get('log_level', 'info')
    loglevel = _log_level_names.get(loglevel_cfg)
    warn_loglevel = (loglevel is None)
    if warn_loglevel:
        loglevel = logging.INFO

    logging.basicConfig(
        level=loglevel,
        format='%(asctime)s %(levelname)s: %(message)s',
        datefmt='%Y-%m-%dT%H:%M:%SZ'
    )
    import time
    logging.getLogger('').handlers[0].formatter.converter = time.gmtime
    logger = logging.getLogger('librarian')

    if warn_loglevel:
        logger.warn('unrecognized value %r for "log_level" config item', loglevel_cfg)

    tf = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
    app = Flask('librarian', template_folder=tf)
    app.config.update(config)
    db = SQLAlchemy(app)
    return logger, app, db
项目:flask-sessionstore    作者:mcrowson    | 项目源码 | 文件源码
def __init__(self, app, db, table, key_prefix, use_signer=False,
                 permanent=True):
        if db is None:
            from flask_sqlalchemy import SQLAlchemy
            db = SQLAlchemy(app)
        self.db = db
        self.key_prefix = key_prefix
        self.use_signer = use_signer
        self.permanent = permanent

        class Session(self.db.Model):
            __tablename__ = table

            id = self.db.Column(self.db.Integer, primary_key=True)
            session_id = self.db.Column(self.db.String(255), unique=True)
            data = self.db.Column(self.db.Text)
            expiry = self.db.Column(self.db.DateTime)

            def __init__(self, session_id, data, expiry):
                self.session_id = session_id
                self.data = data
                self.expiry = expiry

            def __repr__(self):
                return '<Session data {0!s}>'.format(self.data)

        self.sql_session_model = Session
项目:zoom-autocomplete-demo    作者:kenju254    | 项目源码 | 文件源码
def from_sql(row):
    """
    Translates a SQLAlchemy model instance into a dictionary
    """
    data = row.__dict__.copy()
    data['id'] = row.id
    data.pop('_sa_instance_state')
    return data
项目:Browserat    作者:Dor-Tumarkin    | 项目源码 | 文件源码
def run_server():
    global app
    app.run(host='0.0.0.0', port=80, threaded=True,debug=False, use_reloader=False)

# SQLAlchemy Models
项目:demo.slackbot    作者:SynapseFI    | 项目源码 | 文件源码
def connect_db(**kwargs):
    """Link the app database to a postgres db with the supplied info."""
    db_uri = 'postgresql://{0}:{1}@{2}:{3}/{4}'.format(kwargs.get('username'),
                                                       kwargs.get('password'),
                                                       kwargs.get('host'),
                                                       kwargs.get('port'),
                                                       kwargs.get('database'))
    app = kwargs.get('app')
    db = SQLAlchemy(app)
    app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
    return db
项目:edmunds    作者:LowieHuyghe    | 项目源码 | 文件源码
def test_model(self):
        """
        Test model
        :return:    void
        """

        test_db = DatabaseManager.get_sql_alchemy_instance()

        self.assert_is_instance(db, LocalProxy)
        self.assert_is_instance(db._get_current_object(), SQLAlchemy)
        self.assert_equal_deep(test_db, db._get_current_object())

        self.assert_equal_deep(sqlalchemy_mapper, mapper)
        self.assert_equal_deep(sqlalchemy_relationship, relationship)
        self.assert_equal_deep(sqlalchemy_backref, backref)
项目:edmunds    作者:LowieHuyghe    | 项目源码 | 文件源码
def get_sql_alchemy_instance():
        """
        Get sql alchemy instance
        :return:    SQLAlchemy
        :rtype:     flask_sqlalchemy.SQLAlchemy
        """

        if DatabaseManager._sql_alchemy_instance is None:
            DatabaseManager._sql_alchemy_instance = SQLAlchemy()
        return DatabaseManager._sql_alchemy_instance
项目:edmunds    作者:LowieHuyghe    | 项目源码 | 文件源码
def _create_my_sql(self, config):
        """
        Create my sql
        :param config:  The config
        :return:        SQLAlchemy Engine
        :rtype:         sqlalchemy.engine.base.Engine
        """

        db = DatabaseManager.get_sql_alchemy_instance()

        bind = config['name']
        if self._instances_config and self._instances_config[0]['name'] == config['name']:
            bind = None
        return db.get_engine(bind=bind)
项目:edmunds    作者:LowieHuyghe    | 项目源码 | 文件源码
def _create_postgre_sql(self, config):
        """
        Create PostgreSQL
        :param config:  The config
        :return:        SQLAlchemy Engine
        :rtype:         sqlalchemy.engine.base.Engine
        """

        db = DatabaseManager.get_sql_alchemy_instance()

        bind = config['name']
        if self._instances_config and self._instances_config[0]['name'] == config['name']:
            bind = None
        return db.get_engine(bind=bind)
项目:edmunds    作者:LowieHuyghe    | 项目源码 | 文件源码
def _create_sqlite(self, config):
        """
        Create SQLite
        :param config:  The config
        :return:        SQLAlchemy Engine
        :rtype:         sqlalchemy.engine.base.Engine
        """

        db = DatabaseManager.get_sql_alchemy_instance()

        bind = config['name']
        if self._instances_config and self._instances_config[0]['name'] == config['name']:
            bind = None
        return db.get_engine(bind=bind)
项目:ezdmb    作者:angryrancor    | 项目源码 | 文件源码
def setupRestApi():
    app = Flask(__name__)
    app.config.update(
        DEBUG=True,
        SQLALCHEMY_DATABASE_URI='sqlite:///dmb.db',
        SQLALCHEMY_TRACK_MODIFICATIONS=False
    )
    db = SQLAlchemy(app)

    class MenuItem(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        menuItemName = db.Column(db.Text)
        price = db.Column(db.Float)

    class ItemAddOn(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        menuItemId = db.Column(db.Integer, db.ForeignKey('menu_item.id'))
        addOnName = db.Column(db.Text)
        price = db.Column(db.Integer)

    db.create_all()
    manager = APIManager(app, flask_sqlalchemy_db=db)
    methods = ['GET', 'POST', 'DELETE', 'PATCH']
    url_prefix = '/dmb'
    manager.create_api(MenuItem, methods=methods, url_prefix=url_prefix)
    manager.create_api(ItemAddOn, methods=methods, url_prefix=url_prefix)
    app.run()
项目:Stroll-Safely    作者:Munnu    | 项目源码 | 文件源码
def init_app():
    # So that we can use Flask-SQLAlchemy, we'll make a Flask app
    from flask import Flask
    app = Flask(__name__)

    connect_to_db(app)
    print "Connected to DB."
项目:fleaker    作者:croscon    | 项目源码 | 文件源码
def _discover_ideal_backend(orm_backend):
    """Auto-discover the ideal backend based on what is installed.

    Right now, handles discovery of:
      * PeeWee
      * SQLAlchemy

    Args:
        orm_backend (str): The ``orm_backend`` value that was passed to the
            ``create_app`` function. That is, the ORM Backend the User
            indicated they wanted to use.

    Returns:
        str|fleaker.missing.MissingSentinel: Returns a string for the ideal
            backend if it found one, or :obj:`fleaker.MISSING` if we couldn't
            find one.

    Raises:
        RuntimeError: Raised if no user provided ORM Backend is given and BOTH
            PeeWee and SQLAlchemy are installed.
    """
    if orm_backend:
        return orm_backend

    if peewee is not MISSING and sqlalchemy is not MISSING:
        raise RuntimeError('Both PeeWee and SQLAlchemy detected as installed, '
                           'but no explicit backend provided! Please specify '
                           'one!')
    if peewee is not MISSING:
        return _PEEWEE_BACKEND
    elif sqlalchemy is not MISSING:
        return _SQLALCHEMY_BACKEND
    else:
        return MISSING
项目:GridLight-Server    作者:Lunabit    | 项目源码 | 文件源码
def _regenerate_database(db: SQLAlchemy):
    """ For testing purposes only. """
    db.drop_all()
    db.create_all()
    db.session.commit()
项目:taskflow    作者:CityOfPhiladelphia    | 项目源码 | 文件源码
def create_app(taskflow_instance, connection_string=None, secret_key=None):
    app = flask.Flask(__name__)
    app.config['DEBUG'] = os.getenv('DEBUG', False)
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SQLALCHEMY_DATABASE_URI'] = connection_string or os.getenv('SQL_ALCHEMY_CONNECTION')
    app.config['SESSION_COOKIE_NAME'] = 'taskflowsession'
    app.config['SESSION_COOKIE_HTTPONLY'] = True
    app.config['PERMANENT_SESSION_LIFETIME'] = 43200
    app.config['SECRET_KEY'] = secret_key or os.getenv('FLASK_SESSION_SECRET_KEY')

    db = SQLAlchemy(metadata=metadata, model_class=BaseModel)

    db.init_app(app)
    api = Api(app)
    CORS(app, supports_credentials=True)

    login_manager = LoginManager()
    login_manager.init_app(app)

    @login_manager.user_loader
    def load_user(user_id):
        return db.session.query(User).filter(User.id == user_id).first()

    def apply_attrs(class_def, attrs):
        for key, value in attrs.items():
            setattr(class_def, key, value)
        return class_def

    attrs = {
        'session': db.session,
        'taskflow': taskflow_instance
    }

    with app.app_context():
        api.add_resource(apply_attrs(resources.LocalSessionResource, attrs), '/v1/session')

        api.add_resource(apply_attrs(resources.WorkflowListResource, attrs), '/v1/workflows')
        api.add_resource(apply_attrs(resources.WorkflowResource, attrs), '/v1/workflows/<workflow_name>')

        api.add_resource(apply_attrs(resources.TaskListResource, attrs), '/v1/tasks')
        api.add_resource(apply_attrs(resources.TaskResource, attrs), '/v1/tasks/<task_name>')

        api.add_resource(apply_attrs(resources.WorkflowInstanceListResource, attrs), '/v1/workflow-instances')
        api.add_resource(apply_attrs(resources.WorkflowInstanceResource, attrs), '/v1/workflow-instances/<int:instance_id>')

        api.add_resource(apply_attrs(resources.RecurringWorkflowLastestResource, attrs), '/v1/workflow-instances/recurring-latest')

        api.add_resource(apply_attrs(resources.TaskInstanceListResource, attrs), '/v1/task-instances')
        api.add_resource(apply_attrs(resources.TaskInstanceResource, attrs), '/v1/task-instances/<int:instance_id>')

        api.add_resource(apply_attrs(resources.RecurringTaskLastestResource, attrs), '/v1/task-instances/recurring-latest')

    return app
项目:opentaxforms    作者:jsaponara    | 项目源码 | 文件源码
def createApi(app,**kw):
    db = SQLAlchemy(app)
    conn, engine, metadata, md = connect(appname, **kw)
    Base = declarative_base()
    Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    mysession = scoped_session(Session)
    apimanager = flask_restless.APIManager(app, session=mysession)
    counts = {}
    for tabl in md:
        tablobj = md[tabl]
        counts[tabl] = tablobj.count().execute().fetchone()[0]
        attrs = dict(
            __table__=tablobj,
            # todo should flask_restless need __tablename__?
            __tablename__=str(tabl),
            )
        attrs.update(dict(
            orgn=dict(
                form=db.relationship('Form'),
                ),
            form=dict(
                orgn=db.relationship('Orgn', back_populates='form'),
                slot=db.relationship('Slot', back_populates='form'),
                ),
            slot=dict(
                form=db.relationship('Form'),
                ),
            )[tabl])
        tablcls = type(str(tabl).capitalize(), (Base, ), attrs)
        colsToAdd = dict(
            orgn=(),
            form=(
                'orgn', 'orgn.code',
                ),
            slot=(
                'form', 'form.code',
                ),
            )[tabl]
        colsToShow = [c.name for c in tablobj.columns]
        colsToShow.extend(colsToAdd)
        # print tabl,colsToShow
        apimanager.create_api(
            tablcls,
            url_prefix='/api/v%s' % (apiVersion, ),
            include_columns=colsToShow,
            )
    return counts