我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用flask.ext.login.current_user.get_id()。
def test_delete(self): user_id = "testuser" with self.client: # Anonymous user cannot delete response = self.client.post("/blog/delete/1/") self.assertEqual(response.status_code, 401) # a user cannot delete another person's post self.login(user_id) self.assertEquals(current_user.get_id(), user_id) response = self.client.post("/blog/delete/11/", follow_redirects=True) assert "You do not have the rights to delete this post" in \ str(response.data) # a user can delete his posts response = self.client.post("/blog/delete/1/", follow_redirects=True) assert "Your post was successfully deleted" in str(response.data)
def group_apply(group): originalgroup = group group = str(group) assert(group in app.config["groups"]["closedgroups"]+app.config["groups"]["opengroups"]) join = True if group in app.config["groups"]["closedgroups"]: group = group+"-pending" join = False if current_user.accountStatus[0]=="Ineligible": if group not in app.config["groups"]["publicgroups"]: flash("You cannot join that group.", "danger") return redirect("/groups") ldaptools.modgroup(current_user.get_id() , MOD_ADD, group) if join: flash("Joined %s group" % group, "success") else: flash("Applied for %s group" % originalgroup, "success") return redirect("/groups")
def add_tss3id(): ts3id = str(request.form["ts3id"]) if len(ts3id)==0: flash("You must enter a non-null TeamSpeak 3 Identity.", "danger") return redirect("/services") elif len(ts3id)!=28: flash("The TeamSpeak 3 Identity you entered is of an invalid length.", "danger") return redirect("/services") elif ts3id[len(ts3id)-1]!="=": flash("The TeamSpeak 3 Identity you entered is in an invalid format.", "danger") return redirect("/services") ts3group = { "Internal": app.config["ts3"]["servergroups"]["full"], "Ally": app.config["ts3"]["servergroups"]["ally"], "Ineligible": app.config["ts3"]["servergroups"]["none"] } ldaptools.modts3id(current_user.get_id() , MOD_ADD, ts3id) result = ts3manager.modpermissions(ts3id, groupid=ts3group[current_user.accountStatus[0]]) ts3manager.update_groups([ldaptools.getuser(current_user.get_id())]) if result: flash("TS3 ID added and auth requested.", "success") else: flash("Something blew up.", "error") return redirect("/services")
def test_correct_login(self): with self.client: response = self.login("admin", "adminpassword") # 200 (OK) HTTP status code self.assert200(response) # check if user is authenticated self.assertTrue(current_user.is_authenticated()) # check if user is not anonymous self.assertFalse(current_user.is_anonymous()) # get user id self.assertEqual(current_user.get_id(), "1") # test user redirects to the main page self.assertIn('/posts', request.url) # Ensure alert is shown after logging in # Binary format because str() object doesn't support Buffer api self.assertIn(b'you were just logged in', response.data)
def new_reservation(property_id): job_task = None form = ApplicationForm() form.property_id.data = property_id if request.method == 'POST': if form.validate_on_submit(): guest = User.query.get(current_user.get_id()) job_task = JobTask.query.get(form.property_id.data) reservation = Reservation(form.message.data, job_task, guest) db.session.add(reservation) db.session.commit() gigaware.sms.sms_client.notify_host(reservation) return redirect_to('listings') if property_id is not None: job_task = JobTask.query.get(property_id) return view_with_params('reservation', job_task=job_task, form=form)
def new_property(): form = JobListingForm() if request.method == 'POST': if form.validate_on_submit(): host = User.query.get(current_user.get_id()) new_task = JobTask( form.description.data, form.image_url.data, form.city.data, form.country.data, form.zip_code.data, form.details.data, form.price.data, form.currency.data, host) db.session.add(new_task) db.session.commit() return redirect_to('listings') return view('listings_new', form)
def follow_post(id): user = session.query(User).filter_by(id=id).one() cuid = current_user.get_id() cuser = session.query(User).filter_by(id=cuid).one() try: if 'Unfollow' in request.form: session.add(cuser.unfollow(user)) session.commit() flash("Good call unfollowing " + user.name + ". Nobody needs that.", "success") return redirect(url_for("entries")) elif 'Follow' in request.form: session.add(cuser.follow(user)) session.commit() flash("You are now following " + user.name + ".", "success") return redirect(url_for("stats_get")) except (IntegrityError, StaleDataError, UnmappedInstanceError): flash("That didn't work... mind letting Jon know?", "danger") session.rollback() return redirect(url_for("entries"))
def update_post(post_id, **kwargs): """ ???? :param post_id: ?? id :param kwargs: ??????? :return: ?????? id,???? False """ sets = dict(kwargs) sets.update({ 'update_at': int(time.time()) }) if not sets.get('slug'): sets['slug'] = get_slug(sets.get('title')) slug_count = mongo.db.posts.count({'slug': sets.get('slug'), 'post_id': {'$ne': post_id}}) if slug_count > 0: new_slug = '-'.join([sets.get('slug'), str(slug_count+1)]) while mongo.db.posts.count({'slug': new_slug}): slug_count += 1 new_slug = '-'.join([sets.get('slug'), str(slug_count+1)]) sets['slug'] = new_slug if not sets.get('update_by'): sets['update_by'] = str(current_user.get_id()) if sets.get('create_at'): del sets['create_at'] if sets.get('markdown'): sets['html'] = parse_markdown(sets['markdown']) if not sets.get('excerpt') and sets.get('html'): sets['excerpt'] = get_excerpt(sets['html'], count=Setting.get_setting('excerpt_length', 120), wrapper=u'') try: result = mongo.db.posts.update_one({ 'post_id': post_id }, { '$set': sets }) if result and result.modified_count > 0: Post.update_post_tags(post_id, sets.get('tag_ids', '')) return post_id return False except: return False
def test_editor_get(self): user_id = "testuser" with self.client: # access to editor should be forbidden before login response = self.client.get("/blog/editor/") self.assertEqual(response.status_code, 401) response = self.client.get("/blog/editor/1/") self.assertEqual(response.status_code, 401) self.login(user_id) self.assertEquals(current_user.get_id(), user_id) response = self.client.get("/blog/editor/") assert response.status_code == 200 for i in range(1, 21): # logged in user can edit their post, and will be redirected # if they try to edit other's post response = self.client.get("/blog/editor/%d/" % i) expected_status_code = 200 if i <= 10 else 302 self.assertEqual(response.status_code, expected_status_code, "Error for item %d %d" % (i, response.status_code)) # logout and the access should be gone again self.logout() response = self.client.get("/blog/editor/") self.assertEqual(response.status_code, 401) response = self.client.get("/blog/editor/1/") self.assertEqual(response.status_code, 401)
def test_editor_post(self): user_id = "testuser" with self.client: # access to editor should be forbidden before login response = self.client.get("/blog/page/21/", follow_redirects=True) assert "The page you are trying to access is not valid!" in \ str(response.data) response = self.client.post("/blog/editor/") self.assertEqual(response.status_code, 401) response = self.client.post("/blog/editor/1/") self.assertEqual(response.status_code, 401) self.login(user_id) self.assertEquals(current_user.get_id(), user_id) response = self.client.post("/blog/editor/", data=dict(text="Test Text", tags="tag1, tag2")) # should give back the editor page self.assertEqual(response.status_code, 200) response = self.client.post("/blog/editor/", data=dict(title="Test Title", text="Test Text", tags="tag1, tag2")) self.assertEqual(response.status_code, 302) response = self.client.get("/blog/page/21/") self.assertEqual(response.status_code, 200)
def test_permissions_delete(self): self.app.config["BLOGGING_PERMISSIONS"] = True user_id = "testuser" self._set_identity_loader() with self.client: # Anonymous user cannot delete response = self.client.post("/blog/delete/1/") self.assertEqual(response.status_code, 401) self.login(user_id) # non blogger cannot delete posts response = self.client.post("/blog/delete/1/") self.assertEqual(response.status_code, 302) # will be redirected self.logout() self.login(user_id, blogger=True) response = self.client.post("/blog/delete/1/", follow_redirects=True) assert "Your post was successfully deleted" in str(response.data) # a user cannot delete another person's post self.assertEquals(current_user.get_id(), user_id) response = self.client.post("/blog/delete/11/", follow_redirects=True) assert "You do not have the rights to delete this post" in \ str(response.data)
def recovery(token): if token not in recoverymap: flash("Recovery Token Expired", "danger") return redirect("/login") else: user = ldaptools.getuser(recoverymap[token]) login_user(user) del recoverymap[token] flash("Logged in as %s using recovery token." % user.get_id(), "success") return render_template("account_reset.html")
def recovery_update_account(): try: email = request.form["email"] result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "userPassword", ldaptools.makeSecret(request.form["password"])) assert(result) result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "email", email) assert(result) flash("Account updated", "success") except Exception: flash("Update failed", "danger") app.logger.info('User account {0} infos changed'.format(current_user.get_id())) return redirect("/account")
def update_account(): email = request.form["email"] oldpassword = request.form["oldpassword"] api_id = request.form["api_id"] api_key = request.form["api_key"] update_needed = False if api_id != current_user.keyID[0] or api_key != current_user.vCode[0]: update_needed = True if not ldaptools.check_credentials(current_user.get_id(), oldpassword): flash("You must confirm your old password to update your account.", "danger") return redirect("/account") try: if all(x in request.form for x in ["password", "password_confirm", "oldpassword"]): if request.form["password"] != request.form["password_confirm"]: flash("Password confirmation mismatch.", "danger") return redirect("/account") result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "userPassword", ldaptools.makeSecret(request.form["password"])) assert(result) result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "email", email) assert(result) if "api_id" in request.form: result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "keyID", api_id) assert(result) result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "vCode", api_key) assert(result) flash("Account updated", "success") except Exception: flash("Update failed", "danger") if update_needed is True: update_characters([current_user.get_id()]) app.logger.info('User account {0} infos changed'.format(current_user.get_id())) return redirect("/account")
def groupadmin(): if "admin" in current_user.authGroup: groups = groups=app.config["groups"]["closedgroups"]+app.config["groups"]["opengroups"] else: groups = map(lambda x:x[6:], filter(lambda x:x.startswith("admin-"), current_user.authGroup)) pendingusers = ldaptools.getusers("authGroup=*-pending") applications = [] for user in pendingusers: for group in user.get_pending_authgroups(): if group in groups: applications.append((user.get_id(), group)) return render_template("groupsadmin.html", applications=applications, groups=groups)
def ping_send(): servers = map(lambda x:x + config["auth"]["domain"], ["allies.", "", "public."]) servers = filter(lambda x:x in request.form, servers) pingbot.broadcast(current_user.get_id(),",".join(servers), request.form["message"], servers) flash("Broadcasts sent to: "+", ".join(servers), "success") return redirect("/ping")
def ping_send_group(): if ("ping" not in current_user.get_authgroups()) and ("ping-%s" % request.form["group"] not in current_user.get_authgroups()): flash("You do not have the right to do that.", "danger") return redirect("/ping") count = pingbot.groupbroadcast(current_user.get_id(), "(|(authGroup={0})(corporation={0})(alliance={0}))".format(request.form["group"]), request.form["message"], request.form["group"]) flash("Broadcast sent to %d members in %s" % (count, request.form["group"]), "success") return redirect("/ping")
def ping_send_advgroup(): ldap_filter = "("+request.form["filter"]+")" message = request.form["message"] count = pingbot.groupbroadcast(current_user.get_id(), ldap_filter, message, ldap_filter) flash("Broadcast sent to %d members in %s" % (count, ldap_filter), "success") return redirect("/ping")
def refresh_ts3id(): ts3ids = current_user.ts3uid ts3group = { "Internal": app.config["ts3"]["servergroups"]["full"], "Ally": app.config["ts3"]["servergroups"]["ally"], "Ineligible": app.config["ts3"]["servergroups"]["none"] } results = [] for ts3id in ts3ids: results.append(ts3manager.modpermissions(ts3id, groupid=ts3group[current_user.accountStatus[0]])) flash("Results:"+str(results), "info") ts3manager.update_groups([ldaptools.getuser(current_user.get_id())]) return redirect("/services")
def reddit_loop(): query = request.args result = reddittools.verify_token( current_user.get_id(), query) if result: user = load_user(current_user.get_id()) flash("Successfully updated or added reddit account: %s." % (user.redditName[0],), "success") else: flash("Failed to update reddit account.", "danger") return redirect(url_for("index"))
def reservations(): user = User.query.get(current_user.get_id()) reservations_as_host = Reservation.query.filter( JobTask.host_id == current_user.get_id() and len(JobTask.reservations) > 0 ).join(JobTask).filter(Reservation.job_task_id == JobTask.id).all() reservations_as_guest = user.reservations return view_with_params('reservations', reservations_as_guest=reservations_as_guest, reservations_as_host=reservations_as_host)
def login_get(): current_user_id = current_user.get_id() # Don't want to compare Nonetype to ints later if current_user_id is not None: current_user_id = int(current_user_id) user_object = session.query(User).filter_by(id=current_user_id).one() flash(user_object.name + ", you are logged in. Feel free to login if" + " you are someone else though.", "warning") else: current_user is None return render_template("login.html")
def edit_entry_get(id): entry = session.query(Entry).get(id) current_user_id = current_user.get_id() if current_user_id is None: raise Forbidden('Only entry author can delete it.') else: if int(entry.author_id) != int(current_user.get_id()): raise Forbidden('Only the entry author can delete it.') return render_template("edit_entry.html", entry=entry)
def delete_entry_get(id): entry = session.query(Entry).get(id) current_user_id = current_user.get_id() if current_user_id is None: raise Forbidden('Only entry author can delete it.') else: #print(current_user.get_id()) if (int(entry.author_id) != int(current_user.get_id()) and int(current_user.get_id()) != 10): raise Forbidden('Only the entry author can delete it.') return render_template("delete_entry.html", entry=entry)
def draft_post(post_id=None, **kwargs): """ ?????????????? :param post_id: ?? id,????None :param kwargs: ??????? :return: ?????? id,???? False """ sets = dict(kwargs) if not sets.get('type'): sets['type'] = 'post' if not sets.get('slug'): sets['slug'] = get_slug(sets.get('title')) slug_count = mongo.db.posts.count({'slug': sets.get('slug'), 'post_id': {'$ne': post_id}}) if slug_count > 0: new_slug = '-'.join([sets.get('slug'), str(slug_count+1)]) while mongo.db.posts.count({'slug': new_slug}): slug_count += 1 new_slug = '-'.join([sets.get('slug'), str(slug_count+1)]) sets['slug'] = new_slug sets.update({ 'status': 'draft', 'update_at': int(time.time()) }) if not sets.get('update_by'): sets['update_by'] = str(current_user.get_id()) if sets.get('create_at'): del sets['create_at'] if sets.get('markdown'): sets['html'] = parse_markdown(sets['markdown']) if not sets.get('excerpt'): sets['excerpt'] = get_excerpt(sets['html'], count=Setting.get_setting('excerpt_length', 120), wrapper=u'') try: result = mongo.db.posts.update_one({ 'post_id': post_id }, { '$set': sets, '$setOnInsert': { 'post_id': get_next_sequence('post_id'), 'create_at': int(time.time()) } }, upsert=True) if result and result.modified_count > 0: pid = post_id Post.update_post_tags(pid, sets.get('tag_ids', '')) return pid if result and result.upserted_id: pid = mongo.db.posts.find_one({'_id': result.upserted_id}).get('post_id') Post.update_post_tags(pid, sets.get('tag_ids', '')) return pid return False except: return False
def redis_cached(timeout=None, key_prefix='view/%s', unless=None): """ ????????? :param timeout: ??????, ????????3600? :param key_prefix: ??? :param unless: ?????????? :return: ??????????? """ def decorator(f): @functools.wraps(f) # ?????? def decorated_function(*args, **kwargs): if callable(unless) and unless() is True: return f(*args, **kwargs) if kwargs.get('nocache'): return f(*args, **kwargs) # ????????? nocache ??????? try: cache_key = decorated_function.make_cache_key(*args, **kwargs) cache_key = urllib.quote(cache_key, safe='') rv = redis_get(cache_key) except Exception: if current_app.debug: raise return f(*args, **kwargs) if rv is None: rv = f(*args, **kwargs) try: redis_set(cache_key, rv, timeout=decorated_function.cache_timeout) except Exception: if current_app.debug: raise return f(*args, **kwargs) return rv def make_cache_key(*args, **kwargs): if callable(key_prefix): cache_key = key_prefix() elif '%s' in key_prefix: cache_key = key_prefix % (request.url+'_uid_'+str(current_user.get_id())) else: cache_key = key_prefix cache_key = hashlib.md5(cache_key.encode('utf-8')).hexdigest() cache_key = '_'.join((get_version(level='day'), cache_key)) return cache_key decorated_function.uncached = f decorated_function.cache_timeout = timeout decorated_function.make_cache_key = make_cache_key return decorated_function return decorator