我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用aiohttp.web.HTTPForbidden()。
def signature_middleware(app, handler): """Check request signature""" async def middleware_handler(request: Request): if request.method == 'POST' and app.bot.check_signature: sig = request.query.get('sig') body = await request.read() if verify_signature(body, sig, app.bot.auth_token): response = await handler(request) else: logger.warning('Post requests with bad signature {sig} {body}'.format( sig=sig, body=body )) response = web.HTTPForbidden() else: response = await handler(request) return response return middleware_handler
def adapter_do_OPTIONS(self, request): origin = request.headers["Origin"] allowed_origins = self._bot.config.get_option("api_origins") if allowed_origins is None: raise web.HTTPForbidden() if "*" == allowed_origins or "*" in allowed_origins: return web.Response(headers={ "Access-Control-Allow-Origin": origin, "Access-Control-Allow-Headers": "content-type", }) if not origin in allowed_origins: raise web.HTTPForbidden() return web.Response(headers={ "Access-Control-Allow-Origin": origin, "Access-Control-Allow-Headers": "content-type", "Vary": "Origin", })
def __call__(self, f): """ Applies area initialization logic to a request handling function, for example by loading user session. :param f: the request handler to be decorated. :return: a wrapped request handler that loads user information. """ @wraps(f) async def wrapped(request): # set the area property inside the request object request.area = self.name try: await self.before_request(request) except InvalidCultureException: # redirect to a proper url return HTTPFound(self.get_fallback_url(request)) except InvalidAntiforgeryTokenException: raise HTTPForbidden() return await f(request) return wrapped
def pre_dispatch(request, controller, actionName): reason = None check_ok = True if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'): action = getattr(controller, actionName) if not getattr(action, 'csrf_disabled', False): check_ok = False token = request.headers.get(CSRF_HEADER_NAME) if not token: data = await request.post() token = data.get(CSRF_FIELD_NAME) if token: if validate_token(token, await get_secret(request)): check_ok = True else: reason = REASON_BAD_TOKEN else: reason = REASON_NO_CSRF_COOKIE if not check_ok: raise web.HTTPForbidden(reason=reason)
def call(self, request): try: event_data = (await request.post())['mandrill_events'] except KeyError: raise HTTPBadRequest(text='"mandrill_events" not found in post data') sig_generated = base64.b64encode( hmac.new( self.app['webhook_auth_key'], msg=(self.app['mandrill_webhook_url'] + 'mandrill_events' + event_data).encode(), digestmod=hashlib.sha1 ).digest() ) sig_given = request.headers.get('X-Mandrill-Signature', '<missing>').encode() if not hmac.compare_digest(sig_generated, sig_given): raise HTTPForbidden(text='invalid signature') try: events = ujson.loads(event_data) except ValueError as e: raise HTTPBadRequest(text=f'invalid json data: {e}') await self.sender.update_mandrill_webhooks(events) return Response(text='message status updated\n')
def messagebird_pricing(request): if not request.query.get('username') == 'mb-username': raise HTTPForbidden(text='bad username') if not request.query.get('password') == 'mb-password': raise HTTPForbidden(text='bad password') return json_response([ { 'mcc': '0', 'country_name': 'Default rate', 'rate': '0.0400', }, { 'mcc': '234', 'country_name': 'United Kingdom', 'rate': '0.0200', }, ])
def error(request): raise HTTPForbidden(reason="unauthorized")
def handle_auth(token): """ handles retrieving a user from a token """ if token is None: raise HTTPForbidden(body=b"no token in request") user = authentication_method(token) if user is None: raise HTTPForbidden(body=b"invalid token") return user
def cli(loop, test_client): async def error403(request): raise web.HTTPForbidden() async def error404(request): return web.HTTPNotFound() async def error(request): raise ValueError() app = get_app(loop=loop) app.router.add_get('/error', error) app.router.add_get('/error-403', error403) app.router.add_get('/error-404', error404) return loop.run_until_complete(test_client(app))
def auth_required(func): """Utility decorator that checks if a user has been authenticated for this request. Allows views to be decorated like: @auth_required def view_func(request): pass providing a simple means to ensure that whoever is calling the function has the correct authentication details. Args: func: Function object being decorated and raises HTTPForbidden if not Returns: A function object that will raise web.HTTPForbidden() if the passed request does not have the correct permissions to access the view. """ @wraps(func) async def wrapper(*args): if (await get_auth(args[-1])) is None: raise web.HTTPForbidden() return await func(*args) return wrapper
def acl_required(permission, context): """Returns a decorator that checks if a user has the requested permission from the passed acl context. This function constructs a decorator that can be used to check a aiohttp's view for authorization before calling it. It uses the get_permission() function to check the request against the passed permission and context. If the user does not have the correct permission to run this function, it raises HTTPForbidden. Args: permission: The specific permission requested. context: Either a sequence of ACL tuples, or a callable that returns a sequence of ACL tuples. For more information on ACL tuples, see get_permission() Returns: A decorator which will check the request passed has the permission for the given context. The decorator will raise HTTPForbidden if the user does not have the correct permissions to access the view. """ def decorator(func): @wraps(func) async def wrapper(*args): request = args[-1] if callable(context): context = context() if await get_permitted(request, permission, context): return await func(*args) raise web.HTTPForbidden() return wrapper return decorator
def admin_required(handler): @wraps(handler) async def decorator(*args): request = _get_request(args) response = await login_required(handler)(request) if request['user']['email'] not in cfg.ADMIN_EMAILS: raise HTTPForbidden(reason='You are not admin') return response return decorator
def shutdown(request, response): config = Config.instance() if config.get_section_config("Server").getboolean("local", False) is False: raise HTTPForbidden(text="You can only stop a local server") log.info("Start shutting down the server") # close all the projects first controller = Controller.instance() projects = controller.projects.values() tasks = [] for project in projects: tasks.append(asyncio.async(project.close())) if tasks: done, _ = yield from asyncio.wait(tasks) for future in done: try: future.result() except Exception as e: log.error("Could not close project {}".format(e), exc_info=1) continue # then shutdown the server itself from gns3server.web.web_server import WebServer server = WebServer.instance() asyncio.async(server.shutdown_server()) response.set_status(201)
def debug(request, response): config = Config.instance() if config.get_section_config("Server").getboolean("local", False) is False: raise HTTPForbidden(text="You can only debug a local server") debug_dir = os.path.join(config.config_dir, "debug") try: if os.path.exists(debug_dir): shutil.rmtree(debug_dir) os.makedirs(debug_dir) with open(os.path.join(debug_dir, "controller.txt"), "w+") as f: f.write(ServerHandler._getDebugData()) except Exception as e: # If something is wrong we log the info to the log and we hope the log will be include correctly to the debug export log.error("Could not export debug informations {}".format(e), exc_info=1) try: if Controller.instance().gns3vm.engine == "vmware": vmx_path = Controller.instance().gns3vm.current_engine().vmx_path if vmx_path: shutil.copy(vmx_path, os.path.join(debug_dir, os.path.basename(vmx_path))) except OSError as e: # If something is wrong we log the info to the log and we hope the log will be include correctly to the debug export log.error("Could not copy VMware VMX file {}".format(e), exc_info=1) for compute in list(Controller.instance().computes.values()): try: r = yield from compute.get("/debug", raw=True) data = r.body.decode("utf-8") except Exception as e: data = str(e) with open(os.path.join(debug_dir, "compute_{}.txt".format(compute.name)), "w+") as f: f.write("Compute ID: {}\n".format(compute.id)) f.write(data) response.set_status(201)
def static_request_handler(cls: Any, obj: Any, context: Dict, func: Any, path: str, base_url: str) -> Any: if '?P<filename>' not in base_url: pattern = r'^{}(?P<filename>.+?)$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url))) else: pattern = r'^{}$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url))) compiled_pattern = re.compile(pattern) if path.startswith('/'): path = os.path.dirname(path) else: path = '{}/{}'.format(os.path.dirname(context.get('context', {}).get('_service_file_path')), path) if not path.endswith('/'): path = '{}/'.format(path) async def handler(request: web.Request) -> web.Response: result = compiled_pattern.match(request.path) filename = result.groupdict()['filename'] filepath = '{}{}'.format(path, filename) try: if os.path.isdir(filepath) or not os.path.exists(filepath): raise web.HTTPNotFound() pathlib.Path(filepath).open('r') return FileResponse(filepath) except PermissionError as e: raise web.HTTPForbidden() context['_http_routes'] = context.get('_http_routes', []) context['_http_routes'].append(('GET', pattern, handler)) start_func = cls.start_server(obj, context) return (await start_func) if start_func else None
def get(self, **kw): session = await get_session(self.request) if session.get('user'): del session['user'] redirect(self.request, 'login') else: raise web.HTTPForbidden(body=b'Forbidden')
def login(request): session = await get_session(request) client = GoogleClient( client_id=os.getenv('OAUTH_CLIENT_ID'), client_secret=os.getenv('OAUTH_CLIENT_SECRET'), scope='email profile', ) # FIXME not picking up https client.params['redirect_uri'] = '{}://{}{}'.format(request.scheme, request.host, request.path) if client.shared_key not in request.GET: # 'code' not in request.GET return web.HTTPFound(client.get_authorize_url()) access_token, __ = await client.get_access_token(request.GET) user, info = await client.user_info() # TODO store in session storage if user.email in args.admins: session['is_authed'] = True session['user'] = { 'name': info['displayName'], 'email': user.email, 'avatar': user.picture, } else: return web.HTTPForbidden() return web.HTTPFound('/')
def require(func): @functools.wraps(func) async def wrapper(*args): if isinstance(args[-1], AbstractView): request = args[-1].request else: request = args[-1] has_perm = await permits(request, 'Administrator') if not has_perm: raise web.HTTPForbidden() if isinstance(args, AbstractView): return await func(*args) return await func(*args) return wrapper
def test_100_continue_custom_response(loop, test_client): @asyncio.coroutine def handler(request): data = yield from request.post() assert b'123', data['name'] return web.Response() @asyncio.coroutine def expect_handler(request): if request.version == HttpVersion11: if auth_err: return web.HTTPForbidden() request.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n") form = FormData() form.add_field('name', b'123', content_transfer_encoding='base64') app = web.Application() app.router.add_post('/', handler, expect_handler=expect_handler) client = yield from test_client(app) auth_err = False resp = yield from client.post('/', data=form, expect100=True) assert 200 == resp.status auth_err = True resp = yield from client.post('/', data=form, expect100=True) assert 403 == resp.status
def oauth_callback(request): logger = request.app.logger bot = request.app.bot config_data = config['evernote']['basic_access'] params = parse_qs(request.query_string) callback_key = params.get('key', [''])[0] session_key = params.get('session_key')[0] try: session = StartSession.get({'oauth_data.callback_key': callback_key}) except ModelNotFound as e: logger.error(e, exc_info=1) return web.HTTPForbidden() if not params.get('oauth_verifier'): logger.info('User declined access. No access token =(') bot.send_message(session.data['chat_id'], 'We are sorry, but you declined authorization ??') return web.HTTPFound(bot.url) if session.key != session_key: text = 'Session is expired. \ Please, send /start command to create new session' bot.send_message(session.data['chat_id'], text) return web.HTTPFound(bot.url) user_data = session.data['user'] name = '{0} {1}'.format(user_data['first_name'], user_data['last_name']) user = User(id=session.id, name=name, username=user_data['username'], telegram_chat_id=session.data['chat_id'], mode='multiple_notes', places={}, settings={'evernote_access': 'basic'}) try: future = asyncio.ensure_future( bot.evernote.get_access_token(config_data, session.oauth_data, params['oauth_verifier'][0]) ) future.add_done_callback( functools.partial(set_access_token, bot, user) ) except TokenRequestDenied as e: logger.error(e, exc_info=1) text = 'We are sorry, but we have some problems with Evernote \ connection. Please try again later' bot.send_message(user.telegram_chat_id, text) except Exception as e: logger.fatal(e, exc_info=1) bot.send_message(user.telegram_chat_id, 'Oops. Unknown error') text = 'Evernote account is connected.\n\ From now you can just send message and note be created.' bot.send_message(user.telegram_chat_id, text) user.save() return web.HTTPFound(bot.url)
def oauth_callback_full_access(request): logger = request.app.logger bot = request.app.bot params = parse_qs(request.query_string) callback_key = params.get('key', [''])[0] session_key = params.get('session_key')[0] try: session = StartSession.get({'oauth_data.callback_key': callback_key}) user = User.get({'id': session.id}) except ModelNotFound as e: logger.error(e, exc_info=1) return web.HTTPForbidden() if not params.get('oauth_verifier'): logger.info('User declined full access =(') bot.send_message( user.telegram_chat_id, 'We are sorry, but you deny read/update access??', {'hide_keyboard': True} ) return web.HTTPFound(bot.url) if session.key != session_key: text = 'Session is expired. Please, send /start command to create \ new session' bot.send_message(user.telegram_chat_id, text) return web.HTTPFound(bot.url) try: oauth_verifier = params['oauth_verifier'][0] config_data = config['evernote']['full_access'] future = asyncio.ensure_future( bot.evernote.get_access_token(config_data, session.oauth_data, oauth_verifier) ) future.add_done_callback( functools.partial(switch_to_one_note_mode, bot, user.id) ) except TokenRequestDenied as e: logger.error(e, exc_info=1) bot.send_message( user.telegram_chat_id, 'We are sorry, but we have some problems with Evernote connection.\ Please try again later', {'hide_keyboard': True} ) except Exception as e: logger.fatal(e, exc_info=1) bot.send_message(user.telegram_chat_id, 'Oops. Unknown error', {'hide_keyboard': True}) text = 'From now this bot in "One note" mode' bot.send_message(user.telegram_chat_id, text, {'hide_keyboard': True}) return web.HTTPFound(bot.url)
def auth_middleware(app, handler): """ Login via Github """ def gh_client(**kw): return GithubClient(conf['github_id'], conf['github_secret'], **kw) async def callback(request): session = await get_session(request) log.debug('callback: session=%s GET=%s', session, request.GET) if session.get('github_state') != request.GET.get('state'): return web.HTTPBadRequest() code = request.GET.get('code') if not code: return web.HTTPBadRequest() gh = gh_client() token, _ = await gh.get_access_token(code) gh = gh_client(access_token=token) req = await gh.request('GET', 'user') user = await req.json() req.close() users = [] for org in conf['github_orgs']: _, resp = await gh_api('orgs/%s/members?per_page=100' % org) users.extend(u['login'] for u in resp) log.debug('members %s: %s', len(users), users) if user.get('login') in users: session['login'] = user.get('login') session.pop('github_state', None) session.pop('github_url', None) location = session.pop('location') return web.HTTPFound(location) return web.HTTPForbidden() async def check_auth(request): session = await get_session(request) login = session.get('login') if login: request['login'] = login return await handler(request) elif 'github_state' not in session: gh = gh_client() state = str(uuid.uuid4()) url = gh.get_authorize_url(scope='', state=state) session['github_state'] = state session['github_url'] = url session['location'] = request.path log.debug('check_auth: %s', session) return web.HTTPFound(conf['url_prefix'] + '/login') async def inner(request): if request.path == (conf['url_prefix'] + conf['github_callback']): return await callback(request) elif request.path == (conf['url_prefix'] + '/hook'): return await handler(request) elif request.path == (conf['url_prefix'] + '/login'): return await handler(request) else: return await check_auth(request) return inner