Python flask 模块,session() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.session()

项目:tweet-analysis    作者:D4D3VD4V3    | 项目源码 | 文件源码
def twittercallback():
    verification = request.args["oauth_verifier"]
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    try:
        auth.request_token = session["request_token"]
    except KeyError:
        flash("Please login again", "danger")
        return redirect(url_for("bp.home"))

    try:
        auth.get_access_token(verification)
    except tweepy.TweepError:
        flash("Failed to get access token", "danger")
        return redirect(url_for("bp.home"))

    session["access_token"] = auth.access_token
    session["access_token_secret"] = auth.access_token_secret

    return render_template("twittercallback.html", form=HashtagForm())
项目:docklet    作者:unias    | 项目源码 | 文件源码
def get(self):
        masterip = self.masterip
        data = {
            "user": session['username'],
        }
        result = dockletRequest.post('/monitor/hosts/%s/containerslist/'%(self.com_ip), data, masterip)
        containers = result.get('monitor').get('containerslist')
        containerslist = []
        for container in containers:
            result = dockletRequest.post('/monitor/vnodes/%s/basic_info/'%(container), data, masterip)
            basic_info = result.get('monitor').get('basic_info')
            result = dockletRequest.post('/monitor/vnodes/%s/owner/'%(container), data, masterip)
            owner = result.get('monitor')
            basic_info['owner'] = owner
            containerslist.append(basic_info)
        return self.render(self.template_path, containerslist = containerslist, com_ip = self.com_ip, user = session['username'], masterip = masterip)
项目:docklet    作者:unias    | 项目源码 | 文件源码
def get(self):
        data = {
            "user": session['username'],
        }
        allresult = dockletRequest.post_to_all('/monitor/listphynodes/', data)
        allmachines = {}
        for master in allresult:
            allmachines[master] = []
            iplist = allresult[master].get('monitor').get('allnodes')
            for ip in iplist:
                containers = {}
                result = dockletRequest.post('/monitor/hosts/%s/containers/'%(ip), data, master.split("@")[0])
                containers = result.get('monitor').get('containers')
                result = dockletRequest.post('/monitor/hosts/%s/status/'%(ip), data, master.split("@")[0])
                status = result.get('monitor').get('status')
                allmachines[master].append({'ip':ip,'containers':containers, 'status':status})
        #print(machines)
        return self.render(self.template_path, allmachines = allmachines, user = session['username'])
项目:docklet    作者:unias    | 项目源码 | 文件源码
def post_to_all(self, url = '/', data={}):
        if (url == '/'):
            res = []
            for masterip in masterips:
                try:
                    requests.post("http://"+getip(masterip)+":"+master_port+"/isalive/",data=data)
                except Exception as e:
                    logger.debug(e)
                    continue
                res.append(masterip)
            return res
        data = dict(data)
        data['token'] = session['token']
        logger.info("Docklet Request: user = %s data = %s, url = %s"%(session['username'], data, url))
        result = {}
        for masterip in masterips:
            try:
                res = requests.post("http://"+getip(masterip)+":"+master_port+url,data=data).json()
            except Exception as e:
                logger.debug(e)
                continue
            result[masterip] = res
            logger.debug("get result from " + getip(masterip))

        return result
项目:docklet    作者:unias    | 项目源码 | 文件源码
def post(self):
        masterip = self.masterip
        index1 = self.image.rindex("_")
        index2 = self.image[:index1].rindex("_")
        checkname(self.clustername)
        data = {
            "clustername": self.clustername,
            'imagename': self.image[:index2],
            'imageowner': self.image[index2+1:index1],
            'imagetype': self.image[index1+1:],
        }
        result = dockletRequest.post("/cluster/create/", dict(data, **(request.form)), masterip)
        if(result.get('success', None) == "true"):
           return redirect("/dashboard/")
            #return self.render(self.template_path, user = session['username'])
        else:
            return self.render(self.error_path, message = result.get('message'))
项目:docklet    作者:unias    | 项目源码 | 文件源码
def post(self):
        masterip = self.masterip
        data = {
                "clustername": self.clustername,
                "image": self.imagename,
                "containername": self.containername,
                "description": self.description,
                "isforce": self.isforce
        }
        result = dockletRequest.post("/cluster/save/", data, masterip)
        if(result):
            if result.get('success') == 'true':
                #return self.render(self.success_path, user = session['username'])
                return redirect("/config/")
                #res = detailClusterView()
                #res.clustername = self.clustername
                #return res.as_view()
            else:
                if result.get('reason') == "exists":
                    return self.render(self.template_path, containername = self.containername, clustername = self.clustername, image = self.imagename, user = session['username'], description = self.description, masterip=masterip)
                else:
                    return self.render(self.error_path, message = result.get('message'))
        else:
            self.error()
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def after_request(response):
    response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
    response.headers.add('Access-Control-Allow-Credentials', 'true')
    response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
    response.headers.add('Cache-Control', 'no-cache')
    response.headers.add('Cache-Control', 'no-store')
    if api.auth.is_logged_in():
        if 'token' in session:
            response.set_cookie('token', session['token'], domain=app.config['SESSION_COOKIE_DOMAIN'])
        else:
            csrf_token = api.common.token()
            session['token'] = csrf_token
            response.set_cookie('token', csrf_token, domain=app.config['SESSION_COOKIE_DOMAIN'])

    # JB: This is a hack. We need a better solution
    if request.path[0:19] != "/api/autogen/serve/":
        response.mimetype = 'application/json'
    return response
项目:coms4156_jumpstart    作者:keirl    | 项目源码 | 文件源码
def main_teacher():
    tm = teachers_model.Teachers(flask.session['id'])

    if request.method == 'POST':
        cm = courses_model.Courses()
        if "close" in request.form.keys():
            cid = request.form["close"]
            cm.cid = cid
            cm.close_session(cm.get_active_session())
        elif "open" in request.form.keys():
            cid = request.form["open"]
            cm.cid = cid
            cm.open_session()

    courses = tm.get_courses_with_session()
    empty = True if len(courses) == 0 else False
    context = dict(data=courses)
    return render_template('main_teacher.html', empty=empty, **context)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_session_transactions(self):
        app = flask.Flask(__name__)
        app.testing = True
        app.secret_key = 'testing'

        @app.route('/')
        def index():
            return text_type(flask.session['foo'])

        with app.test_client() as c:
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 0)
                sess['foo'] = [42]
                self.assert_equal(len(sess), 1)
            rv = c.get('/')
            self.assert_equal(rv.data, b'[42]')
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 1)
                self.assert_equal(sess['foo'], [42])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_session_using_application_root(self):
        class PrefixPathMiddleware(object):
            def __init__(self, app, prefix):
                self.app = app
                self.prefix = prefix
            def __call__(self, environ, start_response):
                environ['SCRIPT_NAME'] = self.prefix
                return self.app(environ, start_response)

        app = flask.Flask(__name__)
        app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
        app.config.update(
            SECRET_KEY='foo',
            APPLICATION_ROOT='/bar'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://example.com:8080/')
        self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_session_using_session_settings(self):
        app = flask.Flask(__name__)
        app.config.update(
            SECRET_KEY='foo',
            SERVER_NAME='www.example.com:8080',
            APPLICATION_ROOT='/test',
            SESSION_COOKIE_DOMAIN='.example.com',
            SESSION_COOKIE_HTTPONLY=False,
            SESSION_COOKIE_SECURE=True,
            SESSION_COOKIE_PATH='/'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
        cookie = rv.headers['set-cookie'].lower()
        self.assert_in('domain=.example.com', cookie)
        self.assert_in('path=/', cookie)
        self.assert_in('secure', cookie)
        self.assert_not_in('httponly', cookie)
项目:Cloudroid    作者:cyberdb    | 项目源码 | 文件源码
def signup():
    from forms import SignupForm

    form = SignupForm()
    if form.validate_on_submit():
        user = User.query.filter_by(email=form.email.data.lower()).first()
        if user is not None:
            form.email.errors.append("The Email address is already taken.")
            return render_template('signup.html', form=form)

        newuser = User(form.firstname.data,form.lastname.data,form.email.data,form.password.data)
        db.session.add(newuser)
        db.session.commit()

        session['email'] = newuser.email
        return redirect(url_for('login'))

    return render_template('signup.html', form=form)
项目:Cloudroid    作者:cyberdb    | 项目源码 | 文件源码
def login():
    if g.user is not None and g.user.is_authenticated:
        return redirect(url_for('index'))

    from app.forms import LoginForm

    form = LoginForm()
    if form.validate_on_submit():
        session['remember_me'] = form.remember_me.data
        user = User.query.filter_by(email=form.email.data.lower()).first()
        if user and user.check_password(form.password.data):
            session['email'] = form.email.data
            login_user(user,remember=session['remember_me'])
            return redirect(url_for('index'))
        else:
            return render_template('login.html',form=form,failed_auth=True)

    return render_template('login.html',form=form)
项目:Cloudroid    作者:cyberdb    | 项目源码 | 文件源码
def ping(service_id):

    from app import db, models
    from models import Service
    finding = Service.query.filter_by(serviceid=service_id).first()
    if finding is not None:
        image_name = finding.imagename
        uploadn = finding.uploadname
        usern = finding.username
        firstcreatetime = finding.firstcreatetime
        u = Service(serviceid = service_id, createdtime = str(time.time()), imagename = image_name, uploadname = uploadn, username = usern, firstcreatetime = firstcreatetime)
        db.session.add(u) 
        db.session.commit() 
        db.session.delete(finding)
        db.session.commit()
    else:
        return "The service "+service_id+" has been removed!"

    return "There are existing service:"+service_id
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def test_oauth_authorized_saves_token_and_user_to_session(self, oauth):
        fake_resp = {'oauth_token_secret': u'secret',
                     'username': u'palotespaco',
                     'fullname': u'paco palotes',
                     'oauth_token':u'token',
                     'user_nsid': u'user'}
        oauth.authorized_response.return_value = fake_resp
        expected_token = {
            'oauth_token_secret': u'secret',
            'oauth_token': u'token'
        }
        expected_user = {'username': u'palotespaco', 'user_nsid': u'user'}

        with flask_app.test_client() as c:
            c.get('/flickr/oauth-authorized')

            assert session['flickr_token'] == expected_token, session['flickr_token']
            assert session['flickr_user'] == expected_user, session['flickr_user']
项目:github-catalog    作者:yogykwan    | 项目源码 | 文件源码
def new_item(category):
    if request.method == 'GET':
        return render('newitem.html', category=category)
    elif request.method == 'POST':
        name = request.form['name']
        highlight = request.form['highlight']
        url = request.form['url']
        if valid_item(name, url, highlight):
            user_id = login_session['user_id']
            item = Item(name=name, highlight=highlight, url=url, user_id=user_id, category_id=category.id)
            session.add(item)
            session.commit()
            flash("Newed item %s!" % item.name)
            return redirect(url_for('show_item', category_id=category.id, item_id=item.id))
        else:
            error = "Complete info please!"
            return render('newitem.html', category=category, name=name, highlight=highlight, url=url,
                          error=error)
项目:pheweb    作者:statgen    | 项目源码 | 文件源码
def check_auth(func):
    """
    This decorator for routes checks that the user is authorized (or that no login is required).
    If they haven't, their intended destination is stored and they're sent to get authorized.
    It has to be placed AFTER @app.route() so that it can capture `request.path`.
    """
    if 'login' not in conf:
        return func
    # inspired by <https://flask-login.readthedocs.org/en/latest/_modules/flask_login.html#login_required>
    @functools.wraps(func)
    def decorated_view(*args, **kwargs):
        if current_user.is_anonymous:
            print('unauthorized user visited {!r}'.format(request.path))
            session['original_destination'] = request.path
            return redirect(url_for('get_authorized'))
        print('{} visited {!r}'.format(current_user.email, request.path))
        assert current_user.email.lower() in conf.login['whitelist'], current_user
        return func(*args, **kwargs)
    return decorated_view
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_session_transactions(self):
        app = flask.Flask(__name__)
        app.testing = True
        app.secret_key = 'testing'

        @app.route('/')
        def index():
            return text_type(flask.session['foo'])

        with app.test_client() as c:
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 0)
                sess['foo'] = [42]
                self.assert_equal(len(sess), 1)
            rv = c.get('/')
            self.assert_equal(rv.data, b'[42]')
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 1)
                self.assert_equal(sess['foo'], [42])
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_session_using_application_root(self):
        class PrefixPathMiddleware(object):
            def __init__(self, app, prefix):
                self.app = app
                self.prefix = prefix
            def __call__(self, environ, start_response):
                environ['SCRIPT_NAME'] = self.prefix
                return self.app(environ, start_response)

        app = flask.Flask(__name__)
        app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
        app.config.update(
            SECRET_KEY='foo',
            APPLICATION_ROOT='/bar'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://example.com:8080/')
        self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_session_using_session_settings(self):
        app = flask.Flask(__name__)
        app.config.update(
            SECRET_KEY='foo',
            SERVER_NAME='www.example.com:8080',
            APPLICATION_ROOT='/test',
            SESSION_COOKIE_DOMAIN='.example.com',
            SESSION_COOKIE_HTTPONLY=False,
            SESSION_COOKIE_SECURE=True,
            SESSION_COOKIE_PATH='/'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
        cookie = rv.headers['set-cookie'].lower()
        self.assert_in('domain=.example.com', cookie)
        self.assert_in('path=/', cookie)
        self.assert_in('secure', cookie)
        self.assert_not_in('httponly', cookie)
项目:picoCTF    作者:royragsdale    | 项目源码 | 文件源码
def after_request(response):
    response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
    response.headers.add('Access-Control-Allow-Credentials', 'true')
    response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
    response.headers.add('Cache-Control', 'no-cache')
    response.headers.add('Cache-Control', 'no-store')
    if api.auth.is_logged_in():
        if 'token' in session:
            response.set_cookie('token', session['token'])
        else:
            csrf_token = api.common.token()
            session['token'] = csrf_token
            response.set_cookie('token', csrf_token)

    # JB: This is a hack. We need a better solution
    if request.path[0:19] != "/api/autogen/serve/":
        response.mimetype = 'appication/json'
    return response
项目:plexivity    作者:mutschler    | 项目源码 | 文件源码
def info(id):
    info = g.plex.getInfo(id)
    views = None
    parent = None
    episodes = None
    cur_el = info.getchildren()[0]
    cur_type = cur_el.get("type")
    if cur_type == "movie":
        views = db.session.query(models.Processed).filter(models.Processed.title == info.find("Video").get("title")).all()
    elif cur_type == "season":
        parent = g.plex.getInfo(info.getchildren()[0].get("parentRatingKey")).getchildren()[0]
        episodes = g.plex.episodes(id)
    elif cur_type == "episode":
        views = db.session.query(models.Processed).filter(models.Processed.session_id.like("%/metadata/" + cur_el.get("ratingKey") + "_%")).all()
    elif cur_type == "show":
        episodes = db.session.query(db.func.count(models.Processed.title), models.Processed).filter(models.Processed.orig_title.like(cur_el.get("title"))).group_by(models.Processed.title).having(db.func.count(models.Processed.orig_title) > 0).order_by(db.func.count(models.Processed.orig_title).desc(), models.Processed.time.desc()).all()

    return render_template('info.html', info=info, history=views, parent=parent, episodes=episodes, title=_('Info'))
项目:sysu-ctf    作者:ssst0n3    | 项目源码 | 文件源码
def chals():
    if not is_admin():
        if not ctftime():
            if view_after_ctf():
                pass
            else:
                return redirect('/')
    if can_view_challenges():
        chals = Challenges.query.add_columns('id', 'name', 'value', 'description', 'category').order_by(Challenges.value).all()

        json = {'game':[]}
        for x in chals:
            files = [ str(f.location) for f in Files.query.filter_by(chal=x.id).all() ]
            json['game'].append({'id':x[1], 'name':x[2], 'value':x[3], 'description':x[4], 'category':x[5], 'files':files})

        db.session.close()
        return jsonify(json)
    else:
        db.session.close()
        return redirect('/login')
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def after_request(response):
    response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
    response.headers.add('Access-Control-Allow-Credentials', 'true')
    response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *')
    response.headers.add('Cache-Control', 'no-cache')
    response.headers.add('Cache-Control', 'no-store')
    if api.auth.is_logged_in():
        if 'token' in session:
            response.set_cookie('token', session['token'])
        else:
            csrf_token = api.common.token()
            session['token'] = csrf_token
            response.set_cookie('token', csrf_token)

    # JB: This is a hack. We need a better solution
    if request.path[0:19] != "/api/autogen/serve/":
        response.mimetype = 'appication/json'
    return response
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_session_transactions(self):
        app = flask.Flask(__name__)
        app.testing = True
        app.secret_key = 'testing'

        @app.route('/')
        def index():
            return text_type(flask.session['foo'])

        with app.test_client() as c:
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 0)
                sess['foo'] = [42]
                self.assert_equal(len(sess), 1)
            rv = c.get('/')
            self.assert_equal(rv.data, b'[42]')
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 1)
                self.assert_equal(sess['foo'], [42])
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_session_using_application_root(self):
        class PrefixPathMiddleware(object):
            def __init__(self, app, prefix):
                self.app = app
                self.prefix = prefix
            def __call__(self, environ, start_response):
                environ['SCRIPT_NAME'] = self.prefix
                return self.app(environ, start_response)

        app = flask.Flask(__name__)
        app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
        app.config.update(
            SECRET_KEY='foo',
            APPLICATION_ROOT='/bar'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://example.com:8080/')
        self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_session_using_session_settings(self):
        app = flask.Flask(__name__)
        app.config.update(
            SECRET_KEY='foo',
            SERVER_NAME='www.example.com:8080',
            APPLICATION_ROOT='/test',
            SESSION_COOKIE_DOMAIN='.example.com',
            SESSION_COOKIE_HTTPONLY=False,
            SESSION_COOKIE_SECURE=True,
            SESSION_COOKIE_PATH='/'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
        cookie = rv.headers['set-cookie'].lower()
        self.assert_in('domain=.example.com', cookie)
        self.assert_in('path=/', cookie)
        self.assert_in('secure', cookie)
        self.assert_not_in('httponly', cookie)
项目:pycommunicate    作者:mincrmatt12    | 项目源码 | 文件源码
def _dispatch(self, route, **kwargs):
        user = None

        if 'shared_id' in session:
            if session['shared_id'] in self.user_tracker.user_refs:
                user = self.user_tracker.users[session['shared_id']]
        if user is None:
            user = self.user_tracker.connect_user()
            session['shared_id'] = user.id_

        user.open_page(route, kwargs)

        ctx = CallCTX(abort=abort)
        user.active_controller.before_connect(ctx)
        ctx.deactivate()

        possible = user.active_controller.render_page()

        if user.active_controller.special_return_handler is not None:
            return user.active_controller.special_return_handler()

        return possible
项目:JmilkFan-s-Blog    作者:JmilkFan    | 项目源码 | 文件源码
def sidebar_data():
    """Set the sidebar function."""

    # Get post of recent
    recent = db.session.query(Post).order_by(
            Post.publish_date.desc()
        ).limit(5).all()

    # Get the tags and sort by count of posts.
    top_tags = db.session.query(
            Tag, func.count(posts_tags.c.post_id).label('total')
        ).join(
            posts_tags
        ).group_by(Tag).order_by('total DESC').limit(5).all()
    return recent, top_tags


# Use the Blueprint object to set the Route URL
# Register the view function into blueprint
项目:JmilkFan-s-Blog    作者:JmilkFan    | 项目源码 | 文件源码
def new_post():
    """View function for new_port."""

    form = PostForm()

    # Ensure the user logged in.
    # Flask-Login.current_user can be access current user.
    if not current_user:
        return redirect(url_for('main.login'))

    # Will be execute when click the submit in the create a new post page.
    if form.validate_on_submit():
        new_post = Post()
        new_post.title = form.title.data
        new_post.text = form.text.data
        new_post.publish_date = datetime.now()
        new_post.user = current_user

        db.session.add(new_post)
        db.session.commit()
        return redirect(url_for('blog.home'))

    return render_template('new_post.html',
                           form=form)
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def get_repo_if_admin(db, full_name):
    """Retrieve repository from db and return if
     current user is admin (owner or member)

    :param db: database connection where are repos stored
    :type db: ``flask_sqlalchemy.SQLAlchemy``
    :param full_name: full name of desired repository
    :type full_name: str
    :return: repository if found, None otherwise
    :rtype: ``repocribro.models.Repository`` or None
    """
    user = flask_login.current_user.github_user
    repo = db.session.query(Repository).filter_by(
        full_name=full_name
    ).first()
    if repo is None:
        return None
    if repo.owner == user or user in repo.members:
        return repo
    return None
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def organizations():
    """List user organizations from GitHub (GET handler)"""
    page = int(flask.request.args.get('page', 0))
    gh_api = flask.current_app.container.get(
        'gh_api', token=flask.session['github_token']
    )

    gh_orgs = gh_api.get('/user/orgs', page=page)
    orgs_link = gh_api.app_connections_link

    return flask.render_template(
        'manage/orgs.html', orgs=gh_orgs.data,
        actual_page=gh_orgs.actual_page, total_pages=gh_orgs.total_pages,
        orgs_link=orgs_link,

    )
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def organization(login):
    """List organization repositories for activation

    .. :todo: register organization in repocribro
    .. :todo: own profile page of organization
    """
    ORG_REPOS_URL = '/orgs/{}/repos?type=member'
    page = int(flask.request.args.get('page', 0))
    gh_api = flask.current_app.container.get(
        'gh_api', token=flask.session['github_token']
    )

    gh_repos = gh_api.get(ORG_REPOS_URL.format(login), page=page)
    user = flask_login.current_user.github_user
    active_ids = [repo.github_id for repo in user.repositories]
    return flask.render_template(
        'manage/repos.html', repos=gh_repos.data,
        actual_page=gh_repos.actual_page, total_pages=gh_repos.total_pages,
        Repository=Repository, active_ids=active_ids,
        repos_type=login+' (organization)'
    )
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def github_callback_get_account(db, gh_api):
    """Processing GitHub callback action

    :param db: Database for storing GitHub user info
    :type db: ``flask_sqlalchemy.SQLAlchemy``
    :param gh_api: GitHub API client ready for the communication
    :type gh_api: ``repocribro.github.GitHubAPI``
    :return: User account and flag if it's new one
    :rtype: tuple of ``repocribro.models.UserAccount``, bool
    """
    user_data = gh_api.get('/user').data
    gh_user = db.session.query(User).filter(
        User.github_id == user_data['id']
    ).first()
    is_new = False
    if gh_user is None:
        user_account = UserAccount()
        db.session.add(user_account)
        gh_user = User.create_from_dict(user_data, user_account)
        db.session.add(gh_user)
        db.session.commit()
        is_new = True
    return gh_user.user_account, is_new
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def gh_event_push(db, repo, payload, actor):
    """Process GitHub PushEvent (with commits)

    https://developer.github.com/v3/activity/events/types/#pushevent

    :param db: Database to store push data
    :type db: ``flask_sqlalchemy.SQLAlchemy``
    :param repo: Repository where push belongs to
    :type repo: ``repocribro.models.Repository``
    :param payload: Data about push and commits
    :type payload: dict
    :param actor: Actor doing the event
    :type actor: dict
    """
    push = Push.create_from_dict(payload, actor, repo)
    db.session.add(push)
    for commit in push.commits:
        db.session.add(commit)
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def gh_event_release(db, repo, payload, actor):
    """Process GitHub ReleaseEvent (with commits)

    https://developer.github.com/v3/activity/events/types/#releaseevent

    :param db: Database to store push data
    :type db: ``flask_sqlalchemy.SQLAlchemy``
    :param repo: Repository where release belongs to
    :type repo: ``repocribro.models.Repository``
    :param payload: Data about release and action
    :type payload: dict
    :param actor: Actor doing the event
    :type actor: dict
    """
    action = payload['action']
    release = Release.create_from_dict(payload['release'], actor, repo)
    db.session.add(release)
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def make_githup_api_factory(cfg):
    """Simple factory for making the GitHub API client factory

    :param cfg: Configuration of the application
    :type cfg: ``configparser.ConfigParser``
    :return: GitHub API client factory
    :rtype: ``function``
    """

    def github_api_factory(token=None, session=None):
        return GitHubAPI(
            cfg.get('github', 'client_id'),
            cfg.get('github', 'client_secret'),
            cfg.get('github', 'webhooks_secret'),
            session=session,
            token=token
        )

    return github_api_factory
项目:repocribro    作者:MarekSuchanek    | 项目源码 | 文件源码
def session(db, request):
    """Creates a new database session for a test."""
    connection = db.engine.connect()
    transaction = connection.begin()

    options = dict(bind=connection, binds={})
    session = db.create_scoped_session(options=options)

    db.session = session

    def teardown():
        transaction.rollback()
        connection.close()
        session.remove()

    request.addfinalizer(teardown)
    return session
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def test_session_transactions(self):
        app = flask.Flask(__name__)
        app.testing = True
        app.secret_key = 'testing'

        @app.route('/')
        def index():
            return text_type(flask.session['foo'])

        with app.test_client() as c:
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 0)
                sess['foo'] = [42]
                self.assert_equal(len(sess), 1)
            rv = c.get('/')
            self.assert_equal(rv.data, b'[42]')
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 1)
                self.assert_equal(sess['foo'], [42])
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def test_session_using_application_root(self):
        class PrefixPathMiddleware(object):
            def __init__(self, app, prefix):
                self.app = app
                self.prefix = prefix
            def __call__(self, environ, start_response):
                environ['SCRIPT_NAME'] = self.prefix
                return self.app(environ, start_response)

        app = flask.Flask(__name__)
        app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
        app.config.update(
            SECRET_KEY='foo',
            APPLICATION_ROOT='/bar'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://example.com:8080/')
        self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def test_session_using_session_settings(self):
        app = flask.Flask(__name__)
        app.config.update(
            SECRET_KEY='foo',
            SERVER_NAME='www.example.com:8080',
            APPLICATION_ROOT='/test',
            SESSION_COOKIE_DOMAIN='.example.com',
            SESSION_COOKIE_HTTPONLY=False,
            SESSION_COOKIE_SECURE=True,
            SESSION_COOKIE_PATH='/'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
        cookie = rv.headers['set-cookie'].lower()
        self.assert_in('domain=.example.com', cookie)
        self.assert_in('path=/', cookie)
        self.assert_in('secure', cookie)
        self.assert_not_in('httponly', cookie)
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def login_required(func):
    """
    Decorator to require login and save URL for redirecting user after login
    """

    @wraps(func)
    def decorated_function(*args, **kwargs):
        """decorator args"""

        if not is_logged_in():
            # Save off the page so we can redirect them to what they were
            # trying to view after logging in.
            session['previously_requested_page'] = request.url

            return redirect(url_for('login'))

        return func(*args, **kwargs)

    return decorated_function
项目:CodingForLawyers    作者:KCLegalHackers    | 项目源码 | 文件源码
def results():

  respondent = session['respondent']
  print("Petitioner = %s" % session['petitioner'])
  petitioner = session['petitioner']



  return render_template('results.html',
                          title='Court Form Sample', 
                          petitioner=petitioner, 
                          respondent=respondent
                          )



#Error Handling:

# http://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-vii-unit-testing
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def test_session_transactions(self):
        app = flask.Flask(__name__)
        app.testing = True
        app.secret_key = 'testing'

        @app.route('/')
        def index():
            return text_type(flask.session['foo'])

        with app.test_client() as c:
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 0)
                sess['foo'] = [42]
                self.assert_equal(len(sess), 1)
            rv = c.get('/')
            self.assert_equal(rv.data, b'[42]')
            with c.session_transaction() as sess:
                self.assert_equal(len(sess), 1)
                self.assert_equal(sess['foo'], [42])
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def test_session_using_application_root(self):
        class PrefixPathMiddleware(object):
            def __init__(self, app, prefix):
                self.app = app
                self.prefix = prefix
            def __call__(self, environ, start_response):
                environ['SCRIPT_NAME'] = self.prefix
                return self.app(environ, start_response)

        app = flask.Flask(__name__)
        app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
        app.config.update(
            SECRET_KEY='foo',
            APPLICATION_ROOT='/bar'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://example.com:8080/')
        self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def test_session_using_session_settings(self):
        app = flask.Flask(__name__)
        app.config.update(
            SECRET_KEY='foo',
            SERVER_NAME='www.example.com:8080',
            APPLICATION_ROOT='/test',
            SESSION_COOKIE_DOMAIN='.example.com',
            SESSION_COOKIE_HTTPONLY=False,
            SESSION_COOKIE_SECURE=True,
            SESSION_COOKIE_PATH='/'
        )
        @app.route('/')
        def index():
            flask.session['testing'] = 42
            return 'Hello World'
        rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
        cookie = rv.headers['set-cookie'].lower()
        self.assert_in('domain=.example.com', cookie)
        self.assert_in('path=/', cookie)
        self.assert_in('secure', cookie)
        self.assert_not_in('httponly', cookie)
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def get_google_authorization_url():
    current_user = flask.g.user

    if current_user.is_authenticated:
        return

    google = get_google_auth()

    auth_url, state = google.authorization_url(Auth.AUTH_URI)

    flask.session['oauth_state'] = state
    return auth_url
项目:docklet    作者:unias    | 项目源码 | 文件源码
def is_authenticated():
    if "username" in session:
        return True
    else:
        return False
项目:docklet    作者:unias    | 项目源码 | 文件源码
def is_admin():
    if not "username" in session:
        return False
    if not (session['usergroup'] == 'root' or session['usergroup'] == 'admin'):
        return False
    return True
项目:docklet    作者:unias    | 项目源码 | 文件源码
def is_activated():
    if not "username" in session:
        return False
    if not (session['status']=='normal'):
        return False
    return True