我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用app.db.session()。
def create_user(user, email, password=None, role='user'): if not password: password = read_pwd() data = dict( user_name=user, email=email, password=hash_pwd(password), active=True) errors = schema.UserSchema( exclude=('version_id',)).validate(data, db.session) if errors: raise InvalidCommand('Validation Error: %s' % errors) role = model.Role.query.filter_by(name=role).one() u = model.User(**data) u.roles.append(role) db.session.add(u) # @UndefinedVariable db.session.commit() # @UndefinedVariable
def cleanup(uploads_older_then=24, conversions_older_then=24 * 7, purge_empty_ebooks_dirs=False): since = datetime.now() - timedelta(hours=float(uploads_older_then)) for upload in model.Upload.query.filter(model.Upload.created < since): logic.delete_upload(upload) db.session.commit() purge_empty_dirs(settings.UPLOAD_DIR, delete_root=False) since = datetime.now() - timedelta(hours=float(conversions_older_then)) for batch in model.ConversionBatch.query.filter(model.ConversionBatch.created < since): logic.delete_conversion_batch(batch); for conversion in model.Conversion.query.filter(model.Conversion.created < since): logic.delete_conversion(conversion) db.session.commit() purge_empty_dirs(settings.BOOKS_CONVERTED_DIR, delete_root=False) if purge_empty_ebooks_dirs: purge_empty_dirs(settings.BOOKS_BASE_DIR, delete_root=False)
def api_delete(): uniqueid = request.args.get("uniqueid") file = File.query.filter_by(unique_id=uniqueid).first() if g.user.admin or (g.user.id == file.uploader_id): upload_folder = myapp.config['UPLOAD_FOLDER'] try: folder = os.path.join(upload_folder, file.unique_id) cmpl_path = os.path.join(folder, file.name) pubkey = file.publickey db.session.delete(pubkey) db.session.delete(file) db.session.commit() os.remove(cmpl_path) os.rmdir(folder) return jsonify(response=responds['FILE_DELETED']) except Exception as e: print(e) return jsonify(response=responds['SOME_ERROR']) else: return jsonify(response=responds['FAILED_AUTHORIZATION'])
def _adduser(): if request.method == 'POST': if g.user.admin: form = AddUser() if form.validate(): newuser = User(form.username.data, form.password.data, form.email.data) if form.adminuser.data is None: newuser.admin = False else: newuser.admin = form.adminuser.data session = db.session() try: session.add(newuser) session.commit() except Exception as e: session.rollback() return ret_dbfail_response(e) return jsonify(response=('New User ' + newuser.username + ' added.')) else: return jsonify(response=responds["FAILED_VALIDATION"])
def add_slack_game_post(): session = db.session data = flask.request.form token = data['token'] if token != slack_token: return "You're not who you say you are. Wrong token {}".format(token) winner_email, loser_email = data['text'].split(' beat ') winner = session.query(User).filter( User.email == winner_email, ).first() loser = session.query(User).filter( User.email == loser_email ).first() if not winner: return 'no player with email {}'.format(winner_email) if not loser: return 'no player with email {}'.format(loser_email) add_game(session, winner, loser, slack_user_submitted_by=data['user_name']) session.commit() return "{} beat {}! {}'s score is now {} and {}'s score is now {}".format(winner.name, loser.name, winner.name, round(winner.elo, 3), loser.name, round(loser.elo, 3))
def start_game_post(): session = db.session # todo make this read from json in_progress_player_1_id = 1 in_progress_player_2_id = 2 game = Game( in_progress_player_1_id=in_progress_player_1_id, in_progress_player_2_id=in_progress_player_2_id ) session.add(game) session.commit() return 'game id and maybe redirect to play game'
def load_user(user_id): session = db.session return session.query(User).get(user_id)
def dev_login(user_id): if ENVIRONMENT == 'dev': login_user(db.session.query(User).get(user_id)) return redirect(url_for('index'))
def index(): session = db.session current_user = flask.g.user active_users = get_active_users(session) return flask.render_template('index.html', title='Cratejoy Darts', current_user=current_user, active_users=active_users, auth_url=get_google_authorization_url())
def db(request, app): """Create test database tables""" _db.drop_all() # Create the tables based on the current model _db.create_all() user = User.create_test_user() TestClient.test_user = user app.test_client_class = TestClient app.response_class = TestResponse _db.session.commit()
def session(request, monkeypatch): """Prevent the session from closing""" # Roll back at the end of every test request.addfinalizer(_db.session.remove) # Prevent the session from closing (make it a no-op) and # committing (redirect to flush() instead) # https://alextechrants.blogspot.com/2014/01/unit-testing-sqlalchemy-apps-part-2.html # monkeypatch.setattr(_db.session, 'commit', _db.session.flush) monkeypatch.setattr(_db.session, 'remove', lambda: None)
def change_password(user, password=None): try: u = model.User.query.filter( or_(model.User.user_name == user, model.User.email == user)).one() # @UndefinedVariable except NoResultFound: raise InvalidCommand('No such User') if not password: password = read_pwd() u.password = hash_pwd(password) db.session.commit() # @UndefinedVariable
def migrate_tables(): import psycopg2 print('This will migrate database to latest schema, you are advised to backup database before running this command') if prompt_bool('Do you want to continue?'): mdir = os.path.join(SQL_DIR, 'migration') version_obj=model.Version.query.one_or_none() if not version_obj: version_obj=model.Version(version=0, version_id=1) db.session.add(version_obj) old_version = version_obj.version if old_version == db_version: print('DB is at correct version %d'% old_version) scripts = [] for script in os.listdir(mdir): m=re.match(r'v(\d+)\.sql', script) if m: version = int(m.group(1)) if version <= db_version and version > old_version: scripts.append((version, os.path.join(mdir,script))) scripts.sort() connection = psycopg2.connect(database=settings.DB_NAME, user = settings.DB_USER, password = settings.DB_PASSWORD, host = settings.DB_HOST, port = settings.DB_PORT) connection.autocommit = True #connection = db.engine.raw_connection() # @UndefinedVariable try: c = connection.cursor() for v,fname in scripts: script = open(fname, 'rt', encoding='utf-8-sig').read() print('Upgrading database to version %d'% v) res = c.execute(script) version_obj.version = v db.session.commit() #connection.commit() finally: connection.close()
def change_account(cuser, form): if form.validate() and cuser.check_password(form.oldpassword.data): session = db.session() try: if cuser.email is not form.email.data: cuser.email = form.email.data if cuser.username is not form.username.data: cuser.username = form.username.data cuser.password = form.password.data session.commit() return jsonify(response=responds['INFO_CHANGED']) except IntegrityError as e: session.rollback() return ret_dbfail_response(e)
def deliver_file(file, request): upload_folder = myapp.config['UPLOAD_FOLDER'] path = os.path.join(upload_folder, file.unique_id) file.dl_count += 1 db.session.commit() filename = file.name return send_from_directory(path, filename, as_attachment=(request.referrer is not None))
def api_unpublish(): if request.method == 'GET': uniqueid = request.args['uniqueid'] file = File.query.filter_by(unique_id=uniqueid).first() if file is not None: if g.user.admin or (g.user.id == file.uploader_id): key = file.publickey if key is not None: file.publickey.public = False db.session.commit() url = request.host_url + "pub/dl/" + key.hash return jsonify(response=responds['PUBLIC_KEY_UNPUBLISH'], url=url) return jsonify(response=responds['SOME_ERROR'])
def api_publish(): if request.method == 'GET': uniqueid = request.args['uniqueid'] file = File.query.filter_by(unique_id=uniqueid).first() if file is not None: if g.user.admin or (g.user.id == file.uploader_id): key = file.publickey if (key is not None) and (key.public is False): file.publickey.public = True db.session.commit() url = request.host_url + "pub/dl/" + key.hash return jsonify(response=responds['PUBLIC_KEY_PUBLISH'], url=url) return jsonify(response=responds['SOME_ERROR'])
def createadminuser(): admin = User(email='{{ADMIN_MAIL}}', password='{{ADMIN_PASSWORD}}', username='{{ADMIN_USER}}') admin.admin = True db.session().add(admin) db.session().commit() print("Admin User created.")
def test_file_publichash(self): file = File('testfile.txt', helpers.unique_id(), 1337, 'text/html') key = PublicKey() key.public = True file.publickey = key db.session().add(file) db.session().commit() self.assertEqual(key, file.publickey)
def export_users(): session = db.session return flask.Response(json.dumps([user.dict for user in session.query(User).all()]), mimetype=u'application/json')
def export_games(): session = db.session return flask.Response( json.dumps([game_dict(session, game) for game in session.query(Game).all()]), mimetype=u'application/json')
def add_game_post(): session = db.session data = flask.request.json winner_id = data['winner_id'] winner = session.query(User).get(winner_id) if not winner: raise Exception('no player with id {}'.format(winner_id)) loser_id = data['loser_id'] loser = session.query(User).get(loser_id) if not loser: raise Exception('no player with id {}'.format(loser_id)) game = add_game(session, winner, loser, submitted_by_id=flask.g.user.id) session.commit() return flask.Response(json.dumps({ 'id': game.id, 'success': True }), mimetype=u'application/json')
def remove_game_get(game_id): session = db.session current_user = flask.g.user if not current_user.admin: return flask.Response(json.dumps({ 'success': False, 'message': 'Access Denied', 'affected_player_ids': [], 'updated_game_ids': [] }), mimetype=u'application/json') game = session.query(Game).get(game_id) if not game: return flask.Response(json.dumps({ 'success': False, 'message': 'No Such Game', 'affected_player_ids': [], 'updated_game_ids': [] }), mimetype=u'application/json') affected_player_ids, updated_game_ids = remove_game(session, game) return flask.Response(json.dumps({ 'affected_player_ids': list(affected_player_ids), 'updated_game_ids': list(updated_game_ids) }), mimetype=u'application/json')
def callback(): current_user = flask.g.user session = db.session # Redirect user to home page if already logged in. if current_user is not None and current_user.is_authenticated: return redirect(url_for('index')) if 'error' in request.args: if request.args.get('error') == 'access_denied': return 'You denied access.' return 'Error encountered.' if 'code' not in request.args and 'state' not in request.args: return redirect(url_for('login')) else: # Execution reaches here when user has # successfully authenticated our app. google = get_google_auth(state=flask_session['oauth_state']) try: token = google.fetch_token( Auth.TOKEN_URI, client_secret=Auth.CLIENT_SECRET, authorization_response=request.url.replace('http://', 'https://'), ) except HTTPError: return 'HTTPError occurred.' google = get_google_auth(token=token) # todo: cool stuff in here resp = google.get(Auth.USER_INFO) if resp.status_code == 200: user_data = resp.json() if user_data.get('hd') != 'cratejoy.com': return 'Only cratejoy.com emails allowed' email = user_data['email'] user = User.query.filter_by(email=email).first() if user is None: user = User( name=user_data['name'] or user_data['email'].split('@')[0].capitalize(), email=email, avatar=user_data['picture'] ) user.tokens = json.dumps(token) session.add(user) session.commit() login_user(user) return redirect(url_for('index')) return 'Could not fetch your information.'
def _upload(): if request.method == 'POST': file = request.files['files[]'] # get filename and folders file_name = secure_filename(file.filename) directory = str(unique_id()) upload_folder = myapp.config['UPLOAD_FOLDER'] if file.filename == '': return redirect(request.url) if file: #and allowed_file(file.filename) save_dir = os.path.join(upload_folder, directory) if not os.path.exists(save_dir): os.makedirs(save_dir) cmpl_path = os.path.join(save_dir, file_name) file.save(cmpl_path) size = os.stat(cmpl_path).st_size # create our file from the model and add it to the database dbfile = File(file_name, directory, size, file.mimetype) g.user.uploads.append(dbfile) db.session().add(dbfile) db.session().commit() if "image" in dbfile.mimetype: get_thumbnail(cmpl_path, "100") thumbnail_url = request.host_url + 'thumbs/' + directory else: thumbnail_url = "" url = request.host_url + 'uploads/' + directory delete_url = url delete_type = "DELETE" file = {"name": file_name, "url": url, "thumbnailUrl": thumbnail_url, "deleteUrl": delete_url, "deleteType": delete_type, "uid": directory} return jsonify(files=[file]) else: return jsonify(files=[{"name": file_name, "error": responds['FILETYPE_NOT_ALLOWED']}])
def callback(): current_user = flask.g.user session = db.session # Redirect user to home page if already logged in. if current_user is not None and current_user.is_authenticated: return redirect(url_for('index')) if 'error' in request.args: if request.args.get('error') == 'access_denied': return 'You are denied access.' return 'Error encountered.' if 'code' not in request.args and 'state' not in request.args: return redirect(url_for('login')) else: # Execution reaches here when user has # successfully authenticated our app. google = get_google_auth(state=flask_session['oauth_state']) try: token = google.fetch_token( Auth.TOKEN_URI, client_secret=Auth.CLIENT_SECRET, authorization_response=request.url.replace('http://', 'https://'), ) except HTTPError: return 'HTTPError occurred.' google = get_google_auth(token=token) # todo: cool stuff in here resp = google.get(Auth.USER_INFO) if resp.status_code == 200: user_data = resp.json() email = user_data['email'] user = User.query.filter_by(email=email).first() if user is None: user = User( name=user_data['name'] or user_data['email'].split('@')[0].capitalize(), email=email, avatar=user_data['picture'] ) user.tokens = json.dumps(token) session.add(user) session.commit() login_user(user) return redirect(url_for('index')) return 'Could not fetch your information.'