我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用tornado.auth()。
def oauth_request_parameters(consumer_token, url, access_token, parameters={}, method="GET", oauth_version="1.0a", override_version=""): base_args = dict( oauth_consumer_key=consumer_token["key"], oauth_token=access_token["key"], oauth_signature_method="HMAC-SHA1", oauth_timestamp=str(int(time.time())), oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes), oauth_version=override_version or oauth_version, ) args = base_args.copy() args.update(parameters) if oauth_version == "1.0a": signature = tornado.auth._oauth10a_signature(consumer_token, method, url, args, access_token) else: signature = tornado.auth._oauth_signature(consumer_token, method, url, args, access_token) base_args["oauth_signature"] = signature return base_args
def user_login(self, user): """ This is an override of BaseAuthHandler since anonymous auth is special. Generates a unique session ID for this user and saves it in a browser cookie. This is to ensure that anonymous users can't access each other's sessions. """ logging.debug("NullAuthHandler.user_login(%s)" % user['upn']) # Make a directory to store this user's settings/files/logs/etc user_dir = os.path.join(self.settings['user_dir'], user['upn']) if not os.path.exists(user_dir): logging.info(_("Creating user directory: %s" % user_dir)) mkdir_p(user_dir) os.chmod(user_dir, 0o700) session_info = { 'session': generate_session_id() } session_info.update(user) #print session_info self.set_secure_cookie( "gateone_user", tornado.escape.json_encode(session_info))
def get(self): """ Deletes the 'gateone_user' cookie and handles some other situations for backwards compatibility. """ # Get rid of the cookie no matter what (API auth doesn't use cookies) user = self.current_user self.clear_cookie('gateone_user') check = self.get_argument("check", None) if check: # This lets any origin check if the user has been authenticated # (necessary to prevent "not allowed ..." XHR errors) self.set_header('Access-Control-Allow-Origin', '*') logout = self.get_argument("logout", None) if logout: self.user_logout(user['upn']) return logging.debug('APIAuthHandler: user is NOT authenticated') self.write('unauthenticated') self.finish()
def __init__(self): handlers = [ (r"/", MainHandler), (r"/auth/login", AuthLoginHandler), (r"/auth/logout", AuthLogoutHandler), ] settings = dict( cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", login_url="/auth/login", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=True, facebook_api_key=options.facebook_api_key, facebook_secret=options.facebook_secret, ui_modules={"Post": PostModule}, debug=True, autoescape=None, ) tornado.web.Application.__init__(self, handlers, **settings)
def main(): parse_command_line() app = tornado.web.Application( [ (r"/", MainHandler), (r"/auth/login", AuthLoginHandler), (r"/auth/logout", AuthLogoutHandler), (r"/a/message/new", MessageNewHandler), (r"/a/message/updates", MessageUpdatesHandler), ], cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", login_url="/auth/login", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=True, ) app.listen(options.port) tornado.ioloop.IOLoop.instance().start()
def _on_auth(self, user): if not user: raise tornado.web.HTTPError(500, "Google auth failed") author = self.db.get("SELECT * FROM authors WHERE email = %s", user["email"]) if not author: # Auto-create first author any_author = self.db.get("SELECT * FROM authors LIMIT 1") if not any_author: author_id = self.db.execute( "INSERT INTO authors (email,name) VALUES (%s,%s)", user["email"], user["name"]) else: self.redirect("/") return else: author_id = author["id"] self.set_secure_cookie("blogdemo_user", str(author_id)) self.redirect(self.get_argument("next", "/"))
def get(self): if self.get_argument("oauth_token", None): user = yield self.get_authenticated_user() if not user: return tornado.web.HTTPError(500, "Twitter auth failed") reversi_user = User.get_or_create(twitter_id=user['id'])[0] reversi_user.name = user['name'] reversi_user.save() self.save_current_user(reversi_user) self.redirect(self.get_argument("next", "/")) else: yield self.authorize_redirect(callback_uri=self.request.full_url()) self.authenticate_redirect()
def get(self): if self.get_argument('code', False): user = yield self.get_authenticated_user( redirect_uri='http://your.site.com/auth/google', code=self.get_argument('code')) # Save the user with e.g. set_secure_cookie else: yield self.authorize_redirect( redirect_uri='http://your.site.com/auth/google', client_id=self.settings['google_oauth']['key'], scope=['profile', 'email'], response_type='code', extra_params={'approval_prompt': 'auto'})
def _on_auth(self, user): if not user: raise tornado.web.HTTPError(500, _("Kerberos auth failed")) logging.debug(_("KerberosAuthHandler user: %s" % user)) user = {'upn': user} # This takes care of the user's settings dir and their session info self.user_login(user) # TODO: Add some LDAP or local DB lookups here to add more detail to user objects next_url = self.get_argument("next", None) if next_url: self.redirect(next_url) else: self.redirect(self.settings['url_prefix'])
def __init__(self): handlers = [ (r"/", MainHandler), (r"/auth/login", AuthHandler), (r"/auth/logout", LogoutHandler), ] settings = dict( cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", login_url="/auth/login", ) tornado.web.Application.__init__(self, handlers, **settings)
def get(self): name = tornado.escape.xhtml_escape(self.current_user["name"]) self.write("Hello, " + name) self.write("<br><br><a href=\"/auth/logout\">Log out</a>")
def _on_stream(self, stream): if stream is None: # Session may have expired self.redirect("/auth/login") return self.render("stream.html", stream=stream)
def get(self): my_url = (self.request.protocol + "://" + self.request.host + "/auth/login?next=" + tornado.escape.url_escape(self.get_argument("next", "/"))) if self.get_argument("code", False): self.get_authenticated_user( redirect_uri=my_url, client_id=self.settings["facebook_api_key"], client_secret=self.settings["facebook_secret"], code=self.get_argument("code"), callback=self._on_auth) return self.authorize_redirect(redirect_uri=my_url, client_id=self.settings["facebook_api_key"], extra_params={"scope": "read_stream"})
def __init__(self): handlers = [ (r"/", HomeHandler), (r"/archive", ArchiveHandler), (r"/feed", FeedHandler), (r"/entry/([^/]+)", EntryHandler), (r"/compose", ComposeHandler), (r"/auth/login", AuthLoginHandler), (r"/auth/logout", AuthLogoutHandler), ] settings = dict( blog_title=u"Tornado Blog", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), ui_modules={"Entry": EntryModule}, xsrf_cookies=True, cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", login_url="/auth/login", debug=True, ) tornado.web.Application.__init__(self, handlers, **settings) # Have one global connection to the blog DB across all handlers self.db = torndb.Connection( host=options.mysql_host, database=options.mysql_database, user=options.mysql_user, password=options.mysql_password)
def _on_auth(self, user): if not user: raise tornado.web.HTTPError(500, "Facebook auth failed") self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user)) self.redirect(self.get_argument("next", "/"))
def _on_auth(self, user): if not user: raise tornado.web.HTTPError(500, "Twitter auth failed") #is there an existing external account? current_user = self.get_current_user() authenticated_user = User.get("id=%s", current_user['id']) existing = Externalservice.by_user(authenticated_user, Externalservice.TWITTER) if existing: existing.service_id = user['access_token']['user_id'] existing.service_secret = user['access_token']['secret'] existing.service_key = user['access_token']['key'] existing.screen_name = user['access_token']['screen_name'] existing.save() else: external_service = Externalservice( user_id=authenticated_user.id, service_id=user['access_token']['user_id'], screen_name=user['access_token']['screen_name'], type=Externalservice.TWITTER, service_key=user['access_token']['key'], service_secret=user['access_token']['secret']) external_service.save() # if not, insert credentials for this user # if there is, update that account return self.render("tools/twitter-connected.html")
def get(self): my_url = (self.request.protocol + "://" + self.request.host + "/auth/login?next=" + tornado.escape.url_escape(self.get_argument("next", "/"))) if self.get_argument("code", False): self.get_authenticated_user( redirect_uri=my_url, client_id=self.settings["facebook_api_key"], client_secret=self.settings["facebook_secret"], code=self.get_argument("code"), callback=self._on_auth) return self.authorize_redirect(redirect_uri=my_url, client_id=self.settings["facebook_api_key"], extra_params={"scope": "user_posts"})
def get(self): if self.get_argument("code", False): user = yield self.get_authenticated_user( redirect_uri=settings.get('redirect_uri'), client_id=settings.get('fb_app_id'), client_secret=settings.get('fb_app_secret'), code=self.get_argument("code")) self.set_secure_cookie('user', json.dumps(user)) self.redirect('/auth/admin/event/') else: yield self.authorize_redirect( redirect_uri=settings.get('redirect_uri'), client_id=settings.get('fb_app_id'), extra_params={"scope": settings.get('FB_GRANTS')})
def login(self, username, token, next="/"): generateToken = False if token == Options['auth_key']: # Auth_key token option for testing local-only proxy generateToken = True elif Options['no_auth'] and Options['debug'] and not Options['gsheet_url']: # No authentication option for testing local-only proxy generateToken = True role = '' if username == sdproxy.ADMIN_ROLE: role = sdproxy.ADMIN_ROLE elif username == sdproxy.GRADER_ROLE: role = sdproxy.GRADER_ROLE if generateToken: token = gen_proxy_auth_token(username, role=role) data = {} comps = token.split(':') if not role and (self.is_web_view() or len(comps) > 1): if len(comps) != 3: self.redirect('/_auth/login/' + '?error=' + tornado.escape.url_escape('Invalid locked access token. Expecting site:session:code')) return siteName, sessionName, _ = comps if not sessionName: # Locked site access next = '/' + siteName else: # Locked session access next = getSessionPath(sessionName) if siteName: # Add site prefix separately because this is root site next = '/' + siteName + next data['locked_access'] = next auth = self.check_locked(username, token, siteName, sessionName) else: auth = self.check_access(username, token, role=role) if auth: if Global.twitter_params: data['site_twitter'] = Global.twitter_params['site_twitter'] self.set_id(username, data=data, role=role) self.redirect(next) else: error_msg = "?error=" + tornado.escape.url_escape("Incorrect username or token") self.redirect("/_auth/login/" + error_msg)
def get(self): """ Sets the 'user' cookie with an appropriate *upn* and *session* and any other values that might be attached to the user object given to us by Google. """ self.base_url = "{protocol}://{host}:{port}{url_prefix}".format( protocol=self.request.protocol, host=self.request.host, port=self.settings['port'], url_prefix=self.settings['url_prefix']) uri_port = ':{0}/'.format(self.settings['port']) if uri_port in self.base_url: # Get rid of the port (will be added automatically) self.base_url = self.base_url.replace(uri_port, '/', 1) redirect_uri = "{base_url}auth".format(base_url=self.base_url) check = self.get_argument("check", None) if check: self.set_header('Access-Control-Allow-Origin', '*') user = self.get_current_user() if user: logging.debug('GoogleAuthHandler: user is authenticated') self.write('authenticated') else: logging.debug('GoogleAuthHandler: user is NOT authenticated') self.write('unauthenticated') self.finish() return logout_url = "https://accounts.google.com/Logout" logout = self.get_argument("logout", None) if logout: user = self.get_current_user()['upn'] self.clear_cookie('gateone_user') self.user_logout(user, logout_url) return if self.get_argument('code', False): user = yield self.get_authenticated_user( redirect_uri=redirect_uri, code=self.get_argument('code')) if not user: self.clear_all_cookies() raise tornado.web.HTTPError(500, 'Google auth failed') access_token = str(user['access_token']) http_client = self.get_auth_http_client() response = yield http_client.fetch( 'https://www.googleapis.com/oauth2/v1/userinfo?access_token=' +access_token) if not response: self.clear_all_cookies() raise tornado.web.HTTPError(500, 'Google auth failed') user = json.loads(response.body.decode('utf-8')) self._on_auth(user) else: yield self.authorize_redirect( redirect_uri=redirect_uri, client_id=self.settings['google_oauth']['key'], scope=['email'], response_type='code', extra_params={'approval_prompt': 'auto'})