我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.ext.login.current_user.id()。
def auth_jwt_project(short_name): """Create a JWT for a project via its secret KEY.""" project_secret_key = None if 'Authorization' in request.headers: project_secret_key = request.headers.get('Authorization') if project_secret_key: project = project_repo.get_by_shortname(short_name) if project and project.secret_key == project_secret_key: token = jwt.encode({'short_name': short_name, 'project_id': project.id}, project.secret_key, algorithm='HS256') return token else: return abort(404) else: return abort(403)
def avatar(user_id): if current_user.id == user_id or current_user.can(Permission.UPDATE_OTHERS_INFORMATION): the_user = User.query.get_or_404(user_id) avatar_edit_form = AvatarEditForm() avatar_upload_form = AvatarUploadForm() if avatar_upload_form.validate_on_submit(): if 'avatar' in request.files: forder = str(user_id) avatar_name = avatars.save(avatar_upload_form.avatar.data, folder=forder) the_user.avatar = json.dumps({"use_out_url": False, "url": avatar_name}) db.session.add(the_user) db.session.commit() flash(u'??????!', 'success') return redirect(url_for('user.detail', user_id=user_id)) if avatar_edit_form.validate_on_submit(): the_user.avatar = json.dumps({"use_out_url": True, "url": avatar_edit_form.avatar_url.data}) db.session.add(the_user) db.session.commit() return redirect(url_for('user.detail', user_id=user_id)) return render_template('avatar_edit.html', user=the_user, avatar_edit_form=avatar_edit_form, avatar_upload_form=avatar_upload_form, title=u"????") else: abort(403)
def upuser(userid): if current_user.id == userid or current_user.role == '0': form = UserEditForm() if form.validate_on_submit(): user = users.query.filter_by(id=userid).first() if user is not None and user.verify_password(form.oldpassword.data): user.password = form.password.data db.session.add(user) db.session.commit() flash(u'??????!') return render_template('useredit.html',form=form) user = users.query.filter_by(id=userid).first() form.username.data = user.username form.name.data = user.name return render_template('useredit.html',form=form) else: abort(403)
def get_incremental_task(project_id, user_id=None, user_ip=None, external_uid=None, offset=0, limit=1, orderby='id', desc=False): """Get a new task for a given project with its last given answer. It is an important strategy when dealing with large tasks, as transcriptions. """ candidate_tasks = get_candidate_task_ids(project_id, user_id, user_ip, external_uid, limit, offset, orderby='priority_0', desc=True) total_remaining = len(candidate_tasks) if total_remaining == 0: return None rand = random.randrange(0, total_remaining) task = candidate_tasks[rand] # Find last answer for the task q = session.query(TaskRun)\ .filter(TaskRun.task_id == task.id)\ .order_by(TaskRun.finish_time.desc()) last_task_run = q.first() if last_task_run: task.info['last_answer'] = last_task_run.info # TODO: As discussed in GitHub #53 # it is necessary to create a lock in the task! return [task]
def get_candidate_task_ids(project_id, user_id=None, user_ip=None, external_uid=None, limit=1, offset=0, orderby='priority_0', desc=True): """Get all available tasks for a given project and user.""" data = None if user_id and not user_ip and not external_uid: subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, user_id=user_id) else: if not user_ip: user_ip = '127.0.0.1' if user_ip and not external_uid: subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, user_ip=user_ip) else: subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, external_uid=external_uid) query = session.query(Task).filter(and_(~Task.id.in_(subquery.subquery()), Task.project_id == project_id, Task.state != 'completed')) query = _set_orderby_desc(query, orderby, desc) for q in query: print q data = query.limit(limit).offset(offset).all() return _handle_tuples(data)
def _set_orderby_desc(query, orderby, descending): """Set order by to query.""" if orderby == 'fav_user_ids': n_favs = func.coalesce(func.array_length(Task.fav_user_ids, 1), 0).label('n_favs') query = query.add_column(n_favs) if descending: query = query.order_by(desc("n_favs")) else: query = query.order_by("n_favs") else: if descending: query = query.order_by(getattr(Task, orderby).desc()) else: query = query.order_by(getattr(Task, orderby).asc()) #query = query.order_by(Task.id.asc()) return query
def _update_instance(self, oid): repo = repos[self.__class__.__name__]['repo'] query_func = repos[self.__class__.__name__]['get'] existing = getattr(repo, query_func)(oid) if existing is None: raise NotFound ensure_authorized_to('update', existing) data = json.loads(request.data) self._forbidden_attributes(data) # Remove hateoas links data = self.hateoas.remove_links(data) # may be missing the id as we allow partial updates data['id'] = oid self.__class__(**data) old = self.__class__(**existing.dictize()) for key in data: setattr(existing, key, data[key]) self._update_attribute(existing, old) update_func = repos[self.__class__.__name__]['update'] self._validate_instance(existing) getattr(repo, update_func)(existing) self._log_changes(old, existing) return existing
def register_api(view, endpoint, url, pk='id', pk_type='int'): """Register API endpoints. Registers new end points for the API using classes. """ view_func = view.as_view(endpoint) csrf.exempt(view_func) blueprint.add_url_rule(url, view_func=view_func, defaults={pk: None}, methods=['GET', 'OPTIONS']) blueprint.add_url_rule(url, view_func=view_func, methods=['POST', 'OPTIONS']) blueprint.add_url_rule('%s/<%s:%s>' % (url, pk_type, pk), view_func=view_func, methods=['GET', 'PUT', 'DELETE', 'OPTIONS'])
def index(window=0): """Get the last activity from users and projects.""" if current_user.is_authenticated(): user_id = current_user.id else: user_id = None if window >= 10: window = 10 top_users = cached_users.get_leaderboard(current_app.config['LEADERBOARD'], user_id=user_id, window=window) response = dict(template='/stats/index.html', title="Community Leaderboard", top_users=top_users) return handle_content_type(response)
def sanitize_project_owner(project, owner, current_user): """Sanitize project and owner data.""" if current_user.is_authenticated() and owner.id == current_user.id: if isinstance(project, Project): project_sanitized = project.dictize() # Project object else: project_sanitized = project # dict object owner_sanitized = cached_users.get_user_summary(owner.name) else: # anonymous or different owner if request.headers.get('Content-Type') == 'application/json': if isinstance(project, Project): project_sanitized = project.to_public_json() # Project object else: project_sanitized = Project().to_public_json(project) # dict object else: # HTML # Also dictize for HTML to have same output as authenticated user (see above) if isinstance(project, Project): project_sanitized = project.dictize() # Project object else: project_sanitized = project # dict object owner_sanitized = cached_users.public_get_user_summary(owner.name) return project_sanitized, owner_sanitized
def settings(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) title = project_title(project, "Settings") ensure_authorized_to('read', project) ensure_authorized_to('update', project) pro = pro_features() project = add_custom_contrib_button_to(project, get_user_id_or_ip()) owner_serialized = cached_users.get_user_summary(owner.name) response = dict(template='/projects/settings.html', project=project, owner=owner_serialized, n_tasks=n_tasks, overall_progress=overall_progress, n_task_runs=n_task_runs, last_activity=last_activity, n_completed_tasks=cached_projects.n_completed_tasks( project.get('id')), n_volunteers=cached_projects.n_volunteers( project.get('id')), title=title, pro_features=pro) return handle_content_type(response)
def show_blogpost(short_name, id): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) blogpost = blog_repo.get_by(id=id, project_id=project.id) if blogpost is None: raise abort(404) if project.needs_password(): redirect_to_password = _check_if_redirect_to_password(project) if redirect_to_password: return redirect_to_password else: ensure_authorized_to('read', blogpost) pro = pro_features() project = add_custom_contrib_button_to(project, get_user_id_or_ip()) return render_template('projects/blog_post.html', project=project, owner=owner, blogpost=blogpost, overall_progress=overall_progress, n_tasks=n_tasks, n_task_runs=n_task_runs, n_completed_tasks=cached_projects.n_completed_tasks(project.get('id')), n_volunteers=cached_projects.n_volunteers(project.get('id')), pro_features=pro)
def auditlog(short_name): pro = pro_features() if not pro['auditlog_enabled']: raise abort(403) (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) ensure_authorized_to('read', Auditlog, project_id=project.id) logs = auditlogger.get_project_logs(project.id) project = add_custom_contrib_button_to(project, get_user_id_or_ip()) return render_template('projects/auditlog.html', project=project, owner=owner, logs=logs, overall_progress=overall_progress, n_tasks=n_tasks, n_task_runs=n_task_runs, n_completed_tasks=cached_projects.n_completed_tasks(project.get('id')), n_volunteers=cached_projects.n_volunteers(project.get('id')), pro_features=pro)
def publish(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) #### shruthi if("sched" in project.info.keys() and project.info["sched"]=="FRG"): if(project.owner_id==current_user.id and not cached_users.is_quiz_created(current_user.id, project)): flash("You did not created quiz.Please create the quiz","danger") return redirect(url_for('quiz.create_quiz', short_name=project.short_name)) #### end pro = pro_features() ensure_authorized_to('publish', project) if request.method == 'GET': return render_template('projects/publish.html', project=project, pro_features=pro) project.published = True project_repo.save(project) task_repo.delete_taskruns_from_project(project) result_repo.delete_results_from_project(project) webhook_repo.delete_entries_from_project(project) auditlogger.log_event(project, current_user, 'update', 'published', False, True) flash(gettext('Project published! Volunteers will now be able to help you!')) return redirect(url_for('.details', short_name=project.short_name))
def users(user_id=None): """Manage users of PYBOSSA.""" form = SearchForm(request.body) users = [user for user in user_repo.filter_by(admin=True) if user.id != current_user.id] if request.method == 'POST' and form.user.data: query = form.user.data found = [user for user in user_repo.search_by_name(query) if user.id != current_user.id] [ensure_authorized_to('update', found_user) for found_user in found] if not found: flash("<strong>Ooops!</strong> We didn't find a user " "matching your query: <strong>%s</strong>" % form.user.data) response = dict(template='/admin/users.html', found=found, users=users, title=gettext("Manage Admin Users"), form=form) return handle_content_type(response) response = dict(template='/admin/users.html', found=[], users=users, title=gettext("Manage Admin Users"), form=form) return handle_content_type(response)
def del_admin(user_id=None): """Del admin flag for user_id.""" try: if user_id: user = user_repo.get(user_id) if user: ensure_authorized_to('update', user) user.admin = False user_repo.update(user) return redirect_content_type(url_for('.users')) else: msg = "User.id not found" return format_error(msg, 404) else: # pragma: no cover msg = "User.id is missing for method del_admin" return format_error(msg, 415) except Exception as e: # pragma: no cover current_app.logger.error(e) return abort(500)
def edit(user_id): if current_user.id == user_id or current_user.can(Permission.UPDATE_OTHERS_INFORMATION): the_user = User.query.get_or_404(user_id) form = EditProfileForm() if form.validate_on_submit(): the_user.name = form.name.data the_user.major = form.major.data the_user.headline = form.headline.data the_user.about_me = form.about_me.data db.session.add(the_user) db.session.commit() flash(u'??????!', "info") return redirect(url_for('user.detail', user_id=user_id)) form.name.data = the_user.name form.major.data = the_user.major form.headline.data = the_user.headline form.about_me.data = the_user.about_me return render_template('user_edit.html', form=form, user=the_user, title=u"????") else: abort(403)
def people(): error = None form = PersonForm(request.form) if request.method == 'POST': if form.validate_on_submit(): new_person = Person( name=form.name.data, url_handle=form.url_handle.data, biography=form.biography.data, added_by_user_id=current_user.id ) db.session.add(new_person) db.session.commit() return redirect(url_for('people.people')) else: error = 'FIXME ERROR' people = db.session.query(Person).all() return render_template( 'people.html', form=form, people=people, error=error)
def blog(): error = None form = BlogPostForm(request.form) if request.method == 'POST': if form.validate_on_submit(): new_blogpost = BlogPost( form.title.data, form.body.data, current_user.id, 'a_url_handle_for_this_blog_post' # FIXME ) db.session.add(new_blogpost) db.session.commit() return redirect(url_for('blog.blog')) else: error = 'FIXME ERROR' posts = db.session.query(BlogPost).all() return render_template( 'blog.html', form=form, posts=posts, error=error)
def test_max_grid_size_home_people(self): with self.client: response = self.client.post( '/login', data=dict(username="admin", password="admin"), follow_redirects=True ) MAX_GRID_SIZE_HOMEPAGE_PEOPLE = 6 for person in range(0,MAX_GRID_SIZE_HOMEPAGE_PEOPLE): response = self.client.post('/people', data=dict( name=person, biography='The bio', added_by_user_id=current_user.id, url_handle=person ), follow_redirects=True) response = self.client.get('/', content_type='html/text') # This depends on the text value of the home grid buttons self.assertEqual(response.data.count(b"View details"), MAX_GRID_SIZE_HOMEPAGE_PEOPLE)
def add_vote(): result = request.args.get('result',0,type=int) plus_minus = request.args.get('plus_minus',0,type=int) question_id = request.args.get('id',0,type=int) query = Vote.query.filter(Vote.author_id==current_user.id , Vote.question_id==question_id).first() if query: return jsonify(message='Thanks for the feedback! Votes cast by those with less than 125 reputation are recorded, but do not change the publicly displayed post score') else: if plus_minus == 1: put_disabled = True else: put_disabled = False vote = Vote(author_id=current_user.id,question_id=question_id,disabled=put_disabled) db.session.add(vote) return jsonify(result=result + plus_minus)
def edit_profile_admin(id): user = User.query.get_or_404(id) form = EditProfileAdminForm(user=user) if form.validate_on_submit(): user.email = form.email.data user.username = form.username.data user.confirmed = form.confirmed.data user.role = Role.query.get(form.role.data) user.name = form.name.data user.location = form.location.data user.about_me = form.about_me.data db.session.add(user) flash('The profile has been updated.') return redirect(url_for('.user', username=user.username)) form.email.data = user.email form.username.data = user.username form.confirmed.data = user.confirmed form.role.data = user.role_id form.name.data = user.name form.location.data = user.location form.about_me.data = user.about_me return render_template('edit_profile.html', form=form, user=user)
def topic_page(tpage=1, uid=None,source=None): if not source or source != "article": return message("error", "", "????") tpage = int(tpage) User = app.home.models.User user = User.query.filter_by(id=uid).first() if not user: return message("error", "", "?????") Topic = app.home.models.Topic topics = Topic.get_user_article_topics(user.id, tpage) value = {} value["items"] = [{"name":t.name, "href":user_topic_url(source, t, user, tpage), "count": count} for t, count in topics.items] if topics.has_prev: value["prev"] = topics.prev_num if topics.has_next: value["next"] = topics.next_num return message("success", value)
def calendar(id): if request.method == 'POST': try: checked = 'delete' in request.form if checked == True: events.user = current_user.id events.id = request.form['id'] events.delete() else: events.user = current_user.id events.id = request.form['id'] events.title = request.form['title'] events.notes = request.form['notes'] events.lieu = request.form['lieu'] events.color = request.form['color'] events.update_event() return redirect(url_for('core.calendars')) except CalendarException: flash(gettext('An error has occured'),'error') return redirect(url_for('core.calendars'))
def options(): if request.method == 'GET': return render_template('options/index.html') elif request.method == 'POST': try: option.user = current_user.id if request.form.get('color') == None: option.color = current_user.color else: option.color = request.form.get('color') option.change_color() flash(gettext('Account changed successfully' ), 'success') return redirect(url_for('core.options')) except OptionsException: flash(gettext("An error has occured"), 'error') return redirect(url_for('core.options'))
def add_scenario(): try: list = request.form.getlist(request.form.get('event_code') + '_param') template = " && ".join(list) launcher.template = template launcher.name = request.form.get('scenario') launcher.user = current_user.id launcher.event = request.form.get('event_code') launcher.action = request.form.get(request.form.get('action')) launcher.action_param = json.encode(request.form.getlist(request.form.get('action') + '_param')) launcher.add() flash(gettext('Scenario added successfully !'), 'success') return redirect(url_for('core.scenario')) except: flash(gettext('An error has occured !'), 'error') return redirect(url_for('core.scenario'))
def password_needed(self, project, user_id_or_ip): """Check if password is required.""" if project.needs_password() and (current_user.is_anonymous() or not (current_user.admin or current_user.id == project.owner_id)): cookie = self.cookie_handler.get_cookie_from(project) request_passwd = user_id_or_ip not in cookie return request_passwd return False
def get_breadth_first_task(project_id, user_id=None, user_ip=None, external_uid=None, offset=0, limit=1, orderby='id', desc=False): """Get a new task which have the least number of task runs.""" project_query = session.query(Task.id).filter(Task.project_id==project_id, Task.state!='completed') if user_id and not user_ip and not external_uid: subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, user_id=user_id) else: if not user_ip: # pragma: no cover user_ip = '127.0.0.1' if user_ip and not external_uid: subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, user_ip=user_ip) else: subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, external_uid=external_uid) tmp = project_query.except_(subquery) query = session.query(Task, func.sum(Counter.n_task_runs).label('n_task_runs'))\ .filter(Task.id==Counter.task_id)\ .filter(Counter.task_id.in_(tmp))\ .group_by(Task.id)\ .order_by('n_task_runs ASC')\ query = _set_orderby_desc(query, orderby, desc) print offset print "breadth_first" data = query.limit(limit).offset(offset).all() return data
def _add_hateoas_links(self, item): obj = item.dictize() related = request.args.get('related') if related: if item.__class__.__name__ == 'Task': obj['task_runs'] = [] obj['result'] = None task_runs = task_repo.filter_task_runs_by(task_id=item.id) results = result_repo.filter_by(task_id=item.id, last_version=True) for tr in task_runs: obj['task_runs'].append(tr.dictize()) for r in results: obj['result'] = r.dictize() if item.__class__.__name__ == 'TaskRun': tasks = task_repo.filter_tasks_by(id=item.task_id) results = result_repo.filter_by(task_id=item.task_id, last_version=True) obj['task'] = None obj['result'] = None for t in tasks: obj['task'] = t.dictize() for r in results: obj['result'] = r.dictize() if item.__class__.__name__ == 'Result': tasks = task_repo.filter_tasks_by(id=item.task_id) task_runs = task_repo.filter_task_runs_by(task_id=item.task_id) obj['task_runs'] = [] for t in tasks: obj['task'] = t.dictize() for tr in task_runs: obj['task_runs'].append(tr.dictize()) links, link = self.hateoas.create_links(item) if links: obj['links'] = links if link: obj['link'] = link return obj
def api_context(self, all_arg, **filters): if current_user.is_authenticated(): filters['owner_id'] = current_user.id if filters.get('owner_id') and all_arg == '1': del filters['owner_id'] return filters
def _set_limit_and_offset(self): try: limit = min(100, int(request.args.get('limit'))) except (ValueError, TypeError): limit = 20 try: offset = int(request.args.get('offset')) except (ValueError, TypeError): offset = 0 try: orderby = request.args.get('orderby') if request.args.get('orderby') else 'id' except (ValueError, TypeError): orderby = 'updated' return limit, offset, orderby
def _file_upload(self, request): content_type = 'multipart/form-data' if content_type in request.headers.get('Content-Type'): tmp = dict() for key in request.form.keys(): tmp[key] = request.form[key] ensure_authorized_to('create', HelpingMaterial, project_id=tmp['project_id']) upload_method = current_app.config.get('UPLOAD_METHOD') if request.files.get('file') is None: raise AttributeError _file = request.files['file'] container = "user_%s" % current_user.id uploader.upload_file(_file, container=container) file_url = get_avatar_url(upload_method, _file.filename, container) tmp['media_url'] = file_url if tmp.get('info') is None: tmp['info'] = dict() tmp['info']['container'] = container tmp['info']['file_name'] = _file.filename return tmp else: return None
def _select_attributes(self, data): if current_user.is_anonymous(): data = self._filter_private_data(data) return data if (current_user.is_authenticated and (current_user.id == data['owner_id'] or current_user.admin)): return data else: data = self._filter_private_data(data) return data
def user_progress(project_id=None, short_name=None): """API endpoint for user progress. Return a JSON object with two fields regarding the tasks for the user: { 'done': 10, 'total: 100 } This will mean that the user has done a 10% of the available tasks for him """ if project_id or short_name: if short_name: project = project_repo.get_by_shortname(short_name) elif project_id: project = project_repo.get(project_id) if project: # For now, keep this version, but wait until redis cache is used here for task_runs too query_attrs = dict(project_id=project.id) if current_user.is_anonymous(): query_attrs['user_ip'] = request.remote_addr or '127.0.0.1' else: query_attrs['user_id'] = current_user.id taskrun_count = task_repo.count_task_runs_with(**query_attrs) tmp = dict(done=taskrun_count, total=n_tasks(project.id)) return Response(json.dumps(tmp), mimetype="application/json") else: return abort(404) else: # pragma: no cover return abort(404)
def manage_user_no_login(access_token, next_url): if current_user.is_authenticated(): user = user_repo.get(current_user.id) user.info['twitter_token'] = access_token user_repo.save(user) return redirect(next_url)
def details(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) if project.needs_password(): redirect_to_password = _check_if_redirect_to_password(project) if redirect_to_password: return redirect_to_password else: ensure_authorized_to('read', project) template = '/projects/project.html' pro = pro_features() title = project_title(project, None) project = add_custom_contrib_button_to(project, get_user_id_or_ip()) project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user) template_args = {"project": project_sanitized, "title": title, "owner": owner_sanitized, "n_tasks": n_tasks, "n_task_runs": n_task_runs, "overall_progress": overall_progress, "last_activity": last_activity, "n_completed_tasks": cached_projects.n_completed_tasks(project.get('id')), "n_volunteers": cached_projects.n_volunteers(project.get('id')), "pro_features": pro} if current_app.config.get('CKAN_URL'): template_args['ckan_name'] = current_app.config.get('CKAN_NAME') template_args['ckan_url'] = current_app.config.get('CKAN_URL') template_args['ckan_pkg_name'] = short_name response = dict(template=template, **template_args) return handle_content_type(response)
def _import_tasks(project, **form_data): number_of_tasks = importer.count_tasks_to_import(**form_data) if number_of_tasks <= MAX_NUM_SYNCHRONOUS_TASKS_IMPORT: report = importer.create_tasks(task_repo, project.id, **form_data) flash(report.message) else: importer_queue.enqueue(import_tasks, project.id, **form_data) flash(gettext("You're trying to import a large amount of tasks, so please be patient.\ You will receive an email when the tasks are ready.")) return redirect_content_type(url_for('.tasks', short_name=project.short_name))
def tasks(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) title = project_title(project, "Tasks") if project.needs_password(): redirect_to_password = _check_if_redirect_to_password(project) if redirect_to_password: return redirect_to_password else: ensure_authorized_to('read', project) pro = pro_features() project = add_custom_contrib_button_to(project, get_user_id_or_ip()) feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES')) autoimporter_enabled = feature_handler.autoimporter_enabled_for(current_user) project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user) response = dict(template='/projects/tasks.html', title=title, project=project_sanitized, owner=owner_sanitized, autoimporter_enabled=autoimporter_enabled, n_tasks=n_tasks, n_task_runs=n_task_runs, overall_progress=overall_progress, last_activity=last_activity, n_completed_tasks=cached_projects.n_completed_tasks( project.get('id')), n_volunteers=cached_projects.n_volunteers( project.get('id')), pro_features=pro) return handle_content_type(response)
def delete_tasks(short_name): """Delete ALL the tasks for a given project""" (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) ensure_authorized_to('read', project) ensure_authorized_to('update', project) pro = pro_features() if request.method == 'GET': title = project_title(project, "Delete") n_volunteers = cached_projects.n_volunteers(project.id) n_completed_tasks = cached_projects.n_completed_tasks(project.id) project = add_custom_contrib_button_to(project, get_user_id_or_ip()) project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user) response = dict(template='projects/tasks/delete.html', project=project_sanitized, owner=owner_sanitized, n_tasks=n_tasks, n_task_runs=n_task_runs, n_volunteers=n_volunteers, n_completed_tasks=n_completed_tasks, overall_progress=overall_progress, last_activity=last_activity, title=title, pro_features=pro, csrf=generate_csrf()) return handle_content_type(response) else: task_repo.delete_valid_from_project(project) msg = gettext("Tasks and taskruns with no associated results have been deleted") flash(msg, 'success') return redirect_content_type(url_for('.tasks', short_name=project.short_name))
def show_blogposts(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) blogposts = blog_repo.filter_by(project_id=project.id) if project.needs_password(): redirect_to_password = _check_if_redirect_to_password(project) if redirect_to_password: return redirect_to_password else: ensure_authorized_to('read', Blogpost, project_id=project.id) pro = pro_features() project = add_custom_contrib_button_to(project, get_user_id_or_ip()) project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user) response = dict(template='projects/blog.html', project=project_sanitized, owner=owner_sanitized, blogposts=blogposts, overall_progress=overall_progress, n_tasks=n_tasks, n_task_runs=n_task_runs, n_completed_tasks=cached_projects.n_completed_tasks( project.get('id')), n_volunteers=cached_projects.n_volunteers( project.get('id')), pro_features=pro) return handle_content_type(response)
def webhook_handler(short_name, oid=None): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) pro = pro_features() if not pro['webhooks_enabled']: raise abort(403) responses = webhook_repo.filter_by(project_id=project.id) if request.method == 'POST' and oid: tmp = webhook_repo.get(oid) if tmp: webhook_queue.enqueue(webhook, project.webhook, tmp.payload, tmp.id) return json.dumps(tmp.dictize()) else: abort(404) ensure_authorized_to('read', Webhook, project_id=project.id) redirect_to_password = _check_if_redirect_to_password(project) if redirect_to_password: return redirect_to_password project = add_custom_contrib_button_to(project, get_user_id_or_ip()) return render_template('projects/webhook.html', project=project, owner=owner, responses=responses, overall_progress=overall_progress, n_tasks=n_tasks, n_task_runs=n_task_runs, n_completed_tasks=cached_projects.n_completed_tasks(project.get('id')), n_volunteers=cached_projects.n_volunteers(project.get('id')), pro_features=pro)
def results(short_name): """Results page for the project.""" (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) title = project_title(project, "Results") ensure_authorized_to('read', project) pro = pro_features() title = project_title(project, None) project = add_custom_contrib_button_to(project, get_user_id_or_ip()) project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user) template_args = {"project": project_sanitized, "title": title, "owner": owner_sanitized, "n_tasks": n_tasks, "n_task_runs": n_task_runs, "overall_progress": overall_progress, "last_activity": last_activity, "n_completed_tasks": cached_projects.n_completed_tasks(project.get('id')), "n_volunteers": cached_projects.n_volunteers(project.get('id')), "pro_features": pro, "n_results": n_results} response = dict(template = '/projects/results.html', **template_args) return handle_content_type(response)
def categories(): """List Categories.""" try: if request.method == 'GET': ensure_authorized_to('read', Category) form = CategoryForm() if request.method == 'POST': ensure_authorized_to('create', Category) form = CategoryForm(request.body) del form.id if form.validate(): slug = form.name.data.lower().replace(" ", "") category = Category(name=form.name.data, short_name=slug, description=form.description.data) project_repo.save_category(category) cached_cat.reset() msg = gettext("Category added") flash(msg, 'success') else: flash(gettext('Please correct the errors'), 'error') categories = cached_cat.get_all() n_projects_per_category = dict() for c in categories: n_projects_per_category[c.short_name] = \ cached_projects.n_count(c.short_name) response = dict(template='admin/categories.html', title=gettext('Categories'), categories=categories, n_projects_per_category=n_projects_per_category, form=form) return handle_content_type(response) except Exception as e: # pragma: no cover current_app.logger.error(e) return abort(500)
def update_category(id): """Update a category.""" try: category = project_repo.get_category(id) if category: ensure_authorized_to('update', category) form = CategoryForm(obj=category) form.populate_obj(category) if request.method == 'GET': response = dict(template='admin/update_category.html', title=gettext('Update Category'), category=category, form=form) return handle_content_type(response) if request.method == 'POST': form = CategoryForm(request.body) if form.validate(): slug = form.name.data.lower().replace(" ", "") new_category = Category(id=form.id.data, name=form.name.data, short_name=slug) project_repo.update_category(new_category) cached_cat.reset() msg = gettext("Category updated") flash(msg, 'success') return redirect_content_type(url_for(".categories")) else: msg = gettext("Please correct the errors") flash(msg, 'success') response = dict(template='admin/update_category.html', title=gettext('Update Category'), category=category, form=form) return handle_content_type(response) else: abort(404) except HTTPException: raise except Exception as e: # pragma: no cover current_app.logger.error(e) return abort(500)
def new_announcement(): """Create new announcement.""" def respond(): response = dict(template='', # template='admin/new_announcement.html', title=gettext("Write a new post"), form=form) return handle_content_type(response) form = AnnouncementForm() del form.id # project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user) if request.method != 'POST': ensure_authorized_to('create', Announcement()) return respond() if not form.validate(): flash(gettext('Please correct the errors'), 'error') return respond() announcement = Announcement(title=form.title.data, body=form.body.data, user_id=current_user.id) ensure_authorized_to('create', announcement) announcement_repo.save(announcement) msg_1 = gettext('Annnouncement created!') flash('<i class="icon-ok"></i> ' + msg_1, 'success') return redirect_content_type(url_for('admin.announcement'))
def update_announcement(id): announcement = announcement_repo.get_by(id=id) if announcement is None: raise abort(404) def respond(): response = dict(template='', # template='admin/update_announcement.html', title=gettext("Edit a post"), form=form) return handle_content_type(response) form = AnnouncementForm() if request.method != 'POST': ensure_authorized_to('update', announcement) form = AnnouncementForm(obj=announcement) return respond() if not form.validate(): flash(gettext('Please correct the errors'), 'error') return respond() ensure_authorized_to('update', announcement) announcement = Announcement(id=form.id.data, title=form.title.data, body=form.body.data, user_id=current_user.id) announcement_repo.update(announcement) msg_1 = gettext('Announcement updated!') flash('<i class="icon-ok"></i> ' + msg_1, 'success') return redirect_content_type(url_for('admin.announcement'))
def delete_announcement(id): announcement = announcement_repo.get_by(id=id) if announcement is None: raise abort(404) ensure_authorized_to('delete', announcement) announcement_repo.delete(announcement) flash('<i class="icon-ok"></i> ' + 'Announcement deleted!', 'success') return redirect_content_type(url_for('admin.announcement'))
def get_user_id_or_ip(): """Return the id of the current user if is authenticated. Otherwise returns its IP address (defaults to 127.0.0.1). """ user_id = current_user.id if current_user.is_authenticated() else None user_ip = request.remote_addr or "127.0.0.1" \ if current_user.is_anonymous() else None return dict(user_id=user_id, user_ip=user_ip)
def change_password(): form = ChangePasswordForm() if form.validate_on_submit(): current_user.password = form.new_password.data db.session.add(current_user) db.session.commit() flash(u'??????!', 'info') return redirect(url_for('user.detail', user_id=current_user.id)) return render_template('user_edit.html', form=form, user=current_user, title=u"????")
def book_return(): log_id = request.args.get('log_id') book_id = request.args.get('book_id') the_log = None if log_id: the_log = Log.query.get(log_id) if book_id: the_log = Log.query.filter_by(user_id=current_user.id, book_id=book_id).first() if log is None: flash(u'????????', 'warning') else: result, message = current_user.return_book(the_log) flash(message, 'success' if result else 'danger') db.session.commit() return redirect(request.args.get('next') or url_for('book.detail', book_id=log_id))