Python flask.ext.login.current_user 模块,get_id() 实例源码

我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用flask.ext.login.current_user.get_id()

项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_delete(self):
        user_id = "testuser"
        with self.client:

            # Anonymous user cannot delete
            response = self.client.post("/blog/delete/1/")
            self.assertEqual(response.status_code, 401)

            # a user cannot delete another person's post
            self.login(user_id)
            self.assertEquals(current_user.get_id(), user_id)
            response = self.client.post("/blog/delete/11/",
                                        follow_redirects=True)
            assert "You do not have the rights to delete this post" in \
                   str(response.data)

            # a user can delete his posts
            response = self.client.post("/blog/delete/1/",
                                        follow_redirects=True)
            assert "Your post was successfully deleted" in str(response.data)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_delete(self):
        user_id = "testuser"
        with self.client:

            # Anonymous user cannot delete
            response = self.client.post("/blog/delete/1/")
            self.assertEqual(response.status_code, 401)

            # a user cannot delete another person's post
            self.login(user_id)
            self.assertEquals(current_user.get_id(), user_id)
            response = self.client.post("/blog/delete/11/",
                                        follow_redirects=True)
            assert "You do not have the rights to delete this post" in \
                   str(response.data)

            # a user can delete his posts
            response = self.client.post("/blog/delete/1/",
                                        follow_redirects=True)
            assert "Your post was successfully deleted" in str(response.data)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_delete(self):
        user_id = "testuser"
        with self.client:

            # Anonymous user cannot delete
            response = self.client.post("/blog/delete/1/")
            self.assertEqual(response.status_code, 401)

            # a user cannot delete another person's post
            self.login(user_id)
            self.assertEquals(current_user.get_id(), user_id)
            response = self.client.post("/blog/delete/11/",
                                        follow_redirects=True)
            assert "You do not have the rights to delete this post" in \
                   str(response.data)

            # a user can delete his posts
            response = self.client.post("/blog/delete/1/",
                                        follow_redirects=True)
            assert "Your post was successfully deleted" in str(response.data)
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def group_apply(group):
    originalgroup = group
    group = str(group)
    assert(group in app.config["groups"]["closedgroups"]+app.config["groups"]["opengroups"])
    join = True
    if group in app.config["groups"]["closedgroups"]:
        group = group+"-pending"
        join = False
    if current_user.accountStatus[0]=="Ineligible":
        if group not in app.config["groups"]["publicgroups"]:
            flash("You cannot join that group.", "danger")
            return redirect("/groups")
    ldaptools.modgroup(current_user.get_id() , MOD_ADD, group)
    if join:
        flash("Joined %s group" % group, "success")
    else:
        flash("Applied for %s group" % originalgroup, "success")
    return redirect("/groups")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def add_tss3id():
    ts3id = str(request.form["ts3id"])
    if len(ts3id)==0:
        flash("You must enter a non-null TeamSpeak 3 Identity.", "danger")
        return redirect("/services")
    elif len(ts3id)!=28:
        flash("The TeamSpeak 3 Identity you entered is of an invalid length.", "danger")
        return redirect("/services")
    elif ts3id[len(ts3id)-1]!="=":
        flash("The TeamSpeak 3 Identity you entered is in an invalid format.", "danger")
        return redirect("/services")
    ts3group = {
            "Internal": app.config["ts3"]["servergroups"]["full"],
            "Ally": app.config["ts3"]["servergroups"]["ally"],
            "Ineligible": app.config["ts3"]["servergroups"]["none"]
            }
    ldaptools.modts3id(current_user.get_id() , MOD_ADD, ts3id)
    result = ts3manager.modpermissions(ts3id, groupid=ts3group[current_user.accountStatus[0]])
    ts3manager.update_groups([ldaptools.getuser(current_user.get_id())])
    if result:
        flash("TS3 ID added and auth requested.", "success")
    else:
        flash("Something blew up.", "error")
    return redirect("/services")
项目:minitweet    作者:alifaki077    | 项目源码 | 文件源码
def test_correct_login(self):
        with self.client:
            response = self.login("admin", "adminpassword")

            # 200 (OK) HTTP status code
            self.assert200(response)

            # check if user is authenticated
            self.assertTrue(current_user.is_authenticated())
            # check if user is not anonymous
            self.assertFalse(current_user.is_anonymous())
            # get user id
            self.assertEqual(current_user.get_id(), "1")
            # test user redirects to the main page
            self.assertIn('/posts', request.url)

            # Ensure alert is shown after logging in
            # Binary format because str() object doesn't support Buffer api
            self.assertIn(b'you were just logged in', response.data)
项目:gig    作者:shaiwilson    | 项目源码 | 文件源码
def new_reservation(property_id):
    job_task = None
    form = ApplicationForm()
    form.property_id.data = property_id

    if request.method == 'POST':
        if form.validate_on_submit():
            guest = User.query.get(current_user.get_id())
            job_task = JobTask.query.get(form.property_id.data)

            reservation = Reservation(form.message.data, job_task, guest)
            db.session.add(reservation)
            db.session.commit()

            gigaware.sms.sms_client.notify_host(reservation)

            return redirect_to('listings')

    if property_id is not None:
        job_task = JobTask.query.get(property_id)

    return view_with_params('reservation', job_task=job_task, form=form)
项目:gig    作者:shaiwilson    | 项目源码 | 文件源码
def new_property():
    form = JobListingForm()
    if request.method == 'POST':
        if form.validate_on_submit():
            host = User.query.get(current_user.get_id())

            new_task = JobTask(
                form.description.data,
                form.image_url.data,
                form.city.data,
                form.country.data,
                form.zip_code.data,
                form.details.data,
                form.price.data,
                form.currency.data,
                host)
            db.session.add(new_task)
            db.session.commit()
            return redirect_to('listings')

    return view('listings_new', form)
项目:WinTheMini    作者:j10sanders    | 项目源码 | 文件源码
def follow_post(id):
    user = session.query(User).filter_by(id=id).one()
    cuid = current_user.get_id()
    cuser = session.query(User).filter_by(id=cuid).one()
    try:
        if 'Unfollow' in request.form:
            session.add(cuser.unfollow(user))
            session.commit()
            flash("Good call unfollowing " + user.name +
                  ".  Nobody needs that.", "success")
            return redirect(url_for("entries"))
        elif 'Follow' in request.form:
            session.add(cuser.follow(user))
            session.commit()
            flash("You are now following " + user.name +
                  ".", "success")
            return redirect(url_for("stats_get"))
    except (IntegrityError, StaleDataError, UnmappedInstanceError):
        flash("That didn't work... mind letting Jon know?", "danger")
        session.rollback()
        return redirect(url_for("entries"))
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def update_post(post_id, **kwargs):
        """
        ????
        :param post_id: ?? id
        :param kwargs: ???????
        :return: ?????? id,???? False
        """
        sets = dict(kwargs)
        sets.update({
            'update_at': int(time.time())
        })
        if not sets.get('slug'):
            sets['slug'] = get_slug(sets.get('title'))
        slug_count = mongo.db.posts.count({'slug': sets.get('slug'), 'post_id': {'$ne': post_id}})
        if slug_count > 0:
            new_slug = '-'.join([sets.get('slug'), str(slug_count+1)])
            while mongo.db.posts.count({'slug': new_slug}):
                slug_count += 1
                new_slug = '-'.join([sets.get('slug'), str(slug_count+1)])
            sets['slug'] = new_slug
        if not sets.get('update_by'):
            sets['update_by'] = str(current_user.get_id())
        if sets.get('create_at'):
            del sets['create_at']
        if sets.get('markdown'):
            sets['html'] = parse_markdown(sets['markdown'])
        if not sets.get('excerpt') and sets.get('html'):
            sets['excerpt'] = get_excerpt(sets['html'], count=Setting.get_setting('excerpt_length', 120), wrapper=u'')
        try:
            result = mongo.db.posts.update_one({
                'post_id': post_id
            }, {
                '$set': sets
            })
            if result and result.modified_count > 0:
                Post.update_post_tags(post_id, sets.get('tag_ids', ''))
                return post_id
            return False
        except:
            return False
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_editor_get(self):
        user_id = "testuser"
        with self.client:
            # access to editor should be forbidden before login
            response = self.client.get("/blog/editor/")
            self.assertEqual(response.status_code, 401)

            response = self.client.get("/blog/editor/1/")
            self.assertEqual(response.status_code, 401)

            self.login(user_id)
            self.assertEquals(current_user.get_id(), user_id)
            response = self.client.get("/blog/editor/")
            assert response.status_code == 200

            for i in range(1, 21):
                # logged in user can edit their post, and will be redirected
                # if they try to edit other's post
                response = self.client.get("/blog/editor/%d/" % i)
                expected_status_code = 200 if i <= 10 else 302
                self.assertEqual(response.status_code, expected_status_code,
                                 "Error for item %d %d" %
                                 (i, response.status_code))
            # logout and the access should be gone again
            self.logout()
            response = self.client.get("/blog/editor/")
            self.assertEqual(response.status_code, 401)

            response = self.client.get("/blog/editor/1/")
            self.assertEqual(response.status_code, 401)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_editor_post(self):
        user_id = "testuser"
        with self.client:
            # access to editor should be forbidden before login
            response = self.client.get("/blog/page/21/",
                                       follow_redirects=True)
            assert "The page you are trying to access is not valid!" in \
                   str(response.data)

            response = self.client.post("/blog/editor/")
            self.assertEqual(response.status_code, 401)

            response = self.client.post("/blog/editor/1/")
            self.assertEqual(response.status_code, 401)

            self.login(user_id)
            self.assertEquals(current_user.get_id(), user_id)

            response = self.client.post("/blog/editor/",
                                        data=dict(text="Test Text",
                                                  tags="tag1, tag2"))
            # should give back the editor page
            self.assertEqual(response.status_code, 200)

            response = self.client.post("/blog/editor/",
                                        data=dict(title="Test Title",
                                                  text="Test Text",
                                                  tags="tag1, tag2"))
            self.assertEqual(response.status_code, 302)

            response = self.client.get("/blog/page/21/")
            self.assertEqual(response.status_code, 200)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_permissions_delete(self):
        self.app.config["BLOGGING_PERMISSIONS"] = True
        user_id = "testuser"
        self._set_identity_loader()

        with self.client:
            # Anonymous user cannot delete
            response = self.client.post("/blog/delete/1/")
            self.assertEqual(response.status_code, 401)

            self.login(user_id)
            # non blogger cannot delete posts
            response = self.client.post("/blog/delete/1/")
            self.assertEqual(response.status_code, 302)  # will be redirected
            self.logout()

            self.login(user_id, blogger=True)
            response = self.client.post("/blog/delete/1/",
                                        follow_redirects=True)
            assert "Your post was successfully deleted" in str(response.data)

            # a user cannot delete another person's post
            self.assertEquals(current_user.get_id(), user_id)
            response = self.client.post("/blog/delete/11/",
                                        follow_redirects=True)
            assert "You do not have the rights to delete this post" in \
                   str(response.data)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_editor_get(self):
        user_id = "testuser"
        with self.client:
            # access to editor should be forbidden before login
            response = self.client.get("/blog/editor/")
            self.assertEqual(response.status_code, 401)

            response = self.client.get("/blog/editor/1/")
            self.assertEqual(response.status_code, 401)

            self.login(user_id)
            self.assertEquals(current_user.get_id(), user_id)
            response = self.client.get("/blog/editor/")
            assert response.status_code == 200

            for i in range(1, 21):
                # logged in user can edit their post, and will be redirected
                # if they try to edit other's post
                response = self.client.get("/blog/editor/%d/" % i)
                expected_status_code = 200 if i <= 10 else 302
                self.assertEqual(response.status_code, expected_status_code,
                                 "Error for item %d %d" %
                                 (i, response.status_code))
            # logout and the access should be gone again
            self.logout()
            response = self.client.get("/blog/editor/")
            self.assertEqual(response.status_code, 401)

            response = self.client.get("/blog/editor/1/")
            self.assertEqual(response.status_code, 401)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_permissions_delete(self):
        self.app.config["BLOGGING_PERMISSIONS"] = True
        user_id = "testuser"
        self._set_identity_loader()

        with self.client:
            # Anonymous user cannot delete
            response = self.client.post("/blog/delete/1/")
            self.assertEqual(response.status_code, 401)

            self.login(user_id)
            # non blogger cannot delete posts
            response = self.client.post("/blog/delete/1/")
            self.assertEqual(response.status_code, 302)  # will be redirected
            self.logout()

            self.login(user_id, blogger=True)
            response = self.client.post("/blog/delete/1/",
                                        follow_redirects=True)
            assert "Your post was successfully deleted" in str(response.data)

            # a user cannot delete another person's post
            self.assertEquals(current_user.get_id(), user_id)
            response = self.client.post("/blog/delete/11/",
                                        follow_redirects=True)
            assert "You do not have the rights to delete this post" in \
                   str(response.data)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_editor_get(self):
        user_id = "testuser"
        with self.client:
            # access to editor should be forbidden before login
            response = self.client.get("/blog/editor/")
            self.assertEqual(response.status_code, 401)

            response = self.client.get("/blog/editor/1/")
            self.assertEqual(response.status_code, 401)

            self.login(user_id)
            self.assertEquals(current_user.get_id(), user_id)
            response = self.client.get("/blog/editor/")
            assert response.status_code == 200

            for i in range(1, 21):
                # logged in user can edit their post, and will be redirected
                # if they try to edit other's post
                response = self.client.get("/blog/editor/%d/" % i)
                expected_status_code = 200 if i <= 10 else 302
                self.assertEqual(response.status_code, expected_status_code,
                                 "Error for item %d %d" %
                                 (i, response.status_code))
            # logout and the access should be gone again
            self.logout()
            response = self.client.get("/blog/editor/")
            self.assertEqual(response.status_code, 401)

            response = self.client.get("/blog/editor/1/")
            self.assertEqual(response.status_code, 401)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def test_editor_post(self):
        user_id = "testuser"
        with self.client:
            # access to editor should be forbidden before login
            response = self.client.get("/blog/page/21/",
                                       follow_redirects=True)
            assert "The page you are trying to access is not valid!" in \
                   str(response.data)

            response = self.client.post("/blog/editor/")
            self.assertEqual(response.status_code, 401)

            response = self.client.post("/blog/editor/1/")
            self.assertEqual(response.status_code, 401)

            self.login(user_id)
            self.assertEquals(current_user.get_id(), user_id)

            response = self.client.post("/blog/editor/",
                                        data=dict(text="Test Text",
                                                  tags="tag1, tag2"))
            # should give back the editor page
            self.assertEqual(response.status_code, 200)

            response = self.client.post("/blog/editor/",
                                        data=dict(title="Test Title",
                                                  text="Test Text",
                                                  tags="tag1, tag2"))
            self.assertEqual(response.status_code, 302)

            response = self.client.get("/blog/page/21/")
            self.assertEqual(response.status_code, 200)
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def recovery(token):
    if token not in recoverymap:
        flash("Recovery Token Expired", "danger")
        return redirect("/login")
    else:
        user = ldaptools.getuser(recoverymap[token])
        login_user(user)
        del recoverymap[token]
        flash("Logged in as %s using recovery token." % user.get_id(), "success")
        return render_template("account_reset.html")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def recovery_update_account():
    try:
        email = request.form["email"]
        result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "userPassword", ldaptools.makeSecret(request.form["password"]))
        assert(result)
        result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "email", email)
        assert(result)
        flash("Account updated", "success")
    except Exception:
        flash("Update failed", "danger")
    app.logger.info('User account {0} infos changed'.format(current_user.get_id()))
    return redirect("/account")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def update_account():
    email = request.form["email"]
    oldpassword = request.form["oldpassword"]
    api_id = request.form["api_id"]
    api_key = request.form["api_key"]
    update_needed = False
    if api_id != current_user.keyID[0] or api_key != current_user.vCode[0]:
        update_needed = True
    if not ldaptools.check_credentials(current_user.get_id(), oldpassword):
        flash("You must confirm your old password to update your account.", "danger")
        return redirect("/account")
    try:
        if all(x in request.form for x in ["password", "password_confirm", "oldpassword"]):
            if request.form["password"] != request.form["password_confirm"]:
                flash("Password confirmation mismatch.", "danger")
                return redirect("/account")
            result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "userPassword", ldaptools.makeSecret(request.form["password"]))
            assert(result)
        result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "email", email)
        assert(result)
        if "api_id" in request.form:
            result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "keyID", api_id)
            assert(result)
            result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "vCode", api_key)
            assert(result)
        flash("Account updated", "success")
    except Exception:
        flash("Update failed", "danger")
    if update_needed is True:
        update_characters([current_user.get_id()])
    app.logger.info('User account {0} infos changed'.format(current_user.get_id()))
    return redirect("/account")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def groupadmin():
    if "admin" in current_user.authGroup:
        groups = groups=app.config["groups"]["closedgroups"]+app.config["groups"]["opengroups"]
    else:
        groups = map(lambda x:x[6:], filter(lambda x:x.startswith("admin-"), current_user.authGroup))
    pendingusers = ldaptools.getusers("authGroup=*-pending")
    applications = []
    for user in pendingusers:
        for group in user.get_pending_authgroups():
            if group in groups:
                applications.append((user.get_id(), group))
    return render_template("groupsadmin.html", applications=applications, groups=groups)
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def ping_send():
    servers = map(lambda x:x + config["auth"]["domain"], ["allies.", "", "public."])
    servers = filter(lambda x:x in request.form, servers)
    pingbot.broadcast(current_user.get_id(),",".join(servers), request.form["message"], servers)
    flash("Broadcasts sent to: "+", ".join(servers), "success")
    return redirect("/ping")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def ping_send_group():
    if ("ping" not in current_user.get_authgroups()) and ("ping-%s" % request.form["group"] not in current_user.get_authgroups()):
        flash("You do not have the right to do that.", "danger")
        return redirect("/ping")
    count = pingbot.groupbroadcast(current_user.get_id(), "(|(authGroup={0})(corporation={0})(alliance={0}))".format(request.form["group"]), request.form["message"], request.form["group"])
    flash("Broadcast sent to %d members in %s" % (count, request.form["group"]), "success")
    return redirect("/ping")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def ping_send_advgroup():
    ldap_filter = "("+request.form["filter"]+")"
    message = request.form["message"]
    count = pingbot.groupbroadcast(current_user.get_id(), ldap_filter, message, ldap_filter)
    flash("Broadcast sent to %d members in %s" % (count, ldap_filter), "success")
    return redirect("/ping")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def refresh_ts3id():
    ts3ids = current_user.ts3uid
    ts3group = {
            "Internal": app.config["ts3"]["servergroups"]["full"],
            "Ally": app.config["ts3"]["servergroups"]["ally"],
            "Ineligible": app.config["ts3"]["servergroups"]["none"]
            }
    results = []
    for ts3id in ts3ids:
        results.append(ts3manager.modpermissions(ts3id, groupid=ts3group[current_user.accountStatus[0]]))
    flash("Results:"+str(results), "info")
    ts3manager.update_groups([ldaptools.getuser(current_user.get_id())])
    return redirect("/services")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def reddit_loop():
    query = request.args
    result = reddittools.verify_token(
            current_user.get_id(),
            query)

    if result:
        user = load_user(current_user.get_id())
        flash("Successfully updated or added reddit account: %s." % (user.redditName[0],), "success")
    else:
        flash("Failed to update reddit account.", "danger")

    return redirect(url_for("index"))
项目:gig    作者:shaiwilson    | 项目源码 | 文件源码
def reservations():
    user = User.query.get(current_user.get_id())

    reservations_as_host = Reservation.query.filter(
        JobTask.host_id == current_user.get_id() and
        len(JobTask.reservations) > 0
    ).join(JobTask).filter(Reservation.job_task_id == JobTask.id).all()

    reservations_as_guest = user.reservations

    return view_with_params('reservations',
                            reservations_as_guest=reservations_as_guest,
                            reservations_as_host=reservations_as_host)
项目:WinTheMini    作者:j10sanders    | 项目源码 | 文件源码
def login_get():
    current_user_id = current_user.get_id()
    # Don't want to compare Nonetype to ints later
    if current_user_id is not None:
        current_user_id = int(current_user_id)
        user_object = session.query(User).filter_by(id=current_user_id).one()

        flash(user_object.name + ", you are logged in.  Feel free to login if" +
        " you are someone else though.", "warning")
    else:
        current_user is None

    return render_template("login.html")
项目:WinTheMini    作者:j10sanders    | 项目源码 | 文件源码
def edit_entry_get(id):
    entry = session.query(Entry).get(id)
    current_user_id = current_user.get_id()
    if current_user_id is None:
        raise Forbidden('Only entry author can delete it.')
    else:
        if int(entry.author_id) != int(current_user.get_id()):
            raise Forbidden('Only the entry author can delete it.')
    return render_template("edit_entry.html", entry=entry)
项目:WinTheMini    作者:j10sanders    | 项目源码 | 文件源码
def delete_entry_get(id):
    entry = session.query(Entry).get(id)
    current_user_id = current_user.get_id()
    if current_user_id is None:
        raise Forbidden('Only entry author can delete it.')
    else:
        #print(current_user.get_id())
        if (int(entry.author_id) != int(current_user.get_id()) and 
        int(current_user.get_id()) != 10):
            raise Forbidden('Only the entry author can delete it.')
    return render_template("delete_entry.html", entry=entry)
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def draft_post(post_id=None, **kwargs):
        """
        ??????????????
        :param post_id: ?? id,????None
        :param kwargs: ???????
        :return: ?????? id,???? False
        """
        sets = dict(kwargs)
        if not sets.get('type'):
            sets['type'] = 'post'
        if not sets.get('slug'):
            sets['slug'] = get_slug(sets.get('title'))
        slug_count = mongo.db.posts.count({'slug': sets.get('slug'), 'post_id': {'$ne': post_id}})
        if slug_count > 0:
            new_slug = '-'.join([sets.get('slug'), str(slug_count+1)])
            while mongo.db.posts.count({'slug': new_slug}):
                slug_count += 1
                new_slug = '-'.join([sets.get('slug'), str(slug_count+1)])
            sets['slug'] = new_slug
        sets.update({
            'status': 'draft',
            'update_at': int(time.time())
        })
        if not sets.get('update_by'):
            sets['update_by'] = str(current_user.get_id())
        if sets.get('create_at'):
            del sets['create_at']
        if sets.get('markdown'):
            sets['html'] = parse_markdown(sets['markdown'])
        if not sets.get('excerpt'):
            sets['excerpt'] = get_excerpt(sets['html'], count=Setting.get_setting('excerpt_length', 120), wrapper=u'')
        try:
            result = mongo.db.posts.update_one({
                'post_id': post_id
            }, {
                '$set': sets,
                '$setOnInsert': {
                    'post_id': get_next_sequence('post_id'),
                    'create_at': int(time.time())
                }
            }, upsert=True)
            if result and result.modified_count > 0:
                pid = post_id
                Post.update_post_tags(pid, sets.get('tag_ids', ''))
                return pid
            if result and result.upserted_id:
                pid = mongo.db.posts.find_one({'_id': result.upserted_id}).get('post_id')
                Post.update_post_tags(pid, sets.get('tag_ids', ''))
                return pid

            return False
        except:
            return False
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def redis_cached(timeout=None, key_prefix='view/%s', unless=None):
    """
    ?????????
    :param timeout: ??????, ????????3600?
    :param key_prefix: ???
    :param unless: ??????????
    :return: ???????????
    """
    def decorator(f):
        @functools.wraps(f)  # ??????
        def decorated_function(*args, **kwargs):
            if callable(unless) and unless() is True:
                return f(*args, **kwargs)

            if kwargs.get('nocache'):
                return f(*args, **kwargs)  # ????????? nocache ???????

            try:
                cache_key = decorated_function.make_cache_key(*args, **kwargs)
                cache_key = urllib.quote(cache_key, safe='')
                rv = redis_get(cache_key)
            except Exception:
                if current_app.debug:
                    raise
                return f(*args, **kwargs)

            if rv is None:
                rv = f(*args, **kwargs)
                try:
                    redis_set(cache_key, rv, timeout=decorated_function.cache_timeout)
                except Exception:
                    if current_app.debug:
                        raise
                    return f(*args, **kwargs)
            return rv

        def make_cache_key(*args, **kwargs):
            if callable(key_prefix):
                cache_key = key_prefix()
            elif '%s' in key_prefix:
                cache_key = key_prefix % (request.url+'_uid_'+str(current_user.get_id()))
            else:
                cache_key = key_prefix
            cache_key = hashlib.md5(cache_key.encode('utf-8')).hexdigest()
            cache_key = '_'.join((get_version(level='day'), cache_key))
            return cache_key

        decorated_function.uncached = f
        decorated_function.cache_timeout = timeout
        decorated_function.make_cache_key = make_cache_key

        return decorated_function
    return decorator