我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用flask_login.current_user.get_id()。
def get_user(): """User information. .. note:: **Privacy note** A users IP address, user agent string, and user id (if logged in) is sent to a message queue, where it is stored for about 5 minutes. The information is used to: - Detect robot visits from the user agent string. - Generate an anonymized visitor id (using a random salt per day). - Detect the users host contry based on the IP address. The information is then discarded. """ return dict( ip_address=request.remote_addr, user_agent=request.user_agent.string, user_id=( current_user.get_id() if current_user.is_authenticated else None ), )
def register(): form = RegisterForm() if form.validate_on_submit(): user = User( username=form.username.data, password=form.password.data, ) db.session.add(user) db.session.commit() login_user(user) return redirect(url_for('index')) if current_user.get_id() != None: if (current_user.get_username() == 'don'): return render_template('register.html', current_user=current_user, register_form=form) else: return render_template('permission.html') else: return render_template('permission.html')
def postQuestion(): form = QuestionForm() if request.method == 'POST': try: username = current_user.get_id() question = QUESTIONS_COLLECTION.find_one( {'short_description': form.short_description.data} ) if question: flash("This question has already been asked", category='error') return render_template('post-a-question.html', title='HoverSpace | Post a Question', form=form) tags = request.form.getlist('tag') ques_obj = Question(username, form.short_description.data, form.long_description.data, tags) quesID = ques_obj.postQuestion() tag_obj = TagMethods(quesID, tags) tag_obj.addQuestion() srch.add_string(quesID, form.short_description.data) flash("Your question has been successfully posted.", category='success') return redirect(url_for('viewQuestion', quesID=quesID)) except KeyError: return redirect(url_for('login')) return render_template('post-a-question.html', title='HoverSpace | Post a Question', form=form, choices=tag_choices)
def updateQuesVotes(quesID): rd = (request.data).decode('utf-8') data = json.loads(rd) voteType = data['voteType'] print(voteType) ques_obj = QuestionMethods(quesID) postedBy = (ques_obj.getQuestion())['postedBy'] if current_user.get_id() == postedBy: return json.dumps({'type': 'notAllowed'}) usr = User(current_user.get_id()) status = usr.voteQues(quesID, voteType) print(status) votesCount = ques_obj.updateVotes(status['votesChange']) usr = User(postedBy) usr.update_karma(status['votesChange']) print(status['type'], votesCount) return json.dumps({'type': status['type'], 'count': votesCount})
def updateAnsVotes(ansID): rd = (request.data).decode('utf-8') data = json.loads(rd) voteType = data['voteType'] print(voteType) ans_obj = UpdateAnswers(ansID) postedBy = (ans_obj.getAnswer())['postedBy'] if current_user.get_id() == postedBy: return json.dumps({'type': 'notAllowed'}) usr = User(current_user.get_id()) status = usr.voteAns(ansID, voteType) print(status) votesCount = ans_obj.updateVotes(postedBy, status['votesChange']) usr = User(postedBy) usr.update_karma(status['votesChange']) print(status['type'], votesCount) return json.dumps({'type': status['type'], 'count': votesCount})
def create_audit_event(self, code='AUDIT'): """Creates a generic auditing Event logging the changes between saves and the initial data in creates. Kwargs: code (str): The code to set the new Event to. Returns: Event: A new event with relevant info inserted into it """ event = self._meta.event_model( code=code, model=self.__class__.__name__, ) # Use the logged in User, if possible if current_user: event.created_by = current_user.get_id() self.copy_foreign_keys(event) self.populate_audit_fields(event) return event
def get_user_info(self): """ Requires Flask-Login (https://pypi.python.org/pypi/Flask-Login/) to be installed and setup """ if not has_flask_login: return if not hasattr(self.app, 'login_manager'): return try: is_authenticated = current_user.is_authenticated except AttributeError: # HACK: catch the attribute error thrown by flask-login is not attached # > current_user = LocalProxy(lambda: _request_ctx_stack.top.user) # E AttributeError: 'RequestContext' object has no attribute 'user' return {} if callable(is_authenticated): is_authenticated = is_authenticated() if not is_authenticated: return {} return dict(id=current_user.get_id())
def get_call_logd_client(): client = g.get('call_logd_client') if not client: client = g.call_logd_client = CallLogdClient(**app.config['call_logd']) token = current_user.get_id() client.set_token(token) return client
def get_plugind_client(): client = g.get('plugind_client') if not client: client = g.plugind_client = PlugindClient(**app.config['plugind']) token = current_user.get_id() client.set_token(token) return client
def get_confd_client(): client = g.get('confd_client') if not client: client = g.confd_client = ConfdClient(**app.config['confd']) token = current_user.get_id() client.set_token(token) return client
def get(self): token = current_user.get_id() logout_user() try: AuthClient(host='fsa').token.revoke(token) except requests.HTTPError as e: logger.warning('Error with Wazo authentication server: %(error)s', error=e.message) except requests.ConnectionError: logger.warning('Wazo authentication server connection error: Unable to revoke token') return redirect(url_for('index.Index:get'))
def user(): user = User.query.get(current_user.get_id()) return render_template('user.html', user=user)
def edit(): user = User.query.get(current_user.get_id()) form = UserForm() if form.username.data is None: form.username.data = user.username if form.email.data is None: form.email.data = user.email if form.phone.data is None: form.phone.data = user.phone if form.validate_on_submit(): user = db.session.query(User).get(current_user.get_id()) user.username = form.username.data user.email = form.email.data if form.phone.data: user.phone = form.phone.data.replace(' ', '') else: user.phone = None db.session.add(user) db.session.commit() return redirect(request.args.get('next') or '/user') return render_template('user_edit.html', form=form, user=user)
def password(): user = User.query.get(current_user.get_id()) form = PasswordForm() if form.validate_on_submit(): user = User.query.get(current_user.get_id()) user.password = generate_password_hash(form.new_password.data) db.session.add(user) db.session.commit() flash('Your password have been updated.', 'success') return redirect(request.args.get('next') or '/user') return render_template('user_password.html', user=user, form=form)
def validate(self): check_validate = super(UserForm, self).validate() if not check_validate: return False user = User.query.filter(User.id != current_user.get_id(), User.email == self.email.data).first() if user: self.email.errors.append('Email is already taken.') return False return True
def do_custom_task(): form = CustomTaskForm() if form.validate_on_submit(): # do custom onetime task Task.do_custom_task(form.task.data, form.points.data, current_user.get_id()) return redirect('/tasks') return render_template('do_custom_task.html', form=form)
def set_task_state(): user_id = current_user.get_id() task_id = request.form.get('id', type=int) state = request.form.get('state', type=int) if None not in (task_id, state, user_id) and state in (0, 1, 2): Task.set_state(task_id, state, user_id) else: # TODO message return '', 403 return '', 204 # Create customized model view class for Admin interface login ######################### # JSON endpoints for JS # #########################
def schedule(month,year,equip): reservedAshtarut = ReservedAshtarut.query.all(); reservedAstarte = ReservedAstarte.query.all(); users = User.query.all(); renderCalendar.renderCal(int(month), int(year), equip) if current_user.get_id() != None: return render_template('renderCal.html', title = 'Schedule', users = users, reservedAstarte = reservedAstarte, reservedAshtarut = reservedAshtarut) else: return render_template('permission.html')
def profile(): reservedAshtarut = ReservedAshtarut.query.all(); reservedAstarte = ReservedAstarte.query.all(); waitlist = Waitlist.query.all(); if current_user.get_id() != None: return render_template('profile.html', title = 'Profile', waitlist = waitlist, reservedAshtarut = reservedAshtarut, reservedAstarte = reservedAstarte) else: return render_template('permission.html')
def resolve_currentUser(self, args, info): uid = current_user.get_id() return User.get_node(uid) if uid else None
def viewQuestion(quesID): form = AnswerForm() if request.method == 'POST': username = current_user.get_id() ans_obj = Answers(username, quesID, form.ans_text.data) ansID = ans_obj.post_answer() return redirect(url_for('viewQuestion', quesID=quesID)) ques_obj = QuestionMethods(quesID) ques = ques_obj.getQuestion() ansmet_obj = AnswerMethods(quesID) ans = ansmet_obj.get_answers(quesID) return render_template('question.html', question=ques, answers=ans, form=form)
def setBookmark(quesID): usr = User(current_user.get_id()) fl = usr.setBookmark(quesID) #print(fl) if fl: return json.dumps({'status': 'true', 'message': 'This question has been successfully bookmarked'}) else: return json.dumps({'status': 'false', 'message': 'Bookmark removed'})
def setAnsFlag(ansID): usr = current_user.get_id() ans_obj = UpdateAnswers(ansID) postedBy = (ans_obj.getAnswer())['postedBy'] if usr == postedBy: return json.dumps({'flag': 'notAllowed', 'message': 'You cannot flag your own answer'}) fl = ans_obj.addFlaggedBy(usr, postedBy) if fl=='flagged': return json.dumps({'flag': 'flagged', 'message': 'You have marked this answer inappropiate'}) elif fl=='alreadyFlagged': ans_obj.removeFlag(usr) return json.dumps({'flag': 'flagRemoved', 'message': 'Flag removed'}) else: return json.dumps({'flag': 'quesRemoved', 'message': 'This answer has been marked inappropiate by more than 10 users, so it is removed'})
def setAcceptedAns(quesID, ansID): usr = current_user.get_id() ques_obj = QuestionMethods(quesID) postedBy = (ques_obj.getQuestion())['postedBy'] if usr != postedBy: return json.dumps({'status': 'notAllowed', 'message': 'You are not allowed to set this answer as accepted.'}) if str(ques_obj.getAcceptedAns()) == ansID: ques_obj.removeAcceptedAns() return json.dumps({'status': 'removed', 'message': 'Accepted answer removed'}) else: ques_obj.setAcceptedAns(ansID) return json.dumps({'status': 'set', 'message': 'You have marked this answer as accepted'})
def __init__(self, text, ID, postedBy=None, typee='q', timestamp=None): if not postedBy: postedBy = current_user.get_id() self.timestamp = datetime.datetime.utcnow() self.text = text self.type = typee self.postedBy = postedBy self.parentID = ID
def playlist_create(): form = PlaylistCreateForm() if form.validate_on_submit(): # Create playlist db_utils.create_playlist(engine, current_user.get_id(), form.title.data) return redirect(url_for('profile', username=current_user.username)) return render_template('playlist_create.html', form=form)
def status(): submission_set = Submission.query.filter_by(user_id = current_user.get_id()).order_by(Submission.id.desc()).all() return render_template("status.html", slist = submission_set, active='status')
def post_video(): title = request.form['title'] file = request.files['file'] server_filename = upload_file(file, current_user.get_id()) video = mongo.db.video.insert({'title': title, 'user_id': current_user.get_id(), 'filename': server_filename}) return mongo_to_json_response(video)
def disconnect(remote): """Disconnect callback handler for GitHub.""" # User must be authenticated if not current_user.is_authenticated: return current_app.login_manager.unauthorized() external_method = 'github' external_ids = [i.id for i in current_user.external_identifiers if i.method == external_method] if external_ids: oauth_unlink_external_id(dict(id=external_ids[0], method=external_method)) user_id = int(current_user.get_id()) token = RemoteToken.get(user_id, remote.consumer_key) if token: extra_data = token.remote_account.extra_data # Delete the token that we issued for GitHub to deliver webhooks webhook_token_id = extra_data.get('tokens', {}).get('webhook') ProviderToken.query.filter_by(id=webhook_token_id).delete() # Disable GitHub webhooks from our side db_repos = Repository.query.filter_by(user_id=user_id).all() # Keep repositories with hooks to pass to the celery task later on repos_with_hooks = [(r.github_id, r.hook) for r in db_repos if r.hook] for repo in db_repos: try: Repository.disable(user_id=user_id, github_id=repo.github_id, name=repo.name) except NoResultFound: # If the repository doesn't exist, no action is necessary pass db.session.commit() # Send Celery task for webhooks removal and token revocation disconnect_github.delay(token.access_token, repos_with_hooks) # Delete the RemoteAccount (along with the associated RemoteToken) token.remote_account.delete() return redirect(url_for('invenio_oauthclient_settings.index'))
def professor(profid): prof = Professors.query.filter_by(name=unquote(profid)).first() if prof is None: flash("No professor by that name", "danger") return redirect(url_for("bp.home")) form = ReviewForm() disabled = "False" if db.session.query(Reviews).filter( Reviews.studentuid == current_user.get_id()).filter( Reviews.professoruid == prof.uid).one_or_none() is not None: disabled = "True" flash("You've already reviewed this professor. Only one review per student", "danger") if form.validate_on_submit(): if db.session.query(Reviews).filter( Reviews.studentuid == current_user.get_id()).filter( Reviews.professoruid == prof.uid).one_or_none() is None: review = Reviews( studentuid=int(current_user.get_id()), professoruid=int(prof.uid), punctual=form.Punctual.data, deathbypowerpoint=form.DeathByPPT.data, fairpaperevaluation=form.FairPaperEvaluation.data, rating=form.Rating.data) db.session.add(review) db.session.commit() disabled = "True" flash("Review submitted successfully", "success") else: disabled = "True" flash("You've already reviewed this professor. Only one review per student", "danger") if current_user.is_authenticated is False: flash("You cannot leave a review unless you're logged in", "danger") fields = ["punctual", "reliesonppt", "fairpaperevaluation", "rating"] statistics = {k: 0 for k in fields} stats = Reviews.query.filter_by(professoruid=prof.uid).all() if len(stats): for stat in stats: statistics["punctual"] += stat.punctual * 5 statistics["reliesonppt"] += stat.deathbypowerpoint * 5 statistics["fairpaperevaluation"] += stat.fairpaperevaluation * 5 statistics["rating"] += stat.rating for field in fields: statistics[field] /= len(stats) statistics[field] *= 20 return render_template("profile.html", prof=prof, form=form, statistics=statistics, disabled=disabled)