我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用flask_login.current_user.role()。
def login_required(role=0, group='open'): """ This is a redefinition of the decorator login_required, to include a 'role' argument to allow users with different roles access different views and a group access to close some views by groups. For example: @login_required(role=0, group='ntuwn') 0 = for all """ def wrapper(fn): @wraps(fn) def decorated_view(*args, **kwargs): if not current_user.is_authenticated: return login_manager.unauthorized() if current_user.role < role: return login_manager.unauthorized() if group != 'open' and current_user.group != group: return login_manager.unauthorized() return fn(*args, **kwargs) return decorated_view return wrapper
def test_create_user(db, client): role = UserRole.security_team resp = client.post(url_for('create_user'), follow_redirects=True, data=dict(username=USERNAME, password=PASSWORD, email=EMAIL, active=True, role=role.name)) assert resp.status_code == 200 resp = client.post(url_for('logout'), follow_redirects=True) assert_not_logged_in(resp) resp = client.post(url_for('login'), follow_redirects=True, data=dict(username=USERNAME, password=PASSWORD)) assert_logged_in(resp) assert USERNAME == current_user.name assert EMAIL == current_user.email assert role == current_user.role
def test_edit_user(db, client): new_password = random_string() new_email = '{}foo'.format(EMAIL) new_role = UserRole.security_team resp = client.post(url_for('edit_user', username=USERNAME), follow_redirects=True, data=dict(username=USERNAME, email=new_email, password=new_password, role=new_role.name, active=True)) assert resp.status_code == 200 resp = client.post(url_for('logout'), follow_redirects=True) assert_not_logged_in(resp) resp = client.post(url_for('login'), follow_redirects=True, data={'username': USERNAME, 'password': new_password}) assert_logged_in(resp) assert USERNAME == current_user.name assert new_email == current_user.email assert new_role == current_user.role
def socketio_domains_add(form_data): #~ Check if user has quota and rights to do it #~ if current_user.role=='admin': #~ None create_dict=app.isardapi.f.unflatten_dict(form_data) create_dict=parseHardware(create_dict) res=app.isardapi.new_domain_from_tmpl(current_user.username, create_dict) if res is True: data=json.dumps({'result':True,'title':'New desktop','text':'Desktop '+create_dict['name']+' is being created...','icon':'success','type':'success'}) else: data=json.dumps({'result':True,'title':'New desktop','text':'Desktop '+create_dict['name']+' can\'t be created.','icon':'warning','type':'error'}) socketio.emit('add_form_result', data, namespace='/sio_users', room='user_'+current_user.username)
def socketio_domain_edit(form_data): #~ Check if user has quota and rights to do it #~ if current_user.role=='admin': #~ None print('in domain edit') create_dict=app.isardapi.f.unflatten_dict(form_data) create_dict=parseHardware(create_dict) create_dict['create_dict']={'hardware':create_dict['hardware'].copy()} create_dict.pop('hardware',None) res=app.isardapi.update_domain(create_dict.copy()) if res is True: data=json.dumps({'id':create_dict['id'], 'result':True,'title':'Updated desktop','text':'Desktop '+create_dict['name']+' has been updated...','icon':'success','type':'success'}) else: data=json.dumps({'id':create_dict['id'], 'result':True,'title':'Updated desktop','text':'Desktop '+create_dict['name']+' can\'t be updated.','icon':'warning','type':'error'}) socketio.emit('edit_form_result', data, namespace='/sio_users', room='user_'+current_user.username)
def socketio_admins_domain_edit(form_data): #~ Check if user has quota and rights to do it #~ if current_user.role=='admin': #~ None print('in domain edit') create_dict=app.isardapi.f.unflatten_dict(form_data) create_dict=parseHardware(create_dict) create_dict['create_dict']={'hardware':create_dict['hardware'].copy()} create_dict.pop('hardware',None) res=app.isardapi.update_domain(create_dict.copy()) if res is True: data=json.dumps({'id':create_dict['id'], 'result':True,'title':'Updated desktop','text':'Desktop '+create_dict['name']+' has been updated...','icon':'success','type':'success'}) else: data=json.dumps({'id':create_dict['id'], 'result':True,'title':'Updated desktop','text':'Desktop '+create_dict['name']+' can\'t be updated.','icon':'warning','type':'error'}) socketio.emit('edit_form_result', data, namespace='/sio_admins', room='domains')
def __init__(self, userID, password, role, group): self.id = userID self.password = password self.role = role self.group = group
def get_role(self): """ Returns the role (access level) for the user """ return self.role
def reporter_required(func): @wraps(func) def decorated_view(*args, **kwargs): if not current_user.role.is_reporter: return forbidden() return func(*args, **kwargs) return login_required(decorated_view)
def security_team_required(func): @wraps(func) def decorated_view(*args, **kwargs): if not current_user.role.is_security_team: return forbidden() return func(*args, **kwargs) return login_required(decorated_view)
def administrator_required(func): @wraps(func) def decorated_view(*args, **kwargs): if not current_user.role.is_administrator: return forbidden() return func(*args, **kwargs) return login_required(decorated_view)
def user_can_edit_issue(advisories=[]): role = current_user.role if not role.is_reporter: return False if role.is_security_team: return True return 0 == len(advisories)
def user_can_delete_issue(advisories=[]): role = current_user.role if not role.is_reporter: return False return 0 == len(advisories)
def validate_check_password(self, field): if current_user.check_password(field.data): if current_user.role == "admin": raise ValidationError(u"????????_(:???)_") return raise ValidationError(u"??????")
def socketio_admins_connect(): #~ print('sid:'+request.sid) if current_user.role=='admin': join_room('admins') join_room('user_'+current_user.username) socketio.emit('user_quota', json.dumps(app.isardapi.get_user_quotas(current_user.username, current_user.quota)), namespace='/sio_admins', room='user_'+current_user.username) else: None
def socketio_admins_connect(join_rooms): #~ print('sid:'+request.sid) if current_user.role=='admin': for rm in join_rooms: join_room(rm) print('JOINED:'+rm)
def checkRole(fn): @wraps(fn) def decorated_view(*args, **kwargs): if current_user.role == 'user': return redirect(url_for('desktops')) return fn(*args, **kwargs) return decorated_view
def hardware(): dict={} dict['nets']=app.isardapi.get_alloweds(current_user.username,'interfaces',pluck=['id','name','description'],order='name') #~ dict['disks']=app.isardapi.get_alloweds(current_user.username,'disks',pluck=['id','name','description'],order='name') dict['graphics']=app.isardapi.get_alloweds(current_user.username,'graphics',pluck=['id','name','description'],order='name') dict['videos']=app.isardapi.get_alloweds(current_user.username,'videos',pluck=['id','name','description'],order='name') dict['boots']=app.isardapi.get_alloweds(current_user.username,'boots',pluck=['id','name','description'],order='name') dict['hypervisors_pools']=app.isardapi.get_alloweds(current_user.username,'hypervisors_pools',pluck=['id','name','description'],order='name') dict['forced_hyps']=[] if current_user.role == 'admin': dict['forced_hyps']=app.adminapi.get_admin_table('hypervisors',['id','hostname','description','status']) dict['forced_hyps'].insert(0,{'id':'default','hostname':'Auto','description':'Hypervisor pool default'}) dict['user']=app.isardapi.get_user(current_user.username) return json.dumps(dict)
def is_accessible(self): if not current_user.is_authenticated: return False if current_user.role and current_user.role.name == 'admin': return True return False
def is_accessible(self): return current_user.is_authenticated and current_user.role == 1