我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用aiohttp.web.Response()。
def create_handler(transmute_func, context): @wraps(transmute_func.raw_func) async def handler(request): exc, result = None, None try: args, kwargs = await extract_params(request, context, transmute_func) result = await transmute_func.raw_func(*args, **kwargs) except HTTPException as hpe: code = hpe.status_code or 400 exc = APIException(code=code, message=str(hpe)) except Exception as e: exc = e response = transmute_func.process_result( context, result, exc, request.content_type ) return web.Response( body=response["body"], status=response["code"], content_type=response["content-type"], headers=response["headers"] ) handler.transmute_func = transmute_func return handler
def add_swagger_api_route(app, target_route, swagger_json_route): """ mount a swagger statics page. app: the aiohttp app object target_route: the path to mount the statics page. swagger_json_route: the path where the swagger json definitions is expected to be. """ static_root = get_swagger_static_root() swagger_body = generate_swagger_html( STATIC_ROOT, swagger_json_route ).encode("utf-8") async def swagger_ui(request): return web.Response(body=swagger_body, content_type="text/html") app.router.add_route("GET", target_route, swagger_ui) app.router.add_static(STATIC_ROOT, static_root)
def create_swagger_json_handler(app, **kwargs): """ Create a handler that returns the swagger definition for an application. This method assumes the application is using the TransmuteUrlDispatcher as the router. """ spec = get_swagger_spec(app).swagger_definition(**kwargs) encoded_spec = json.dumps(spec).encode("UTF-8") async def swagger(request): return web.Response( # we allow CORS, so this can be requested at swagger.io headers={ "Access-Control-Allow-Origin": "*" }, body=encoded_spec, content_type="application/json", ) return swagger
def _http_add_org_webhook(self, request): await request.post() data = { "name": "web", "active": True, "events": ["*"], "config": { "url": self.root.config.core["url"] + self.url + "webhook", "content_type": "json" }, } client = self._get_client(request) resp = await client.post("/orgs/:org/hooks", request.POST["org"], **data) org_data = await client.get("/orgs/:org", request.POST["org"]) self.orgs[str(org_data["id"]).encode("ascii")] = client.token return web.Response(text=str(resp))
def http_handler(self, request): if request.path.endswith("api.sock"): return await self.ws_handler(request) if request.path.endswith("/monitor/"): data = pkgutil.get_data("rci.services.monitor", "monitor.html").decode("utf8") return web.Response(text=data, content_type="text/html") if request.path.endswith("/login/github"): if request.method == "POST": url = self.oauth.generate_request_url(("read:org", )) return web.HTTPFound(url) if request.path.endswith("/oauth2/github"): return (await self._oauth2_handler(request)) if request.path.endswith("logout"): if request.method == "POST": sid = request.cookies.get(self.config["cookie_name"]) del(self.sessions[sid]) return web.HTTPFound("/monitor/") return web.HTTPNotFound()
def get(self) -> web.Response: dbcon = self.request.app['dbcon'] if 'id' in self.request.rel_url.query: contact_id = require_int(get_request_param(self.request, 'id')) c = await contact.get_contact(dbcon, contact_id) contact_list = [] # type: Iterable[object_models.Contact] if c: contact_list = [c] metadata_list = await metadata.get_metadata_for_object(dbcon, 'contact', contact_id) elif 'meta_key' in self.request.rel_url.query: meta_key = require_str(get_request_param(self.request, 'meta_key')) meta_value = require_str(get_request_param(self.request, 'meta_value')) contact_list = await contact.get_contacts_for_metadata(dbcon, meta_key, meta_value) metadata_list = await metadata.get_metadata_for_object_metadata( dbcon, meta_key, meta_value, 'contact', 'contacts') else: contact_list = await contact.get_all_contacts(dbcon) metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'contact') return web.json_response(apply_metadata_to_model_list(contact_list, metadata_list))
def get(self) -> web.Response: dbcon = self.request.app['dbcon'] if 'id' in self.request.rel_url.query: contact_group_id = require_int(get_request_param(self.request, 'id')) contact_group_item = await contact.get_contact_group(dbcon, contact_group_id) contact_group_list = [] # type: Iterable[object_models.ContactGroup] if contact_group_item: contact_group_list = [contact_group_item] metadata_list = await metadata.get_metadata_for_object(dbcon, 'contact_group', contact_group_id) elif 'meta_key' in self.request.rel_url.query: meta_key = require_str(get_request_param(self.request, 'meta_key')) meta_value = require_str(get_request_param(self.request, 'meta_value')) contact_group_list = await contact.get_contact_groups_for_metadata(dbcon, meta_key, meta_value) metadata_list = await metadata.get_metadata_for_object_metadata( dbcon, meta_key, meta_value, 'contact_group', 'contact_groups') else: contact_group_list = await contact.get_all_contact_groups(dbcon) metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'monitor_group') return web.json_response(apply_metadata_to_model_list(contact_group_list, metadata_list))
def get(self) -> web.Response: dbcon = self.request.app['dbcon'] if 'id' in self.request.rel_url.query: monitor_group_id = require_int(get_request_param(self.request, 'id')) monitor_group_item = await monitor_group.get_monitor_group(dbcon, monitor_group_id) monitor_group_list = [] # type: Iterable[object_models.MonitorGroup] if monitor_group_item: monitor_group_list = [monitor_group_item] metadata_list = await metadata.get_metadata_for_object(dbcon, 'monitor_group', monitor_group_id) elif 'meta_key' in self.request.rel_url.query: meta_key = require_str(get_request_param(self.request, 'meta_key')) meta_value = require_str(get_request_param(self.request, 'meta_value')) monitor_group_list = await monitor_group.get_monitor_groups_for_metadata(dbcon, meta_key, meta_value) metadata_list = await metadata.get_metadata_for_object_metadata( dbcon, meta_key, meta_value, 'monitor_group', 'monitor_groups') else: monitor_group_list = await monitor_group.get_all_monitor_groups(dbcon) metadata_list = await metadata.get_metadata_for_object_type(dbcon, 'monitor_group') return web.json_response(apply_metadata_to_model_list(monitor_group_list, metadata_list))
def authenticate(*,email,passwd): if not email: raise APIValueError('email','Invalid email') if not passwd: raise APIValueError('passwd','Invalid passwd') users = await User.findAll('email=?',[email]) if len(users) == 0: raise APIValueError('email','Email not exist.') user = users[0] # check passwd sha1 = hashlib.sha1() sha1.update(user.id.encode('utf-8')) sha1.update(b':') sha1.update(passwd.encode('utf-8')) if user.passwd != sha1.hexdigest(): raise APIValueError('passwd','passwd error') # authenticate ok set cookie r = web.Response() r.set_cookie(COOKIE_NAME,user2cookie(user,86400),max_age=86400,httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user,ensure_ascii=False).encode('utf-8') return r # ??
def api_register_user(*,email,name,passwd): if not name or not name.strip(): raise APIValueError('name') if not email or not _RE_EMAIL.match(email): raise APIValueError('email') if not passwd or not _RE_SHA1.match(passwd): raise APIValueError('passwd') users = await User.findAll('email=?',[email]) if len(users) > 0: raise APIError('register:failed','email','Email is already in use') uid = next_id() sha1_passwd = '%s:%s' % (uid,passwd) encrypt_passwd = hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest() image = 'https://1.gravatar.com/avatar/%s?s=200&r=pg&d=mm' user = User(id=uid,name=name.strip(),email=email,passwd=encrypt_passwd,image=image % hashlib.md5(email.encode('utf-8')).hexdigest()) await user.save() # make session cookie r = web.Response() r.set_cookie(COOKIE_NAME,user2cookie(user,86400),max_age=86400,httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user,ensure_ascii=False).encode('utf-8') return r
def handle(self, request): filename = request.match_info['filename'] try: filepath = self._directory.joinpath(filename).resolve() except (ValueError, FileNotFoundError, OSError): pass else: if filepath.is_dir(): request.match_info['filename'] = str(filepath.joinpath('index.html').relative_to(self._directory)) status, length = 'unknown', '' try: response = await super().handle(request) except HTTPNotModified: status, length = 304, 0 raise except HTTPNotFound: _404_msg = '404: Not Found\n\n' + _get_asset_content(self._asset_path) response = web.Response(body=_404_msg.encode('utf8'), status=404) status, length = response.status, response.content_length else: status, length = response.status, response.content_length finally: l = logger.info if status in {200, 304} else logger.warning l(' > %s %s %s %s', request.method, request.path, status, _fmt_size(length)) return response
def async_handle_subscribe(request, subscribed_clients, state_variables): callback_url = request.headers.get('CALLBACK')[1:-1] sid = 'uuid:' + str(uuid.uuid4()) subscribed_clients[sid] = callback_url headers = { 'SID': sid } @asyncio.coroutine def async_push_later(state_variable): yield from asyncio.sleep(0.5) yield from state_variable.async_notify_listeners() for state_variable in state_variables.values(): LOGGER.debug('Pushing state_variable on SUBSCRIBE: %s', state_variable.name) asyncio.get_event_loop().create_task(async_push_later(state_variable)) return web.Response(status=200, headers=headers) #endregion #region Unsubscribe
def api_register_user(*, email, name, password): if not name or not name.strip(): raise APIValueError('name') if not email or not _RE_EMAIL.match(email): raise APIValueError('email') if not password or not _RE_SHA1.match(password): raise APIValueError('password') users = yield from User.find_all('email=?', [email]) if len(users) > 0: raise APIError('Register failed', 'email', 'Email is already in use.') uid = next_id() sha1_password = '{}:{}'.format(uid, password) logging.info('register password:{}, sha1_password:{}'.format(password, sha1_password)) user = User(id=uid, name= name.strip(), email= email, password = hashlib.sha1(sha1_password.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/{}?d=mm&s=120'.format(hashlib.md5(email.encode('utf-8')).hexdigest())) yield from user.save() r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.password = '*' * 8 r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r
def authenticate(*, email, password): if not email: raise APIValueError('email', 'Invalid Email') if not password: raise APIValueError('password', 'Invalid Password') users = yield from User.find_all('email=?', [email]) if len(users) == 0: raise APIValueError('email', 'Email not exist') user = users[0] #check password sha1_password = '{}:{}'.format(user.id, password) logging.info('login password:{}, sha1_password:{}'.format(password, sha1_password)) if user.password != hashlib.sha1(sha1_password.encode('utf-8')).hexdigest(): raise APIValueError('password', 'Invalid Password.') # authenticate ok, set cookie r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.password = '*' * 8 r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r
def datetime_filter(t): date_time = datetime.fromtimestamp(t) str_date = date_time.strftime("%Y-%m-%d %X") delta = int(time.time() - t) if delta < 60: return u'<span title="{}">1???</span>'.format(str_date) if delta < 3600: return u'<span title="{}">{}???</span>'.format(str_date, delta // 60) if delta < 86400: return u'<span title="{}">{}???</span>'.format(str_date, delta // 3600) if delta < 604800: return u'<span title="{}">{}??</span>'.format(str_date, delta // 86400) #dt = datetime.fromtimestamp(t) return u'<span title="{}">{}</span>'.format(str_date, date_time.strftime("%Y?%m?%d?")) #def index(request): #return web.Response(body=b'<h1>Awesome Python3 Web</h1>', content_type='text/html')
def aiohttp_server(loop, addr): async def handle(request): payload_size = int(request.match_info.get('size', 1024)) resp = _RESP_CACHE.get(payload_size) if resp is None: resp = b'X' * payload_size _RESP_CACHE[payload_size] = resp return web.Response(body=resp) app = web.Application(loop=loop) app.router.add_route('GET', '/{size}', handle) app.router.add_route('GET', '/', handle) handler = app.make_handler() server = loop.create_server(handler, *addr) return server
def __call__(self, value): # Safe html transformation if _is_pserver_response(value): body = value.response if not isinstance(body, bytes): if not isinstance(body, str): body = json.dumps(value.response) body = body.encode('utf8') value = aioResponse( body=body, status=value.status, headers=value.headers) if 'content-type' not in value.headers: value.headers.update({ 'content-type': 'text/html' }) return value
def guess_response(self, value): resp = value.response if isinstance(resp, dict): resp = aioResponse(body=bytes(json.dumps(resp, cls=PServerJSONEncoder), 'utf-8')) resp.headers['Content-Type'] = 'application/json' elif isinstance(resp, list): resp = aioResponse(body=bytes(json.dumps(resp, cls=PServerJSONEncoder), 'utf-8')) resp.headers['Content-Type'] = 'application/json' elif isinstance(resp, str): resp = aioResponse(body=bytes(resp, 'utf-8')) resp.headers['Content-Type'] = 'text/html' elif resp is None: # missing result... resp = aioResponse(body=b'{}') resp.headers['Content-Type'] = 'application/json' resp.headers.update(value.headers) if not resp.prepared: resp.set_status(value.status) return resp
def home(request): # <1> query = request.GET.get('query', '').strip() # <2> print('Query: {!r}'.format(query)) # <3> if query: # <4> descriptions = list(index.find_descriptions(query)) res = '\n'.join(ROW_TPL.format(**vars(descr)) for descr in descriptions) msg = index.status(query, len(descriptions)) else: descriptions = [] res = '' msg = 'Enter words describing characters.' html = template.format(query=query, result=res, # <5> message=msg) print('Sending {} results'.format(len(descriptions))) # <6> return web.Response(content_type=CONTENT_TYPE, text=html) # <7> # END HTTP_CHARFINDER_HOME # BEGIN HTTP_CHARFINDER_SETUP
def get_chars(request): peername = request.transport.get_extra_info('peername') print('Request from: {}, GET data: {!r}'.format(peername, dict(request.GET))) query = request.GET.get('query', '') if query: try: start = int(request.GET.get('start', 0)) stop = int(request.GET.get('stop', sys.maxsize)) except ValueError: raise web.HTTPBadRequest() stop = min(stop, start+RESULTS_PER_REQUEST) num_results, chars = index.find_chars(query, start, stop) else: raise web.HTTPBadRequest() text = ''.join(char if n % 64 else char+'\n' for n, char in enumerate(chars, 1)) response_data = {'total': num_results, 'start': start, 'stop': stop} print('Response to query: {query!r}, start: {start}, stop: {stop}'.format( query=query, **response_data)) response_data['chars'] = text json_obj = json.dumps(response_data) print('Sending {} characters'.format(len(text))) headers = {'Access-Control-Allow-Origin': '*'} return web.Response(content_type=TEXT_TYPE, headers=headers, text=json_obj)
def handle(request): query = request.GET.get('query', '') print('Query: {!r}'.format(query)) if query: descriptions = list(index.find_descriptions(query)) res = '\n'.join(ROW_TPL.format(**vars(descr)) for descr in descriptions) msg = index.status(query, len(descriptions)) else: descriptions = [] res = '' msg = 'Type words describing characters.' text = PAGE_TPL.format(query=query, result=res, message=msg, links=LINKS_HTML) print('Sending {} results'.format(len(descriptions))) return web.Response(content_type=CONTENT_TYPE, text=text)
def homepage(request): conn = request.app['conn'] t = HTML_HDR t += "<h2>%s</h2>" % conn.server_info # NOTE: only a demo program would do all these remote server # queries just to display the hompage... donate = await conn.RPC('server.donation_address') motd = await conn.RPC('server.banner') t += '<pre style="font-size: 50%%">\n%s\n</pre><hr/>' % motd t += '<p>Donations: %s</p>' % linkage(donate) t += '</p><p>Top block: %s</p>' % linkage(top_blk) t += ''' <form method=POST action="/"> <input name='q' style="width: 50%" placeholder="Txn hash / address / etc"></input> ''' return Response(content_type='text/html', text=t)
def search(request): query = (await request.post())['q'].strip() if not (1 <= len(query) <= 200): raise HTTPFound('/') if len(query) <= 7: raise HTTPFound('/blk/'+query.lower()) elif len(query) == 64: # assume it's a hash of block or txn raise HTTPFound('/txn/'+query.lower()) elif query[0] in '13mn': # assume it'a payment address raise HTTPFound('/addr/'+query) else: return Response(text="Can't search for that")
def address_page(request): # address summary by bitcoin payment addr addr = request.match_info['addr'] conn = request.app['conn'] t = HTML_HDR t += '<h1><code>%s</code></h1>' % addr for method in ['blockchain.address.get_balance', 'blockchain.address.get_status', 'blockchain.address.get_mempool', 'blockchain.address.get_proof', 'blockchain.address.listunspent']: # get a balance, etc. t += await call_and_format(conn, method, addr) return Response(content_type='text/html', text=t)
def register_in_memory_block_store_api(app, prefix='/blockstore'): # Really, really simple stuff ;-) blocks = {} async def api_block_get(request): id = request.match_info['id'] try: return web.Response(body=blocks[id], content_type='application/octet-stream') except KeyError: raise web.HTTPNotFound() async def api_block_post(request): id = request.match_info['id'] if id in blocks: raise web.HTTPConflict() blocks[id] = await request.read() return web.Response() app.router.add_get(prefix + '/{id}', api_block_get) app.router.add_post(prefix + '/{id}', api_block_post)
def async_good(self, request): """TBD.""" data = { "title": "Top of the best stikers for Telegram", "active_good": "class=\"active\"", "active_bad": "", "top": {} } await self.session_handler(request=request) data["top"] = await self.db.get_top(9, 'ITERATOR_LE') return web.Response( text=self.jinja.get_template('good.html').render( title=data["title"], active_good=data["active_good"], active_bad=data["active_bad"], top=data["top"] ), content_type='text/html') # @aiohttp_jinja2.template('bad.html')
def action_bad(self, request): """TBD.""" data = { "title": "Top of bad stikers for Telegram", "active_good": "", "active_bad": "class=\"active\"", "top": {} } await self.session_handler(request=request) data['top'] = await self.db.get_top(9, 'ITERATOR_GE') return web.Response( text=self.jinja.get_template('good.html').render( title=data["title"], active_good=data["active_good"], active_bad=data["active_bad"], top=data["top"] ), content_type='text/html')
def __call__(self, value): if _is_guillotina_response(value): body = value.response if not isinstance(body, bytes): if not isinstance(body, str): body = ujson.dumps(value.response) body = body.encode('utf8') value = aioResponse( body=body, status=value.status, headers=value.headers) if 'content-type' not in value.headers: value.headers.update({ 'content-type': self.content_type }) return value
def guess_response(self, value): resp = value.response if type(resp) in (dict, list, int, float, bool): resp = aioResponse(body=bytes(json.dumps(resp, cls=GuillotinaJSONEncoder), 'utf-8')) resp.headers['Content-Type'] = 'application/json' elif isinstance(resp, str): original_resp = resp resp = aioResponse(body=bytes(resp, 'utf-8')) if '<html' in original_resp: resp.headers['Content-Type'] = 'text/html' else: resp.headers['Content-Type'] = 'text/plain' elif resp is None: # missing result... resp = aioResponse(body=b'{}') resp.headers['Content-Type'] = 'application/json' resp.headers.update(value.headers) if not resp.prepared: resp.set_status(value.status) return resp
def adapter_do_OPTIONS(self, request): origin = request.headers["Origin"] allowed_origins = self._bot.config.get_option("api_origins") if allowed_origins is None: raise web.HTTPForbidden() if "*" == allowed_origins or "*" in allowed_origins: return web.Response(headers={ "Access-Control-Allow-Origin": origin, "Access-Control-Allow-Headers": "content-type", }) if not origin in allowed_origins: raise web.HTTPForbidden() return web.Response(headers={ "Access-Control-Allow-Origin": origin, "Access-Control-Allow-Headers": "content-type", "Vary": "Origin", })
def adapter_do_GET(self, request): payload = { "sendto": request.match_info["id"], "key": request.match_info["api_key"], "content": unquote(request.match_info["message"]) } results = await self.process_request('', # IGNORED '', # IGNORED payload) if results: content_type="text/html" results = results.encode("ascii", "xmlcharrefreplace") else: content_type="text/plain" results = "OK".encode('utf-8') return web.Response(body=results, content_type=content_type)
def test_middleware_doesnt_reissue_on_bad_response(self): secret = b'01234567890abcdef' auth_ = auth.CookieTktAuthentication(secret, 15, 0, cookie_name='auth') middlewares = [ auth_middleware(auth_)] valid_until = time.time() + 15 session_data = TicketFactory(secret).new('some_user', valid_until=valid_until) request = await make_request('GET', '/', middlewares, \ [(auth_.cookie_name, session_data)]) user_id = await auth.get_auth(request) self.assertEqual(user_id, 'some_user') response = await make_response(request, middlewares, web.Response(status=400)) self.assertFalse(auth_.cookie_name in response.cookies)
def app_hello(self, request): return web.Response(text='Djaio!')
def get(self): result = web.Response( body=str.encode('You shall not pass!'), content_type='text/plain' ) return result
def response_ok(): return web.Response(**{'status': 200})
def response_not_found(): return web.Response(**{'status': 404})
def logger_factory(app, handler): async def logger(request): logging.info('Response: %s %s' % (request.method, request.path)) return await handler(request) return logger # ??cookie???????????????request.__user__
def data_factory(app, handler): async def parse_data(request): logging.info('data_factory...') if request.method in ('POST', 'PUT'): if not request.content_type: return web.HTTPBadRequest(text='Missing Content-Type.') content_type = request.content_type.lower() if content_type.startswith('application/json'): request.__data__ = await request.json() if not isinstance(request.__data__, dict): return web.HTTPBadRequest(text='JSON body must be object.') logging.info('request json: %s' % request.__data__) elif content_type.startswith(('application/x-www-form-urlencoded', 'multipart/form-data')): params = await request.post() request.__data__ = dict(**params) logging.info('request form: %s' % request.__data__) else: return web.HTTPBadRequest(text='Unsupported Content-Type: %s' % content_type) elif request.method == 'GET': qs = request.query_string request.__data__ = {k: v[0] for k, v in parse.parse_qs(qs, True).items()} logging.info('request query: %s' % request.__data__) else: request.__data__ = dict() return await handler(request) return parse_data # ??????????????????Response??
def response_factory(app, handler): async def response(request): logging.info('Response handler...') r = await handler(request) if isinstance(r, web.StreamResponse): return r if isinstance(r, bytes): resp = web.Response(body=r) resp.content_type = 'application/octet-stream' return resp if isinstance(r, str): if r.startswith('redirect:'): return web.HTTPFound(r[9:]) resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, dict): template = r.get('__template__') if template is None: resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp else: # ???jinja2???????????? r['__user__'] = request.__user__ resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, int) and 100 <= r < 600: return web.Response(status=r) if isinstance(r, tuple) and len(r) == 2: status, message = r if isinstance(status, int) and 100 <= status < 600: return web.Response(status=status, text=str(message)) # default resp = web.Response(body=str(r).encode('utf-8')) resp.content_type = 'text/plain;charset=utf-8' return resp return response
def make_webserver(self): async def page(request): body = self.settings['content'] return web.Response(text=body, content_type='text/html') self.app.router.add_get('/', page) self.handler = self.app.make_handler() port = self.settings['server_port'] self.server = await self.bot.loop.create_server(self.handler, '0.0.0.0', port) print('Serving webserver on {}:{}'.format(self.ip, port))
def webhook_handle(self, request): """ aiohttp.web handle for processing web hooks :Example: >>> from aiohttp import web >>> app = web.Application() >>> app.router.add_route('/webhook') """ update = await request.json() self._process_update(update) return web.Response()
def sparql_endpoint(request): result = { "post": dict((await request.post()).items()), "path": request.path, } if "failure" in result['post'].get('query', ""): raise web.HTTPBadRequest() if "failure" in result['post'].get('update', ""): raise web.HTTPBadRequest() return web.Response(text=json.dumps(result), content_type="application/json")
def crud_endpoint(request): request.app['last_request'] = request if request.method == "PATCH": return web.Response(text="{}", content_type="application/json") else: raise web.HTTPNoContent()
def handle(request): text = "Hello, can you hear me?" return web.Response(body=text.encode('utf-8'))
def header_response(request): return Response("foo", headers={ "location": "boo" })
def registry_dump_handle(request): ''' only read :param request: :return: ''' registry = registry_dump_handle.registry response_dict = {} repo = registry._repository response_dict['registered_services'] = repo._registered_services response_dict['uptimes'] = repo._uptimes response_dict['service_dependencies'] = repo._service_dependencies return web.Response(status=400, content_type='application/json', body=json.dumps(response_dict).encode())
def _enable_http_middleware(func): # pre and post http, processing @wraps(func) async def f(self, *args, **kwargs): if hasattr(self, 'middlewares'): for i in self.middlewares: if hasattr(i, 'pre_request'): pre_request = getattr(i, 'pre_request') if callable(pre_request): try: res = await pre_request(self, *args, **kwargs) # passing service as first argument if res: return res except Exception as e: return Response(status=400, content_type='application/json', body=json.dumps( {'error': str(e), 'sector': getattr(i, 'middleware_info')}).encode()) _func = coroutine(func) # func is a generator object result = await _func(self, *args, **kwargs) if hasattr(self, 'middlewares'): for i in self.middlewares: if hasattr(i, 'post_request'): post_request = getattr(i, 'post_request') if callable(post_request): try: res = await post_request(self, result, *args, **kwargs) if res: return res except Exception as e: return Response(status=400, content_type='application/json', body=json.dumps( {'error': str(e), 'sector': getattr(i, 'middleware_info')}).encode()) return result return f
def pong(self, _): return Response()
def stats(self, _): res_d = Aggregator.dump_stats() return Response(status=200, content_type='application/json', body=json.dumps(res_d).encode())