我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask_login.current_user.email()。
def getAuthorizedFiles(fileConfigs, reportObj, report_name, userDirectory, fnct=None, ajax=False): ALIAS, DISK_NAME = 0, 1 SQL_CONFIG = os.path.join(current_app.config['ROOT_PATH'], config.ARES_SQLITE_FILES_LOCATION) sqlFileDict = {'data': 'get_file_auth.sql', 'static': 'static_file_map.sql'} fileNameToParser = {} for fileConfig in fileConfigs: queryFileAuthPrm = {'team': session['TEAM'], 'file_cod': fileConfig['filename'], 'username': current_user.email, 'type': fileConfig.get('folder')} files = executeSelectQuery(os.path.join(current_app.config['ROOT_PATH'], config.ARES_USERS_LOCATION, report_name, 'db', 'admin.db'), open(os.path.join(SQL_CONFIG, sqlFileDict.get(fileConfig.get('folder')))).read(), params=queryFileAuthPrm) for file in files: if fileConfig.get('parser', None): reportObj.files[file[DISK_NAME]] = fileConfig['parser'](open(os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME]))) elif fileConfig.get('type') == 'pandas': reportObj.files[file[DISK_NAME]] = os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME]) else: reportObj.files[file[DISK_NAME]] = open(os.path.join(userDirectory, fileConfig['folder'], file[DISK_NAME])) if not ajax: fileNameToParser[file[DISK_NAME]] = "%s.%s" % (fileConfig['parser'].__module__.split(".")[-1], fileConfig['parser'].__name__) if fnct == 'params' and not ajax: reportObj.fileMap.setdefault(file[ALIAS], []).append(file[DISK_NAME]) return fileNameToParser
def test_create_user(db, client): role = UserRole.security_team resp = client.post(url_for('create_user'), follow_redirects=True, data=dict(username=USERNAME, password=PASSWORD, email=EMAIL, active=True, role=role.name)) assert resp.status_code == 200 resp = client.post(url_for('logout'), follow_redirects=True) assert_not_logged_in(resp) resp = client.post(url_for('login'), follow_redirects=True, data=dict(username=USERNAME, password=PASSWORD)) assert_logged_in(resp) assert USERNAME == current_user.name assert EMAIL == current_user.email assert role == current_user.role
def test_edit_user(db, client): new_password = random_string() new_email = '{}foo'.format(EMAIL) new_role = UserRole.security_team resp = client.post(url_for('edit_user', username=USERNAME), follow_redirects=True, data=dict(username=USERNAME, email=new_email, password=new_password, role=new_role.name, active=True)) assert resp.status_code == 200 resp = client.post(url_for('logout'), follow_redirects=True) assert_not_logged_in(resp) resp = client.post(url_for('login'), follow_redirects=True, data={'username': USERNAME, 'password': new_password}) assert_logged_in(resp) assert USERNAME == current_user.name assert new_email == current_user.email assert new_role == current_user.role
def check_auth(func): """ This decorator for routes checks that the user is authorized (or that no login is required). If they haven't, their intended destination is stored and they're sent to get authorized. It has to be placed AFTER @app.route() so that it can capture `request.path`. """ if 'login' not in conf: return func # inspired by <https://flask-login.readthedocs.org/en/latest/_modules/flask_login.html#login_required> @functools.wraps(func) def decorated_view(*args, **kwargs): if current_user.is_anonymous: print('unauthorized user visited {!r}'.format(request.path)) session['original_destination'] = request.path return redirect(url_for('get_authorized')) print('{} visited {!r}'.format(current_user.email, request.path)) assert current_user.email.lower() in conf.login['whitelist'], current_user return func(*args, **kwargs) return decorated_view
def admin_server(): MQTTList = NodeDefender.db.mqtt.list(user = current_user.email) MQTT = CreateMQTTForm() if request.method == 'GET': return render_template('frontend/admin/server.html', MQTTList = MQTTList, MQTTForm = MQTT) if MQTT.Submit.data and MQTT.validate_on_submit(): try: NodeDefender.db.mqtt.create(MQTT.IPAddr.data, MQTT.Port.data) NodeDefender.mqtt.connection.add(MQTT.IPAddr.data, MQTT.Port.data) except ValueError as e: flash('Error: {}'.format(e), 'danger') return redirect(url_for('admin_view.admin_server')) if General.Submit.data and General.validate_on_submit(): flash('Successfully updated General Settings', 'success') return redirect(url_for('admin_server')) else: flash('Error when trying to update General Settings', 'danger') return redirect(url_for('admin_view.admin_server')) flash('{}'.format(e), 'success') return redirect(url_for('admin_view.admin_server'))
def admin_groups(): GroupForm = CreateGroupForm() groups = NodeDefender.db.group.list(user_mail = current_user.email) if request.method == 'GET': return render_template('frontend/admin/groups.html', groups = groups, CreateGroupForm = GroupForm) else: if not GroupForm.validate_on_submit(): flash('Form not valid', 'danger') return redirect(url_for('admin_view.admin_groups')) try: group = NodeDefender.db.group.create(GroupForm.Name.data) NodeDefender.db.group.update(group.name, **\ {'email' : GroupForm.Email.data, 'description' : GroupForm.description.data}) except ValueError as e: flash('Error: {}'.format(e), 'danger') return redirect(url_for('admin_view.admin_groups')) flash('Successfully Created Group: {}'.format(group.name), 'success') return redirect(url_for('admin_view.admin_group', name = serializer.dumps(group.name)))
def admin_users(): UserForm = CreateUserForm() if request.method == 'GET': if current_user.superuser: users = NodeDefender.db.user.list() else: groups = NodeDefender.db.group.list(current_user.email) groups = [group.name for group in groups] users = NodeDefender.db.user.list(*groups) return render_template('frontend/admin/users.html', Users = users,\ CreateUserForm = UserForm) if not UserForm.validate(): flash('Error adding user', 'danger') return redirect(url_for('admin_view.admin_users')) try: user = NodeDefender.db.user.create(UserForm.Email.data, UserForm.Firstname.data, UserForm.Lastname.data) except ValueError as e: flash('Error: {}'.format(e), 'danger') redirect(url_for('admin_view.admin_users')) flash('Successfully added user {}'.format(user.firstname), 'success') return redirect(url_for('admin_view.admin_user', email = user.email))
def admin_user(email): email = serializer.loads(email) usersettings = UserSettings() userpassword = UserPassword() usergroupadd = UserGroupAdd() user = NodeDefender.db.user.get(email) if request.method == 'GET': if user is None: flash('User {} not found'.format(id), 'danger') return redirect(url_for('admin_view.admin_groups')) return render_template('frontend/admin/user.html', User = user, UserSettings = usersettings, UserPassword = userpassword, UserGroupAdd = usergroupadd) if usersettings.Email.data and usersettings.validate(): NodeDefender.db.user.update(usersettings.Email.data, **\ {'firstname' : usersettings.Firstname.data, 'lastname' : usersettings.Lastname.data}) return redirect(url_for('admin_view.admin_user', email = serializer.dumps(usersettings.Email.data)))
def personalsubmitinfo(): '''Change user's information in database''' if request.form['name']: current_user.nickname = request.form['name'] current_user.email = request.form['email'] phone = request.form['phone'] try: phone = int(phone) except ValueError: phone = None current_user.phone = phone if 'picture' in request.form: pic = int(request.form['picture']) if -pic in range(1, 21): current_user.picture = Upload(pic) if 'receive_email' in request.form: current_user.set_preference('receive_email', True) else: current_user.set_preference('receive_email', False) flash('Your information has been successfully changed.', 'status_info') return redirect(url_for('.personal'))
def checkAuth(report_name): """ Check whether user has authorization to change data within the environment """ db = AresSql.SqliteDB(report_name) query = """SELECT 1 FROM env_auth INNER JOIN env_def ON env_auth.env_id = env_def.env_id and env_def.env_name = '%s' INNER JOIN team_def ON env_auth.team_id = team_def.team_id and team_def.team_name = '%s' WHERE team_def.role = 'admin'""" % (report_name, session['TEAM']) subQuery = """SELECT 1 FROM env_auth WHERE temp_owner = '%s'""" % current_user.email if not list(db.select(query)): if not list(db.select(subQuery)): return False else: return True else: return True
def createDataSource(): app_id = request.args['app_id'] source = request.args['source'] username = request.args['username'] password = request.args['password'] user = User.query.filter_by(email=app_id).first() encryptPwd, salt= AresUserAuthorization.encrypt(password, session['PWD']) dataSource = DataSource.query.filter_by(uid=user.uid, source_name=source).first() if dataSource: db.session.delete(dataSource) db.session.commit() session[source] = (username, password) dataSource = DataSource(source, user.uid, username, encryptPwd, salt) db.session.add(dataSource) db.session.commit() return json.dumps('Success'), 200
def aresRegistration(): """ """ if request.method == 'GET': jsImport = render_template_string('<script language="javascript" type="text/javascript" src="{{ url_for(\'static\', filename=\'js/jquery-3.2.1.min.js\') }}"></script>') special_css = render_template_string('<link rel="stylesheet" href="{{ url_for(\'static\', filename=\'css/aresLogin.css\') }}" >') return render_template('ares_login_page.html', cssImport=special_css, jsImport=jsImport) if request.method == 'POST': data = request.form if User.query.filter_by(email=data['email_addr']).first(): return redirect(url_for('ares.aresLogin', next=url_for('ares.run_report'))) if not Team.query.filter_by(team_name=data['team']).first(): team = Team(data['team'], data['team_email']) db.session.add(team) team_def = Team.query.filter_by(team_name=data['team']).first() user = User(data['email_addr'], team_def.team_id, data['password']) db.session.add(user) db.session.commit() return redirect(url_for('ares.aresLogin', next=url_for('ares.run_report')))
def post(self): content = json.loads(json.dumps(request.get_json(force = True))) email = current_user.email password = content['password'] validateResult = User.validate(user_id = email, password = password) if validateResult is None: return {"ERROR": 'Old password is not correct'} elif validateResult == False: return {'ERROR': 'Old password is not correct'} else: logout_user() user = User.get(email) newPassword = content['newpassword'] time = content['time'] user.changePassword(newPassword = newPassword, time = time) return {'SUCCESS': 'Password changed'}
def request_orcid_credentials(): """Redirect to the ORCID for the technical conact of the organisation. Additionally the time stamp gets saved when the handler gets invoked. """ client_secret_url = append_qs( iri_to_uri(MEMBER_API_FORM_BASE_URL), new_existing=('Existing_Update' if current_user.organisation.confirmed else 'New_Credentials'), note=NOTE_ORCID + " " + current_user.organisation.name, contact_email=current_user.email, contact_name=current_user.name, org_name=current_user.organisation.name, cred_type=CRED_TYPE_PREMIUM, app_name=APP_NAME + " for " + current_user.organisation.name, app_description=APP_DESCRIPTION + current_user.organisation.name + "and its researchers", app_url=APP_URL, redirect_uri_1=url_for("orcid_callback", _external=True)) current_user.organisation.api_credentials_requested_at = datetime.now() current_user.organisation.save() return redirect(client_secret_url)
def action_invite(self, ids): """Batch registraion of organisatons.""" count = 0 for oi in OrgInfo.select().where(OrgInfo.id.in_(ids)): try: register_org( org_name=oi.name, email=oi.email, tech_contact=True, via_orcid=(False if oi.tuakiri_name else True), first_name=oi.first_name, last_name=oi.last_name, city=oi.city, country=oi.country, course_or_role=oi.role, disambiguated_id=oi.disambiguated_id, disambiguation_source=oi.disambiguation_source) count += 1 except Exception as ex: flash(f"Failed to send an invitation to {oi.email}: {ex}") app.logger.exception(f"Failed to send registration invitation to {oi.email}.") flash("%d invitations were sent successfully." % count)
def user_info(userid): result = User.query.filter_by(userID=userid).first() if result: logging.log(logging.INFO, "Get User Information ({}): Success".format(userid)) return { "status": 1, "message": "Get user info Success", "data": { "username": result.userName, "id": result.userID, "nickname": result.nickName, "email": result.email, "isAuthenticated": result.isAuthenticated, "qq": result.qq, "picture":result.picture, "compressPicture":result.compressPicture } } else: logging.log(logging.INFO, "Get User Information ({}) Fail: No such User".format(userid)) return { "status": 0, "message": "Get User info Fail!, Not such User", "data": {} }
def _emails_view(emails: Iterable[dict], page: int, template: str='email.html') -> Response: attachments_session = app.ioc.attachments_session timezone_offset = timedelta(minutes=current_user.timezone_offset_minutes) if page < 1: return abort(404) emails = Pagination(emails, page, AppConfig.EMAILS_PER_PAGE) for email in emails: sent_at = email.get('sent_at') if sent_at: sent_at_utc = datetime.strptime(sent_at, '%Y-%m-%d %H:%M') sent_at_local = sent_at_utc - timezone_offset email['sent_at'] = sent_at_local.strftime('%Y-%m-%d %H:%M') attachments_session.store(emails) return _view(template, emails=emails, page=page)
def index(): form = RegistrationForm() if form.validate_on_submit(): user = User( first_name=form.first_name.data, last_name=form.last_name.data, email=form.email.data, password=form.password.data) db.session.add(user) db.session.commit() token = user.generate_confirmation_token() confirm_link = url_for('account.confirm', token=token, _external=True) get_queue().enqueue( send_email, recipient=user.email, subject='Confirm Your Account', template='account/email/confirm', user=user, confirm_link=confirm_link) flash('A confirmation link has been sent to {}.'.format(user.email), 'warning') return redirect(url_for('main.index')) return render_template('main/index.html', form=form)
def register() : form = RegistrationForm() if form.validate_on_submit() : user = User(email=form.email, username=form.username.data , password = form.password.data) db.session.add(User) flash('You can now login.') return redirect(url_for('auth.login')) db.session.add(user) db.session.commit() token = user.generate_confirmation_token() send_mail(user.mail,'Confirm Your Account' ,'auth/email/confirm',user=user,token =token) flash('A confirmation email has been sent to you by email') return redirect(url_for('main.index')) return render_template('auth/register.html',form = form) #???????
def unconfirmed() : if current_user.is_anonymous() or current_user.confirmed : return redirect(url_for('auth.unconfirmed') return render_template('auth/unconfirmed.html') #?????????? @auth.route('/confirm') @login_required def resend_confirmation(): token = current_user.generate_confirmation_token() send_email(current_user.email,'Confim Your Accout' , 'auth/email/confirm',user=current_user,token=token) flash('A new confirmation email has been sent to you by email.') return redirect(url_for('main.index')) # ??????? @main.route('/user/<username>') def user(username) : user = User.query.filter_by(username=username).first() if user is None : abort(404) return render_template('user.html',user=user)
def share(id): note = Note.query.get_or_404(id) if current_user != note.author: abort(403) form = ShareForm() if form.validate_on_submit(): send_email( form.recipient_email.data, '{0} has shared a braindump with you!' .format(current_user.email), 'app_email/share_note', user=current_user, note=note, html=markdown(note.body)) shared = SharedNote( author_id=current_user.id, note_id=note.id, recipient_email=form.recipient_email.data) db.session.add(shared) db.session.commit() flash('The note has been shared!') return redirect(url_for('.index')) return render_template('app/share_note.html', form=form, notes=[note])
def confirm_email(token): try: # try confirm_email email = confirm_token(token) user = User.query.filter_by(email=email).first_or_404() if user.confirmed: flash('Account already confirmed Please login.', 'success') else: user.confirmed = True # user follow himself to show his posts in the main page user.follow(user) db.session.add(user) db.session.commit() login_user(user) flash('You have confirmed your account Thanks', 'success') except: flash('The confirmation link is invalid or has expired.', 'danger') return redirect(url_for('users.discover_users'))
def register(): form = RegistrationForm() if form.validate_on_submit(): if form.user_type.data == '0': role = 2 else: role = 3 user = orm.User(email=form.email.data, username=form.username.data, password=form.password.data, role_id = role) token = user.generate_confirmation_token() try: send_email(user.email, '????', 'auth/email/confirm', user=user, token=token) except Exception: flash(u'??????.') return redirect(url_for('register')) else: db.session.add(user) db.session.commit() flash(u'????????????????.') return redirect(url_for('login')) elif request.method == 'GET': logic.LoadBasePageInfo('??', form) return render_template('auth/register.html', form=form)
def get_current_locations(): locations = [l.value for l in Location.query.all()] device_macs = [d.mac for d in Device.query.all()] df = get_df_from_detection(Detection.query.filter_by(type='detection').filter(Detection.mac.in_(device_macs)).all()) json_data = dict([(l, []) for l in locations]) if len(df) > 0: df['user'] = '?' df['avatar'] = '' for index, row in df.iterrows(): device = Device.query.filter_by(mac=row['mac']).first() if device: df.loc[index, 'user'] = device.user.name if device.user.name else device.user.email.split("@")[0].replace('.', ' ').title() df.loc[index, 'avatar'] = device.user.avatar df = df[df["user"] != '?'] df["most_recent_seen"] = pd.to_datetime(df["most_recent_seen"]) df["most_recent_seen"] = df["most_recent_seen"].apply(lambda timestamp: str(math.ceil((datetime.datetime.now() - timestamp).total_seconds() / 60)).split('.')[0] + " min") if len(df) > 0: df = predict_location(df) if len(df) > 0: df.drop(u"mac", inplace=True, axis=1) for l in locations: locations_df = df[df["predicted_location"] == l] json_data[l] = locations_df.to_dict(orient='records') return json_data
def get_training_table(training_macs, locations): training_json = dict() champions = [] for mac in training_macs: is_champion = True device = Device.query.filter_by(mac=mac).first() if device: user = device.user else: continue training_json[mac] = {'avatar_url': user.avatar, 'name': device.user.name if device.user.name else device.user.email.split("@")[0].replace('.', ' ').title()} for location in locations: l = Location.query.filter_by(value=location).first() training_json[mac][location] = (TrainingDetection.query.filter_by(mac=mac, location=l).first() is not None) if TrainingDetection.query.filter_by(mac=mac, location=l).first() is None: is_champion = False if is_champion: champions.append(user.avatar) del training_json[mac] continue return champions, training_json
def setting_users(id): user = User.query.get_or_404(id) form = EditProfileAdminForm(user=user) if form.validate_on_submit(): user.email = form.email.data user.username = form.username.data user.confirmed = form.confirmed.data user.role = Role.query.get(form.role.data) user.name = form.name.data db.session.add(user) db.session.commit() flash(constant.PROFILE_UPDATE) return redirect(url_for('auth.setting_users', id=id)) form.email.data = user.email form.username.data = user.username form.confirmed.data = user.confirmed form.role.data = user.role_id form.name.data = user.name return render_template('auth/admin_edit_profile.html', form=form)
def password_reset_request(): if not current_user.is_anonymous: return redirect(url_for('main.index')) form = PasswordResetRequestForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data).first() if user: token = user.generate_reset_token() send_email(user.email, 'Reset Your Password', 'auth/email/reset_password', user=user, token=token, next=request.args.get('next')) flash('An email with instructions to reset your password has been' 'sent to you.') return redirect(url_for('auth.login')) return render_template('auth/reset_password.html', form=form)
def register(): form = RegisterForm(request.form) if form.validate_on_submit(): user = User( first_name=form.first_name.data, last_name=form.last_name.data, email=form.email.data, password=form.password.data, confirmed=False, ) db.session.add(user) db.session.commit() token = generate_confirmation_token(user.email) confirm_url = url_for('user.confirm_email', token=token, _external=True) html = render_template('user/activate.html', confirm_url=confirm_url) subject = "Please confirm your email" send_email(user.email, subject, html) login_user(user) flash('You registered and are now logged in. Welcome!', 'success') return redirect(url_for('user.unconfirmed')) return render_template('user/register.html', form=form)
def login(): form = LoginForm(request.form) if form.validate_on_submit(): next = request.args.get('next') user = User.query.filter_by(email=form.email.data).first() if user and bcrypt.check_password_hash( user.password, request.form['password']): login_user(user) flash('Welcome.', 'success') if next is not None: return redirect(next) else: return redirect(url_for('main.home')) else: flash('Invalid email and/or password.', 'danger') return render_template('user/login.html', form=form) return render_template('user/login.html', form=form)
def recover_password(token): try: email = ts.loads(token, salt="recover-key", max_age=86400) except: flash('Invalid or expired password reset link', 'danger') return redirect(url_for('main.landing')) form = PasswordResetEnterPasswordForm(request.form) if form.validate_on_submit(): user = User.query.filter_by(email=email).first() user.password = bcrypt.generate_password_hash(form.password.data) db.session.add(user) db.session.commit() flash('Password successfully reset!', 'success') return redirect(url_for('main.landing')) return render_template('user/reset_with_token.html', form=form, token=token)
def register(): form = RegisterForm() if request.method == 'POST': if form.validate_on_submit(): email = form['email'].data password = form['password'].data if not email.split('@')[-1] in ['ustc.edu.cn', 'mail.ustc.edu.cn', 'ustclug.org']: flash('Email must end with @[mail.]ustc.edu.cn', 'error') elif User.get_user_by_email(email): flash('Email already exists', 'error') else: token = ts.dumps(email, salt=app.config['SECRET_KEY'] + 'email-confirm-key') url = url_for('confirm', token=token, _external=True) user = User(email, password) user.save() send_mail('Confirm your email', 'Follow this link to confirm your email:<br><a href="' + url + '">' + url + '</a>' , email) return redirect(url_for('register_ok')) return render_template('register.html', form=form)
def confirm(): token = request.args.get('token') if not token: flash('No token provided', 'error') return render_template('confirm_error.html') try: email = ts.loads(token, salt=app.config['SECRET_KEY'] + "email-confirm-key", max_age=86400) except: flash('Invalid token or token out of date', 'error') return render_template('confirm_error.html') user = User.get_user_by_email(email) if not user: flash('Invalid user', 'error') return render_template('confirm_error.html') user.set_active() flash('User actived') return redirect(url_for('login'))
def login(): if current_user.is_authenticated: return redirect(url_for('index')) form = LoginForm() if request.method == 'POST': if form.validate_on_submit(): email = form['email'].data password = form['password'].data user = User.get_user_by_email(email) if not user: flash('Email not found', 'error') elif not user.check_password(password): flash('Email or password incorrect', 'error') elif not user.active: flash('Email not confirmed. Please recover your account at the bottom of this page.', 'error') else: login_user(user) return redirect(url_for('index')) return render_template('login.html', form=form)
def register(): """ register a new user using invite code, note that a newly registered user is not administrator, you need to use an admin user to promote it :return: response """ content = request.get_data(True, as_text=True) register_data = json.loads(content) if ('name' in register_data) and ('password' in register_data) and ('password_repeat' in register_data) and ('invite_code' in register_data) and ('email' in register_data): name = register_data['name'] password = register_data['password'] password_repeat = register_data['password_repeat'] email = register_data['email'] invite_code = register_data['invite_code'] if password != password_repeat: raise ClientError(ClientError.PASSWORD_MISMATCH) if UserCredential.register_user(name=name, password=password, email=email, invite_code=invite_code): # login automatically credential = UserCredential.login_user(name, password) login_user(credential, remember=False) # send email credential.send_confirm_email() return json_resp({'message': 'ok'}, 201) else: raise ClientError(ClientError.INVALID_REQUEST)
def test_create_duplicate_user_fails(db, client): resp = client.post(url_for('create_user'), follow_redirects=True, data=dict(username=DEFAULT_USERNAME, password=PASSWORD, email=EMAIL, active=True)) assert resp.status_code == 200 assert ERROR_USERNAME_EXISTS in resp.data.decode()
def test_create_duplicate_email_fails(db, client): resp = client.post(url_for('create_user'), follow_redirects=True, data=dict(username=USERNAME, password=PASSWORD, email=current_user.email, active=True)) assert resp.status_code == 200 assert ERROR_EMAIL_EXISTS in resp.data.decode()
def test_edit_preserves_password(db, client): new_email = '{}foo'.format(EMAIL) resp = client.post(url_for('edit_user', username=USERNAME), follow_redirects=True, data=dict(username=USERNAME, email=new_email, active=True)) assert resp.status_code == 200 resp = client.post(url_for('logout'), follow_redirects=True) assert_not_logged_in(resp) resp = client.post(url_for('login'), follow_redirects=True, data={'username': USERNAME, 'password': PASSWORD}) assert_logged_in(resp) assert USERNAME == current_user.name assert new_email == current_user.email
def test_edit_requires_admin(db, client): resp = client.post(url_for('edit_user', username=USERNAME), follow_redirects=True, data=dict(username=USERNAME, email=EMAIL, password=PASSWORD)) assert resp.status_code == Forbidden.code
def __init__(self, username=None, email=None): self.username = username self.email = email
def __repr__(self): return "<User email={!r}>".format(self.email)
def load_user(id): if id in conf.login['whitelist']: return User(email=id) return None
def logout(): print(current_user.email, 'logged out') logout_user() return redirect(url_for('homepage'))
def create(name, email, location): if NodeDefender.db.group.get(name): emit('error', ('Group exsists'), namespace='/general') return False group = NodeDefender.db.group.create(name, email) NodeDefender.db.group.location(name, **location) NodeDefender.mail.group.new_group(name) url = url_for('admin_view.admin_group', name = serializer.dumps(name)) return emit('redirect', (url), namespace='/general')
def list(user = None): if user is None: user = current_user.email return emit('list', [group.name for group in NodeDefender.db.group.list(user_mail = user)])
def create(email, firstname, lastname, group, role): if not NodeDefender.db.group.get(group): emit('error', ('Group does not exist'), namespace='/general') return False if NodeDefender.db.user.get(email): emit('error', ('User Exists'), namespace='/general') return False user = NodeDefender.db.user.create(email, firstname, lastname) NodeDefender.db.group.add_user(group, email) NodeDefender.db.user.set_role(email, role) NodeDefender.mail.user.new_user(user) emit('reload', namespace='/general') return True
def info(email): user = NodeDefender.db.user.get(email) if user: return emit('info', user.to_json()) else: return emit('error', "User {} not found".format(email), namespace='/general')
def groups(email): return emit('groups', NodeDefender.db.user.groups(email))
def name(email, firstname, lastname): NodeDefender.db.user.update(email, **{'firstname' : firstname, 'lastname' : lastname}) emit('reload', namespace='/general') return True
def role(email, role): NodeDefender.db.user.set_role(email, role) emit('reload', namespace='/general') return True