我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.args()。
def authorize_view(self): """Flask view that starts the authorization flow. Starts flow by redirecting the user to the OAuth2 provider. """ args = request.args.to_dict() # Scopes will be passed as mutliple args, and to_dict() will only # return one. So, we use getlist() to get all of the scopes. args['scopes'] = request.args.getlist('scopes') return_url = args.pop('return_url', None) if return_url is None: return_url = request.referrer or '/' flow = self._make_flow(return_url=return_url, **args) auth_url = flow.step1_get_authorize_url() return redirect(auth_url)
def index(): code = request.args.get("code", "") app.logger.debug("code: %s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data.get('access_token') userData = Get_User_Info(access_token) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def twittercallback(): verification = request.args["oauth_verifier"] auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) try: auth.request_token = session["request_token"] except KeyError: flash("Please login again", "danger") return redirect(url_for("bp.home")) try: auth.get_access_token(verification) except tweepy.TweepError: flash("Failed to get access token", "danger") return redirect(url_for("bp.home")) session["access_token"] = auth.access_token session["access_token_secret"] = auth.access_token_secret return render_template("twittercallback.html", form=HashtagForm())
def index(): """Primary index function. This function handles searching and the main page. If ``q`` is passed in a query string, e.g. ``http://localhost?q=gabriel+dropout``, then a search will be performed. If request path is ``search``, e.g. ``http://localhost/search``, then the navigation menu will not be rendered. Should there be no shows returned from the backend, ``front_end.do_first_time_setup`` will be called to scrape shows from the source. Returns: A rendered template, either ``first_time.html`` for the first run or ``default.html`` otherwise. """ log.debug("Entering index, attempting to get shows.") watching, airing, specials, movies = fe.get_shows_for_display(request.args.get('q',None)) standalone = True if request.path.strip('/') == 'search' else False logged_in = fe.check_login_id(escape(session['logged_in'])) if 'logged_in' in session else False if not watching and not airing and not specials and not movies: log.debug("No shows found in any category. Starting first time startup.") fe.do_first_time_setup() return render_template('first_time.html', logged_in=logged_in) return render_template('default.html', watching=watching, airing=airing, specials=specials, movies=movies, standalone=standalone, logged_in=logged_in)
def star(): """Starring/Highlighting handler. Attempts to toggle a star/highlight on a particular show. The show ID must be passed in the ``id`` query string. If the user is unauthenticated, the function is aborted with a ``404`` message to hide the page. Returns: JSON formatted output describing success and the ID of the show starred. """ log.debug("Entering star, trying to toggle star.") if fe.check_login_id(escape(session['logged_in'])): log.debug("Sending show ID {0} to function".format(request.args['id'])) fe.star_show(request.args['id']) log.debug("Returning to user.") return jsonify({ "star": "success", "id": request.args['id'] }) log.debug("User cannot be authenticated, send 404 to hide page.") abort(404)
def drop_show(): """Show removal handler. Attempts to remove a show from the backend system. The show ID must be passed in the ``id`` query string. If the user if unauthenticated, the function is aborted with a ``404`` message to hide the page. Returns: An HTTP redirect to the home page, to refresh. """ log.debug("Entering drop_show, trying to remove show from list.") if fe.check_login_id(escape(session['logged_in'])): log.debug("Sending show ID {0} to function".format(request.args['id'])) fe.remove_show(request.args['id']) log.debug("Refreshing user's page.") return redirect('/') log.debug("User cannot be authenticated, send 404 to hide page.") abort(404)
def setup(args, pipeline, runmod, injector): """Load configuration""" logging.basicConfig( format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') _globals['gransk'] = gransk.api.API(injector) _globals['config'] = _globals['gransk'].config if pipeline: _globals['gransk'].pipeline = pipeline if _globals['gransk'].pipeline.get_service('related_entities'): _globals['gransk'].pipeline.get_service('related_entities').load_all(_globals['config']) if _globals['gransk'].pipeline.get_service('related_documents'): _globals['gransk'].pipeline.get_service('related_documents').load_all(_globals['config'])
def customEvent(): event_name = request.args['eventName'] location = request.args['location'] start_dt = request.args['startDt'] end_dt = request.args['endDt'] event_hashkey = utils.make_hashkey("salttt"); if len(event_name) == 0: event_name = None if len(location) == 0: location = None db_manager.query( """ INSERT INTO EVENT (event_hashkey,calendar_hashkey,event_id,summary,start_dt,end_dt,location) VALUES (%s, 'admin_calendar_hashkey','admin_event_id', %s, %s, %s, %s) """, (event_hashkey,event_name,start_dt,end_dt,location) ) reco_maestro = recoMaestro.RecoMaestro( account_hashkey = 'admin_account_hashkey', switchExtractor = True) return json.dumps(reco_maestro.result_final)
def password_required(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) form = PasswordForm(request.form) if request.method == 'POST' and form.validate(): password = request.form.get('password') cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT') passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp)) if passwd_mngr.validates(password, project): response = make_response(redirect(request.args.get('next'))) return passwd_mngr.update_response(response, project, get_user_id_or_ip()) flash(gettext('Sorry, incorrect password')) return render_template('projects/password.html', project=project, form=form, short_name=short_name, next=request.args.get('next'))
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data['access_token'] userData = Get_User_Info(access_token) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) app.logger.debug(_data) access_token = _data['access_token'] uid = _data['uid'] userData = Get_User_Info(access_token, uid) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data['access_token'] openid = Get_OpenID(access_token)['openid'] userData = Get_User_Info(access_token, openid) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def get_filter_arguments(mrequest): """ Get timestamp and address from request """ data = mrequest.args cur_timestamp, addr = None, None if data is not None: if 'timestamp' in data.keys(): cur_timestamp = data['timestamp'] form = "%Y-%m-%dT%H:%M:%S.%f" try: cur_timestamp = datetime.datetime.strptime(cur_timestamp, form) except ValueError: abort(500, "Wrong timestamp format") if 'addr' in data.keys(): addr = int(data['addr'], 16) return cur_timestamp, addr
def _session_history(): try: data = request.args sid = data.get('session') sess = session_manager.get_session(sid) fname = sess.dump_file if fname is not None and os.path.isfile(fname): return send_from_directory( os.path.dirname(fname), os.path.basename(fname), mimetype='text/plain') else: return '', 404 except Exception as ex: logger.error("Internal error {}".format(ex)) return '', 500
def jsonhistory(): history = db.session.query(models.Processed) table = DataTable(request.args, models.Processed, history, [ ("date", "time", lambda i: "{}".format(i.time.strftime('%Y/%m/%d')) if i.stopped else '<span class="orange">{}</span>'.format(_("Currently watching..."))), ("ipaddress", lambda i: "{}".format(i.get_xml().find('Player').get("address"))), ("user", lambda i: '<a href="{0}" class="invert-link">{1}</a>'.format(url_for('user', name=i.user), i.user)), ("platform"), ("title", lambda i: u'<a class="invert-link" href="{0}">{1}</a>'.format(url_for('info', id=i.get_xml_value('ratingKey')), i.title)), ("type", lambda i: "{}".format(i.get_xml_value("type"))), ("streaminfo", lambda i: '<a href="#" data-link="{0}" class="orange" data-target="#streamModal" data-toggle="modal"><i class="glyphicon glyphicon glyphicon-info-sign"></i></a>'.format(url_for('streaminfo',id=i.id)) if i.platform != "Imported" else ''), ("time",lambda i: "{}".format(i.time.strftime('%H:%M'))), ("paused_counter", lambda i: "{} min".format(int(i.paused_counter)/60) if i.paused_counter else "0 min" ), ("stopped", lambda i: "{}".format(i.stopped.strftime('%H:%M')) if i.stopped else "n/a"), ("duration", lambda i: "{} min".format(int((((i.stopped - i.time).total_seconds() - (int(i.paused_counter or 0))) /60))) if i.stopped else "n/a"), ("completed", lambda i: '<span class="badge badge-warning">{}%</span>'.format(helper.getPercentage(i.get_xml_value("duration") if i.platform == "Imported" else i.get_xml_value("viewOffset"), i.get_xml_value("duration")))), ]) table.searchable(lambda queryset, user_input: perform_some_search(queryset, user_input)) return json.dumps(table.json())
def audit(func): """ Record a Flask route function in the audit log. Generates a JSON record in the Flask log for every request. """ @wraps(func) def wrapper(*args, **kwargs): options = AuditOptions( include_request_body=DEFAULT_INCLUDE_REQUEST_BODY, include_response_body=DEFAULT_INCLUDE_RESPONSE_BODY, include_path=True, include_query_string=True, ) with logging_levels(): return _audit_request(options, func, None, *args, **kwargs) return wrapper
def __init__(self, options, func, request_context): self.options = options self.operation = request.endpoint self.func = func.__name__ self.method = request.method self.args = request.args self.view_args = request.view_args self.request_context = request_context self.timing = dict() self.error = None self.stack_trace = None self.request_body = None self.response_body = None self.response_headers = None self.status_code = None self.success = None
def file_index(page=1): """Show available actions regarding files. Args: page (int): Listing page number to show. """ order_key, order_dir, ordering = _sort_uploads(request.args) files = ( FileUpload.query .order_by(ordering) .paginate(page, 20, False) ) try: return render_template( 'akamatsu/dashboard/file/index.html', files=files, order_key=order_key, order_dir=order_dir ) except NotFound: # Show a 'no files found' notice instead of a 404 error return render_template('akamatsu/dashboard/file/index.html')
def user_index(page=1): """Show list of users registered in the application. Args: page (int): Listing page number to show. """ order_key, order_dir, ordering = _sort_users(request.args) users = ( User.query .order_by(ordering) .paginate(page, 20, False) ) try: return render_template( 'akamatsu/dashboard/user/index.html', users=users, order_key=order_key, order_dir=order_dir ) except NotFound: # Show a 'no posts found' notice instead of a 404 error return render_template('akamatsu/dashboard/user/index.html')
def config_sensors_change(self, mode, id): if mode == 'add' and 'module' not in request.args: if request.method == 'POST': filename = OS().upload_file('sensors/', 'file'); return self._get_module_chooser("Add Sensor", "/config/sensors/add", "sensors", "Sensor") data = { "edit": (mode == 'edit'), "mode": mode, "sensor": None, "sensor_impl": None, "sensor_module": None, "modules": OS().get_classes("sensors", "Sensor") } if mode == 'edit' and id is not None: sensor = Sensors().get(id) data['sensor'] = sensor data['sensor_module'] = sensor.get_classpath() data['sensor_impl'] = sensor.get_sensor_impl() elif mode == 'add': data['sensor_module'] = request.args.get('module') data['sensor_impl'] = OS().create_object(data['sensor_module']) return self.get_view('config_sensor_change.html').data(data)
def config_notifications_change(self, mode, nid): if mode == 'add' and 'module' not in request.args: if request.method == 'POST': filename = OS().upload_file('notifiers/', 'file') return self._get_module_chooser("Add Notification Service", "/config/notifications/add", "notifiers", "Notifier") data = { "edit": (mode == 'edit'), "mode": mode, "notifier": None, "notifier_impl": None, "notifier_module": None, "modules": OS().get_classes("notifiers", "Notifier") } if mode == 'edit' and nid is not None: notifier = Notifiers().get(nid) data['notifier'] = notifier data['notifier_module'] = notifier.get_classpath() data['notifier_impl'] = notifier.get_notifier_impl() elif mode == 'add': data['notifier_module'] = request.args.get('module') data['notifier_impl'] = OS().create_object(data['notifier_module']) return self.get_view('config_notifications_change.html').data(data)
def index(): for ip in request.headers.get('X-Forwarded-For', '').split(','): ip = ip.strip().lower() if ip in HALL_OF_SHAME: abort(403) if 'f' in request.args: try: f = request.args['f'] if re.search(r'proc|random|zero|stdout|stderr', f): abort(403) elif '\x00' in f: abort(404) return open(f).read(4096) except IOError: abort(404) else: return INDEX
def log_exception(self, exc_info): self.logger.error(""" Path: %s HTTP Method: %s Client IP Address: %s User Agent: %s User Platform: %s User Browser: %s User Browser Version: %s GET args: %s view args: %s URL: %s """ % ( request.path, request.method, request.remote_addr, request.user_agent.string, request.user_agent.platform, request.user_agent.browser, request.user_agent.version, dict(request.args), request.view_args, request.url ), exc_info=exc_info)
def search_results(): q = request.args.get('q') or '' if not q: return render_template('results_page.html', results=[], q=q) m = re_qid.match(q.strip()) if m: return redirect(url_for('item_page', wikidata_id=m.group(1)[1:])) try: results = nominatim.lookup(q) except nominatim.SearchError: message = 'nominatim API search error' return render_template('error_page.html', message=message) update_search_results(results) for hit in results: add_hit_place_detail(hit) return render_template('results_page.html', results=results, q=q)
def saved_places(): abort(404) if 'filter' in request.args: arg_filter = request.args['filter'].strip().replace(' ', '_') if arg_filter: return redirect(url_for('saved_with_filter', name_filter=arg_filter)) else: return redirect(url_for('saved_places')) sort = request.args.get('sort') or 'name' name_filter = g.get('filter') or None if name_filter: place_tbody = render_template('place_tbody.html', existing=get_existing(sort, name_filter)) else: place_tbody = get_place_tbody(sort) return render_template('saved.html', place_tbody=place_tbody, sort_link=sort_link)
def twitter_authorized(resp): if resp is None: return 'Access denied: reason: {} error:{}'.format( request.args['error_reason'], request.args['error_description']) session['twitter_oauth_token'] = resp['oauth_token'] + \ resp['oauth_token_secret'] user = User.query.filter_by( username=resp['screen_name']).first() if not user: user = User(username=resp['screen_name'], password='jmilkfan') db.session.add(user) db.session.commit() flash("You have been logged in.", category="success") return redirect( request.args.get('next') or url_for('blog.home'))
def push(URI, arc_id, p_args={}): global handlers try: # push to all possible archives res = [] ### if arc_id == 'all': ### for handler in handlers: ### if (handlers[handler].api_required): # pass args like key API ### res.append(handlers[handler].push(str(URI), p_args)) ### else: ### res.append(handlers[handler].push(str(URI))) ### else: # push to the chosen archives for handler in handlers: if (arc_id == handler) or (arc_id == 'all'): ### if (arc_id == handler): ### and (handlers[handler].api_required): res.append(handlers[handler].push(str(URI), p_args)) ### elif (arc_id == handler): ### res.append(handlers[handler].push(str(URI))) return res except: pass return ["bad request"]
def get_callsign(objtype, kw): def decorator(func): @wraps(func) def decorated_function(*args, **kwargs): try: item = int(re.match(r'^\d+', kwargs[kw]).group(0)) item = objtype(item) item._data except (NameError, AttributeError, OverflowError, NoRow): abort(404) kwargs[kw] = item setattr(g, kw, item) return func(*args, **kwargs) return decorated_function return decorator
def special_access_required(func): @login_required @wraps(func) def decorated_function(*args, **kwargs): if current_user.type != UserType.ADMIN: if 'club' in kwargs: club = kwargs['club'] elif 'activity' in kwargs: club = kwargs['activity'].club else: abort(403) # Admin-only page if current_user.type == UserType.TEACHER: if current_user != club.teacher: abort(403) else: if current_user != club.leader: abort(403) return func(*args, **kwargs) return decorated_function
def require_student_membership(func): @login_required @wraps(func) def decorated_function(*args, **kwargs): if 'club' in kwargs: club = kwargs['club'] elif 'activity' in kwargs: club = kwargs['activity'].club else: assert False if current_user not in club.members: abort(403) return func(*args, **kwargs) return decorated_function
def require_membership(func): @login_required @wraps(func) def decorated_function(*args, **kwargs): if current_user.type != UserType.ADMIN: if 'club' in kwargs: club = kwargs['club'] elif 'activity' in kwargs: club = kwargs['activity'].club else: assert False if current_user not in [club.teacher] + club.members: abort(403) return func(*args, **kwargs) return decorated_function
def __call__(self, func): @wraps(func) def decorator(*args, **kwargs): response = func(*args, **kwargs) if not USE_CACHE: return response for path in self.routes: route = '%s,%s,%s' % (path, json.dumps(self.args), None) Cache.delete(unicode(route)) user = kwargs.get('user', None) if self.clear_for_user else None Logger.debug(unicode('clear %s from cache' % (route))) if user: route = '%s,%s,%s' % (path, json.dumps(self.args), user) Cache.delete(unicode(route)) Logger.debug(unicode('clear %s from cache' % (route))) return response return decorator
def perm_required(func): """ Check if user can do actions """ @wraps(func) def check_perm(*args, **kwargs): if 'report' in kwargs: GeneralController.check_perms( method=request.method, user=g.user, report=kwargs['report'] ) if 'ticket' in kwargs: GeneralController.check_perms( method=request.method, user=g.user, ticket=kwargs['ticket'] ) if 'defendant' in kwargs and request.method != 'GET': GeneralController.check_perms( method=request.method, user=g.user, defendant=kwargs['defendant'] ) return func(*args, **kwargs) return check_perm
def callback(self): if 'code' not in request.args: return None, None, None oauth_session = self.service.get_auth_session( data={'code': request.args['code'], 'grant_type': 'authorization_code', 'redirect_uri': self.get_callback_url()}, decoder=json.loads ) me = oauth_session.get('/api/v1/user.json').json() return ( me.get('data').get('id'), me.get('data').get('first_name'), me.get('data').get('email') )
def __init__(self, *args, **kwargs): print "init app" super(App, self).__init__(*args, **kwargs) self.config.from_object('config.Production') self.config['ROOT_FOLDER'], _ = os.path.split(self.instance_path) self.setup()
def set_request(cls, request, **kwargs): uuid = uuid4().hex[:8] cls.requests[uuid] = dict(request.args) if kwargs: cls.requests[uuid].update(kwargs) return uuid
def collector(command): command = command.replace('-', '_') cmd = getattr(app._cr, 'do_%s' % command) return cmd(**request.args)
def analyzer(command): kwargs = dict(request.args) uuid = parse_single_arg('request_id', kwargs) if uuid: kwargs = Request.get_request(uuid) command = command.replace('-', '_') cmd = getattr(app._ar, 'do_%s' % command) return cmd(**kwargs)
def plot(): command = parse_single_arg('plot', request.args, default_val='raw-data') uuid = Request.set_request(request, columns_data=True) return render_template('chart.html', command=command, req_id=uuid)
def handle_verification(): """ Handles Facebook verification call. """ return request.args['hub.challenge']
def showgauges(): return render_template("showgauges.html", subjectivityavg=request.args["subjectivityavg"], sentimentavg=request.args["sentimentavg"])
def telemetry(function): def _wrapper(*args, **kwargs): telemetry = Telemetry(referrer=request.referrer, ip=md5(request.remote_addr).hexdigest(), creation_date=datetime.now()) save(telemetry) return function(*args, **kwargs) return _wrapper
def index_page(): if request.args.get('epub'): return create_epub() else: return create_page()
def create_page(): page = int(request.args.get("page", "0")) limit = int(request.args.get("limit", "10")) offset = page * limit toots, count, count_all = get_toots(offset, limit) accounts = Account.query.order_by(Account.username) instances = Instance.query.order_by(Instance.domain) blacklist_status = True if request.args.get('blacklisted', None) else False if request.args.get('blacklisted') != 'ignore': accounts = accounts.filter(Account.blacklisted == blacklist_status) instances = instances.filter(Instance.blacklisted == blacklist_status) pagination = {'page': page + 1, 'limit': limit} pagination['next'] = "/?page=%s&" % (page + 1) pagination['previous'] = "/?page=%s&" % (page - 1) for key, value in request.args.iteritems(): if not key == "page": pagination['next'] += "&%s=%s" % (key, value) pagination['previous'] += "&%s=%s" % (key, value) if count < limit: pagination.pop('next') if page == 0: pagination.pop('previous') pagination['page_count'] = int(count_all / limit) + 1 return render_template('index.html', toots=toots, accounts=accounts, instances=instances, pagination=pagination)
def authorize(self, *args, **kwargs): """Default authorization method.""" if self.api is not None: return self.api.authorize(*args, **kwargs) return current_user
def get_many(self, *args, **kwargs): """Get collection.""" return []
def get_one(self, *args, **kwargs): """Load resource.""" return kwargs.get(self.meta.name)
def filter(self, collection, *args, **kwargs): """Filter collection.""" return self.meta.filters.filter(collection, self, *args, **kwargs)