我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用flask.request.url_rule()。
def verify_route(): """ Verify if the route is in a list of routes that need have the authorization header """ for route in routes: if route[0] == str(request.url_rule) and request.method in route[1] and 'Authorization' not in request.headers: abort(403, 'Authorization header missing') # TODO: Implementar a validação dos modelos de entrada
def encrypt_password(): """ Verify if the route is for password reset or user create, then encrypts the password. """ if request.json is None or not 'password' in request.json: return if str(request.url_rule) == '/auth/user/' and request.method == 'POST' \ or str(request.url_rule) == '/auth/user/resetpassword/' and request.method == 'PUT': request.json['password'] = generate_password_hash(request.json['password'])
def get_url_rule(): rule = None url_rule = request.url_rule if url_rule is not None: rule = url_rule.rule return rule
def request_finished(self, app, response): rule = request.url_rule.rule if request.url_rule is not None else "" rule = build_name_with_http_method_prefix(rule, request) request_data = get_data_from_request(request) response_data = get_data_from_response(response) elasticapm.set_transaction_data(request_data, 'request') elasticapm.set_transaction_data(response_data, 'response') if response.status_code: result = 'HTTP {}xx'.format(response.status_code // 100) else: result = response.status self.client.end_transaction(rule, result)
def build_url(self, **kwargs): arg = request.args.copy() view_args = request.view_args arg.update(view_args) for attr in kwargs.keys(): if attr in arg: arg.pop(attr) arg.update(kwargs.items()) rule = request.url_rule result = rule.build(arg) return result[1]
def inject_template_vars(): output = {} output["baseurl"] = get_baseurl() current_path = str(request.url_rule)[1:].split("/") output["current_path"] = current_path[0] return output
def requireAuthenticate(acceptGuest): def requireAuth(f): @wraps(f) def decorated_function(*args, **kwargs): auth = request.authorization if auth: if acceptGuest and request.headers['Service-Provider'] == 'Guest' and auth.username == app.config['GUEST_ID']: if auth.password == app.config['GUEST_TOKEN']: g.currentUser = None g.loginWith = 'Guest' return f(*args, **kwargs) else: g.loginWith = None if request.headers['Service-Provider'] == 'Facebook': g.facebookToken = FacebookModel.getTokenValidation(app.config['FACEBOOK_ACCESS_TOKEN'], auth.password) if g.facebookToken['is_valid'] and g.facebookToken['user_id'] == auth.username: g.currentUser = FacebookModel.getUser(auth.username) g.loginWith = 'Facebook' elif request.headers['Service-Provider'] == 'Google': g.googleToken = GoogleModel.getTokenValidation(app.config['GOOGLE_CLIENT_ID'], auth.password) if g.googleToken and g.googleToken['sub'] == auth.username: g.currentUser = GoogleModel.getUser(auth.username) g.loginWith = 'Google' if g.loginWith and (str(request.url_rule) == '/v1/login' or g.currentUser): if str(request.url_rule) == '/v1/login' or not g.currentUser['disabled']: return f(*args, **kwargs) return abort(410) return abort(401) return decorated_function return requireAuth
def _has_fr_route(self): """Encapsulating the rules for whether the request was to a Flask endpoint""" # 404's, 405's, which might not have a url_rule if self._should_use_fr_error_handler(): return True # for all other errors, just check if FR dispatched the route if not request.url_rule: return False return self.owns_endpoint(request.url_rule.endpoint)
def load_user(): # Make sure not to run function for static files if request.url_rule and '/static/' in request.url_rule.rule: return # Load user session_id = request.cookies.get('session_id') user_id = request.cookies.get('user_id') if session_id and user_id: try: user_id = int(user_id) except ValueError: return session = db.session.query(LoginSession).get((session_id, user_id)) if session: user = session.user if user and session.active: g.logged_in = True g.user = user # Update user's last_action timestamp if it's been at least # LAST_ACTION_OFFSET minutes. current = datetime.datetime.now() mins_ago = current - datetime.timedelta( minutes=app.config['LAST_ACTION_OFFSET']) if g.user and g.user.last_action < mins_ago: g.user.last_action = current db.session.commit() return g.logged_in = False g.user = None return
def submit_lifecycle_ignition(request_raw_query): """ Lifecycle Ignition --- tags: - lifecycle responses: 200: description: A JSON of the ignition status """ try: machine_ignition = json.loads(request.get_data()) except ValueError: app.logger.error("%s have incorrect content" % request.path) return jsonify({"message": "FlaskValueError"}), 406 req = requests.get("%s/ignition?%s" % (EC.matchbox_uri, request_raw_query)) try: matchbox_ignition = json.loads(req.content) req.close() except ValueError: app.logger.error("%s have incorrect matchbox return" % request.path) return jsonify({"message": "MatchboxValueError"}), 406 @smartdb.cockroach_transaction def op(caller=request.url_rule): with SMART.new_session() as session: try: inject = crud.InjectLifecycle(session, request_raw_query=request_raw_query) if json.dumps(machine_ignition, sort_keys=True) == json.dumps(matchbox_ignition, sort_keys=True): inject.refresh_lifecycle_ignition(True) return jsonify({"message": "Up-to-date"}), 200 else: inject.refresh_lifecycle_ignition(False) return jsonify({"message": "Outdated"}), 210 except AttributeError: return jsonify({"message": "Unknown"}), 406 return op(caller=request.url_rule)
def lifecycle_rolling_delete(request_raw_query): """ Lifecycle Rolling Update Disable the current policy for a given machine by UUID or MAC --- tags: - lifecycle parameters: - name: request_raw_query in: path description: Pass the mac as 'mac=<mac>' required: true type: string responses: 200: description: Rolling Update is not enable schema: type: dict """ app.logger.info("%s %s" % (request.method, request.url)) @smartdb.cockroach_transaction def op(caller=request.url_rule): with SMART.new_session() as session: life = crud.InjectLifecycle(session, request_raw_query) life.apply_lifecycle_rolling(False, None) return jsonify({"enable": False, "request_raw_query": request_raw_query}), 200 return op(caller=request.url_rule)
def report_lifecycle_coreos_install(status, request_raw_query): """ Lifecycle CoreOS Install Report the status of a CoreOS install by MAC --- tags: - lifecycle responses: 200: description: CoreOS Install report schema: type: dict """ app.logger.info("%s %s" % (request.method, request.url)) if status.lower() == "success": success = True elif status.lower() == "fail": success = False else: app.logger.error("%s %s" % (request.method, request.url)) return "success or fail != %s" % status.lower(), 403 @smartdb.cockroach_transaction def op(caller=request.url_rule): with SMART.new_session() as session: inject = crud.InjectLifecycle(session, request_raw_query=request_raw_query) inject.refresh_lifecycle_coreos_install(success) op(caller=request.url_rule) repositories.machine_state.update( mac=tools.get_mac_from_raw_query(request_raw_query), state=MachineStates.installation_succeed if success else MachineStates.installation_failed) return jsonify({"success": success, "request_raw_query": request_raw_query}), 200
def before_request(): g.user = current_user if should_redirect(current_user, request.url_rule): redirect_to('home')
def before_request(): g.res = LunabitResponse() # all routes must have a trailing slash, will append if doesn't exist if not request.path.endswith('/'): request.path = ''.join([request.path, '/']) # removing api version from beginning of string for proper routing request_path = str(request.url_rule) if request.url_rule else request.path url_rules = [r.rule for r in app.url_map.iter_rules()] if request_path in url_rules: for r in app.url_map.iter_rules(): if r.rule == request_path and request.method in r.methods: print(r.methods) try: g.req = LunabitRequest.from_data(request.method, request) g.user = g.req.user break except GridlightException as e: raise InvalidUsage(message=str(e)) else: err = ''.join(["URL '", request_path, "' does not allow the method '", request.method,"'."]) print_log(err) raise InvalidUsage(message=err, status_code=405) else: err = ''.join(["URL '", request_path, "' does not exist."]) print_log(err) raise InvalidUsage(message=err, status_code=404)
def change_lifecycle_rolling(request_raw_query): """ Lifecycle Rolling Update Change the current policy for a given machine by MAC --- tags: - lifecycle parameters: - name: request_raw_query in: path description: Pass the mac as 'mac=<mac>' required: true type: string responses: 200: description: Rolling Update is enable schema: type: dict 401: description: Mac address is not in database schema: type: dict """ app.logger.info("%s %s" % (request.method, request.url)) try: strategy = json.loads(request.get_data())["strategy"] app.logger.info("%s %s rolling strategy: setting to %s" % (request.method, request.url, strategy)) except (KeyError, ValueError): # JSONDecodeError is a subclass of ValueError # Cannot use JSONDecodeError because the import is not consistent between python3.X app.logger.info("%s %s rolling strategy: setting default to kexec" % (request.method, request.url)) strategy = "kexec" @smartdb.cockroach_transaction def op(caller=request.url_rule): with SMART.new_session() as session: try: life = crud.InjectLifecycle(session, request_raw_query) life.apply_lifecycle_rolling(True, strategy) return jsonify({"enable": True, "request_raw_query": request_raw_query, "strategy": strategy}), 200 except AttributeError: return jsonify({"enable": None, "request_raw_query": request_raw_query, "strategy": strategy}), 401 return op(caller=request.url_rule)
def __inject_flask_g(*args, **kwargs): if str(request.url_rule) == '/static/<path:filename>': return homeworks = HwSet(app.config['HOMEWORK_DIR'],['']) if current_user.is_authenticated(): mongouser = app.config['USERS_COLLECTION'].find_one({"_id": current_user.name}) if mongouser is None: session['course'] = None return if len(mongouser['course']) != 0: session['course'] = mongouser['course'] if session.get('course') is not None: problem_dict = mongouser['problem_list'] course_name = session['course'] course = app.config['COURSE_COLLECTION'].find_one({"name": course_name}) if course == None or not(os.path.isdir(os.path.join(app.config['HOMEWORK_DIR_FOR_CLASS'],course_name))): session['course'] = None return if not os.path.isdir(course["path"]): session['course'] = None if app.config['COURSE_COLLECTION'].count({"name":course}) > 0: app.config['COURSE_COLLECTION'].remove({"name":course}) return problem_list = problem_dict.get(course_name,'key_error') if current_user.is_admin: problem_list = course['problem_list'] if (not current_user.is_admin) and (problem_list == 'key_error' or (len(problem_list) == 0) or (not_int_list(problem_list,course['problem_list'])) or (not_cover_list(problem_list,course['problem_list']))) and (len(course['problem_list']) != 0): problem_list = getproblemlist(course['problem_list'],app.config['HOMEWORK_NUM']) problem_dict.update({course_name:problem_list}) app.config['USERS_COLLECTION'].remove({"_id":mongouser['_id']}) app.config['USERS_COLLECTION'].insert({"_id":mongouser['_id'],"password":mongouser['password'],"problem_list":problem_dict,"course":mongouser['course']}) string = str(problem_list) course_path = os.path.join(app.config['COURSE_HOMEWORK_DIR'],course_name) if string == "key_error": homeworks = HwSet(course_path,['']) else: tmplist = string.split('@') list = [item for item in tmplist] homeworks = HwSet(course_path,list) g.homeworks = HwSetProxy(homeworks) # g.utcnow will be used in templates/homework.html to determine some # visual styles g.utcnow = utc_now()