我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用aiohttp.web.HTTPFound()。
def oauth2(code): url = 'https://api.weibo.com/oauth2/access_token' payload = { 'client_id': '366603916', 'client_secret': 'b418efbd77094585d0a7f9ccac98a706', 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': 'http://www.qiangtaoli.com' } with ClientSession() as session: async with session.post(url, data=payload) as resp: params = await resp.json() async with session.get('https://api.weibo.com/2/users/show.json', params=params) as resp: info = await resp.json() o = await Oauth.find('weibo-' + info['idstr']) if not o: return 'redirect:/bootstrap/register?oid=weibo-%s&name=%s&image=%s' % (info['idstr'], info['name'], info['avatar_large']) user = await User.find(o.user_id) if not user: return 'oauth user was deleted.' return user.signin(web.HTTPFound('/')) # ????
def post(self): await self.request.post() email = self.request.POST.get('email') password = self.request.POST.get('password') if not all((email, password)): return web.HTTPFound(self.request.app.router['admin_login'].url()) try: user = await settings.manager.get(User, email=email) except User.DoesNotExist: return web.HTTPFound(self.request.app.router['admin_login'].url()) if not all((user.active, user.superuser, await user.check_password(password=password))): return web.HTTPFound(self.request.app.router['admin_login'].url()) session = await get_session(self.request) session['email'] = user.email return web.HTTPFound(self.request.app.router['admin_records'].url())
def http_handler(self, request): if request.path.endswith("api.sock"): return await self.ws_handler(request) if request.path.endswith("/monitor/"): data = pkgutil.get_data("rci.services.monitor", "monitor.html").decode("utf8") return web.Response(text=data, content_type="text/html") if request.path.endswith("/login/github"): if request.method == "POST": url = self.oauth.generate_request_url(("read:org", )) return web.HTTPFound(url) if request.path.endswith("/oauth2/github"): return (await self._oauth2_handler(request)) if request.path.endswith("logout"): if request.method == "POST": sid = request.cookies.get(self.config["cookie_name"]) del(self.sessions[sid]) return web.HTTPFound("/monitor/") return web.HTTPNotFound()
def search(request): query = (await request.post())['q'].strip() if not (1 <= len(query) <= 200): raise HTTPFound('/') if len(query) <= 7: raise HTTPFound('/blk/'+query.lower()) elif len(query) == 64: # assume it's a hash of block or txn raise HTTPFound('/txn/'+query.lower()) elif query[0] in '13mn': # assume it'a payment address raise HTTPFound('/addr/'+query) else: return Response(text="Can't search for that")
def __call__(self, f): """ Applies area initialization logic to a request handling function, for example by loading user session. :param f: the request handler to be decorated. :return: a wrapped request handler that loads user information. """ @wraps(f) async def wrapped(request): # set the area property inside the request object request.area = self.name try: await self.before_request(request) except InvalidCultureException: # redirect to a proper url return HTTPFound(self.get_fallback_url(request)) except InvalidAntiforgeryTokenException: raise HTTPForbidden() return await f(request) return wrapped
def follow_user(self, request): """Adds the current user as follower of the given user.""" username = request.match_info['username'] session = await get_session(request) user_id = session.get('user_id') if not user_id: raise web.HTTPNotAuthorized() whom_id = await db.get_user_id(self.mongo.user, username) if whom_id is None: raise web.HTTPFound() await self.mongo.follower.update( {'who_id': ObjectId(user_id)}, {'$push': {'whom_id': whom_id}}, upsert=True) return redirect(request, 'user_timeline', parts={"username": username})
def unfollow_user(self, request): """Removes the current user as follower of the given user.""" username = request.match_info['username'] session = await get_session(request) user_id = session.get('user_id') if not user_id: raise web.HTTPNotAuthorized() whom_id = await db.get_user_id(self.mongo.user, username) if whom_id is None: raise web.HTTPFound() await self.mongo.follower.update( {'who_id': ObjectId(session['user_id'])}, {'$pull': {'whom_id': whom_id}}) return redirect(request, 'user_timeline', parts={"username": username})
def logs_handler(request): """Return the log file on disk. :param request: a web requeest object. :type request: request | None """ log.info("Request for logs endpoint made.") complete_log_path = STATE_DIR + '/complete.log' json_files = glob.glob(STATE_DIR + '/*.json') complete_log = [] for f in json_files: log.debug('Adding {} to complete log file.'.format(f)) with open(f) as blob: complete_log.append(json.loads(blob.read())) with open(complete_log_path, 'w') as f: f.write(json.dumps(complete_log, indent=4, sort_keys=True)) return web.HTTPFound('/download/log/complete.log'.format(VERSION))
def redirect_on_success(action_name, by_name=None, params=None): def decorator(fn): async def decorated(self, *args, **kwargs): params_data = {} if params: for param in params: if params[param][0] == 'match': params_data[param] = self.request.match_info[params[param][1]] res = await fn(self, *args, **kwargs) if not self.request.is_ajax(): if by_name: return web.HTTPFound(self.router.resolve_named(action_name, params_data)) else: return web.HTTPFound(self.path_for(action_name, None, params_data)) return res return decorated return decorator ## ## Content negotiation ##
def auth_factory(app, handler): @asyncio.coroutine def auth(request): logging.info('check user: %s %s' % (request.method, request.path)) request.__user__ = None # ?????__user__????None cookie_str = request.cookies.get(COOKIE_NAME) # ??cookie?????cookie????COOKIE_NAME??headlers?????? if cookie_str: user = yield from cookie2user(cookie_str) # ??cookie???????? if user: logging.info('set current user: %s' % user.email) request.__user__ = user # ??????????? # ??????????????????????????????? if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin): return web.HTTPFound('/signin') return (yield from handler(request)) return auth # ????????POST?????????
def restart(request): prefix = request.match_info['prefix'] try: repo = getattr(scopes, prefix).name except AttributeError: return web.HTTPNotFound() ref = request.match_info['ref'] ref = Ref(repo, ref, '<sha>') targets = request.GET.get('t', '').split(',') all = request.GET.get('all') request.app.loop.create_task(ci(ref, targets, all)) await asyncio.sleep(2) log_url = '%slatest/%s/' % (conf['log_url'], ref.uid) return web.HTTPFound(log_url)
def authorize(app, handler): async def middleware(request): def check_path(path): result = True for r in ['/login', '/static/', '/signin', '/signout', '/_debugtoolbar/']: if path.startswith(r): result = False return result session = await get_session(request) if session.get("user"): return await handler(request) elif check_path(request.path): url = request.app.router['login'].url() raise web.HTTPFound(url) return handler(request) else: return await handler(request) return middleware
def auth_factory(app, handler): """ ????????, ???????????? :param app: WEB???? :param handler: ?????? :return: ??????? """ async def auth(request): request.__user__ = None # ?COOKIE????????, ?????????? cookie_name = configs.user_cookie.name cookie_str = request.cookies.get(cookie_name) if cookie_str: user = await user_cookie_parse(cookie_str, configs.user_cookie.secret) if user: request.__user__ = user # ??????, ?????????? if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__['admin']): return web.HTTPFound('/') return await handler(request) return auth
def auth_factory(app, handler): ''' middleware(???)???cookie????????? ''' async def auth(request): logging.info('check user: %s %s' % (request.method, request.path)) request.__user__ = None cookie_str = request.cookies.get(COOKIE_NAME) if cookie_str: # ??cookie??????????? user = await cookie2user(cookie_str) if user: logging.info('set current user:%s' % user.email) request.__user__ = user if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin): # HTTPFound: Exception->HTTPRedirection->302 return web.HTTPFound('/signin') return (await handler(request)) return auth
def error_middleware(app, handler): async def middleware_handler(request): try: response = await handler(request) if response.status == 404: return await http_404_response(request) elif response.status == 403: return web.HTTPFound('/auth/login') return response except web.HTTPException as ex: if ex.status == 404: return await http_404_response(request) elif ex.status == 403: return web.HTTPFound('/auth/login') raise return middleware_handler
def get(self): u2f, req, method = False, [], self._get('method', None) if config.admin['u2f'] and method is None: u2f = True identity = config.admin['identity'] users = await self.redis.get('Auth.U2F') or {} try: user = users[identity] users[identity], req = await sign(user) await self.redis.set('Auth.U2F', users, many=False) except KeyError: pass elif method == 'common': u2f = False if await auth(self.request): return web.HTTPFound('/manage') return geass({ 'u2f': u2f, 'request': req }, self.request, 'public/login.html')
def post(self): form = await self.request.post() identity = config.admin['identity'] # account = await self.redis.get('User') response = web.HTTPFound('/manage') if config.admin['u2f'] and '_method' not in form: users = await self.redis.get('Auth.U2F') or {} users[identity], ok = await verify(users[identity], dict(await self.request.post())) if ok: await self.redis.set('Auth.U2F', users, many=False) await remember(self.request, response, identity) return response elif form['_method'] == 'common': method = check_method(form.get('email').lower()) # TODO:???????? if await check_credentials(self.redis, identity, form.get('password')): await remember(self.request, response, identity) return response return web.HTTPFound('/auth/login')
def test_redirect_url(loop, test_client): @asyncio.coroutine def redirector(request): raise web.HTTPFound(location=URL('/redirected')) @asyncio.coroutine def redirected(request): return web.Response() app = web.Application() app.router.add_get('/redirector', redirector) app.router.add_get('/redirected', redirected) client = yield from test_client(app) resp = yield from client.get('/redirector') assert resp.status == 200
def response_factory(app, handler): async def response(request): logging.info('%s response_factory response start next handler %s ' % (request.__uuid__, handler)) r = await handler(request) logging.info('%s response_factory response end ' % (request.__uuid__)) if isinstance(r, str): if r.startswith('redirect:'): return web.HTTPFound(r[9:]) resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, dict): template = r.get('__template__') if template is not None: resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp else: resp = web.Response( body=json.dumps(r, ensure_ascii=False).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp return r return response
def signout(request): logging.info('user sign out') return User.signout(web.HTTPFound(request.headers.get('Referer') or '/')) # ???????????????
def response_factory(app, handler): async def response(request): logging.info('Response handler...') r = await handler(request) if isinstance(r, web.StreamResponse): return r if isinstance(r, bytes): resp = web.Response(body=r) resp.content_type = 'application/octet-stream' return resp if isinstance(r, str): if r.startswith('redirect:'): return web.HTTPFound(r[9:]) resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, dict): template = r.get('__template__') if template is None: resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp else: # ???jinja2???????????? r['__user__'] = request.__user__ resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, int) and 100 <= r < 600: return web.Response(status=r) if isinstance(r, tuple) and len(r) == 2: status, message = r if isinstance(status, int) and 100 <= status < 600: return web.Response(status=status, text=str(message)) # default resp = web.Response(body=str(r).encode('utf-8')) resp.content_type = 'text/plain;charset=utf-8' return resp return response
def get(self): page = int(self.request.match_info.get('page', 1)) try: records = await settings.manager.execute( Record .select() .where(Record.active == True) .order_by(Record.name.asc()) .offset((page - 1) * settings.RECORDS_PER_PAGE) .limit(settings.RECORDS_PER_PAGE + 1)) except (psycopg2.OperationalError, peewee.IntegrityError, peewee.ProgrammingError): records = [] count = len(records) if count == 0 and page != 1: return web.HTTPFound(self.request.app.router['web_records'].url()) next_page = page + 1 if count > settings.RECORDS_PER_PAGE else None prev_page = page - 1 if page != 1 else None return dict(request=self.request, records=records[:settings.RECORDS_PER_PAGE], prev_page=prev_page, page=page, next_page=next_page)
def unauthenticated(func): async def decorated(self, *args, **kwargs): current_user = await self.get_current_user() if current_user: return web.HTTPFound( self.request.app.router['admin_records'].url()) else: return await func(self, *args, **kwargs) return decorated
def authenticated(func): async def decorated(self, *args, **kwargs): current_user = await self.get_current_user() if not current_user: return web.HTTPFound( self.request.app.router['admin_login'].url()) else: return await func(self, *args, **kwargs) return decorated
def get(self): session = await get_session(self.request) del session['email'] return web.HTTPFound(self.request.app.router['admin_login'].url())
def _http_oauth2(self, request): client = await self.oauth.oauth(request.GET["code"], request.GET["state"]) user_data = await client.get("user") session = self.ss.session(request) self.users[str(user_data["id"])] = client.token session.data["token"] = client.token session.data["user"] = user_data["login"] response = web.HTTPFound(self.url + "/settings") return response
def _http_login(self, request): url = self.oauth.generate_request_url(("read:org", )) return web.HTTPFound(url)
def _http_registraion(self, request): url = self.oauth.generate_request_url( ("repo:status", "write:repo_hook", "admin:org_hook", "read:org")) return web.HTTPFound(url)
def _oauth2_handler(self, request): client = await self.oauth.oauth(request.GET["code"], request.GET["state"]) user_data = await client.get("user") login = user_data["login"].encode("utf8") self.tokens[login] = client.token sid = base64.b32encode(os.urandom(15)).decode('ascii') self.sessions[sid] = login resp = web.HTTPFound("/monitor/") resp.set_cookie(self.config["cookie_name"], sid) return resp
def run_active_monitor_view(request: web.Request) -> web.Response: """GET view to run an active monitor immediately.""" monitor_id = int(request.match_info['id']) am_manager = request.app['active_monitor_manager'] monitor = am_manager.monitors[monitor_id] monitor.schedule_immediately() return web.HTTPFound('/active_monitor/%s/?notification_msg=Monitor job scheduled' % monitor_id)
def send_active_monitor_test_notification(request: web.Request) -> web.Response: """GET view to send a test notification for an active monitor.""" monitor_id = int(request.match_info['id']) am_manager = request.app['active_monitor_manager'] monitor = am_manager.monitors[monitor_id] monitor.schedule_immediately() await monitor.notify_state_change('UNKNOWN', abs(monitor.state_ts - (time.time() - monitor.state_ts))) return web.HTTPFound('/active_monitor/%s/?notification_msg=Notification sent' % monitor_id)
def signout(request): referer = request.headers.get('Referer') r = web.HTTPFound(referer or '/') r.set_cookie(COOKIE_NAME,'-delete-',max_age=0,httponly=True) logging.info('user signed out') return r # ??
def auth_factory(app,handler): async def auth(request): request.__user__ = None cookie_str = request.cookies.get(COOKIE_NAME) if cookie_str: user = await cookie2user(cookie_str) if user: logging.info('set current user :%s' % user.email) request.__user__ = user if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin): return web.HTTPFound('/signin') return await handler(request) return auth # ??????
def response_factory(app,handler): async def response(request): logging.info('Response handler...') r = await handler(request) if isinstance(r,web.StreamResponse): return r if isinstance(r,bytes): resp = web.Response(body=r) resp.content_type = 'application/octet-stream' return resp if isinstance(r,str): if r.startswith('redirect'): return web.HTTPFound(r[9:]) resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r,dict): template = r.get('__template__') if template is None: resp = web.Response(body=json.dumps(r,ensure_ascii=False,default=lambda o:o.__dict__).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp else: resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r,int) and r>=100 and r<600: return web.Response(r) if isinstance(r,tuple) and len(r) == 2: t,m=r if isinstance(t,int) and t>=100 and t<600: return web.Response(t,str(m)) # default resp = web.Response(body=str(r).encode('utf-8')) resp.content_type = 'text/plain;charset=utf-8' return resp return response #?????
def signout(request): referer = request.headers.get('Referer') r = web.HTTPFound(referer or '/') r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True) logging.info('User signed out.') return r
def auth_factory(app, handler): @asyncio.coroutine def auth(request): logging.info('check user: {} {}'.format(request.method, request.path)) request.__user__ = None cookie_str = request.cookies.get(COOKIE_NAME) if cookie_str: user = yield from cookie2user(cookie_str) if user: logging.info('set current user: {}'.format(user.email)) request.__user__ = user if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin): return web.HTTPFound('/signin') return (yield from handler(request)) return auth
def response_factory(app, handler): @asyncio.coroutine def response(request): logging.info('Response handler...') r = yield from handler(request) if isinstance(r, web.StreamResponse): return r if isinstance(r, bytes): res = web.Response(body = r) res.content_type = 'application/octet-stream' return res if isinstance(r, str): if r.startswith('redirect:'): return web.HTTPFound(r[9:]) res = web.Response(body = r.encode('utf-8')) res.content_type = 'text/html; charset=utf-8' return res if isinstance(r, dict): template = r.get('__template__') if template is None: res = web.Response(body = json.dumps(r, ensure_ascii = False, default = lambda o: o.__dict__).encode('utf-8')) res.content_type = 'application/json;charset=utf-8' return res else: r['__user__'] = request.__user__ res = web.Response(body = app['__templating__'].get_template(template).render(**r).encode('utf-8')) res.content_type = 'text/html;charset=utf-8' return res if isinstance(r, int) and r >= 100 and r < 600: return web.Response(r) if isinstance(r, tuple) and len(r) == 2: t, m = r if isinstance(t, int) and t >= 100 and t < 600: return web.Response(t, str(m)) #default: res = web.Request(body = str(r).encode('utf-8')) res.content_type = 'text/plain;charset=utf-8' return res return response
def logout_handler(request): """Handles logging out Gets the logout route, deletes the session data, then redirects the user to the logout route. """ session = await get_session(request) on_logout = request.app[APP_KEY]['ON_LOGOUT'] # Delete the session key and redirect to the logout url del session[SESSION_KEY] return web.HTTPFound(on_logout)
def contribute_redirect(request): return web.HTTPFound('/v1/contribute.json')
def redirect(request): return web.HTTPFound('/v1/')
def handle_404(request, response): if 'json' not in response.headers['Content-Type']: if request.path.endswith('/'): return web.HTTPFound(request.path.rstrip('/')) return web.json_response({ "status": 404, "message": "Page '{}' not found".format(request.path) }, status=404) return response
def redirect(urlname, *args, **kwargs): return HTTPFound(url_for(urlname, *args, **kwargs))
def index_with_culture(request): """ Redirects to the index page with culture information. """ culture = request.match_info.get("culture") if not culture: culture = get_best_culture(request, area="public") return web.HTTPFound("/" + culture + "/")
def auth_factory(app, handler): async def auth(request): logging.info('check user: %s %s' % (request.method, request.path)) request.__user__ = None cookie_str = request.cookies.get(COOKIE_NAME) if cookie_str: user = await User.find_by_cookie(cookie_str) if user: logging.info('set current user: %s' % user.email) request.__user__ = user if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin): return web.HTTPFound('/signin') return (await handler(request)) return auth
def response_factory(app, handler): async def response(request): logging.info('Response handler...') r = await handler(request) if isinstance(r, web.StreamResponse): return r elif isinstance(r, bytes): resp = web.Response(body=r) resp.content_type = 'application/octet-stream' return resp elif isinstance(r, str): if r.startswith('redirect:'): return web.HTTPFound(r[9:]) resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp elif isinstance(r, dict): template = r.get('__template__') if template is None: resp = web.Response( body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp else: # ???jinja2???????????? r['__user__'] = request.__user__ resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp elif isinstance(r, int) and 100 <= r < 600: return web.Response(status=r) elif isinstance(r, tuple) and len(r) == 2: status, message = r if isinstance(status, int) and 100 <= status < 600: return web.Response(status=status, text=str(message)) else: resp = web.Response(body=str(r).encode('utf-8')) resp.content_type = 'text/plain;charset=utf-8' return resp return response
def signout(request): referer = request.headers.get('Referer') r = web.HTTPFound(referer or '/') # ???cookie??????? r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True) logging.info('user signed out') return r # ???????????????
def vote(self, request): question_id = int(request.match_info['question_id']) data = await request.post() try: choice_id = int(data['choice']) except (KeyError, TypeError, ValueError) as e: raise web.HTTPBadRequest( text='You have not specified choice value') from e try: await db.vote(self.postgres, question_id, choice_id) except db.RecordNotFound as e: raise web.HTTPNotFound(text=str(e)) router = request.app.router url = router['results'].url(parts={'question_id': question_id}) return web.HTTPFound(location=url)
def timeline(self, request): session = await get_session(request) user_id = session.get('user_id') if user_id is None: router = request.app.router location = router['public_timeline'].url() raise web.HTTPFound(location=location) user = await self.mongo.user.find_one({'_id': ObjectId(user_id)}) query = {'who_id': ObjectId(user_id)} filter = {'whom_id': 1} followed = await self.mongo.follower.find_one(query, filter) if followed is None: followed = {'whom_id': []} query = {'$or': [{'author_id': ObjectId(user_id)}, {'author_id': {'$in': followed['whom_id']}}]} messages = await (self.mongo.message .find(query) .sort('pub_date', -1) .to_list(30)) endpoint = request.match_info.route.name return {"messages": messages, "user": user, "endpoint": endpoint}
def redirect(request, name, **kw): router = request.app.router location = router[name].url(**kw) return web.HTTPFound(location=location)