Python flask.session 模块,permanent() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用flask.session.permanent()

项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def login():
    session.permanent = True
    #pdb.set_trace()
    if DISABLE_LOGIN:
        flash('error:Login is disable because of many failed login attempts!')
        return render_template('login/login.html', disable=True)

    if request.method == 'POST':
        user = request.form['user']
        pawd = request.form['chaabi']

        if not authenticate(user, pawd):
            guard('POST')
            flash("error:Invalid Username or Password!")
            #return render_template('login/login.html')
        else:
            flash("info:Login Successful!")
            user = User("test_user")
            login_user(user)
            return redirect("/blog")
    guard('GET')
    return render_template('login/login.html')
项目:apiTest    作者:wuranxu    | 项目源码 | 文件源码
def login():
    db = UserDb(app.config['LOCAL_DB'])
    form = request.form
    user = form.get('user')
    pwd = form.get('pwd')
    password = db.login(user)
    del db
    if pwd == password:
        # ??????
        session.permanent = True
        # session????
        app.permanent_session_lifetime = timedelta(minutes=30)
        session.update(dict(user=user))
        return render_template('index.html')
    elif password is None:
        return render_template('login.html', info="??????!")
    else:
        return render_template('login.html', info="?????!")
项目:flask-base    作者:mcescalante    | 项目源码 | 文件源码
def login():
  # Already logged in; return to index
  if current_user.is_authenticated:
    return redirect(url_for('index'))

  # Not logged in; show the login form or errors
  form = LoginForm()
  if form.validate_on_submit():
    user = User.query.filter_by(email = form.email.data).first()
    if user is not None and user.valid_password(form.password.data):
      if login_user(user, remember = form.remember.data):
        session.permanent = not form.remember.data
        #Need to add proper message flashing code to base.html
        user.lastLoggedIn = datetime.datetime.now()
        db.session.commit()
        flash('Logged in successfully!', category = 'success')
        return redirect(request.args.get('next') or url_for('index'))
      else:
          flash('This username is disabled', 'danger')
    else:
        flash('Wrong username or password', 'danger')

  return render_template('account/login.html', title = 'Login', form = form)
项目:SDV-Summary    作者:Sketchy502    | 项目源码 | 文件源码
def login():
    page_init()
    session.permanent = True
    if logged_in():
        return redirect(url_for('home'))
    if request.method == 'POST':
        if 'email' not in request.form or 'password' not in request.form or request.form['email']=='':
            g.error = _('Missing email or password for login!')
        else:
            pw = check_user_pw(request.form['email'],request.form['password'])
            if pw['result'] == False:
                g.error = pw['error']
            elif pw['result'] == None:
                flash({'message':'<p>'+_('Please reset your password to log in!')+'</p>'})
                return redirect(url_for('reset_password'))
            else:
                flash({'message':'<p>'+_('Logged in successfully!')+'</p>'})
                redirect_url = session.get('login_redir')
                if redirect_url:
                    session.pop('login_redir')
                    return redirect(redirect_url)
                else:
                    return redirect(url_for('home'))
    return render_template("login.html",**page_args())
项目:flask_skeleton    作者:Bleezworld    | 项目源码 | 文件源码
def login():
    """login as our user.
    input: LoginRequest (if POST)
    output: ok.

    Request:
        ?email=xx&password=xx[&remember_me=1] (if GET, else arguments in LoginRequest)
    """
    if request.method == "POST":
        input_pb = protobuf_json.json2pb(all_pbs.LoginRequest(), request.get_json())
    else:
        input_pb = all_pbs.LoginRequest()
        input_pb.email = request.args.get("email", "")
        input_pb.password = request.args.get("password", "")
        input_pb.remember = request.args.get("remember", "") == "1"
    table = data_models.GetTable(data_models.RW_USERS)
    user_pb = data_models.ToProto(table.find_one({"info.email": input_pb.email}), data_models.RW_USERS)
    if not user_pb or user_pb.info.password != input_pb.password:
        abort(400)
    user = user_util.FLUser(user_pb)
    flogin.login_user(user, remember=input_pb.remember)
    session.permanent = True
    return 'ok'
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def login(username, password):
    """
    Authenticates a user.
    """

    # Read in submitted username and password
    validate(user_login_schema, {
        "username": username,
        "password": password
    })

    user = safe_fail(api.user.get_user, name=username)
    if user is None:
        raise WebException("Incorrect username.")

    if user.get("disabled", False):
        raise WebException("This account has been disabled.")

    if not user["verified"]:
        raise WebException("This account has not been verified yet.")

    if confirm_password(password, user['password_hash']):
        if not user["verified"]:
            try:
                api.email.send_user_verification_email(username)
                raise WebException("This account is not verified. An additional email has been sent to {}.".format(user["email"]))
            except InternalException as e:
                raise WebException("You have hit the maximum number of verification emails. Please contact support.")

        if debug_disable_general_login:
            if session.get('debugaccount', False):
                raise WebException("Correct credentials! But the game has not started yet...")
        if user['uid'] is not None:
            session['uid'] = user['uid']
            session.permanent = True
        else:
            raise WebException("Login Error")
    else:
        raise WebException("Incorrect password")
项目:picoCTF    作者:royragsdale    | 项目源码 | 文件源码
def login(username, password):
    """
    Authenticates a user.
    """

    # Read in submitted username and password
    validate(user_login_schema, {
        "username": username,
        "password": password
    })

    user = safe_fail(api.user.get_user, name=username)
    if user is None:
        raise WebException("Incorrect username.")

    if user.get("disabled", False):
        raise WebException("This account has been disabled.")

    if not user["verified"]:
        raise WebException("This account has not been verified yet.")

    if confirm_password(password, user['password_hash']):
        if not user["verified"]:
            try:
                api.email.send_user_verification_email(username)
                raise WebException("This account is not verified. An additional email has been sent to {}.".format(user["email"]))
            except InternalException as e:
                raise WebException("You have hit the maximum number of verification emails. Please contact support.")

        if debug_disable_general_login:
            if session.get('debugaccount', False):
                raise WebException("Correct credentials! But the game has not started yet...")
        if user['uid'] is not None:
            session['uid'] = user['uid']
            session.permanent = True
        else:
            raise WebException("Login Error")
    else:
        raise WebException("Incorrect password")
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def login(username, password):
    """
    Authenticates a user.
    """

    # Read in submitted username and password
    validate(user_login_schema, {
        "username": username,
        "password": password
    })

    user = safe_fail(api.user.get_user, name=username)
    if user is None:
        raise WebException("Incorrect username.")

    if user.get("disabled", False):
        raise WebException("This account has been disabled.")

    if confirm_password(password, user['password_hash']):
        if debug_disable_general_login:
            if session.get('debugaccount', False):
                raise WebException("Correct credentials! But the game has not started yet...")
        if user['uid'] is not None:
            session['uid'] = user['uid']
            session.permanent = True
        else:
            raise WebException("Login Error")
    else:
        raise WebException("Incorrect Password")
项目:hashtagtodo-open    作者:slackpad    | 项目源码 | 文件源码
def login(provider):
    if request.method == 'POST':
        session.permanent = 'remember' in request.form
        session.modified = True

    response = make_response()
    result = AUTHOMATIC.login(WerkzeugAdapter(request, response), provider)
    if result:
        if result.user:
            result.user.update()
            credentials = result.user.credentials.serialize()
            user = User.create_or_update(provider,
                                         result.user.id,
                                         result.user.email,
                                         result.user.first_name,
                                         result.user.last_name,
                                         credentials)
            session['user'] = user.key.urlsafe()

            # If they are on the freemium list hook them up.
            if (not user.is_premium) and (Freemium.get_by_email(result.user.email) is not None):
                user.is_premium = True
                user.put()
                flash('You\'ve been upgraded to a free premium account for one year!')

            return redirect('/todos')

        return render_template('login.html', result=result)

    return response
项目:freshonions-torscraper    作者:dirtyfilthy    | 项目源码 | 文件源码
def setup_session():

    session.permanent = True
    app.permanent_session_lifetime = timedelta(days=365*30)
    if not 'uuid' in session:
        session['uuid'] = str(uuid.uuid4())
        g.uuid_is_fresh = True
    else:
        g.uuid_is_fresh = False
    now = datetime.now()

    referrer  = request.headers.get('Referer', '')
    path      = request.path
    full_path = request.full_path
    agent     = request.headers.get('User-Agent', '')

    if agent in BLACKLIST_AGENT or len(agent) < 15:
        g.request_log_id = 0
        return render_template('error.html',code=200,message="Layer 8 error. If you want my data, DON'T SCRAPE (too much cpu load), contact me and I will give it to you"), 200

    with db_session:
        req_log   = RequestLog( uuid=session['uuid'], 
                                uuid_is_fresh=g.uuid_is_fresh, 
                                created_at=now, 
                                agent=agent,
                                referrer=referrer,
                                path=path,
                                full_path=full_path)
        flush()
        g.request_log_id = req_log.id
项目:crontab.py    作者:dengmin    | 项目源码 | 文件源码
def login_user(self, user):
        session['logged_in'] = True
        session['user_pk'] = user.get_id()
        session.permanent = True
        g.user = user
        flash('You are logged in as %s' % user, 'success')
项目:neogoso    作者:neogoso    | 项目源码 | 文件源码
def session_reset():
    session.modified = True
    session.permanent = True
    app.permanent_session_lifetime = timedelta(minutes=30)
项目:mmwatch    作者:Zverik    | 项目源码 | 文件源码
def oauth():
    resp = openstreetmap.authorized_response()
    if resp is None:
        return 'Denied. <a href="' + url_for('revert') + '">Try again</a>.'
    session['osm_token'] = (
            resp['oauth_token'],
            resp['oauth_token_secret']
    )
    session.permanent = True
    return redirect(url_for('revert', objects=session.pop('objects')))
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def make_session_permanent():
    session.permanent = True
    app.permanent_session_lifetime = app.config['SESSION_TIMEOUT']
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def login_user(user_id: str, session=session, current_datetime=None):
    session['uid'] = str(user_id)
    session['expire'] = int((
        (current_datetime or timezone.now()) + current_app.config['PERMANENT_SESSION_LIFETIME']).strftime('%s'))
    session.permanent = True
项目:synergy_website    作者:alfredojf    | 项目源码 | 文件源码
def make_session_permanent():
    session.permanent = True
    app.permanent_session_lifetime = timedelta(minutes=45)
    session.modified = True
项目:auxilia    作者:GHP2017    | 项目源码 | 文件源码
def play_page():
    """Returns the play page"""
    session.permanent = True
    if 'tracks' not in session:
        session['tracks'] = {}
    if 'id' not in session:
        session['id'] = uuid4().int
    return app.send_static_file('play.html')

## Admin
项目:website    作者:DiscordEmotes    | 项目源码 | 文件源码
def callback():
    state = session.get('oauth2_state')
    if not state and request.values.get('error'):
        return redirect(url_for('.index'))

    with make_session(state=state) as discord:
        token = discord.fetch_token(DISCORD_TOKEN_URL,
                                    client_secret=current_app.config['OAUTH2_SECRET_KEY'],
                                    authorization_response=request.url)

        session['oauth2_token'] = token
        session.permanent = True
        return redirect(url_for('.guilds'))
项目:Nurevam    作者:Maverun    | 项目源码 | 文件源码
def confirm_login():
    log.info("Checking login....")
    # Check for state and for 0 errors
    state = session.get('oauth2_state')
    if not state or request.values.get('error'):
        return redirect(url_for('index'))

    # Fetch token
    discord = utils.make_session(state=state)
    discord_token = discord.fetch_token(
        data_info.TOKEN_URL,
        client_secret=data_info.OAUTH2_CLIENT_SECRET,
        authorization_response=request.url)
    if not discord_token:
        log.info("Not clear, returning")
        return redirect(url_for('index'))

    # Fetch the user
    user = utils.get_user(discord_token)
    # Generate api_key from user_id
    serializer = JSONWebSignatureSerializer(app.config['SECRET_KEY'])
    api_key = str(serializer.dumps({'user_id': user['id']}))
    # Store api_key
    db.set('user:{}:api_key'.format(user['id']), api_key)
    # Store token
    db.set('user:{}:discord_token'.format(user['id']), json.dumps(discord_token))
    # Store api_token in client session
    api_token = {
        'api_key': api_key,
        'user_id': user['id']
    }
    session.permanent = True
    session['api_token'] = api_token
    log.info("Clear, redirect...")
    return redirect(url_for('after_login'))
项目:paste    作者:NextFloor    | 项目源码 | 文件源码
def view(slug):
    paste = Paste.get_or_404(slug)
    if paste.password:
        form = PasswordForm()
        if form.validate_on_submit():
            if not paste.verify_password(form.password.data):
                flash('????? ???? ????.', 'error')
                return render_template('password.html', form=form)
        else:
            form.flash_errors()
            return render_template('password.html', form=form)

    viewed = session.setdefault('viewed', [])
    if paste.slug not in viewed:
        viewed.append(paste.slug)
        session.permanent = True
        session.modified = True
        paste.view_count += 1
        db.session.add(paste)
        db.session.commit()

    lexer = get_lexer_by_name(paste.lexer)
    formatter = HtmlFormatter(
        linenos=True,
        linespans='line',
        lineanchors='line',
        anchorlinenos=True,
    )

    return render_template(
        'view.html',
        styles=formatter.get_style_defs(),
        highlighted_source=highlight(paste.source, lexer, formatter),
        lexer=lexer,
        paste=paste,
    )
项目:lti-template-flask-oauth-tokens    作者:ucfopen    | 项目源码 | 文件源码
def check_valid_user(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        """
        Decorator to check if the user is allowed access to the app.

        If user is allowed, return the decorated function.
        Otherwise, return an error page with corresponding message.
        """
        if request.form:
            session.permanent = True
            # 1 hour long session
            app.permanent_session_lifetime = timedelta(minutes=60)
            session['course_id'] = request.form.get('custom_canvas_course_id')
            session['canvas_user_id'] = request.form.get('custom_canvas_user_id')
            roles = request.form['roles']

            if "Administrator" in roles:
                session['admin'] = True
                session['instructor'] = True
            elif 'admin' in session:
                # remove old admin key in the session
                session.pop('admin', None)

            if "Instructor" in roles:
                session['instructor'] = True
            elif 'instructor' in session:
                # remove old instructor key from the session
                session.pop('instructor', None)

        # no session and no request
        if not session:
            if not request.form:
                app.logger.warning("No session and no request. Not allowed.")
                return return_error('No session or request provided.')

        # no canvas_user_id
        if not request.form.get('custom_canvas_user_id') and 'canvas_user_id' not in session:
            app.logger.warning("No canvas user ID. Not allowed.")
            return return_error('No canvas uer ID provided.')

        # no course_id
        if not request.form.get('custom_canvas_course_id') and 'course_id' not in session:
            app.logger.warning("No course ID. Not allowed.")
            return return_error('No course_id provided.')

        # If they are neither instructor or admin, they're not in the right place

        if 'instructor' not in session and 'admin' not in session:
            app.logger.warning("Not enrolled as Teacher or an Admin. Not allowed.")
            return return_error('''You are not enrolled in this course as a Teacher or Designer.
            Please refresh and try again. If this error persists, please contact support.''')

        return f(*args, **kwargs)
    return decorated_function


# ============================================
# Web Views / Routes
# ============================================
项目:SDV-Summary    作者:Sketchy502    | 项目源码 | 文件源码
def file_uploaded(inputfile):
    memfile = io.BytesIO()
    inputfile.save(memfile)
    md5_info = md5(memfile)
    try:
        save = savefile(memfile.getvalue(), True)
        player_info = playerInfo(save)
    except defusedxml.common.EntitiesForbidden:
        g.error = _("I don't think that's very funny")
        return {'type':'render','target':'index.html','parameters':{"error":g.error}}
    except IOError:
        g.error = _("Savegame failed sanity check (if you think this is in error please let us know)")
        db = get_db()
        cur = db.cursor()
        cur.execute('INSERT INTO errors (ip, time, notes) VALUES ('+app.sqlesc+','+app.sqlesc+','+app.sqlesc+')',(request.environ['REMOTE_ADDR'],time.time(),'failed sanity check '+str(secure_filename(inputfile.filename))))
        db.commit()
        return {'type': 'render', 'target': 'index.html', 'parameters': {"error": g.error}}
    except AttributeError as e:
        g.error = _("Not valid save file - did you select file 'SaveGameInfo' instead of 'playername_number'?")
        # print(e)
        return {'type': 'render', 'target': 'index.html', 'parameters': {"error": g.error}}
    except ParseError as e:
        g.error = _("Not well-formed xml")
        return {'type':'render','target':'index.html','parameters':{"error":g.error}}
    except AssertionError as e:
        g.error = _("Savegame failed an internal check (often caused by mods) sorry :(")
        return {'type':'render','target':'index.html','parameters':{"error":g.error}}
    dupe = is_duplicate(md5_info,player_info)
    if dupe != False:
        session[dupe[0]] = md5_info
        session[dupe[0]+'del_token'] = dupe[1]
        return {'type':'redirect','target':'display_data','parameters':{"url":dupe[0]}}
    else:
        farm_info = getFarmInfo(save)
        outcome, del_token, rowid, g.error = insert_info(player_info,farm_info,md5_info)
        if outcome != False:
            filename = os.path.join(app.config['UPLOAD_FOLDER'], outcome)
            # with open(filename,'wb') as f:
            #   f.write(memfile.getvalue())
            # REPLACED WITH ZIPUPLOADS
            zwrite(memfile.getvalue(),legacy_location(filename))
            series_id = add_to_series(rowid,player_info['uniqueIDForThisGame'],player_info['name'],player_info['farmName'])
            owner_id = get_logged_in_user()
            db = get_db()
            cur = db.cursor()
            cur.execute('UPDATE playerinfo SET savefileLocation='+app.sqlesc+', series_id='+app.sqlesc+', owner_id='+app.sqlesc+' WHERE url='+app.sqlesc+';',(filename,series_id,owner_id,outcome))
            db.commit()
        else:
            if g.error == None:
                g.error = _("Error occurred inserting information into the database!")
            return {'type':'render','target':'index.html','parameters':{"error":g.error}}
        imageDrone.process_queue()
        memfile.close()
    if outcome != False:
        session.permanent = True
        session[outcome] = md5_info
        session[outcome+'del_token'] = del_token
        return {'type':'redirect','target':'display_data','parameters':{"url":outcome}}
项目:flask-esipy-example    作者:Kyria    | 项目源码 | 文件源码
def callback():
    """ This is where the user comes after he logged in SSO """
    # get the code from the login process
    code = request.args.get('code')
    token = request.args.get('state')

    # compare the state with the saved token for CSRF check
    sess_token = session.pop('token', None)
    if sess_token is None or token is None or token != sess_token:
        return 'Login EVE Online SSO failed: Session Token Mismatch', 403

    # now we try to get tokens
    try:
        auth_response = esisecurity.auth(code)
    except APIException as e:
        return 'Login EVE Online SSO failed: %s' % e, 403

    # we get the character informations
    cdata = esisecurity.verify()

    # if the user is already authed, we log him out
    if current_user.is_authenticated:
        logout_user()

    # now we check in database, if the user exists
    # actually we'd have to also check with character_owner_hash, to be
    # sure the owner is still the same, but that's an example only...
    try:
        user = User.query.filter(
            User.character_id == cdata['CharacterID'],
        ).one()

    except NoResultFound:
        user = User()
        user.character_id = cdata['CharacterID']

    user.character_owner_hash = cdata['CharacterOwnerHash']
    user.character_name = cdata['CharacterName']
    user.update_token(auth_response)

    # now the user is ready, so update/create it and log the user
    try:
        db.session.merge(user)
        db.session.commit()

        login_user(user)
        session.permanent = True

    except:
        logger.exception("Cannot login the user - uid: %d" % user.character_id)
        db.session.rollback()
        logout_user()

    return redirect(url_for("index"))


# -----------------------------------------------------------------------
# Index Routes
# -----------------------------------------------------------------------