我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用flask_sqlalchemy.SQLAlchemy()。
def setUp(self): class TestConfig(object): SQLALCHEMY_TRACK_MODIFICATIONS = True SQLALCHEMY_DATABASE_URI = 'sqlite://' DEBUG = True TESTING = True MSEARCH_INDEX_NAME = mkdtemp() # MSEARCH_BACKEND = 'whoosh' self.app = Flask(__name__) self.app.config.from_object(TestConfig()) # we need this instance to be: # a) global for all objects we share and # b) fresh for every test run global db db = SQLAlchemy(self.app) self.search = Search(self.app, db=db) self.Post = None
def github_callback_get_account(db, gh_api): """Processing GitHub callback action :param db: Database for storing GitHub user info :type db: ``flask_sqlalchemy.SQLAlchemy`` :param gh_api: GitHub API client ready for the communication :type gh_api: ``repocribro.github.GitHubAPI`` :return: User account and flag if it's new one :rtype: tuple of ``repocribro.models.UserAccount``, bool """ user_data = gh_api.get('/user').data gh_user = db.session.query(User).filter( User.github_id == user_data['id'] ).first() is_new = False if gh_user is None: user_account = UserAccount() db.session.add(user_account) gh_user = User.create_from_dict(user_data, user_account) db.session.add(gh_user) db.session.commit() is_new = True return gh_user.user_account, is_new
def connect(app): from flask_sqlalchemy import SQLAlchemy from sqlalchemy.ext.declarative import declarative_base from generic_social_network.app.models.tables.my_model import MyModel app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + _db_file_location() class MySQLAlchemy(SQLAlchemy): def make_declarative_base(self): from flask.ext.sqlalchemy import _BoundDeclarativeMeta, _QueryProperty base = declarative_base(cls=MyModel, name='MyModel', metaclass=_BoundDeclarativeMeta) base.query = _QueryProperty(self) return base db = MySQLAlchemy(app) # db.engine.echo = True return db
def __init__(self, app, db, table, key_prefix): if db is None: from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy(app) self.db = db self.key_prefix = key_prefix class Session(self.db.Model): __tablename__ = table id = self.db.Column(self.db.Integer, primary_key=True) session_id = self.db.Column(self.db.String(256), unique=True) data = self.db.Column(self.db.LargeBinary) expiry = self.db.Column(self.db.DateTime) def __init__(self, session_id, data, expiry): self.session_id = session_id self.data = data self.expiry = expiry def __repr__(self): return '<Session data %s>' % self.data self.db.create_all() self.sql_session_model = Session
def _find_db_ext(): """Find the current ORM Extension this App is using. After we discover and initialize the proper ORM Extension we need the ``db`` proxy to resolve to it, which is exactly what this method does here. Does this by simply inspecting a global, but that could be done better. Returns: flask_utils.FlaskDB|flask_sqlalchemy.SQLAlchemy: The correct and current ORM Extension in use for this App. """ if _SELECTED_BACKEND is MISSING: # @TODO: Do like Flask does with the context error and have a single # leading line that sort of explains the issue and is easily searched? raise RuntimeError("You have attempted to use the Fleaker DB proxy " "before it was initialized! Please ensure you are " "in the right context or push it yourself! Please " "see the documentation for more information!") return _SELECTED_BACKEND
def create_app(): app = Flask(__name__) # SQLAlchemy config app.config['SECRET_KEY'] = '123456790' app.config['SQLALCHEMY_DATABASE_URI'] = \ 'postgresql+psycopg2://localhost:5432/test' # Flask-ImageAlchemy config app.config['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID') app.config['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY') app.config['AWS_REGION_NAME'] = os.environ.get('AWS_REGION_NAME', 'eu-central-1') app.config['S3_BUCKET_NAME'] = os.environ.get('AWS_REGION_NAME', 'haraka-local') # init extensions db.init_app(app) s3_storage.init_app(app) return app # create app
def __init__(self, flask_app, config): self._flask_app = flask_app self.config = config self._plugin_manager = None self._flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///%s' % os.path.join(prism.settings.CONFIG_FOLDER, 'prism.db') self._flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False self._database = SQLAlchemy(self._flask_app)
def test_unversioned_db_fixture(unversioned_db): """Test unversioned SQLAlchemy object. """ assert unversioned_db.__class__ == flask_sqlalchemy.SQLAlchemy assert unversioned_db.session.__class__ == sqlalchemy.orm.scoping.scoped_session assert (unversioned_db.session.session_factory().__class__.__name__ == flask_sqlalchemy.SignallingSession.__name__)
def unversioned_db(app, request): """An unversioned db fixture. """ db = flask_sqlalchemy.SQLAlchemy(app) yield db
def versions(self, before=None, after=None, return_query=False): """Fetch the history of the given object from its history table. :param before: Return changes only _before_ the provided ``DateTime``. :param before: Return changes only _after_ the provided ``DateTime``. :param return_query: Return a SQLAlchemy query instead of a list of models. :return: List of history models for the given object (or a query object). """ # get the primary keys for this table prim_keys = [k.key for k in self.__history_mapper__.primary_key if k.key != 'version'] # Find all previous versions that have the same primary keys as myself query = self.__history_mapper__.class_.query.filter_by( **{k: getattr(self, k) for k in prim_keys} ) # Filter additionally by date as needed if before is not None: query = query.filter(self.__history_mapper__.class_.changed <= before) if after is not None: query = query.filter(self.__history_mapper__.class_.changed >= after) # Order by the version query = query.order_by(self.__history_mapper__.class_.version) if return_query: return query else: return query.all()
def init_app(): # So that we can use Flask-SQLAlchemy, we'll make a Flask app. from flask import Flask app = Flask(__name__) connect_to_db(app) print "Connected to DB."
def sqlalchemy_db(app): from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy(app) return db
def get_all_latest_submissions(self) -> '_MyQuery[Work]': """Get a list of all the latest submissions (:class:`Work`) by each :class:`User` who has submitted at least one work for this assignment. :returns: The latest submissions. """ # get_from_latest_submissions uses SQLAlchemy magic that MyPy cannot # encode. return self.get_from_latest_submissions(t.cast(Work, Work))
def configure_app(config_object, app): app.config.from_object(config_object) db = SQLAlchemy(app)
def new(): if request.method == 'POST': # The request is POST with some data, get POST data and validate it. # The form data is available in request.form dictionary. # Check if all the fields are entered. If not, raise an error if not request.form['name'] or not request.form['email'] or not request.form['comment']: flash('Please enter all the fields', 'error') else: # The data is valid. So create a new 'Comments' object # to save to the database comment = Comments(request.form['name'], request.form['email'], request.form['comment']) # Add it to the SQLAlchemy session and commit it to # save it to the database db.session.add(comment) db.session.commit() # Flash a success message flash('Comment was successfully submitted') # Redirect to the view showing all the comments return redirect(url_for('show_all')) # Render the form template if the request is a GET request or # the form validation failed return render_template('new.html') # This is the code that gets executed when the current python file is # executed.
def _initialize(): import json import os.path from flask import Flask from flask_sqlalchemy import SQLAlchemy config_path = os.environ.get('LIBRARIAN_CONFIG_PATH', 'server-config.json') with open(config_path) as f: config = json.load(f) if 'SECRET_KEY' not in config: print('cannot start server: must define the Flask "secret key" as the item ' '"SECRET_KEY" in "server-config.json"', file=sys.stderr) sys.exit(1) # TODO: configurable logging parameters will likely be helpful. We use UTC # for timestamps using standard ISO-8601 formatting. The Python docs claim # that 8601 is the default format but this does not appear to be true. loglevel_cfg = config.get('log_level', 'info') loglevel = _log_level_names.get(loglevel_cfg) warn_loglevel = (loglevel is None) if warn_loglevel: loglevel = logging.INFO logging.basicConfig( level=loglevel, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%SZ' ) import time logging.getLogger('').handlers[0].formatter.converter = time.gmtime logger = logging.getLogger('librarian') if warn_loglevel: logger.warn('unrecognized value %r for "log_level" config item', loglevel_cfg) tf = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') app = Flask('librarian', template_folder=tf) app.config.update(config) db = SQLAlchemy(app) return logger, app, db
def __init__(self, app, db, table, key_prefix, use_signer=False, permanent=True): if db is None: from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy(app) self.db = db self.key_prefix = key_prefix self.use_signer = use_signer self.permanent = permanent class Session(self.db.Model): __tablename__ = table id = self.db.Column(self.db.Integer, primary_key=True) session_id = self.db.Column(self.db.String(255), unique=True) data = self.db.Column(self.db.Text) expiry = self.db.Column(self.db.DateTime) def __init__(self, session_id, data, expiry): self.session_id = session_id self.data = data self.expiry = expiry def __repr__(self): return '<Session data {0!s}>'.format(self.data) self.sql_session_model = Session
def from_sql(row): """ Translates a SQLAlchemy model instance into a dictionary """ data = row.__dict__.copy() data['id'] = row.id data.pop('_sa_instance_state') return data
def run_server(): global app app.run(host='0.0.0.0', port=80, threaded=True,debug=False, use_reloader=False) # SQLAlchemy Models
def connect_db(**kwargs): """Link the app database to a postgres db with the supplied info.""" db_uri = 'postgresql://{0}:{1}@{2}:{3}/{4}'.format(kwargs.get('username'), kwargs.get('password'), kwargs.get('host'), kwargs.get('port'), kwargs.get('database')) app = kwargs.get('app') db = SQLAlchemy(app) app.config['SQLALCHEMY_DATABASE_URI'] = db_uri return db
def test_model(self): """ Test model :return: void """ test_db = DatabaseManager.get_sql_alchemy_instance() self.assert_is_instance(db, LocalProxy) self.assert_is_instance(db._get_current_object(), SQLAlchemy) self.assert_equal_deep(test_db, db._get_current_object()) self.assert_equal_deep(sqlalchemy_mapper, mapper) self.assert_equal_deep(sqlalchemy_relationship, relationship) self.assert_equal_deep(sqlalchemy_backref, backref)
def get_sql_alchemy_instance(): """ Get sql alchemy instance :return: SQLAlchemy :rtype: flask_sqlalchemy.SQLAlchemy """ if DatabaseManager._sql_alchemy_instance is None: DatabaseManager._sql_alchemy_instance = SQLAlchemy() return DatabaseManager._sql_alchemy_instance
def _create_my_sql(self, config): """ Create my sql :param config: The config :return: SQLAlchemy Engine :rtype: sqlalchemy.engine.base.Engine """ db = DatabaseManager.get_sql_alchemy_instance() bind = config['name'] if self._instances_config and self._instances_config[0]['name'] == config['name']: bind = None return db.get_engine(bind=bind)
def _create_postgre_sql(self, config): """ Create PostgreSQL :param config: The config :return: SQLAlchemy Engine :rtype: sqlalchemy.engine.base.Engine """ db = DatabaseManager.get_sql_alchemy_instance() bind = config['name'] if self._instances_config and self._instances_config[0]['name'] == config['name']: bind = None return db.get_engine(bind=bind)
def _create_sqlite(self, config): """ Create SQLite :param config: The config :return: SQLAlchemy Engine :rtype: sqlalchemy.engine.base.Engine """ db = DatabaseManager.get_sql_alchemy_instance() bind = config['name'] if self._instances_config and self._instances_config[0]['name'] == config['name']: bind = None return db.get_engine(bind=bind)
def setupRestApi(): app = Flask(__name__) app.config.update( DEBUG=True, SQLALCHEMY_DATABASE_URI='sqlite:///dmb.db', SQLALCHEMY_TRACK_MODIFICATIONS=False ) db = SQLAlchemy(app) class MenuItem(db.Model): id = db.Column(db.Integer, primary_key=True) menuItemName = db.Column(db.Text) price = db.Column(db.Float) class ItemAddOn(db.Model): id = db.Column(db.Integer, primary_key=True) menuItemId = db.Column(db.Integer, db.ForeignKey('menu_item.id')) addOnName = db.Column(db.Text) price = db.Column(db.Integer) db.create_all() manager = APIManager(app, flask_sqlalchemy_db=db) methods = ['GET', 'POST', 'DELETE', 'PATCH'] url_prefix = '/dmb' manager.create_api(MenuItem, methods=methods, url_prefix=url_prefix) manager.create_api(ItemAddOn, methods=methods, url_prefix=url_prefix) app.run()
def init_app(): # So that we can use Flask-SQLAlchemy, we'll make a Flask app from flask import Flask app = Flask(__name__) connect_to_db(app) print "Connected to DB."
def _discover_ideal_backend(orm_backend): """Auto-discover the ideal backend based on what is installed. Right now, handles discovery of: * PeeWee * SQLAlchemy Args: orm_backend (str): The ``orm_backend`` value that was passed to the ``create_app`` function. That is, the ORM Backend the User indicated they wanted to use. Returns: str|fleaker.missing.MissingSentinel: Returns a string for the ideal backend if it found one, or :obj:`fleaker.MISSING` if we couldn't find one. Raises: RuntimeError: Raised if no user provided ORM Backend is given and BOTH PeeWee and SQLAlchemy are installed. """ if orm_backend: return orm_backend if peewee is not MISSING and sqlalchemy is not MISSING: raise RuntimeError('Both PeeWee and SQLAlchemy detected as installed, ' 'but no explicit backend provided! Please specify ' 'one!') if peewee is not MISSING: return _PEEWEE_BACKEND elif sqlalchemy is not MISSING: return _SQLALCHEMY_BACKEND else: return MISSING
def _regenerate_database(db: SQLAlchemy): """ For testing purposes only. """ db.drop_all() db.create_all() db.session.commit()
def create_app(taskflow_instance, connection_string=None, secret_key=None): app = flask.Flask(__name__) app.config['DEBUG'] = os.getenv('DEBUG', False) app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_DATABASE_URI'] = connection_string or os.getenv('SQL_ALCHEMY_CONNECTION') app.config['SESSION_COOKIE_NAME'] = 'taskflowsession' app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['PERMANENT_SESSION_LIFETIME'] = 43200 app.config['SECRET_KEY'] = secret_key or os.getenv('FLASK_SESSION_SECRET_KEY') db = SQLAlchemy(metadata=metadata, model_class=BaseModel) db.init_app(app) api = Api(app) CORS(app, supports_credentials=True) login_manager = LoginManager() login_manager.init_app(app) @login_manager.user_loader def load_user(user_id): return db.session.query(User).filter(User.id == user_id).first() def apply_attrs(class_def, attrs): for key, value in attrs.items(): setattr(class_def, key, value) return class_def attrs = { 'session': db.session, 'taskflow': taskflow_instance } with app.app_context(): api.add_resource(apply_attrs(resources.LocalSessionResource, attrs), '/v1/session') api.add_resource(apply_attrs(resources.WorkflowListResource, attrs), '/v1/workflows') api.add_resource(apply_attrs(resources.WorkflowResource, attrs), '/v1/workflows/<workflow_name>') api.add_resource(apply_attrs(resources.TaskListResource, attrs), '/v1/tasks') api.add_resource(apply_attrs(resources.TaskResource, attrs), '/v1/tasks/<task_name>') api.add_resource(apply_attrs(resources.WorkflowInstanceListResource, attrs), '/v1/workflow-instances') api.add_resource(apply_attrs(resources.WorkflowInstanceResource, attrs), '/v1/workflow-instances/<int:instance_id>') api.add_resource(apply_attrs(resources.RecurringWorkflowLastestResource, attrs), '/v1/workflow-instances/recurring-latest') api.add_resource(apply_attrs(resources.TaskInstanceListResource, attrs), '/v1/task-instances') api.add_resource(apply_attrs(resources.TaskInstanceResource, attrs), '/v1/task-instances/<int:instance_id>') api.add_resource(apply_attrs(resources.RecurringTaskLastestResource, attrs), '/v1/task-instances/recurring-latest') return app
def createApi(app,**kw): db = SQLAlchemy(app) conn, engine, metadata, md = connect(appname, **kw) Base = declarative_base() Session = sessionmaker(autocommit=False, autoflush=False, bind=engine) mysession = scoped_session(Session) apimanager = flask_restless.APIManager(app, session=mysession) counts = {} for tabl in md: tablobj = md[tabl] counts[tabl] = tablobj.count().execute().fetchone()[0] attrs = dict( __table__=tablobj, # todo should flask_restless need __tablename__? __tablename__=str(tabl), ) attrs.update(dict( orgn=dict( form=db.relationship('Form'), ), form=dict( orgn=db.relationship('Orgn', back_populates='form'), slot=db.relationship('Slot', back_populates='form'), ), slot=dict( form=db.relationship('Form'), ), )[tabl]) tablcls = type(str(tabl).capitalize(), (Base, ), attrs) colsToAdd = dict( orgn=(), form=( 'orgn', 'orgn.code', ), slot=( 'form', 'form.code', ), )[tabl] colsToShow = [c.name for c in tablobj.columns] colsToShow.extend(colsToAdd) # print tabl,colsToShow apimanager.create_api( tablcls, url_prefix='/api/v%s' % (apiVersion, ), include_columns=colsToShow, ) return counts