我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.session()。
def twittercallback(): verification = request.args["oauth_verifier"] auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) try: auth.request_token = session["request_token"] except KeyError: flash("Please login again", "danger") return redirect(url_for("bp.home")) try: auth.get_access_token(verification) except tweepy.TweepError: flash("Failed to get access token", "danger") return redirect(url_for("bp.home")) session["access_token"] = auth.access_token session["access_token_secret"] = auth.access_token_secret return render_template("twittercallback.html", form=HashtagForm())
def get(self): masterip = self.masterip data = { "user": session['username'], } result = dockletRequest.post('/monitor/hosts/%s/containerslist/'%(self.com_ip), data, masterip) containers = result.get('monitor').get('containerslist') containerslist = [] for container in containers: result = dockletRequest.post('/monitor/vnodes/%s/basic_info/'%(container), data, masterip) basic_info = result.get('monitor').get('basic_info') result = dockletRequest.post('/monitor/vnodes/%s/owner/'%(container), data, masterip) owner = result.get('monitor') basic_info['owner'] = owner containerslist.append(basic_info) return self.render(self.template_path, containerslist = containerslist, com_ip = self.com_ip, user = session['username'], masterip = masterip)
def get(self): data = { "user": session['username'], } allresult = dockletRequest.post_to_all('/monitor/listphynodes/', data) allmachines = {} for master in allresult: allmachines[master] = [] iplist = allresult[master].get('monitor').get('allnodes') for ip in iplist: containers = {} result = dockletRequest.post('/monitor/hosts/%s/containers/'%(ip), data, master.split("@")[0]) containers = result.get('monitor').get('containers') result = dockletRequest.post('/monitor/hosts/%s/status/'%(ip), data, master.split("@")[0]) status = result.get('monitor').get('status') allmachines[master].append({'ip':ip,'containers':containers, 'status':status}) #print(machines) return self.render(self.template_path, allmachines = allmachines, user = session['username'])
def post_to_all(self, url = '/', data={}): if (url == '/'): res = [] for masterip in masterips: try: requests.post("http://"+getip(masterip)+":"+master_port+"/isalive/",data=data) except Exception as e: logger.debug(e) continue res.append(masterip) return res data = dict(data) data['token'] = session['token'] logger.info("Docklet Request: user = %s data = %s, url = %s"%(session['username'], data, url)) result = {} for masterip in masterips: try: res = requests.post("http://"+getip(masterip)+":"+master_port+url,data=data).json() except Exception as e: logger.debug(e) continue result[masterip] = res logger.debug("get result from " + getip(masterip)) return result
def post(self): masterip = self.masterip index1 = self.image.rindex("_") index2 = self.image[:index1].rindex("_") checkname(self.clustername) data = { "clustername": self.clustername, 'imagename': self.image[:index2], 'imageowner': self.image[index2+1:index1], 'imagetype': self.image[index1+1:], } result = dockletRequest.post("/cluster/create/", dict(data, **(request.form)), masterip) if(result.get('success', None) == "true"): return redirect("/dashboard/") #return self.render(self.template_path, user = session['username']) else: return self.render(self.error_path, message = result.get('message'))
def post(self): masterip = self.masterip data = { "clustername": self.clustername, "image": self.imagename, "containername": self.containername, "description": self.description, "isforce": self.isforce } result = dockletRequest.post("/cluster/save/", data, masterip) if(result): if result.get('success') == 'true': #return self.render(self.success_path, user = session['username']) return redirect("/config/") #res = detailClusterView() #res.clustername = self.clustername #return res.as_view() else: if result.get('reason') == "exists": return self.render(self.template_path, containername = self.containername, clustername = self.clustername, image = self.imagename, user = session['username'], description = self.description, masterip=masterip) else: return self.render(self.error_path, message = result.get('message')) else: self.error()
def after_request(response): response.headers.add('Access-Control-Allow-Methods', 'GET, POST') response.headers.add('Access-Control-Allow-Credentials', 'true') response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *') response.headers.add('Cache-Control', 'no-cache') response.headers.add('Cache-Control', 'no-store') if api.auth.is_logged_in(): if 'token' in session: response.set_cookie('token', session['token'], domain=app.config['SESSION_COOKIE_DOMAIN']) else: csrf_token = api.common.token() session['token'] = csrf_token response.set_cookie('token', csrf_token, domain=app.config['SESSION_COOKIE_DOMAIN']) # JB: This is a hack. We need a better solution if request.path[0:19] != "/api/autogen/serve/": response.mimetype = 'application/json' return response
def main_teacher(): tm = teachers_model.Teachers(flask.session['id']) if request.method == 'POST': cm = courses_model.Courses() if "close" in request.form.keys(): cid = request.form["close"] cm.cid = cid cm.close_session(cm.get_active_session()) elif "open" in request.form.keys(): cid = request.form["open"] cm.cid = cid cm.open_session() courses = tm.get_courses_with_session() empty = True if len(courses) == 0 else False context = dict(data=courses) return render_template('main_teacher.html', empty=empty, **context)
def test_session_transactions(self): app = flask.Flask(__name__) app.testing = True app.secret_key = 'testing' @app.route('/') def index(): return text_type(flask.session['foo']) with app.test_client() as c: with c.session_transaction() as sess: self.assert_equal(len(sess), 0) sess['foo'] = [42] self.assert_equal(len(sess), 1) rv = c.get('/') self.assert_equal(rv.data, b'[42]') with c.session_transaction() as sess: self.assert_equal(len(sess), 1) self.assert_equal(sess['foo'], [42])
def test_session_using_application_root(self): class PrefixPathMiddleware(object): def __init__(self, app, prefix): self.app = app self.prefix = prefix def __call__(self, environ, start_response): environ['SCRIPT_NAME'] = self.prefix return self.app(environ, start_response) app = flask.Flask(__name__) app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar') app.config.update( SECRET_KEY='foo', APPLICATION_ROOT='/bar' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = app.test_client().get('/', 'http://example.com:8080/') self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
def test_session_using_session_settings(self): app = flask.Flask(__name__) app.config.update( SECRET_KEY='foo', SERVER_NAME='www.example.com:8080', APPLICATION_ROOT='/test', SESSION_COOKIE_DOMAIN='.example.com', SESSION_COOKIE_HTTPONLY=False, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_PATH='/' ) @app.route('/') def index(): flask.session['testing'] = 42 return 'Hello World' rv = app.test_client().get('/', 'http://www.example.com:8080/test/') cookie = rv.headers['set-cookie'].lower() self.assert_in('domain=.example.com', cookie) self.assert_in('path=/', cookie) self.assert_in('secure', cookie) self.assert_not_in('httponly', cookie)
def signup(): from forms import SignupForm form = SignupForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data.lower()).first() if user is not None: form.email.errors.append("The Email address is already taken.") return render_template('signup.html', form=form) newuser = User(form.firstname.data,form.lastname.data,form.email.data,form.password.data) db.session.add(newuser) db.session.commit() session['email'] = newuser.email return redirect(url_for('login')) return render_template('signup.html', form=form)
def login(): if g.user is not None and g.user.is_authenticated: return redirect(url_for('index')) from app.forms import LoginForm form = LoginForm() if form.validate_on_submit(): session['remember_me'] = form.remember_me.data user = User.query.filter_by(email=form.email.data.lower()).first() if user and user.check_password(form.password.data): session['email'] = form.email.data login_user(user,remember=session['remember_me']) return redirect(url_for('index')) else: return render_template('login.html',form=form,failed_auth=True) return render_template('login.html',form=form)
def ping(service_id): from app import db, models from models import Service finding = Service.query.filter_by(serviceid=service_id).first() if finding is not None: image_name = finding.imagename uploadn = finding.uploadname usern = finding.username firstcreatetime = finding.firstcreatetime u = Service(serviceid = service_id, createdtime = str(time.time()), imagename = image_name, uploadname = uploadn, username = usern, firstcreatetime = firstcreatetime) db.session.add(u) db.session.commit() db.session.delete(finding) db.session.commit() else: return "The service "+service_id+" has been removed!" return "There are existing service:"+service_id
def test_oauth_authorized_saves_token_and_user_to_session(self, oauth): fake_resp = {'oauth_token_secret': u'secret', 'username': u'palotespaco', 'fullname': u'paco palotes', 'oauth_token':u'token', 'user_nsid': u'user'} oauth.authorized_response.return_value = fake_resp expected_token = { 'oauth_token_secret': u'secret', 'oauth_token': u'token' } expected_user = {'username': u'palotespaco', 'user_nsid': u'user'} with flask_app.test_client() as c: c.get('/flickr/oauth-authorized') assert session['flickr_token'] == expected_token, session['flickr_token'] assert session['flickr_user'] == expected_user, session['flickr_user']
def new_item(category): if request.method == 'GET': return render('newitem.html', category=category) elif request.method == 'POST': name = request.form['name'] highlight = request.form['highlight'] url = request.form['url'] if valid_item(name, url, highlight): user_id = login_session['user_id'] item = Item(name=name, highlight=highlight, url=url, user_id=user_id, category_id=category.id) session.add(item) session.commit() flash("Newed item %s!" % item.name) return redirect(url_for('show_item', category_id=category.id, item_id=item.id)) else: error = "Complete info please!" return render('newitem.html', category=category, name=name, highlight=highlight, url=url, error=error)
def check_auth(func): """ This decorator for routes checks that the user is authorized (or that no login is required). If they haven't, their intended destination is stored and they're sent to get authorized. It has to be placed AFTER @app.route() so that it can capture `request.path`. """ if 'login' not in conf: return func # inspired by <https://flask-login.readthedocs.org/en/latest/_modules/flask_login.html#login_required> @functools.wraps(func) def decorated_view(*args, **kwargs): if current_user.is_anonymous: print('unauthorized user visited {!r}'.format(request.path)) session['original_destination'] = request.path return redirect(url_for('get_authorized')) print('{} visited {!r}'.format(current_user.email, request.path)) assert current_user.email.lower() in conf.login['whitelist'], current_user return func(*args, **kwargs) return decorated_view
def after_request(response): response.headers.add('Access-Control-Allow-Methods', 'GET, POST') response.headers.add('Access-Control-Allow-Credentials', 'true') response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *') response.headers.add('Cache-Control', 'no-cache') response.headers.add('Cache-Control', 'no-store') if api.auth.is_logged_in(): if 'token' in session: response.set_cookie('token', session['token']) else: csrf_token = api.common.token() session['token'] = csrf_token response.set_cookie('token', csrf_token) # JB: This is a hack. We need a better solution if request.path[0:19] != "/api/autogen/serve/": response.mimetype = 'appication/json' return response
def info(id): info = g.plex.getInfo(id) views = None parent = None episodes = None cur_el = info.getchildren()[0] cur_type = cur_el.get("type") if cur_type == "movie": views = db.session.query(models.Processed).filter(models.Processed.title == info.find("Video").get("title")).all() elif cur_type == "season": parent = g.plex.getInfo(info.getchildren()[0].get("parentRatingKey")).getchildren()[0] episodes = g.plex.episodes(id) elif cur_type == "episode": views = db.session.query(models.Processed).filter(models.Processed.session_id.like("%/metadata/" + cur_el.get("ratingKey") + "_%")).all() elif cur_type == "show": episodes = db.session.query(db.func.count(models.Processed.title), models.Processed).filter(models.Processed.orig_title.like(cur_el.get("title"))).group_by(models.Processed.title).having(db.func.count(models.Processed.orig_title) > 0).order_by(db.func.count(models.Processed.orig_title).desc(), models.Processed.time.desc()).all() return render_template('info.html', info=info, history=views, parent=parent, episodes=episodes, title=_('Info'))
def chals(): if not is_admin(): if not ctftime(): if view_after_ctf(): pass else: return redirect('/') if can_view_challenges(): chals = Challenges.query.add_columns('id', 'name', 'value', 'description', 'category').order_by(Challenges.value).all() json = {'game':[]} for x in chals: files = [ str(f.location) for f in Files.query.filter_by(chal=x.id).all() ] json['game'].append({'id':x[1], 'name':x[2], 'value':x[3], 'description':x[4], 'category':x[5], 'files':files}) db.session.close() return jsonify(json) else: db.session.close() return redirect('/login')
def _dispatch(self, route, **kwargs): user = None if 'shared_id' in session: if session['shared_id'] in self.user_tracker.user_refs: user = self.user_tracker.users[session['shared_id']] if user is None: user = self.user_tracker.connect_user() session['shared_id'] = user.id_ user.open_page(route, kwargs) ctx = CallCTX(abort=abort) user.active_controller.before_connect(ctx) ctx.deactivate() possible = user.active_controller.render_page() if user.active_controller.special_return_handler is not None: return user.active_controller.special_return_handler() return possible
def sidebar_data(): """Set the sidebar function.""" # Get post of recent recent = db.session.query(Post).order_by( Post.publish_date.desc() ).limit(5).all() # Get the tags and sort by count of posts. top_tags = db.session.query( Tag, func.count(posts_tags.c.post_id).label('total') ).join( posts_tags ).group_by(Tag).order_by('total DESC').limit(5).all() return recent, top_tags # Use the Blueprint object to set the Route URL # Register the view function into blueprint
def new_post(): """View function for new_port.""" form = PostForm() # Ensure the user logged in. # Flask-Login.current_user can be access current user. if not current_user: return redirect(url_for('main.login')) # Will be execute when click the submit in the create a new post page. if form.validate_on_submit(): new_post = Post() new_post.title = form.title.data new_post.text = form.text.data new_post.publish_date = datetime.now() new_post.user = current_user db.session.add(new_post) db.session.commit() return redirect(url_for('blog.home')) return render_template('new_post.html', form=form)
def get_repo_if_admin(db, full_name): """Retrieve repository from db and return if current user is admin (owner or member) :param db: database connection where are repos stored :type db: ``flask_sqlalchemy.SQLAlchemy`` :param full_name: full name of desired repository :type full_name: str :return: repository if found, None otherwise :rtype: ``repocribro.models.Repository`` or None """ user = flask_login.current_user.github_user repo = db.session.query(Repository).filter_by( full_name=full_name ).first() if repo is None: return None if repo.owner == user or user in repo.members: return repo return None
def organizations(): """List user organizations from GitHub (GET handler)""" page = int(flask.request.args.get('page', 0)) gh_api = flask.current_app.container.get( 'gh_api', token=flask.session['github_token'] ) gh_orgs = gh_api.get('/user/orgs', page=page) orgs_link = gh_api.app_connections_link return flask.render_template( 'manage/orgs.html', orgs=gh_orgs.data, actual_page=gh_orgs.actual_page, total_pages=gh_orgs.total_pages, orgs_link=orgs_link, )
def organization(login): """List organization repositories for activation .. :todo: register organization in repocribro .. :todo: own profile page of organization """ ORG_REPOS_URL = '/orgs/{}/repos?type=member' page = int(flask.request.args.get('page', 0)) gh_api = flask.current_app.container.get( 'gh_api', token=flask.session['github_token'] ) gh_repos = gh_api.get(ORG_REPOS_URL.format(login), page=page) user = flask_login.current_user.github_user active_ids = [repo.github_id for repo in user.repositories] return flask.render_template( 'manage/repos.html', repos=gh_repos.data, actual_page=gh_repos.actual_page, total_pages=gh_repos.total_pages, Repository=Repository, active_ids=active_ids, repos_type=login+' (organization)' )
def github_callback_get_account(db, gh_api): """Processing GitHub callback action :param db: Database for storing GitHub user info :type db: ``flask_sqlalchemy.SQLAlchemy`` :param gh_api: GitHub API client ready for the communication :type gh_api: ``repocribro.github.GitHubAPI`` :return: User account and flag if it's new one :rtype: tuple of ``repocribro.models.UserAccount``, bool """ user_data = gh_api.get('/user').data gh_user = db.session.query(User).filter( User.github_id == user_data['id'] ).first() is_new = False if gh_user is None: user_account = UserAccount() db.session.add(user_account) gh_user = User.create_from_dict(user_data, user_account) db.session.add(gh_user) db.session.commit() is_new = True return gh_user.user_account, is_new
def gh_event_push(db, repo, payload, actor): """Process GitHub PushEvent (with commits) https://developer.github.com/v3/activity/events/types/#pushevent :param db: Database to store push data :type db: ``flask_sqlalchemy.SQLAlchemy`` :param repo: Repository where push belongs to :type repo: ``repocribro.models.Repository`` :param payload: Data about push and commits :type payload: dict :param actor: Actor doing the event :type actor: dict """ push = Push.create_from_dict(payload, actor, repo) db.session.add(push) for commit in push.commits: db.session.add(commit)
def gh_event_release(db, repo, payload, actor): """Process GitHub ReleaseEvent (with commits) https://developer.github.com/v3/activity/events/types/#releaseevent :param db: Database to store push data :type db: ``flask_sqlalchemy.SQLAlchemy`` :param repo: Repository where release belongs to :type repo: ``repocribro.models.Repository`` :param payload: Data about release and action :type payload: dict :param actor: Actor doing the event :type actor: dict """ action = payload['action'] release = Release.create_from_dict(payload['release'], actor, repo) db.session.add(release)
def make_githup_api_factory(cfg): """Simple factory for making the GitHub API client factory :param cfg: Configuration of the application :type cfg: ``configparser.ConfigParser`` :return: GitHub API client factory :rtype: ``function`` """ def github_api_factory(token=None, session=None): return GitHubAPI( cfg.get('github', 'client_id'), cfg.get('github', 'client_secret'), cfg.get('github', 'webhooks_secret'), session=session, token=token ) return github_api_factory
def session(db, request): """Creates a new database session for a test.""" connection = db.engine.connect() transaction = connection.begin() options = dict(bind=connection, binds={}) session = db.create_scoped_session(options=options) db.session = session def teardown(): transaction.rollback() connection.close() session.remove() request.addfinalizer(teardown) return session
def login_required(func): """ Decorator to require login and save URL for redirecting user after login """ @wraps(func) def decorated_function(*args, **kwargs): """decorator args""" if not is_logged_in(): # Save off the page so we can redirect them to what they were # trying to view after logging in. session['previously_requested_page'] = request.url return redirect(url_for('login')) return func(*args, **kwargs) return decorated_function
def results(): respondent = session['respondent'] print("Petitioner = %s" % session['petitioner']) petitioner = session['petitioner'] return render_template('results.html', title='Court Form Sample', petitioner=petitioner, respondent=respondent ) #Error Handling: # http://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-vii-unit-testing
def get_google_authorization_url(): current_user = flask.g.user if current_user.is_authenticated: return google = get_google_auth() auth_url, state = google.authorization_url(Auth.AUTH_URI) flask.session['oauth_state'] = state return auth_url
def is_authenticated(): if "username" in session: return True else: return False
def is_admin(): if not "username" in session: return False if not (session['usergroup'] == 'root' or session['usergroup'] == 'admin'): return False return True
def is_activated(): if not "username" in session: return False if not (session['status']=='normal'): return False return True