我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用flask_login.current_user.can()。
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and \ form.validate_on_submit(): post = Post(body=form.body.data, author=current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed=bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['POSTS_PER_PAGE'], error_out=False) posts=pagination.items return render_template('index.html', form=form, posts=posts, show_followed=show_followed, pagination=pagination)
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and \ form.validate_on_submit(): post = Post(body=form.body.data, author=current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['CIRCULATE_POSTS_PER_PAGE'], error_out=False) posts = pagination.items return render_template('index.html', form=form, posts=posts, show_followed=show_followed, pagination=pagination)
def index(): form = PostForm() if current_user.can(Permission.WRITE) and form.validate_on_submit(): post = Post(body=form.body.data, author=current_user._get_current_object()) db.session.add(post) db.session.commit() return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['FLASKY_POSTS_PER_PAGE'], error_out=False) posts = pagination.items return render_template('index.html', form=form, posts=posts, show_followed=show_followed, pagination=pagination)
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and \ form.validate_on_submit(): post = Post(body=form.body.data, author=current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['FLASKY_POSTS_PER_PAGE'], error_out=False) posts = pagination.items return render_template('index.html', form=form, posts=posts, show_followed=showfollowed, pagination=pagination)
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and form.validate_on_submit(): post = Post(body=form.body.data, author=current_user.get_get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['FLASKY_POSTS_PER_PAGE'], error_out=False) posts = pagination.items return render_template('index.html', form=form, posts=posts, show_followed=show_followed, pagination=pagination)
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and \ form.validate_on_submit(): post = Post(body = form.body.data, author = current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed', '')) if show_followed: query = current_user.followed_posts else: query = Post.query page = request.args.get('page', 1, type=int) pagination = query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['FLASKY2_POSTS_PER_PAGE'], error_out = False) posts = pagination.items return render_template('index.html', form=form, posts=posts, pagination=pagination, show_followed=show_followed)
def index(): form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and \ form.validate_on_submit(): post = Post(title=form.title.data, category=Category.query.get(form.category.data), body=form.body.data, summury=form.summury.data, author=current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page', 1, type=int) pagination = Post.query.order_by(Post.timestamp.desc()).paginate( page, per_page=current_app.config['FLASKY_POSTS_PER_PAGE'], error_out=False) posts = pagination.items return render_template('index.html', form=form, posts=posts, pagination=pagination)
def edit(id): post = Post.query.get_or_404(id) if current_user != post.author and \ not current_user.can(Permission.ADMINISTER): abort(403) form = PostForm() if form.validate_on_submit(): post.title=form.title.data post.body=form.body.data post.summury=form.summury.data post.category=Category.query.get(form.category.data) db.session.add(post) flash(u'?????') return redirect(url_for('.post', id=post.id)) form.title.data = post.title form.body.data = post.body form.summury.data = post.summury form.category.data = post.category_id return render_template('edit_post.html', form=form)
def edit(id): post = Post.query.get_or_404(id) if current_user != post.author and not current_user.can(Permission.ADMINISTER): abort(403) form = PostForm() if form.validate_on_submit(): post.body = form.body.data db.session.add(post) flash('The post has been updated') return redirect(url_for('.post',id=post.id)) form.body.data = post.body return render_template('edit_post.html',form=form) #????
def edit(id): post = Post.query.get_or_404(id) if current_user != post.author and \ not current_user.can(Permission.ADMINISTER): abort(403) form = PostForm() if form.validate_on_submit(): post.body = form.body.data db.session.add(post) flash('The post has been updated.') return redirect(url_for('.post', id=post.id)) form.body.data = post.body return render_template('edit_post.html', form=form)
def permission_required(permission): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.can(permission): abort(403) return f(*args, **kwargs) return decorated_function return decorator
def permission_required(permission): def decorator(f): @functools.wraps(f) def wrapped(*args, **kwargs): if not current_user.can(permission): abort(403) return f(*args, **kwargs) return wrapped return decorator
def permission_required(permission): """Decorator for permission verification. """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.can(permission): abort(403) return f(*args, **kwargs) return decorated_function return decorator
def edit_sentiment(id): sentiment = Sentiment.query.get_or_404(id) if not current_user == sentiment.author and not current_user.can(Permission.ADMINISTER): flash('The sentiment can be edited by either its author or a site administrator.', 'warning') return redirect(url_for('.sentiments')) form = SentimentForm() if form.validate_on_submit(): sentiment.body = form.body.data sentiment.score = form.score.data sentiment.language_id = form.language.data sentiment.timestamp = form.timestamp.data db.session.add(sentiment) flash('The sentiment has been updated.', 'success') return redirect(url_for('.sentiments')) form.body.data = sentiment.body form.score.data = sentiment.score form.language.data = sentiment.language_id form.timestamp.data = datetime.utcnow() return render_template('admin/edit_sentiment.html', form=form, datetimepicker=datetime.utcnow(), )
def remove_sentiment(id): sentiment = Sentiment.query.get_or_404(id) if current_user == sentiment.author or \ current_user.can(Permission.ADMINISTER): db.session.delete(sentiment) flash('The sentiment has been removed.', 'success') else: flash('The sentiment can be removed by either its author or a site administrator.', 'warning') return redirect(url_for('.sentiments')) # languages
def can(self, permissions): if self.roles is None: return False all_perms = reduce(or_, map(lambda x: x.permissions, self.roles)) return all_perms & permissions == permissions
def can_admin(self): return self.can(Permission.ADMINISTER)
def permission_required(permission): def decorator(f): @wraps(f) def _deco(*args, **kwargs): if not current_user.can(permission): abort(403) return f(*args, **kwargs) return _deco return decorator
def admin(): return 'Only administrators can see this!'
def edit(id): post = Post.query.get_or_404(id) if current_user != post.author and \ not current_user.can(Permission.ADMIN): abort(403) form = PostForm() if form.validate_on_submit(): post.body = form.body.data db.session.add(post) db.session.commit() flash('The post has been updated.') return redirect(url_for('.post', id=post.id)) form.body.data = post.body return render_template('edit_post.html', form=form)
def permission_required(permission): """Restrict a view to users with the given permission.""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.can(permission): abort(403) return f(*args, **kwargs) return decorated_function return decorator
def permission_required(permission) : def decorator(f) : @wraps(f) def decorate_function(*args,**kwargs): if not current_user.can(permission) : abort(403) return f(*args,**kwargs) return decorated_function return decorator
def edit(id): post=Post.query.get_or_404(id) if current_user != post.author and \ not current_user.can(Permission.ADMINISTER): abort(403) form = PostForm() if form.validate_on_submit(): post.body = form.body.data db.session.add(post) flash('The post has been updated.') return redirect(url_for('.post', id=post.id)) form.body.data = post.body return render_template('edit_post.html', form=form)
def permission_required(permissions): def decorator(f): @wraps(f) def wrapper(*args, **kwargs): if not current_user.can(permissions): abort(403) return f(*args, **kwargs) return wrapper return decorator
def edit(id): post = Post.query.get_or_404(id) if current_user != post.author and \ not current_user.can(Permission.ADMINISTER): abort(403) form = PostForm() if form.validate_on_submit(): post.body=form.body.data db.session.add(post) flash('The post has been updated.') return redirect(url_for('.post', id=post.id)) form.body.data=post.body return render_template('edit_post.html', form=form)
def delete(id): post = Post.query.get_or_404(id) if current_user != post.author and \ not current_user.can(Permission.ADMINISTER): abort(403) form=DeleteForm() if form.delete: db.session.delete(post) flash('You have delete the post.') return redirect(url_for('.user', username=current_user.username)) return render_template('delete.html', form=form)
def edit(id): post = Post.query.get_or_404(id) if current_user != post.author and not current_user.can(Permission.ADMINISTER): abort(403) form = PostForm() if form.validate_on_submit(): post.body = form.body.data db.session.add(post) flash('The post has been updated.') return redirect(url_for('post', id=post.id)) form.body.data = post.body return render_template('edit_post.html', form=form)
def permission_required(permission): """????????????""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.can(permission): abort(403) return f(*args, **kwargs) return decorated_function return decorator
def index(): # form = NameForm() # if form.validate_on_submit(): # user = User.query.filter_by(username=form.name.data).first() # if user is None: # user = User(username=form.name.data) # db.session.add(user) # session['known'] = False # if current_app.config['FLASKY_ADMIN']: # send_email(current_app.config['FLASKY_ADMIN'],'New User','mail/new_user',user=user) # # else: # session['known'] = True # session['name'] = form.name.data # return redirect(url_for('.index')) # return render_template('index.html',form = form ,name =session.get('name'),known = session.get('known',False)) form = PostForm() if current_user.can(Permission.WRITE_ARTICLES) and form.validate_on_submit(): post = Post(body=form.body.data,author=current_user._get_current_object()) # print type(current_user._get_current_object()) db.session.add(post) return redirect(url_for('.index')) page = request.args.get('page',1,type=int) show_followed = False if current_user.is_authenticated: show_followed = bool(request.cookies.get('show_followed','')) if show_followed: query = current_user.followed_posts else: query = Post.query pagination = query.order_by(Post.timestamp.desc()).paginate(page,per_page=current_app.config['FLASKY_POSTS_PER_PAGE'],error_out=False) posts = pagination.items print posts return render_template('index.html',form=form,posts=posts,show_followed=show_followed,pagination=pagination)
def permission_required(permission): def decorator(f): @wraps(f) def decorated_function(*args,**kwargs): if not current_user.can(permission): abort(403) return f(*args,**kwargs) return decorated_function return decorator
def json_response(f): """A decorator without arguments :param func f: :return: :rtype: func """ @functools.wraps(f) def wrapped(*args, **kwargs): current_app.log.warn('Using the json_response decorator is deprecated.' 'Please use app.core.ApiResponse.') # invoke the wrapped function rv = f(*args, **kwargs) # wrapped function is a redirect # return it without doing anything if isinstance(rv, Response): return rv # the wrapped function can return the dictionary alone, # or can also include a status code and/or headers. # here we separate all these items status_or_headers = None headers = None if isinstance(rv, tuple): rv, status_or_headers, headers = rv + (None, ) * (3 - len(rv)) if isinstance(status_or_headers, (dict, list)): headers, status_or_headers = status_or_headers, None # if the response was a database model, then convert it to a # dictionary if not isinstance(rv, dict): rv = rv.serialize() # generate the JSON response rv = jsonify(rv) if status_or_headers is not None: rv.status_code = status_or_headers if headers is not None: rv.headers.extend(headers) return rv return wrapped
def rate_limit(limit, period): """Limits the rate at which clients can send requests to 'limit' requests per 'period' seconds. Once a client goes over the limit all requests are answered with a status code 429 Too Many Requests for the remaining of that period. :param period: :param limit: """ def decorator(f): @functools.wraps(f) def wrapped(*args, **kwargs): # initialize the rate limiter the first time here global _limiter if _limiter is None: _limiter = MemRateLimit() # generate a unique key to represent the decorated function and # the IP address of the client. Rate limiting counters are # maintained on each unique key. key = '{0}/{1}'.format(f.__name__, request.remote_addr) allowed, remaining, reset = _limiter.is_allowed(key, limit, period) # set the rate limit headers in g, so that they are picked up # by the after_request handler and attached to the response g.headers = { 'DO-RateLimit-Remaining': str(remaining), 'DO-RateLimit-Limit': str(limit), 'DO-RateLimit-Reset': str(reset) } # if the client went over the limit respond with a 429 status # code, else invoke the wrapped function if not allowed: response = jsonify( {'status': 429, 'error': 'too many requests', 'message': 'You have exceeded your request rate'}) response.status_code = 429 return response # else we let the request through return f(*args, **kwargs) return wrapped return decorator
def get_file(file_id): """Download file **Example request**: .. sourcecode:: http GET /api/1.0/files/67 HTTP/1.1 Host: cp.cert.europa.eu Accept: application/json **Example response**: .. sourcecode:: http HTTP/1.0 200 OK Content-Type: application/json Content-Disposition: attachment; filename=CIMBL-244-EU.zip Content-Length: 55277 Content-Type: application/zip :param file_id: filename or unique ID :reqheader Accept: Content type(s) accepted by the client :resheader Content-Type: this depends on `Accept` header or request :status 200: File found :status 404: Resource not found """ if isinstance(file_id, str): cond = (DeliverableFile.name == file_id) else: cond = (DeliverableFile.id == file_id) if current_user.can(Permission.SLAACTIONS): deliverable_query = DeliverableFile.query.\ filter(cond) else: deliverable_query = DeliverableFile.query.\ filter(cond).filter_by(is_sla=0) dfile = deliverable_query.first_or_404() cfg = current_app.config return send_file(os.path.join(cfg['APP_UPLOADS'], dfile.name), attachment_filename=dfile.name, as_attachment=True)