我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.method()。
def query(): if(request.method == 'POST'): organisation = request.form['organisation'] email_address = request.form['email_address'] filename = organisation + ".html" info = db.session.query(User.github_username, User.name).filter_by(organisation = organisation).all() if(info == []): job = q.enqueue_call( func="main.save_info", args=(organisation, email_address, ), result_ttl=5000, timeout=600 ) flash("We shall notify you at " + email_address + " when the processing is complete") else: lists = [] for i in info: lists.append([str(i.github_username), str(i.name)]) get_nodes.creating_objs(lists, organisation) return render_template(filename, organisation=str(organisation)+'.json') return render_template('query.html')
def is_hot_dog(): if request.method == 'POST': if not 'file' in request.files: return jsonify({'error': 'no file'}), 400 # Image info img_file = request.files.get('file') img_name = img_file.filename mimetype = img_file.content_type # Return an error if not a valid mimetype if not mimetype in valid_mimetypes: return jsonify({'error': 'bad-type'}) # Write image to static directory and do the hot dog check img_file.save(os.path.join(app.config['UPLOAD_FOLDER'], img_name)) hot_dog_conf = rekognizer.get_confidence(img_name) # Delete image when done with analysis os.remove(os.path.join(app.config['UPLOAD_FOLDER'], img_name)) is_hot_dog = 'false' if hot_dog_conf == 0 else 'true' return_packet = { 'is_hot_dog': is_hot_dog, 'confidence': hot_dog_conf } return jsonify(return_packet)
def add(): if request.method == 'POST': if request.form['submit'] == 'Add': addr = request.form['addr'].lstrip().rstrip() f = io.open('blastlist.txt', 'a', encoding="utf-8") f.write(addr.decode('utf-8') + u'\r\n') f.close() return render_template('listadded.html',addr=addr) elif request.form['submit'] == 'Remove': addr = request.form['addr'].lstrip().rstrip() f = io.open('blastlist.txt', 'r', encoding="utf-8") lines = f.readlines() f.close() f = io.open('blastlist.txt', 'w', encoding="utf-8") for line in lines: if addr not in line: f.write(line.decode('utf-8')) f.close() return render_template('listremoved.html',addr=addr)
def login(): """ This login function checks if the username & password match the admin.db; if the authentication is successful, it passes the id of the user into login_user() """ if request.method == "POST" and \ "username" in request.form and \ "password" in request.form: username = request.form["username"] password = request.form["password"] user = User.get(username) # If we found a user based on username then compare that the submitted # password matches the password in the database. The password is stored # is a slated hash format, so you must hash the password before comparing it. if user and hash_pass(password) == user.password: login_user(user, remember=True) # FIXME! Get this to work properly... # return redirect(request.args.get("next") or url_for("index")) return redirect(url_for("index")) else: flash(u"Invalid username, please try again.") return render_template("login.html")
def main_teacher(): tm = teachers_model.Teachers(flask.session['id']) if request.method == 'POST': cm = courses_model.Courses() if "close" in request.form.keys(): cid = request.form["close"] cm.cid = cid cm.close_session(cm.get_active_session()) elif "open" in request.form.keys(): cid = request.form["open"] cm.cid = cid cm.open_session() courses = tm.get_courses_with_session() empty = True if len(courses) == 0 else False context = dict(data=courses) return render_template('main_teacher.html', empty=empty, **context)
def add_backnode(): if request.method == 'POST': node_name = request.form['node_name'] ftp_ip = request.form['ftp_ip'] ftp_port = request.form['ftp_port'] ftp_user = request.form['ftp_user'] ftp_pass = request.form['ftp_pass'] back_node = backhosts.query.filter(or_(backhosts.host_node==node_name,backhosts.ftp_ip==ftp_ip)).all() if back_node: flash(u'%s ?????????????!' %node_name) return render_template('addbacknode.html') backhost = backhosts(host_node=node_name,ftp_ip=ftp_ip,ftp_port=ftp_port,ftp_user=ftp_user,ftp_pass=ftp_pass) db.session.add(backhost) db.session.commit() flash(u'%s ??????!' %node_name) return render_template('addbacknode.html') else: return render_template('addbacknode.html')
def customer(): if request.method == 'POST': try: customer_id = request.form['customer_id'] customer_oper = request.form['customer_oper'] customer = customers.query.filter_by(id=customer_id).first() if customer_oper == 'stop_back': customer.customers_status = 1 else: customer.customers_status = 0 db.session.add(customer) db.session.commit() return u"???????" except Exception, e: print e return u"???????" else: customer_all = customers.query.all() return render_template('customers.html',customers=customer_all)
def changepass(): if request.method == 'POST': # process password change if request.form['pass1'] == request.form['pass2']: change_password(session['username'], request.form['pass1']) log_action(session['uid'], 8) session.pop('logged_in', None) session.pop('uid', None) session.pop('priv', None) session.pop('username', None) flash('Your password has been changed. Please login using your new password.') return redirect(url_for('home')) else: flash('The passwords you entered do not match. Please try again.') return render_template('changepass.html') return render_template('changepass.html') # # EDIT USER PAGE #
def adduser(): if request.method == 'POST': if request.form['pass1'] == request.form['pass2']: if user_exists(request.form['username']) == False: # create the user admin = 0 if request.form['status'] == 'admin': admin = 1 create_user(request.form['username'], request.form['pass1'], admin) log_action(session['uid'], 10) flash(request.form['username'] + ' has been created.') return render_template('adduser.html', acp=session['priv'], username=session['username']) else: flash('The username you entered is already in use.') return render_template('adduser.html', acp=session['priv'], username=session['username']) else: flash('The passwords you entered do not match. Please try again.') return render_template('adduser.html', acp=session['priv'], username=session['username']) return render_template('adduser.html', acp=session['priv'], username=session['username'])
def login(): """Login POST handler. Only runs when ``/login`` is hit with a POST method. There is no GET method equivilent, as it is handled by the navigation template. Sets the status code to ``401`` on login failure. Returns: JSON formatted output describing success or failure. """ log.debug("Entering login, attempting to authenticate user.") username = request.form['signin_username'] password = request.form['signin_password'] log.debug("Username: {0}".format(username)) if fe.check_auth(username, password): log.debug("User authenticated. Trying to set session.") session_id = fe.set_login_id() session['logged_in'] = session_id log.debug("Session ID: {0}, returning to user".format(session_id)) return jsonify({ "login": "success" }) log.debug("Username or password not recognized, sending 401.") response.status = 401 return jsonify({ "login": "failed" })
def json_query_simple(query, query_args=[], empty_ok=False): """Do a SQL query that selects one column and dump those values as a JSON array""" if request.method != 'GET': return not_allowed() try: cursor = dbcursor_query(query, query_args) except Exception as ex: return error(str(ex)) if cursor.rowcount == 0: cursor.close() if empty_ok: return json_response([]) else: return not_found() result = [] for row in cursor: result.append(row[0]) cursor.close() return json_response(result)
def json_query(query, query_args=[], name = 'name', single = False): """Do a SQL query that selects one column containing JSON and dump the results, honoring the 'expanded' and 'pretty' arguments. If the 'single' argument is True, the first-returned row will be returned as a single item instead of an array.""" if request.method != 'GET': return not_allowed() try: cursor = dbcursor_query(query, query_args) except Exception as ex: return error(str(ex)) if single and cursor.rowcount == 0: cursor.close() return not_found() result = [] for row in cursor: this = base_url(None if single else row[0][name]) row[0]['href'] = this result.append( row[0] if single or is_expanded() else this) cursor.close() return json_response(result[0] if single else result)
def new(): form = ProjectForm(request.form) if request.method == 'POST' and form.validate(): user_repo_path = join('repos', form.name.data) if os.path.isdir(user_repo_path): flash(_('This project name already exists'), 'error') else: project = Project(form.name.data, current_user) db.session.add(project) db.session.commit() #project.create_project(form.name.data, current_user) flash(_('Project created successfuly!'), 'info') return redirect(url_for('branches.view', project=form.name.data, branch='master', filename='index')) return render_template('new.html', form=form)
def html2rst(): if request.method == 'POST': if request.form.has_key('content'): input = request.form.get('content') if not input: input = 'undefined' if input != 'undefined': try: converted = html2rest(input) prefetch = None except: converted = None prefetch = input return render_template('html2rst.html', converted=converted, prefetch=prefetch) return render_template('html2rst.html')
def request_validate(view): @wraps(view) def wrapper(*args, **kwargs): endpoint = request.endpoint.partition('.')[-1] # data method = request.method if method == 'HEAD': method = 'GET' locations = validators.get((endpoint, method), {}) data_type = {"json": "request_data", "args": "request_args"} for location, schema in locations.items(): value = getattr(request, location, MultiDict()) validator = FlaskValidatorAdaptor(schema) result = validator.validate(value) LOG.info("Validated request %s: %s" % (location, result)) if schema.get("maxProperties") == 0: continue else: kwargs[data_type[location]] = result context = request.environ['context'] return view(*args, context=context, **kwargs) return wrapper
def update_content_in_local_cache(url, content, method='GET'): """?? local_cache ??????, ??content ?stream?????""" if local_cache_enable and method == 'GET' and cache.is_cached(url): info_dict = cache.get_info(url) resp = cache.get_obj(url) resp.set_data(content) # ???????????content?, without_content ????true # ?????????, ???content????, ???????? # ?stream???, ??????http?, ???????, ???????????????? # ?????????????????????, ??????????????? info_dict['without_content'] = False if verbose_level >= 4: dbgprint('LocalCache_UpdateCache', url, content[:30], len(content)) cache.put_obj( url, resp, obj_size=len(content), expires=get_expire_from_mime(parse.mime), last_modified=info_dict.get('last_modified'), info_dict=info_dict, )
def request_remote_site(): """ ???????(high-level), ????404/500??? domain_guess ?? """ # ???????? # ??: ?zmirror?????????, ?????????????? parse.remote_response = send_request( parse.remote_url, method=request.method, headers=parse.client_header, data=parse.request_data_encoded, ) if parse.remote_response.url != parse.remote_url: warnprint("requests's remote url", parse.remote_response.url, 'does no equals our rewrited url', parse.remote_url) if 400 <= parse.remote_response.status_code <= 599: # ??url???????? dbgprint("Domain guessing for", request.url) result = guess_correct_domain() if result is not None: parse.remote_response = result
def reset_api_key(name): """ Reset API-KEY for user. Returns a Jinja2 template. """ if request.method == 'POST': user = user_repo.get_by_name(name) if not user: return abort(404) ensure_authorized_to('update', user) user.api_key = model.make_uuid() user_repo.update(user) cached_users.delete_user_summary(user.name) msg = gettext('New API-KEY generated') flash(msg, 'success') return redirect_content_type(url_for('account.profile', name=name)) else: csrf = dict(form=dict(csrf=generate_csrf())) return jsonify(csrf)
def password_required(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) form = PasswordForm(request.form) if request.method == 'POST' and form.validate(): password = request.form.get('password') cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT') passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp)) if passwd_mngr.validates(password, project): response = make_response(redirect(request.args.get('next'))) return passwd_mngr.update_response(response, project, get_user_id_or_ip()) flash(gettext('Sorry, incorrect password')) return render_template('projects/password.html', project=project, form=form, short_name=short_name, next=request.args.get('next'))
def publish(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) #### shruthi if("sched" in project.info.keys() and project.info["sched"]=="FRG"): if(project.owner_id==current_user.id and not cached_users.is_quiz_created(current_user.id, project)): flash("You did not created quiz.Please create the quiz","danger") return redirect(url_for('quiz.create_quiz', short_name=project.short_name)) #### end pro = pro_features() ensure_authorized_to('publish', project) if request.method == 'GET': return render_template('projects/publish.html', project=project, pro_features=pro) project.published = True project_repo.save(project) task_repo.delete_taskruns_from_project(project) result_repo.delete_results_from_project(project) webhook_repo.delete_entries_from_project(project) auditlogger.log_event(project, current_user, 'update', 'published', False, True) flash(gettext('Project published! Volunteers will now be able to help you!')) return redirect(url_for('.details', short_name=project.short_name))
def users(user_id=None): """Manage users of PYBOSSA.""" form = SearchForm(request.body) users = [user for user in user_repo.filter_by(admin=True) if user.id != current_user.id] if request.method == 'POST' and form.user.data: query = form.user.data found = [user for user in user_repo.search_by_name(query) if user.id != current_user.id] [ensure_authorized_to('update', found_user) for found_user in found] if not found: flash("<strong>Ooops!</strong> We didn't find a user " "matching your query: <strong>%s</strong>" % form.user.data) response = dict(template='/admin/users.html', found=found, users=users, title=gettext("Manage Admin Users"), form=form) return handle_content_type(response) response = dict(template='/admin/users.html', found=[], users=users, title=gettext("Manage Admin Users"), form=form) return handle_content_type(response)
def new_item(category): if request.method == 'GET': return render('newitem.html', category=category) elif request.method == 'POST': name = request.form['name'] highlight = request.form['highlight'] url = request.form['url'] if valid_item(name, url, highlight): user_id = login_session['user_id'] item = Item(name=name, highlight=highlight, url=url, user_id=user_id, category_id=category.id) session.add(item) session.commit() flash("Newed item %s!" % item.name) return redirect(url_for('show_item', category_id=category.id, item_id=item.id)) else: error = "Complete info please!" return render('newitem.html', category=category, name=name, highlight=highlight, url=url, error=error)
def edit_item(category, item): if request.method == 'GET': return render('edititem.html', category=category, item=item, name=item.name, highlight=item.highlight, url=item.url) elif request.method == 'POST': name = request.form['name'] highlight = request.form['highlight'] url = request.form['url'] if valid_item(name, url, highlight): item.name = name item.highlight = highlight item.url = url session.commit() flash("Edited item %s!" % item.name) return redirect(url_for('show_item', category_id=category.id, item_id=item.id)) else: error = "Complete info please!" return render('edititem.html', category=category, item=item, name=name, highlight=highlight, url=url, error=error)
def new_category(): if request.method == 'GET': return render('newcategory.html') elif request.method == 'POST': name = request.form['name'] description = request.form['description'] if valid_category(name, description): user_id = login_session['user_id'] category = Category(name=name, description=description, user_id=user_id) session.add(category) session.commit() flash("Newed category %s!" % category.name) return redirect(url_for("show_items", category_id=category.id)) else: error = "Complete info please!" return render('newcategory.html', name=name, description=description, error=error)
def edit_category(category): if request.method == 'GET': return render('editcategory.html', category=category, name=category.name, description=category.description) elif request.method == 'POST': name = request.form['name'] description = request.form['description'] if valid_category(name, description): category.name = name category.description = description session.commit() flash("Edited category %s!" % category.name) return redirect(url_for("show_items", category_id=category.id)) else: error = "Complete info please!" return render('editcategory.html', category=category, name=name, description=description, error=error)
def modify_blog_posts(id): """ RESTful routes for fetching, updating, or deleting a specific blog post """ post = BlogPost.query.get(id) if not post: return jsonify({"message": "No blog post found with id " + str(id)}), 404 if request.method == 'GET': return post elif request.method == 'PUT': data = request.form post.content = data["blog_content"] db.session.add(post) db.session.commit() return jsonify({"message": "Success!"}), 200 else: db.session.delete(post) db.session.commit() return jsonify({"message": "Success!"}), 200
def userlist(): u=User() form=AddUserForm() flag=current_user.is_administrator(g.user) if flag is True: userlist=u.GetUserList() jsondata=request.get_json() if request.method == 'POST' and jsondata: if jsondata['action'] == u'edit': username=jsondata['username'] location=url_for('.admin_edit_profile',username=username) return jsonify({"status":302,"location":location}) else: username=jsondata['username'] u.RemUser(username) return redirect('userlist') elif request.method == 'POST' and form.validate(): pwd=u.GetPassword(g.user) if u.verify_password(form.oripassword.data): u.AddUser(form.username.data,form.password.data,form.role.data,form.email.data) return redirect('userlist') else: return render_template('userlist.html',userlist=userlist,form=form) else: abort(403)
def edit_profile(): form=EditProfileForm() u=User() if request.method == 'POST' and form.validate(): pwd=u.GetPassword(g.user) if u.verify_password(form.oripassword.data): email=form.email.data aboutme=form.about_me.data if form.password.data is not u'': u.ChangePassword(g.user,form.password.data) u.ChangeProfile(g.user,email,aboutme) flash('??????') return redirect(url_for('.user',username=g.user)) else: flash('????????') u.GetUserInfo(g.user) form.email.data=u.email form.about_me.data=u.aboutme return render_template('edit_profile.html',form=form,u=u)
def nodes_node(name): name = serializer.loads(name) node = NodeDefender.db.node.get(name) if request.method == 'GET': return render_template('frontend/nodes/node.html', Node = node) if icpeform.Submit.data and icpeform.validate_on_submit(): icpe.alias = BasicForm.alias.data icpe.comment = BasicForm.comment.data elif locationform.Submit.data and locationform.validate_on_submit(): icpe.location.street = AddressForm.street.data icpe.location.city = AddressForm.city.data icpe.location.geolat = AddressForm.geolat.data icpe.location.geolong = AddressForm.geolong.data db.session.add(icpe) db.session.commit() return render_template('frontend/nodes/node.html', Node = node)
def login(): if request.method == 'GET': return redirect(url_for('auth_view.authenticate')) login_form = LoginForm() if login_form.validate() and login_form.email.data: user = NodeDefender.db.user.get(login_form.email.data) if user is None: flash('Email or Password Wrong', 'error') return redirect(url_for('auth_view.login')) if not user.verify_password(login_form.password.data): flash('Email or Password Wrong', 'error') return redirect(url_for('auth_view.login')) if not user.enabled: flash('Account Locked', 'error') return redirect(url_for('auth_view.login')) if login_form.remember(): login_user(user, remember = True) else: login_user(user) return redirect(url_for('index'))
def register(): if request.method == 'GET': return redirect(url_for('auth_view.authenticate')) register_form = RegisterForm() if register_form.validate() and register_form.email.data: email = register_form.email.data firstname = register_form.firstname.data lastname = register_form.lastname.data NodeDefender.db.user.create(email, firstname, lastname) NodeDefender.db.user.enable(email) NodeDefender.db.user.set_password(email, register_form.password.data) NodeDefender.mail.user.confirm_user(email) flash('Register Successful, please login', 'success') else: flash('Error doing register, please try again', 'error') return redirect(url_for('auth_view.authenticate')) flash('error', 'error') return redirect(url_for('auth_view.authenticate'))
def register_token(token): user = NodeDefender.db.user.get(serializer.loads_salted(token)) if user is None: flash('Invalid Token', 'error') return redirect(url_for('index')) register_form = RegisterTokenForm() if request.method == 'GET': return render_template('frontend/auth/register.html', RegisterForm = register_form, user = user) if register_form.validate_on_submit(): user.firstname = register_form.firstname.data user.lastname = register_form.lastname.data user.enabled = True user.confirmed_at = datetime.now() NodeDefender.db.user.save_sql(user) NodeDefender.db.user.set_password(user.email, register_form.password.data) NodeDefender.mail.user.confirm_user(user.email) flash('Register Successful, please login', 'success') else: flash('Error doing register, please try again', 'error') return redirect(url_for('auth_view.login')) return redirect(url_for('auth_view.login'))
def admin_server(): MQTTList = NodeDefender.db.mqtt.list(user = current_user.email) MQTT = CreateMQTTForm() if request.method == 'GET': return render_template('frontend/admin/server.html', MQTTList = MQTTList, MQTTForm = MQTT) if MQTT.Submit.data and MQTT.validate_on_submit(): try: NodeDefender.db.mqtt.create(MQTT.IPAddr.data, MQTT.Port.data) NodeDefender.mqtt.connection.add(MQTT.IPAddr.data, MQTT.Port.data) except ValueError as e: flash('Error: {}'.format(e), 'danger') return redirect(url_for('admin_view.admin_server')) if General.Submit.data and General.validate_on_submit(): flash('Successfully updated General Settings', 'success') return redirect(url_for('admin_server')) else: flash('Error when trying to update General Settings', 'danger') return redirect(url_for('admin_view.admin_server')) flash('{}'.format(e), 'success') return redirect(url_for('admin_view.admin_server'))
def admin_groups(): GroupForm = CreateGroupForm() groups = NodeDefender.db.group.list(user_mail = current_user.email) if request.method == 'GET': return render_template('frontend/admin/groups.html', groups = groups, CreateGroupForm = GroupForm) else: if not GroupForm.validate_on_submit(): flash('Form not valid', 'danger') return redirect(url_for('admin_view.admin_groups')) try: group = NodeDefender.db.group.create(GroupForm.Name.data) NodeDefender.db.group.update(group.name, **\ {'email' : GroupForm.Email.data, 'description' : GroupForm.description.data}) except ValueError as e: flash('Error: {}'.format(e), 'danger') return redirect(url_for('admin_view.admin_groups')) flash('Successfully Created Group: {}'.format(group.name), 'success') return redirect(url_for('admin_view.admin_group', name = serializer.dumps(group.name)))
def admin_users(): UserForm = CreateUserForm() if request.method == 'GET': if current_user.superuser: users = NodeDefender.db.user.list() else: groups = NodeDefender.db.group.list(current_user.email) groups = [group.name for group in groups] users = NodeDefender.db.user.list(*groups) return render_template('frontend/admin/users.html', Users = users,\ CreateUserForm = UserForm) if not UserForm.validate(): flash('Error adding user', 'danger') return redirect(url_for('admin_view.admin_users')) try: user = NodeDefender.db.user.create(UserForm.Email.data, UserForm.Firstname.data, UserForm.Lastname.data) except ValueError as e: flash('Error: {}'.format(e), 'danger') redirect(url_for('admin_view.admin_users')) flash('Successfully added user {}'.format(user.firstname), 'success') return redirect(url_for('admin_view.admin_user', email = user.email))
def admin_user(email): email = serializer.loads(email) usersettings = UserSettings() userpassword = UserPassword() usergroupadd = UserGroupAdd() user = NodeDefender.db.user.get(email) if request.method == 'GET': if user is None: flash('User {} not found'.format(id), 'danger') return redirect(url_for('admin_view.admin_groups')) return render_template('frontend/admin/user.html', User = user, UserSettings = usersettings, UserPassword = userpassword, UserGroupAdd = usergroupadd) if usersettings.Email.data and usersettings.validate(): NodeDefender.db.user.update(usersettings.Email.data, **\ {'firstname' : usersettings.Firstname.data, 'lastname' : usersettings.Lastname.data}) return redirect(url_for('admin_view.admin_user', email = serializer.dumps(usersettings.Email.data)))
def protect(self): if request.method not in current_app.config['WTF_CSRF_METHODS']: return try: validate_csrf(self._get_csrf_token()) except ValidationError as e: logger.info(e.args[0]) self._error_response(e.args[0]) if request.is_secure and current_app.config['WTF_CSRF_SSL_STRICT']: if not request.referrer: self._error_response('The referrer header is missing.') good_referrer = 'https://{0}/'.format(request.host) if not same_origin(request.referrer, good_referrer): self._error_response('The referrer does not match the host.') g.csrf_valid = True # mark this request as CSRF valid
def register_extension(app): @app.before_request def add_correlation_id(*args, **kw): correlation_id = request.headers.get(CORRELATION_ID) log.debug("%s %s", request.method, request.url) if not correlation_id: correlation_id = str(uuid.uuid4()) if request.method != "GET": """ TODO: remove sensitive information such as username/password """ log.debug({ "message": "Tracking request", "correlation_id": correlation_id, "method": request.method, "uri": request.url, "data": request.data, }) request.correlation_id = correlation_id @app.after_request def save_correlation_id(response): if CORRELATION_ID not in response.headers: response.headers[CORRELATION_ID] = getattr(request, "correlation_id", None) return response
def admin_view(): if request.method == 'POST': username = request.form.get('name') password = request.form.get('password') admin_user= Teams.query.filter_by(name=request.form['name'], admin=True).first() if admin_user and bcrypt_sha256.verify(request.form['password'], admin_user.password): try: session.regenerate() # NO SESSION FIXATION FOR YOU except: pass # TODO: Some session objects dont implement regenerate :( session['username'] = admin_user.name session['id'] = admin_user.id session['admin'] = True session['nonce'] = sha512(os.urandom(10)) db.session.close() return redirect('/admin/graphs') if is_admin(): return redirect('/admin/graphs') return render_template('admin/login.html')
def admin_keys(chalid): if request.method == 'GET': chal = Challenges.query.filter_by(id=chalid).first_or_404() json_data = {'keys':[]} flags = json.loads(chal.flags) for i, x in enumerate(flags): json_data['keys'].append({'id':i, 'key':x['flag'], 'type':x['type']}) return jsonify(json_data) elif request.method == 'POST': chal = Challenges.query.filter_by(id=chalid).first() newkeys = request.form.getlist('keys[]') newvals = request.form.getlist('vals[]') print(list(zip(newkeys, newvals))) flags = [] for flag, val in zip(newkeys, newvals): flag_dict = {'flag':flag, 'type':int(val)} flags.append(flag_dict) json_data = json.dumps(flags) chal.flags = json_data db.session.commit() db.session.close() return '1'
def hello(): form = ReusableForm(request.form) print(form.errors) if request.method == 'POST': name=request.form['name'] password=request.form['password'] email=request.form['email'] print(name, " ", email, " ", password) if form.validate(): # Save the comment here. flash('Thank you, ' + name) else: flash('Error: All the form fields are required. ') return render_template('hello.html', form=form)
def login(): if request.method == "POST": post_data = { "username": request.form["username"], "password": request.form["password"], } r = requests.post("http://127.0.0.1:8000/api/user/login", data=post_data) resp = r.json() if resp["status"] == 1: session["token"] = r.cookies["token"] session["api-session"] = r.cookies["flask"] return redirect("/problems") else: flash(resp["message"]) return render_template("login.html") else: return render_template("login.html")
def problem(pid): if "token" not in session: return redirect("/") if request.method == "POST": r = requests.post("http://127.0.0.1:8000/api/problems/submit", data={ "pid": pid, "key": request.form["solution"], }, cookies={ "token": session["token"], "flask": session["api-session"] }) app.logger.info(r.text) app.logger.info(r.status_code) resp = r.json() if resp.get("status", 0) == 0: flash(resp.get("message", "That's not the correct answer")) return redirect("/problem/" + pid) else: return redirect("/problems") else: problem = api_get("/api/problems/" + pid) return render_template("problem.html", problem=problem["data"])
def __init__(self, options, func, request_context): self.options = options self.operation = request.endpoint self.func = func.__name__ self.method = request.method self.args = request.args self.view_args = request.view_args self.request_context = request_context self.timing = dict() self.error = None self.stack_trace = None self.request_body = None self.response_body = None self.response_headers = None self.status_code = None self.success = None
def newMenuItem(restaurant_id): if request.method == 'POST': newItem = MenuItem(name=request.form['name'], description=request.form[ 'description'], price=request.form['price'], course=request.form['course'], restaurant_id=restaurant_id) session.add(newItem) session.commit() return redirect(url_for('showMenu', restaurant_id=restaurant_id)) else: return render_template('newMenuItem.html', restaurant_id=restaurant_id) return render_template('newMenuItem.html', restaurant=restaurant) # return 'This page is for making a new menu item for restaurant %s' # %restaurant_id # Edit a menu item
def editMenuItem(restaurant_id, menu_id): editedItem = session.query(MenuItem).filter_by(id=menu_id).one() if request.method == 'POST': if request.form['name']: editedItem.name = request.form['name'] if request.form['description']: editedItem.description = request.form['name'] if request.form['price']: editedItem.price = request.form['price'] if request.form['course']: editedItem.course = request.form['course'] session.add(editedItem) session.commit() return redirect(url_for('showMenu', restaurant_id=restaurant_id)) else: return render_template( 'editMenuItem.html', restaurant_id=restaurant_id, menu_id=menu_id, item=editedItem) # return 'This page is for editing menu item %s' % menu_id # Delete a menu item
def stop_instance(): res = {} if request.method == 'POST': req_data = json.loads(request.data) db_session = db.Session() instance_query_result = db_session.query(Instance).filter(\ Instance.container_serial == req_data['container_serial']).first() if instance_query_result is not None: policy = {} policy['operate'] = app.config['STOP'] policy['container_serial'] = req_data['container_serial'] policy['container_name'] = instance_query_result.container_name policy['user_name'] = req_data['user_name'] message = json.dumps(policy) ui_mq = UiQueue() worker_res = ui_mq.send(message) worker_res_dict = json.loads(worker_res) res['code'] = worker_res_dict['code'] res['message'] = worker_res_dict['message'] res['container_serial'] = worker_res_dict['container_serial'] eagle_logger.info(res['message']) else: res['code'] = '0x9' res['message'] = 'container not exist' return jsonify(**res)
def restart_instance(): res = {} if request.method == 'POST': req_data = json.loads(request.data) db_session = db.Session() instance_query_result = db_session.query(Instance).filter(\ Instance.container_serial == req_data['container_serial']).first() if instance_query_result is not None: policy = {} policy['operate'] = app.config['RESTART'] policy['container_serial'] = req_data['container_serial'] policy['container_name'] = instance_query_result.container_name policy['user_name'] = req_data['user_name'] message = json.dumps(policy) ui_mq = UiQueue() worker_res = ui_mq.send(message) worker_res_dict = json.loads(worker_res) res = worker_res_dict eagle_logger.info(res['message']) else: res['code'] = '0x9' res['message'] = 'container not exist' return jsonify(**res)
def remove_instance(): res = {} if request.method == 'POST': req_data = json.loads(request.data) db_session = db.Session() instance_query_result = db_session.query(Instance).filter(\ Instance.container_serial == req_data['container_serial']).first() if instance_query_result is not None: policy = {} policy['operate'] = app.config['REMOVE'] policy['container_serial'] = req_data['container_serial'] policy['user_name'] = req_data['user_name'] message = json.dumps(policy) ui_mq = UiQueue() worker_res = ui_mq.send(message) worker_res_dict = json.loads(worker_res) res['code'] = worker_res_dict['code'] res['message'] = worker_res_dict['message'] res['container_serial'] = worker_res_dict['container_serial'] eagle_logger.info(res['message']) else: res['code'] = '0x9' res['message'] = 'container not exist' return jsonify(**res)
def tail(): if request.method == 'POST': fi = request.form['file'] if os.path.isfile(fi): n = int(request.form['n']) le = io.open(fi, 'r', encoding='utf-8') taildata = le.read()[-n:] le.close() else: taildata = "No such file." return render_template('tail.html',taildata=taildata)