我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.g.db()。
def check_login(login, password): if login == '' or password == '': return False else: g.db = connect_db() cur = g.db.execute('SELECT salt FROM users WHERE login = "' + login + '"') salt = cur.fetchone() if salt: salted = password + salt[0] else: #unsalted password or invalid login g.db.close() return False hashed = sha256(salted.encode()).hexdigest() cur = g.db.execute('SELECT id FROM users WHERE login = "' + login + '" AND password = "' + hashed + '"') uid = cur.fetchone() g.db.close() if uid: return uid[0] else: return False ## # Change password #
def start_platform(db, repository, delay=None): """ Function to check whether there is already running test for which it check the kvm progress and if no running test then it start a new test. """ from run import log, config linux_kvm_name = config.get('KVM_LINUX_NAME', '') win_kvm_name = config.get('KVM_WINDOWS_NAME', '') kvm_test = Kvm.query.first() if kvm_test is None: start_new_test(db, repository, delay) elif kvm_test.name is linux_kvm_name: kvm_processor_linux(db, repository, delay) elif kvm_test.name is win_kvm_name: kvm_processor_windows(db, repository, delay) return
def start_new_test(db, repository, delay): """ Function to start a new test based on kvm table. """ from run import log finished_tests = db.query(TestProgress.test_id).filter( TestProgress.status.in_([TestStatus.canceled, TestStatus.completed]) ).subquery() test = Test.query.filter( and_(Test.id.notin_(finished_tests)) ).order_by(Test.id.asc()).first() if test is None: return elif test.platform is TestPlatform.windows: kvm_processor_windows(db, repository, delay) elif test.platform is TestPlatform.linux: kvm_processor_linux(db, repository, delay) else: log.error("Unsupported CI platform: {platform}".format( platform=test.platform)) return
def toggle_maintenance(platform, status): result = 'failed' message = 'Platform Not found' try: platform = TestPlatform.from_string(platform) db_mode = MaintenanceMode.query.filter(MaintenanceMode.platform == platform).first() if db_mode is not None: db_mode.disabled = status == 'True' g.db.commit() result = 'success' message = '{platform} in maintenance? {status}'.format( platform=platform.description, status=("Yes" if db_mode.disabled else 'No') ) except ValueError: pass return jsonify({ 'status': result, 'message': message })
def job(): job_id = request.form.get('job_id', None) date_path = request.form.get('date_path', None) status = request.form.get('status', None) # A job is starting, we want the date_path if job_id and date_path: query('UPDATE searches SET date_path = ? WHERE id = ?', [date_path, job_id]) logging.debug('update date_path=%s where id=%s' % (date_path, job_id)) g.db.commit() # A job is in progress, we want the status if date_path and status: query('UPDATE searches SET status = ? WHERE date_path = ?', [status, date_path]) logging.debug('update status=%s where date_path=%s' % (status, date_path)) g.db.commit() return redirect(url_for('index'))
def init_db(): rd = connect_redis() rd.flushall() db = connect_db() with db: with app.open_resource('schema.sql') as f: cursor = db.cursor() query = '' while(True): query_line = f.readline() if not query_line: break query += query_line if ';' in query: print query cursor.execute(query) print 'Success' query = '' db.close() # not used
def posts_by_page(): if request.method == 'GET': topic = ast.literal_eval(request.args.get('topic')) sources_ids = ast.literal_eval(request.args.get('sources')) format_string = formatstring(sources_ids) page = ast.literal_eval(request.args.get('page')) fromDate = format_fromDate(ast.literal_eval(request.args.get('fromDate'))) toDate = format_toDate(ast.literal_eval(request.args.get('toDate'))) app.logger.info('GET posts_by_page: topic(%s), sources_ids(%s), page(%s), from(%s), to(%s)' % ('%s', format_string, '%s', '%s', '%s') % tuple([topic] + sources_ids + [page, fromDate, toDate])) db = g.db cur = db.cursor() from_post_rnum = (page-1)*config['perpage'] get_post_between(cur, topic, sources_ids, from_post_rnum, config['perpage'], fromDate, toDate) posts = [dict(id=int(row[0]), title=row[1], url=row[2], timestamp=row[3]) for row in cur.fetchall()] post_ruleset_count_dic = get_post_ruleset_count_dic(cur, topic, sources_ids, from_post_rnum, config['perpage'], fromDate, toDate) return jsonify(posts = posts, post_ruleset_count_dic = post_ruleset_count_dic)
def sentences(): if request.method == 'GET': topic = ast.literal_eval(request.args.get('topic')) sources_ids = ast.literal_eval(request.args.get('sources')) format_string = formatstring(sources_ids) post_id = ast.literal_eval(request.args.get('post_id')) app.logger.info('GET sentneces: topic(%s), sources_ids(%s), post_id(%s)' % ('%s', format_string, '%s') % tuple([topic]+sources_ids+[post_id])) cur = g.db.cursor() cur.execute(queries['get_sentences'], (post_id, )) sentences = [] for row in cur.fetchall(): sentence_id = int(row[0]) full_text = row[1] cur.execute(queries['get_result_by_sentence'], (post_id, sentence_id)) rules = [row[0] for row in cur.fetchall()] sentences.append(dict(sentence_id=sentence_id, full_text=full_text, rules=rules)) return jsonify(sentences=sentences)
def analysis(): if request.method == 'POST': topic = ast.literal_eval(request.form['topic']) sources_ids = ast.literal_eval(request.form['sources']) source_format_string = formatstring(sources_ids) page = ast.literal_eval(request.form['page']) fromDate = format_fromDate(ast.literal_eval(request.form['fromDate'])) toDate = format_toDate(ast.literal_eval(request.form['toDate'])) app.logger.info('POST anaysis: topic(%s), sources_ids(%s)' % ('%s', source_format_string) % tuple([topic] + sources_ids)) analyze(topic, sources_ids) db = g.db cur = db.cursor() rule_count_dic = get_rule_count_dic(cur, topic, sources_ids, fromDate, toDate) from_post_rnum = (page-1)*config['perpage'] post_ruleset_count_dic = get_post_ruleset_count_dic(cur, topic, sources_ids, from_post_rnum, config['perpage'], fromDate, toDate) return jsonify(rule_count_dic = rule_count_dic, post_ruleset_count_dic = post_ruleset_count_dic)
def delete_records(keep=20): """Clean up files on server and mark the record as deleted""" sql = "SELECT * from records where is_deleted<>1 ORDER BY id desc LIMIT -1 offset {}".format(keep) assert isinstance(g.db, sqlite3.Connection) c = g.db.cursor() c.execute(sql) rows = c.fetchall() for row in rows: name = row[1] xmind = join(app.config['UPLOAD_FOLDER'], name) xml = join(app.config['UPLOAD_FOLDER'], name[:-5] + 'xml') for f in [xmind, xml]: if exists(f): os.remove(f) sql = 'UPDATE records SET is_deleted=1 WHERE id = ?' c.execute(sql, (row[0],)) g.db.commit()
def logged_in(): # designed to prevent repeated db requests if not hasattr(g,'logged_in_user'): if 'logged_in_user' in session: db = get_db() cur = db.cursor() cur.execute('SELECT auth_key FROM users WHERE id='+app.sqlesc,(session['logged_in_user'][0],)) result = cur.fetchall() if len(result) == 0: session.pop('logged_in_user',None) g.logged_in_user = False elif result[0][0] == session['logged_in_user'][1]: g.logged_in_user = True else: session.pop('logged_in_user',None) g.logged_in_user = False else: g.logged_in_user = False return g.logged_in_user
def add_to_series(rowid,uniqueIDForThisGame,name,farmName): current_auto_key = json.dumps([uniqueIDForThisGame,name,farmName]) db = get_db() cur = db.cursor() if logged_in() or api_user(): logged_in_userid = get_logged_in_user() cur.execute('SELECT id, owner, members_json FROM series WHERE auto_key_json='+app.sqlesc+' AND owner='+app.sqlesc,(current_auto_key,logged_in_userid)) result = cur.fetchall() db.commit() assert len(result)<= 1 if len(result)==0: cur.execute('INSERT INTO series (owner, members_json, auto_key_json) VALUES ('+app.sqlesc+','+app.sqlesc+','+app.sqlesc+') RETURNING id',(logged_in_userid,json.dumps([rowid]),current_auto_key)) series_id = cur.fetchall()[0][0] elif len(result)==1: series_id = result[0][0] new_members_json = json.dumps(json.loads(result[0][2])+[rowid]) cur.execute('UPDATE series SET members_json='+app.sqlesc+' WHERE id='+app.sqlesc,(new_members_json,result[0][0])) else: cur.execute('INSERT INTO series (members_json, auto_key_json) VALUES ('+app.sqlesc+','+app.sqlesc+') RETURNING id',(json.dumps([rowid]),current_auto_key)) series_id = cur.fetchall()[0][0] db.commit() return series_id
def remove_render_over_limit(url): db = get_db() cur = db.cursor() cur.execute('UPDATE plans SET render_deleted=TRUE WHERE url='+app.sqlesc+' RETURNING image_url, base_path',(url,)) image_url, base_path = cur.fetchone() if image_url != None and os.path.split(os.path.split(image_url)[0])[1] == url: # second condition ensures you're in a folder named after the URL which prevents accidentally deleting placeholders try: os.remove(legacy_location(image_url)) except: pass try: os.rmdir(legacy_location(base_path)) except: pass db.commit()
def find_claimables(): if not hasattr(g,'claimables'): sessionids = list(session.keys()) removals = ['admin','logged_in_user'] for key in removals: try: sessionids.remove(key) except ValueError: pass urls = tuple([key for key in sessionids if not key.endswith('del_token')]) if len(urls) > 0: db = get_db() cur = db.cursor() cur.execute('SELECT id, md5, del_token, url FROM playerinfo WHERE owner_id IS NULL AND url IN '+app.sqlesc,(urls,)) result = cur.fetchall() checked_results = [] for row in result: if row[1] == session[row[3]] and row[2] == session[row[3]+'del_token']: checked_results.append((row[0],row[3])) g.claimables = checked_results else: g.claimables = [] return g.claimables
def _op_imgur_post(url): db = get_db() cur = db.cursor() if logged_in(): check_access = imgur.checkApiAccess(get_logged_in_user()) if check_access == True: result = imgur.uploadToImgur(get_logged_in_user(),url) if 'success' in result: return redirect(result['link']) elif 'error' in result: if result['error'] == 'too_soon': g.error = _('You have uploaded this page to imgur in the last 2 hours: please wait to upload again') elif result['error'] == 'upload_issue': g.error = _('There was an issue with uploading the file to imgur. Please try again later!') else: g.error = 'There was an unknown error!' return render_template("error.html", **page_args()) elif check_access == False: return redirect(imgur.getAuthUrl(get_logged_in_user(),target=request.path)) elif check_access == None: g.error = _('Either you or upload.farm are out of imgur credits for the day! Sorry :( Try again tomorrow') return render_template("error.html", **page_args()) else: g.error = _("You must be logged in to post your farm to imgur!") return render_template("signup.html", **page_args())
def claim_playerinfo_entry(url,md5,del_token): # verify ability to be owner, then remove_series_link (checking ownership!), then add_to_series if logged_in(): db = get_db() cur = db.cursor() cur.execute('SELECT id,series_id,md5,del_token,owner_id,uniqueIDForThisGame,name,farmName FROM playerinfo WHERE url='+app.sqlesc,(url,)) result = cur.fetchone() if result[2] == md5 and result[3] == del_token and result[4] == None: remove_series_link(result[0], result[1]) series_id = add_to_series(result[0],result[5],result[6],result[7]) cur.execute('UPDATE playerinfo SET series_id='+app.sqlesc+', owner_id='+app.sqlesc+' WHERE id='+app.sqlesc,(series_id,get_logged_in_user(),result[0])) db.commit() return True else: return _('Problem authenticating!') else: return _('You are not logged in!')
def blogindividual(id): page_init() try: blogid = int(id) db = get_db() cur = db.cursor() cur.execute("SELECT id,time,author,title,post,live FROM blog WHERE id="+app.sqlesc+" AND live='1'",(blogid,)) blogdata = cur.fetchone() if blogdata != None: blogdata = list(blogdata) blogdata[1] = datetime.datetime.fromtimestamp(blogdata[1]) blogposts = {'posts':(blogdata,),'total':1} return render_template('blog.html',full=True,offset=0,recents=get_recents(),blogposts=blogposts,**page_args()) else: g.error = _("No blog with that ID!") except: g.error = _("No blog with that ID!") return render_template('error.html',**page_args())
def retrieve_file(url): page_init() db = get_db() cur = db.cursor() cur.execute("SELECT savefileLocation,name,uniqueIDForThisGame,download_enabled,download_url,id FROM playerinfo WHERE url="+app.sqlesc,(url,)) result = cur.fetchone() if result[3] == True: if result[4] == None: filename = generateSavegame.createZip(url,result[1],result[2],'static/saves',result[0]) cur.execute('UPDATE playerinfo SET download_url='+app.sqlesc+' WHERE id='+app.sqlesc,(filename,result[5])) db.commit() return redirect(filename) else: return redirect(result[4]) elif 'admin' in session: if result != None: with open(legacy_location(result[0]),'rb') as f: response = make_response(f.read()) response.headers["Content-Disposition"] = "attachment; filename="+str(result[1])+'_'+str(result[2]) return response else: g.error = _("URL does not exist") else: g.error = _("You are unable to download this farm data at this time.") return render_template('error.html',**page_args())
def handle_patreon_error(response): try: if response.get('errors'): if response['errors'][0]['status'] == '401': db = get_db() cur = db.cursor() cur.execute('UPDATE users SET patreon_info='+app.sqlesc+', patreon_token='+app.sqlesc+ ', patreon_refresh_token='+app.sqlesc+', patreon_expiry='+app.sqlesc+' patreon WHERE id='+app.sqlesc, (json.dumps({'error':'unauthorized'}),None,None,None,get_logged_in_user())) db.commit() if response.get('error') and response['error'] == 'invalid_grant': db = get_db() cur = db.cursor() cur.execute('UPDATE users SET patreon_info='+app.sqlesc+', patreon_token='+app.sqlesc+ ', patreon_refresh_token='+app.sqlesc+', patreon_expiry='+app.sqlesc+' WHERE id='+app.sqlesc, (json.dumps({'error':'unauthorized'}),None,None,None,get_logged_in_user())) db.commit() except: pass
def refresh_patreon_token(refresh_token,expiry): oauth_client = patreon.OAuth(app.config['PATREON_CLIENT_ID'],app.config['PATREON_CLIENT_SECRET']) tokens = oauth_client.refresh_token(refresh_token,app.config['PATREON_REDIRECT_URI']) if 'token_type' not in tokens: handle_patreon_error(tokens) return tokens else: db = get_db() cur = db.cursor() access_token = tokens['access_token'] refresh_token = tokens['refresh_token'] expiry = tokens['expires_in']+time.time() cur.execute('UPDATE users SET patreon_token='+app.sqlesc+', patreon_refresh_token='+app.sqlesc+ ', patreon_expiry='+app.sqlesc+' WHERE id='+app.sqlesc,(access_token,refresh_token,expiry,get_logged_in_user())) db.commit() return {'new_access_token':access_token}
def verify_email(): page_init() if 'i' in request.args and 't' in request.args: db = get_db() cur = db.cursor() cur.execute('SELECT email_conf_token, email_confirmed FROM users WHERE id='+app.sqlesc,(request.args.get('i'),)) t = cur.fetchall() if len(t) == 0: g.error = _('Account does not exist!') return render_template('error.html',**page_args()) elif t[0][1] == True: flash({'message':'<p>'+_('Already confirmed email address!')+'</p>'}) return redirect(url_for('home')) else: if t[0][0] == request.args.get('t'): cur.execute('UPDATE users SET email_confirmed='+app.sqlesc+' WHERE id='+app.sqlesc,(True,request.args.get('i'))) db.commit() flash({'message':'<p>'+_("Account email address confirmed!")+'</p>'}) return redirect(url_for('home')) g.error = _('Malformed verification string!') return render_template('error.html',**page_args())
def get_checkboxes(): food = request.form['foodName'] #group = request.form['group'] g.db = connect_db() g.db.execute("INSERT INTO users(favorites) VALUES(?)",[food]) # g.db.execute("INSERT INTO users(favorites,groups) VALUES(?,?)",[food,group]) g.db.close() #Now you have to run with this command: gunicorn flaskMain:app #Set the host to be 0.0.0.0
def get_results_from_user(user_id, contest_id): query = """ select value, higher_ranked_play_count from descriptors left outer join ( select count(answers.id) as higher_ranked_play_count, descriptors.id as inner_descriptor_id from descriptors left outer join answers on descriptors.id=answers.higher_ranked_descriptor_id where contest_id=? and answers.user_id=? group by descriptors.id ) on descriptors.id=inner_descriptor_id where descriptors.contest_id=? order by higher_ranked_play_count desc """ # REFACT get sql to get the zeros right cursor = g.db.execute(query, [contest_id, user_id, contest_id]) return [(row[0], row[1] or 0) for row in cursor.fetchall()]
def create_app(): app = Flask(__name__) app.config.from_object(flask_config) app.secret_key = os.urandom(24) app.register_blueprint(users_app, url_prefix='/users') app.register_blueprint(projects_app, url_prefix='/projects') login_manager = LoginManager() login_manager.init_app(app) def flask_login_user_loader(user_id): return g.db.query(User).filter(User.id == user_id).first() login_manager.user_loader(flask_login_user_loader) return app
def admin_required(f): @wraps(f) def wrap(*args, **kwargs): if is_admin(session['uid']): return f(*args, **kwargs) else: flash('You must be an administrator to view that page.') return redirect(url_for('home')) return wrap ## # SQLite Database Connector # Returns the db connection.
def change_password(login, password): salt = uuid4().hex hashed = sha256(password.encode() + salt.encode()).hexdigest() g.db = connect_db() cur = g.db.execute('UPDATE users SET password=?, salt=? WHERE login=?', (hashed, salt, login)) g.db.commit() g.db.close() ## # Change admin status # status: 1 = admin, 0 = user
def change_admin(login, status=0): g.db = connect_db() cur = g.db.execute('UPDATE users SET isAdmin=? WHERE login=?', (status, login)) g.db.commit() g.db.close() ## # Create user
def create_user(login, password, isAdmin=0): salt = uuid4().hex hashed = sha256(password.encode() + salt.encode()).hexdigest() g.db = connect_db() cur = g.db.execute('INSERT INTO users(login, password, salt, isAdmin) VALUES (?,?,?,?)', (login, hashed, salt, isAdmin)) g.db.commit() g.db.close() ## # Is the user an admin?
def get_login(uid): g.db = connect_db() cur = g.db.execute('SELECT login FROM users WHERE id=' + str(uid)) result = cur.fetchone() return result[0] ## # Get UID from login
def get_uid(login): g.db = connect_db() cur = g.db.execute('SELECT id FROM users WHERE login= "' + login + '"') result = cur.fetchone() return result[0] ## # Does this user already exist
def user_exists(login): g.db = connect_db() cur = g.db.execute('SELECT id FROM users WHERE login= "' + login + '"') if len(cur.fetchall()) > 0: return True return False ## # Logs an action
def log_action(uid, action): # ACTIONS #1|Successful login #2|Failed login #3|Logout #4|Server started #5|Server restarted #6|Server stopped #7|Server updated #8|Reset Own Password #9|Reset Anothers Password #10|Created User #11|Deleted User #12|Modified Admin Status g.db = connect_db() cur = g.db.execute('INSERT INTO loggedactions(user, action, time) VALUES (?,?,datetime("now"))', (uid, action)) g.db.commit() g.db.close() ####################################### PAGE FUNCTIONS ###################################### # # INDEX PAGE #
def home(): results = ''; if request.method == 'POST': if request.form['action'] == 'start': # run the start command results = "Starting the server...<br /><br />" results += check_output([app.script, "start"]) log_action(session['uid'], 4) results = Markup(results.replace('\n', '<br />')) elif request.form['action'] == 'stop': # run the stop action results = "Stoping the server...<br /><br />" results += check_output([app.script, "stop"]) log_action(session['uid'], 6) results = Markup(results.replace('\n', '<br />')) elif request.form['action'] == 'restart': # run the restart action results = "Restarting the server...<br /><br />" results += check_output([app.script, "restart"]) log_action(session['uid'], 5) results = Markup(results.replace('\n', '<br />')) elif request.form['action'] == 'update': # run the update action results = "Updating the server...<br /><br />" results += check_output([app.script, "update"]) log_action(session['uid'], 7) results = Markup(results.replace('\n', '<br />')) else: # invalid action! results = "INVALID ACTION!" g.db = connect_db() cur = g.db.execute('SELECT time, (SELECT users.login FROM users WHERE users.id = loggedactions.user), actions.action FROM loggedactions LEFT JOIN actions ON loggedactions.action = actions.id ORDER BY time DESC LIMIT 10;') actions = [dict(time=row[0], user=row[1], action=row[2]) for row in cur.fetchall()] g.db.close() return render_template('index.html', actions=actions, results=results, acp=session['priv'], username=session['username']) # # WELCOME PAGE #
def logs(): g.db = connect_db() cur = g.db.execute('SELECT time, (SELECT users.login FROM users WHERE users.id = loggedactions.user), actions.action FROM loggedactions LEFT JOIN actions ON loggedactions.action = actions.id ORDER BY time DESC LIMIT 50;') actions = [dict(time=row[0], user=row[1], action=row[2]) for row in cur.fetchall()] g.db.close() return render_template('logs.html', actions=actions, acp=session['priv'], username=session['username']) # # ADD USER PAGE #
def before_request(): g.db = db g.cache = redis_cache
def before_request(): g.menu_entries = {} g.db = create_session(app.config['DATABASE_URI']) g.mailer = Mailer(app.config.get('EMAIL_DOMAIN', ''), app.config.get('EMAIL_API_KEY', ''), 'CCExtractor.org CI Platform') g.version = "0.1" g.log = log g.github = { 'deploy_key': app.config.get('GITHUB_DEPLOY_KEY', ''), 'ci_key': app.config.get('GITHUB_CI_KEY', ''), 'bot_token': app.config.get('GITHUB_TOKEN', ''), 'repository_owner': app.config.get('GITHUB_OWNER', ''), 'repository': app.config.get('GITHUB_REPOSITORY', '') }
def teardown(exception): db = g.get('db', None) if db is not None: db.remove() # Register blueprints
def kvm_processor_linux(db, repository, delay): from run import config kvm_name = config.get('KVM_LINUX_NAME', '') return kvm_processor( db, kvm_name, TestPlatform.linux, repository, delay)
def queue_test(db, gh_commit, commit, test_type, branch="master", pr_nr=0): """ Function to store test details into Test model separately for various vm. Post status to GitHub """ from run import log fork = Fork.query.filter(Fork.github.like( "%/CCExtractor/ccextractor.git")).first() if test_type == TestType.pull_request: branch = "pull_request" # Create Linux test entry linux = Test(TestPlatform.linux, test_type, fork.id, branch, commit, pr_nr) db.add(linux) # Create Windows test entry windows = Test(TestPlatform.windows, test_type, fork.id, branch, commit, pr_nr) db.add(windows) db.commit() # Update statuses on GitHub if gh_commit is not None: test_ids = [linux.id, windows.id] ci_names = [linux.platform.value, windows.platform.value] for idx in range(len(ci_names)): try: gh_commit.post( state=Status.PENDING, description="Tests queued", context="CI - {name}".format(name=ci_names[idx]), target_url=url_for( 'test.by_id', test_id=test_ids[idx], _external=True)) except ApiError as a: log.critical( 'Could not post to GitHub! Response: {res}'.format( res=a.response) ) # We wait for the cron to kick off the CI VM's log.debug("Created tests, waiting for cron...")
def in_maintenance_mode(platform): try: platform = TestPlatform.from_string(platform) except ValueError: return 'ERROR' status = MaintenanceMode.query.filter( MaintenanceMode.platform == platform).first() if status is None: status = MaintenanceMode(platform, False) g.db.add(status) g.db.commit() return "True" if status.disabled else "False"
def connect_db(): return sqlite3.connect('db.db')
def before_request(): g.db = connect_db()
def recentlyUploaded(): #SELECT imgpath FROM pics ORDER BY id DESC LIMIT 8 recently = [] n = g.db.execute('SELECT imgpath,url FROM pics ORDER BY id DESC LIMIT 12') for i in n: recently.append(i) return recently
def get_last_id(): #SELECT id FROM pics ORDER BY id DESC LIMIT 1; n = g.db.execute('SELECT id FROM pics ORDER BY id DESC LIMIT 1;').fetchone()[0] return n
def upload_img(): #http://flask.pocoo.org/docs/0.10/patterns/fileuploads/ if request.method == 'POST': file = request.files['file'] if file and allowed_file(file.filename): try: filename = secure_filename(file.filename) if len(filename) > 30: filename = filename[0:30]+'~.'+filename.split('.')[-1] new_id = get_last_id() + 1 new_filename = filename new_url = short_url.encode_url(new_id) img_path = new_url+'.'+filename.split('.')[-1] file.save(os.path.join(UPLOAD_FOLDER, img_path)) g.db.execute("INSERT INTO pics (id, filename, url, imgpath) VALUES (?, ?, ?, ?)", (new_id, new_filename, new_url, img_path)) g.db.commit() save_thumbnail(img_path) #return redirect(url_for('upload_img', filename=filename)) return redirect('/show/'+new_url) except Exception as e: return str(e) else: return "Wrong file!" else: recent = recentlyUploaded() return render_template('index.html', STATIC_DIR = STATIC_DIR, TEMPLATES_DIR=TEMPLATES_DIR, recent = recent,)
def before_request(): if not hasattr(g, 'db'): user = GaodeConfig.USER password = GaodeConfig.PASSWORD databse = GaodeConfig.DATABASE g.db = Models(databse, user, password)
def teardown_request(exception): if hasattr(g, 'db'): g.db.close_db()
def create_table(): table_name = request.args.get('table_name') data = g.db.create_table(table_name) return json.dumps(data)
def get_table(): conn = g.db.conn cur = conn.cursor() sql = "select table_name from information_schema.tables" cur.execute(sql) table_names = cur.fetchall() table = [] for each in table_names: if "web_" in each[0]: table.append(each[0]) return json.dumps(table)