我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用flask.ext.login.current_user.is_admin()。
def changePasswordForUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is not None and not current_user.is_anonymous() and (current_user.get_name() == username or current_user.is_admin()): if not "application/json" in request.headers["Content-Type"]: return make_response("Expected content-type JSON", 400) try: data = request.json except BadRequest: return make_response("Malformed JSON body in request", 400) if not "password" in data or not data["password"]: return make_response("password is missing from request", 400) try: userManager.changeUserPassword(username, data["password"]) except users.UnknownUser: return make_response(("Unknown user: %s" % username, 404, [])) return jsonify(SUCCESS) else: return make_response(("Forbidden", 403, []))
def changeSettingsForUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is None or current_user.is_anonymous() or (current_user.get_name() != username and not current_user.is_admin()): return make_response("Forbidden", 403) try: data = request.json except BadRequest: return make_response("Malformed JSON body in request", 400) try: userManager.changeUserSettings(username, data) return jsonify(SUCCESS) except users.UnknownUser: return make_response("Unknown user: %s" % username, 404)
def pluginData(name): api_plugins = octoprint.plugin.plugin_manager().get_filtered_implementations(lambda p: p._identifier == name, octoprint.plugin.SimpleApiPlugin) if not api_plugins: return make_response("Not found", 404) if len(api_plugins) > 1: return make_response("More than one api provider registered for {name}, can't proceed".format(name=name), 500) api_plugin = api_plugins[0] if api_plugin.is_api_adminonly() and not current_user.is_admin(): return make_response("Forbidden", 403) response = api_plugin.on_api_get(request) if response is not None: return response return NO_CONTENT #~~ commands for plugins
def admin(): if not current_user.is_admin: return redirect(url_for('index')) return render_template('admin.html')
def role_required(role): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if role == 'ADMIN' and not current_user.is_admin: abort(403) if role == 'BLOGER' and not current_user.is_bloger: abort(403) return f(*args, **kwargs) return decorated_function return decorator
def getUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is not None and not current_user.is_anonymous() and (current_user.get_name() == username or current_user.is_admin()): user = userManager.findUser(username) if user is not None: return jsonify(user.asDict()) else: abort(404) else: abort(403)
def getSettingsForUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is None or current_user.is_anonymous() or (current_user.get_name() != username and not current_user.is_admin()): return make_response("Forbidden", 403) try: return jsonify(userManager.getAllUserSettings(username)) except users.UnknownUser: return make_response("Unknown user: %s" % username, 404)
def deleteApikeyForUser(username): if not userManager.enabled: return jsonify(SUCCESS) if current_user is not None and not current_user.is_anonymous() and (current_user.get_name() == username or current_user.is_admin()): try: userManager.deleteApikey(username) except users.UnknownUser: return make_response(("Unknown user: %s" % username, 404, [])) return jsonify(SUCCESS) else: return make_response(("Forbidden", 403, []))
def pluginCommand(name): api_plugins = octoprint.plugin.plugin_manager().get_filtered_implementations(lambda p: p._identifier == name, octoprint.plugin.SimpleApiPlugin) if not api_plugins: return make_response("Not found", 404) if len(api_plugins) > 1: return make_response("More than one api provider registered for {name}, can't proceed".format(name=name), 500) api_plugin = api_plugins[0] valid_commands = api_plugin.get_api_commands() if valid_commands is None: return make_response("Method not allowed", 405) if api_plugin.is_api_adminonly() and not current_user.is_admin(): return make_response("Forbidden", 403) command, data, response = get_json_command_from_request(request, valid_commands) if response is not None: return response response = api_plugin.on_api_command(command, data) if response is not None: return response return NO_CONTENT #~~ first run setup
def is_accessible(self): try: return current_user.is_admin except AttributeError: # anonymous user object doesn't have is_admin attribute self.inaccessible_callback()
def on_settings_load(self): data = octoprint.plugin.SettingsPlugin.on_settings_load(self) # only return our restricted settings to admin users - this is only needed for OctoPrint <= 1.2.16 restricted = (("token", None), ("tracking_token", None), ("chats", dict())) for r, v in restricted: if r in data and (current_user is None or current_user.is_anonymous() or not current_user.is_admin()): data[r] = v return data
def is_admin(self): return False
def is_admin(self): return 'admin' in self.roles
def more_json(self): return { 'profile_picture_url': media_url(self.profile_picture) if self.profile_picture else None, 'is_admin': self.is_admin(), 'full_name': self.full_name(), 'abbr_name': self.abbr_name() }
def authorize_changes(resource): return current_user.is_admin() # using SQLAlchemy's hybrid_property to provide a setter and validation step, # below I use Flask-Alcohol's setter decorator to do the same thing
def authorize_changes(resource): return current_user.is_admin()
def admin_required(method): """A decorator on Flask view functions that validate whether the request user is an administrator. If not authenticated, the request user will be redirected to :func:`~railgun.website.views.signin`. If not an administrator, an error message will be flashed and the request user will be redirected to :class:`~railgun.website.views.index`. If the session is stale, the request user will be redirected to :func:`~railgun.website.views.reauthenticate`. Usage:: @bp.route('/') @admin_required def admin_index(): return 'This page can only be accessed by admins.' """ @wraps(method) def inner(*args, **kwargs): if not current_user.is_authenticated(): return login_manager.unauthorized() if not current_user.is_admin: flash(_("Only admin can view this page!"), 'danger') return redirect(url_for('index')) if not login_fresh(): return login_manager.needs_refresh() return method(*args, **kwargs) return inner
def login_required(method): """A decorator on Flask view functions that validate whether the visitor is authenticated. If not authenticated, the request user will be redirected to :func:`~railgun.website.views.signin`. If :func:`should_update_email` returns :data:`True`, the request user will be redirected to :func:`~railgun.website.views.profile_edit`. Usage:: @bp.route('/') @login_required def foo(): return 'This page can only be accessed by authenticated users.' """ @wraps(method) def inner(*args, **kwargs): if not current_user.is_authenticated(): return login_manager.unauthorized() if should_update_email(): return redirect_update_email() if should_choose_course() and (not current_user.is_admin): return redirect_choose_course() return method(*args, **kwargs) return inner
def __init__(self, name, path): super(CsvFileAuthProvider, self).__init__(name) self.csvpath = path self.users = [] self.__interested_fields = ('name', 'email', 'is_admin') self.reload()
def pull(self, name=None, email=None, dbuser=None): # Get the interested user by `auth_request` if email: user = self.__email_to_user.get(email, None) else: user = self.__name_to_user.get(name, None) # Return none if user not found, or password not match if not user: return None # Create the mongodb object if not exist if app.config['USERS_COLLECTION'].count({"_id":user.name}) == 0: # insert the user into mongo db dictionary = {} course = user_class_data.user_dic.get(user.name,'') app.config['USERS_COLLECTION'].insert({"_id":user.name,"password":None,"problem_list":dictionary,"course":course}) # dbuser is None, create new one if dbuser is None: try: dbuser = User(name=user.name, email=user.email, password=None, is_admin=user.is_admin, provider=self.name) # Special hack: get locale & timezone from request dbuser.fill_i18n_from_request() # save to database db.session.add(dbuser) db.session.commit() self._log_pull(user, create=True) except Exception: dbuser = None self._log_pull(user, create=True, exception=True) return (user, dbuser) # dbuser is not None, update existing one updated = False for k in self.__interested_fields: if getattr(dbuser, k) != getattr(user, k): updated = True setattr(dbuser, k, getattr(user, k)) if updated: try: db.session.commit() self._log_pull(user, create=False) except Exception: dbuser = None self._log_pull(user, create=False, exception=True) return (user, dbuser)
def __inject_flask_g(*args, **kwargs): if str(request.url_rule) == '/static/<path:filename>': return homeworks = HwSet(app.config['HOMEWORK_DIR'],['']) if current_user.is_authenticated(): mongouser = app.config['USERS_COLLECTION'].find_one({"_id": current_user.name}) if mongouser is None: session['course'] = None return if len(mongouser['course']) != 0: session['course'] = mongouser['course'] if session.get('course') is not None: problem_dict = mongouser['problem_list'] course_name = session['course'] course = app.config['COURSE_COLLECTION'].find_one({"name": course_name}) if course == None or not(os.path.isdir(os.path.join(app.config['HOMEWORK_DIR_FOR_CLASS'],course_name))): session['course'] = None return if not os.path.isdir(course["path"]): session['course'] = None if app.config['COURSE_COLLECTION'].count({"name":course}) > 0: app.config['COURSE_COLLECTION'].remove({"name":course}) return problem_list = problem_dict.get(course_name,'key_error') if current_user.is_admin: problem_list = course['problem_list'] if (not current_user.is_admin) and (problem_list == 'key_error' or (len(problem_list) == 0) or (not_int_list(problem_list,course['problem_list'])) or (not_cover_list(problem_list,course['problem_list']))) and (len(course['problem_list']) != 0): problem_list = getproblemlist(course['problem_list'],app.config['HOMEWORK_NUM']) problem_dict.update({course_name:problem_list}) app.config['USERS_COLLECTION'].remove({"_id":mongouser['_id']}) app.config['USERS_COLLECTION'].insert({"_id":mongouser['_id'],"password":mongouser['password'],"problem_list":problem_dict,"course":mongouser['course']}) string = str(problem_list) course_path = os.path.join(app.config['COURSE_HOMEWORK_DIR'],course_name) if string == "key_error": homeworks = HwSet(course_path,['']) else: tmplist = string.split('@') list = [item for item in tmplist] homeworks = HwSet(course_path,list) g.homeworks = HwSetProxy(homeworks) # g.utcnow will be used in templates/homework.html to determine some # visual styles g.utcnow = utc_now()