我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sanic.response.html()。
def index(request): form = FeedbackForm(request) if request.method == 'POST' and form.validate(): note = form.note.data msg = '{} - {}'.format(datetime.now(), note) session.setdefault('fb', []).append(msg) return response.redirect('/') # NOTE: trusting user input here, never do that in production feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', [])) # Ah, f string, so, python 3.6, what do you expect from someone brave # enough to use sanic... :) content = f""" <h1>Form with CSRF Validation</h1> <p>Try <a href="/fail">form</a> that fails CSRF validation</p> {feedback} <form action="" method="POST"> {'<br>'.join(form.csrf_token.errors)} {form.csrf_token} {'<br>'.join(form.note.errors)} <br> {form.note(size=40, placeholder="say something..")} {form.submit} </form> """ return response.html(content)
def fail(request): form = FeedbackForm(request) if request.method == 'POST' and form.validate(): note = form.note.data msg = '{} - {}'.format(datetime.now(), note) session.setdefault('fb', []).append(msg) return response.redirect('/fail') feedback = ''.join('<p>{}</p>'.format(m) for m in session.get('fb', [])) content = f""" <h1>Form which fails CSRF Validation</h1> <p>This is the same as this <a href="/">form</a> except that CSRF validation always fail because we did not render the hidden csrf token</p> {feedback} <form action="" method="POST"> {'<br>'.join(form.csrf_token.errors)} {'<br>'.join(form.note.errors)} <br> {form.note(size=40, placeholder="say something..")} {form.submit} </form> """ return response.html(content)
def test_file_upload(app): app.config['WTF_CSRF_ENABLED'] = False class TestForm(SanicForm): upload = FileField('upload file') submit = SubmitField('Upload') @app.route('/upload', methods=['GET', 'POST']) async def upload(request): form = TestForm(request) if form.validate_on_submit(): return response.text(form.upload.data.name) content = render_form(form) return response.html(content) req, resp = app.test_client.post( '/upload', data={'upload': open(__file__, 'rb')}) assert resp.status == 200 assert resp.text == os.path.basename(__file__)
def login(request): message = '' if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') # for demonstration purpose only, you should use more robust method if username == 'demo' and password == '1234': # use User proxy in sanic_auth, this should be some ORM model # object in production, the default implementation of # auth.login_user expects User.id and User.name available user = User(id=1, name=username) auth.login_user(request, user) return response.redirect('/') message = 'invalid username or password' return response.html(LOGIN_FORM.format(message))
def render_map(): css_js = '' if conf.LOAD_CUSTOM_CSS_FILE: css_js = '<link rel="stylesheet" href="static/css/custom.css">' if conf.LOAD_CUSTOM_JS_FILE: css_js += '<script type="text/javascript" src="static/js/custom.js"></script>' js_vars = Markup( "_defaultSettings['FIXED_OPACITY'] = '{:d}'; " "_defaultSettings['SHOW_TIMER'] = '{:d}'; " "_defaultSettings['TRASH_IDS'] = [{}]; ".format(conf.FIXED_OPACITY, conf.SHOW_TIMER, ', '.join(str(p_id) for p_id in conf.TRASH_IDS))) template = env.get_template('custom.html' if conf.LOAD_CUSTOM_HTML_FILE else 'newmap.html') return html(template.render( area_name=conf.AREA_NAME, map_center=center, map_provider_url=conf.MAP_PROVIDER_URL, map_provider_attribution=conf.MAP_PROVIDER_ATTRIBUTION, social_links=social_links(), init_js_vars=js_vars, extra_css_js=Markup(css_js) ))
def index(request): form = UploadForm(request) if form.validate_on_submit(): image = form.image.data # NOTE: trusting user submitted file names here, the name should be # sanitized in production. uploaded_file = Path(request.app.config.UPLOAD_DIR) / image.name uploaded_file.write_bytes(image.body) description = form.description.data or 'no description' session.setdefault('files', []).append((image.name, description)) return response.redirect('/') img = '<section><img src="/img/{}"><p>{}</p><hr></section>' images = ''.join(img.format(i, d) for i, d in session.get('files', [])) content = f""" <h1>Sanic-WTF file field validators example</h1> {images} <form action="" method="POST" enctype="multipart/form-data"> {'<br>'.join(form.csrf_token.errors)} {form.csrf_token} {'<br>'.join(form.image.errors)} {'<br>'.join(form.description.errors)} <br> {form.image.label} <br> {form.image} <br> {form.description.label} <br> {form.description(size=20, placeholder="description")} <br> {form.submit} </form> """ return response.html(content)
def test_form_validation(app): app.config['WTF_CSRF_ENABLED'] = False class TestForm(SanicForm): msg = StringField('Note', validators=[DataRequired(), Length(max=10)]) submit = SubmitField('Submit') @app.route('/', methods=['GET', 'POST']) async def index(request): form = TestForm(request) if request.method == 'POST' and form.validate(): return response.text('validated') content = render_form(form) return response.html(content) req, resp = app.test_client.get('/') assert resp.status == 200 # we disabled it assert 'csrf_token' not in resp.text # this is longer than 10 payload = {'msg': 'love is beautiful'} req, resp = app.test_client.post('/', data=payload) assert resp.status == 200 assert 'validated' not in resp.text payload = {'msg': 'happy'} req, resp = app.test_client.post('/', data=payload) assert resp.status == 200 assert 'validated' in resp.text
def test_form_csrf_validation(app): app.config['WTF_CSRF_SECRET_KEY'] = 'top secret !!!' class TestForm(SanicForm): msg = StringField('Note', validators=[DataRequired(), Length(max=10)]) submit = SubmitField('Submit') @app.route('/', methods=['GET', 'POST']) async def index(request): form = TestForm(request) if request.method == 'POST' and form.validate(): return response.text('validated') content = render_form(form) return response.html(content) req, resp = app.test_client.get('/') assert resp.status == 200 assert 'csrf_token' in resp.text token = re.findall(csrf_token_pattern, resp.text)[0] assert token payload = {'msg': 'happy', 'csrf_token': token} req, resp = app.test_client.post('/', data=payload) assert resp.status == 200 assert 'validated' in resp.text payload = {'msg': 'happy'} req, resp = app.test_client.post('/', data=payload) assert resp.status == 200 # should fail, no CSRF token in payload assert 'validated' not in resp.text
def test_validate_on_submit(app): app.config['WTF_CSRF_SECRET_KEY'] = 'top secret !!!' class TestForm(SanicForm): msg = StringField('Note', validators=[DataRequired(), Length(max=10)]) submit = SubmitField('Submit') @app.route('/', methods=['GET', 'POST']) async def index(request): form = TestForm(request) if form.validate_on_submit(): return response.text('validated') content = render_form(form) return response.html(content) req, resp = app.test_client.get('/') assert resp.status == 200 assert 'csrf_token' in resp.text token = re.findall(csrf_token_pattern, resp.text)[0] assert token payload = {'msg': 'happy', 'csrf_token': token} req, resp = app.test_client.post('/', data=payload) assert resp.status == 200 assert 'validated' in resp.text
def profile(request, user): content = '<a href="/logout">Logout</a><p>Welcome, %s</p>' % user.name return response.html(content)
def start(self): """Start sanic web server.""" self.app = Sanic('sanic_server') # GZip support # Compress(self.app) # self.app.config['COMPRESS_MIMETYPES'] = {'text/html', # 'application/json'} # self.app.config['COMPRESS_LEVEL'] = 4 # self.app.config['COMPRESS_MIN_SIZE'] = 300 # Session support self.session_interface = InMemorySessionInterface() self.app.response_middleware.appendleft(self.save_session) self.app.request_middleware.append(self.add_session_to_request) self.add_routes() return await self.app.create_server(loop=self.loop, host='0.0.0.0', port=self.port, debug=False)
def async_good(self, request): """TBD.""" data = { "title": "Top of the best stikers for Telegram", "active_good": "class=\"active\"", "active_bad": "", "top": {} } await self.session_handler(request=request) data["top"] = await self.db.get_top(9, 'ITERATOR_LE') return self.jinja.render('good.html', request, title=data['title'], active_good=data['active_good'], active_bad=data['active_bad'], top=data['top'], )
def action_bad(self, request): """TBD.""" data = { "title": "Top of bad stikers for Telegram", "active_good": "", "active_bad": "class=\"active\"", "top": {} } await self.session_handler(request=request) data['top'] = await self.db.get_top(9, 'ITERATOR_GE') return self.jinja.render('bad.html', request, title=data['title'], active_good=data['active_good'], active_bad=data['active_bad'], top=data['top'], )
def render_graphiql(jinja_env=None, graphiql_version=None, graphiql_template=None, params=None, result=None): graphiql_version = graphiql_version or GRAPHIQL_VERSION template = graphiql_template or TEMPLATE template_vars = { 'graphiql_version': graphiql_version, 'query': params and params.query, 'variables': params and params.variables, 'operation_name': params and params.operation_name, 'result': result, } if jinja_env: template = jinja_env.from_string(template) if jinja_env.is_async: source = await template.render_async(**template_vars) else: source = template.render(**template_vars) else: source = simple_renderer(template, **template_vars) return html(source)
def chapter(request): """ ????????? : content_url ?????U??url????? : url ??????url : novels_name ???? :return: ??????? """ url = request.args.get('url', None) novels_name = request.args.get('novels_name', None) netloc = get_netloc(url) if netloc not in RULES.keys(): return redirect(url) if netloc in REPLACE_RULES.keys(): url = url.replace(REPLACE_RULES[netloc]['old'], REPLACE_RULES[netloc]['new']) content_url = RULES[netloc].content_url content = await cache_owllook_novels_chapter(url=url, netloc=netloc) if content: content = str(content).strip('[],, Jjs').replace(', ', '').replace('onerror', '').replace('js', '').replace( '????', '') return template( 'chapter.html', novels_name=novels_name, url=url, content_url=content_url, soup=content) else: return text('?????????????????????????????????{url}'.format(url=url))
def owllook_register(request): """ ???? :param request: :return: : -1 ?????????? : 0 ???????? : 1 ???? """ user = request['session'].get('user', None) if user: return redirect('/') else: ver_que_ans = ver_question() if ver_que_ans: request['session']['index'] = ver_que_ans return template( 'register.html', title='owllook - ?? - ????????', question=ver_que_ans[1] ) else: return redirect('/')
def admin_setting(request): user = request['session'].get('user', None) if user: try: motor_db = motor_base.get_db() data = await motor_db.user.find_one({'user': user}) if data: return template('admin_setting.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user, register_time=data['register_time'], email=data.get('email', '???????')) else: return text('????') except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def default(self, request, exception): self.log(format_exc()) if issubclass(type(exception), SanicException): return text( 'Error: {}'.format(exception), status=getattr(exception, 'status_code', 500), headers=getattr(exception, 'headers', dict()) ) elif self.debug: html_output = self._render_traceback_html(exception, request) response_message = ('Exception occurred while handling uri: ' '"%s"\n%s') logger.error(response_message, request.url, format_exc()) return html(html_output, status=500) else: return html(INTERNAL_SERVER_ERROR_HTML, status=500)
def default(self, request, exception): self.log(format_exc()) if issubclass(type(exception), SanicException): return text( 'Error: {}'.format(exception), status=getattr(exception, 'status_code', 500), headers=getattr(exception, 'headers', dict()) ) elif self.debug: html_output = self._render_traceback_html(exception, request) response_message = ( 'Exception occurred while handling uri: "{}"\n{}'.format( request.url, format_exc())) log.error(response_message) return html(html_output, status=500) else: return html(INTERNAL_SERVER_ERROR_HTML, status=500)
def render(session): tpl = """<a href="/inc">+</a> {} <a href="/dec">-</a>""" return response.html(tpl.format(session['counter']))
def _save_to_cache(key: str, data: bytes, format: str = 'html') -> None: try: cache.set(key, data, CACHE_LIVE_TIME, format) except Exception: logger.exception('Error writing cache') if sentry: sentry.captureException()
def _render(prerender: Prerender, url: str, format: str = 'html') -> str: '''Retry once after TemporaryBrowserFailure occurred.''' for i in range(2): try: return await prerender.render(url, format) except (TemporaryBrowserFailure, asyncio.TimeoutError) as e: if i < 1: logger.warning('Temporary browser failure: %s, retry rendering %s in 1s', str(e), url) await asyncio.sleep(1) continue raise
def get_login(request): return html(await env.get_template("login.html").render_async())
def get_config_home(request, current_user): # get list of admins async with app.pool.acquire() as con: admins = await con.fetch("SELECT * FROM admins") users = [] for admin in admins: async with app.configs['mainnet'].db.id.acquire() as con: user = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", admin['toshi_id']) if user is None: user = {'toshi_id': admin['toshi_id']} users.append(fix_avatar_for_user(app.configs['mainnet'].urls.id, dict(user))) return html(await env.get_template("config.html").render_async( admins=users, current_user=current_user, environment='config', page="home"))
def get_categories(request, conf, current_user): language = 'en' sql = ("SELECT * FROM categories " "JOIN category_names ON categories.category_id = category_names.category_id AND category_names.language = $1" "ORDER BY categories.category_id DESC ") async with conf.db.id.acquire() as con: rows = await con.fetch(sql, language) return html(await env.get_template("categories.html").render_async( categories=rows, current_user=current_user, environment=conf.name, page="categories"))
def test(request): data = request.json return html(template.render(**data)) # ??????
def render_worker_map(): template = env.get_template('workersmap.html') return html(template.render( area_name=conf.AREA_NAME, map_center=center, map_provider_url=conf.MAP_PROVIDER_URL, map_provider_attribution=conf.MAP_PROVIDER_ATTRIBUTION, social_links=social_links() ))
def handle(self, request, name): """Simple handler that answer http request get with port and name.""" text = 'Sanic server running on {0} port. Hello, {1}'.format( str(self.port), str(name)) return html(text)
def handle_index(self, request): """Simple handler that answer http request get with port and name.""" text = 'Sanic server running on {0} port.'.format(str(self.port)) return html(text)
def action_about(self, request): """TBD.""" data = {"title": "About and statistics"} data.update(self.statistics) return self.jinja.render('about.html', request, title=data['title'], packs_count=data['packs_count'], stickers_count=data['stickers_count'], clicks=data['clicks'], votes=data['votes'], users=data['users'])
def template(tpl, **kwargs): template = env.get_template(tpl) return html(template.render(kwargs))
def index(request): user = request['session'].get('user', None) search_ranking = await cache_owllook_search_ranking() if user: return template('index.html', title='owllook - ????????', is_login=1, user=user, search_ranking=search_ranking[:25]) else: return template('index.html', title='owllook - ????????', is_login=0, search_ranking=search_ranking[:25])
def donate(request): return template('donate.html')
def ignore_404(request, exception): if "google3eabdadc11faf3b3" in request.url: return template('google3eabdadc11faf3b3.html') return template('404.html')
def index(request): user = request['session'].get('user', None) novels_head = ['#', '???', '????'] first_type_title = "????" first_type = [] search_ranking = await cache_owllook_search_ranking() if user: return template('index.html', title='owllook', is_login=1, user=user, search_ranking=search_ranking, first_type=first_type, first_type_title=first_type_title, novels_head=novels_head, is_owl=1) else: return template('index.html', title='owllook', is_login=0, search_ranking=search_ranking, first_type=first_type, first_type_title=first_type_title, novels_head=novels_head, is_owl=1)
def similar_user(request): user = request['session'].get('user', None) if user: try: motor_db = motor_base.get_db() similar_info = await motor_db.user_recommend.find_one({'user': user}) if similar_info: similar_user = similar_info['similar_user'][:20] user_tag = similar_info['user_tag'] updated_at = similar_info['updated_at'] return template('similar_user.html', title='?' + user + '?????', is_login=1, is_similar=1, user=user, similar_user=similar_user, user_tag=user_tag, updated_at=updated_at) else: return template('similar_user.html', title='?' + user + '?????', is_login=1, is_similar=0, user=user) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def bookmarks(request): user = request['session'].get('user', None) if user: try: motor_db = motor_base.get_db() data = await motor_db.user_message.find_one({'user': user}) if data: # ?????? bookmarks = data.get('bookmarks', None) if bookmarks: result = [] for i in bookmarks: item_result = {} bookmark = i.get('bookmark', None) query = parse_qs(urlparse(bookmark).query) item_result['novels_name'] = query.get('novels_name', '')[0] if query.get('novels_name', '') else '' item_result['chapter_name'] = query.get( 'name', '')[0] if query.get('name', '') else '' item_result['chapter_url'] = query.get('chapter_url', '')[0] if query.get('chapter_url', '') else '' item_result['bookmark'] = bookmark item_result['add_time'] = i.get('add_time', '') result.append(item_result) return template('admin_bookmarks.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user, is_bookmark=1, result=result[::-1]) return template('admin_bookmarks.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user, is_bookmark=0) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def book_list(request): user = request['session'].get('user', None) if user: try: return template('admin_book_list.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def bookmarks(request): user = request['session'].get('user', None) if user: try: motor_db = motor_base.get_db() data = await motor_db.user_message.find_one({'user': user}) if data: # ?????? bookmarks = data.get('bookmarks', None) if bookmarks: result = [] for i in bookmarks: item_result = {} bookmark = i.get('bookmark', None) query = parse_qs(urlparse(bookmark).query) item_result['novels_name'] = query.get('novels_name', '')[0] if query.get('novels_name', '') else '' item_result['chapter_name'] = query.get('name', '')[0] if query.get('name', '') else '' item_result['chapter_url'] = query.get('chapter_url', '')[0] if query.get('chapter_url', '') else '' item_result['bookmark'] = bookmark item_result['add_time'] = i.get('add_time', '') result.append(item_result) return template('admin_bookmarks.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user, is_bookmark=1, result=result[::-1]) return template('admin_bookmarks.html', title='{user}??? - owllook'.format(user=user), is_login=1, user=user, is_bookmark=0) except Exception as e: LOGGER.error(e) return redirect('/') else: return redirect('/')
def add_session_to_request(request): # before each request initialize a session # using the client's request host = request.headers.get('host', None) user_agent = request.headers.get('user-agent', None) if user_agent: if CONFIG.VAL_HOST == 'true': if not host or host not in CONFIG.HOST: return redirect('http://www.owllook.net') if CONFIG.WEBSITE['IS_RUNNING']: await app.session_interface.open(request) else: return html("<h3>??????...</h3>") else: return html("<h3>??????...</h3>")
def homepage(req): global request_count request_count = request_count + 1 shared_count = 0 heading = "Stateless Front-End" if redis: await redis.incr(shared_counter) shared_count = int(await redis.get(shared_counter)) heading = "Shared State Front-End" return html(layout.render(request=req, heading=heading, hostname=gethostname(), requests=request_count, shared_count=shared_count, timestr=datetime.now().strftime("%H:%M:%S.%f")))
def index(req): return html(template.render())
def render_jinja2(tpl_path, context): """ Render jinja2 html template (not used lol) """ path, filename = os.path.split(tpl_path) return jinja2.Environment( loader=jinja2.FileSystemLoader(path or './') ).get_template(filename).render(context) # Application logic
def iffse_index(request): insta_post = request.args.get('p', '') html_ = render_jinja2('./templates/index.html', {'INSTA_POS_ID': insta_post}) return response.html(html_)
def short_chain(request: Request, short_chain: str): """ ?????App?? :param request: :param short_chain: ??? :return: """ session = Session() query = DB.model_exists(session, AppModel, short_chain_uri_=short_chain) if not query: raise BadRequest('not find short chain: {}'.format(short_chain)) model = query.one() url = '{}/#/app/{}'.format(Config.url, model.id) # return redirect('/#/app/{}'.format(model.id)) return html(''' <html> <header> </header> <body style="padding: 30px"> <div>???...</div> <a href='{0}'>???????????</a> </body> <script> setTimeout("location.href='{0}'", 2000); </script> </html> '''.format(url))