我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.web.RequestHandler()。
def get_app(self): class HelloHandler(RequestHandler): def get(self): self.write("Hello world!") class PathQuotingHandler(RequestHandler): def get(self, path): self.write(path) # It would be better to run the wsgiref server implementation in # another thread instead of using our own WSGIContainer, but this # fits better in our async testing framework and the wsgiref # validator should keep us honest return WSGIContainer(validator(WSGIApplication([ ("/", HelloHandler), ("/path/(.*)", PathQuotingHandler), ("/typecheck", TypeCheckHandler), ])))
def log_request(self, handler: web.RequestHandler) -> None: """Handle access log.""" if 'log_function' in self.settings: self.settings['log_function'](handler) return status = handler.get_status() if status < 400: log_method = logger.info elif status < 500: log_method = logger.warning else: log_method = logger.error request_time = 1000.0 * handler.request.request_time() if request_time > 10: logger.warning('%d %s %.2fms', status, handler._request_summary(), request_time) else: log_method('%d %s', status, handler._request_summary())
def setup_handlers(web_app): env_name = 'production' config = config_for_env(env_name, web_app.settings['base_url']) define('config', config) settings = dict( debug=True, serve_traceback=True, compiled_template_cache=False, template_path=os.path.join(os.path.dirname(__file__), 'static/'), static_path=os.path.join(os.path.dirname(__file__), 'static/'), # Ensure static urls are prefixed with the base url too static_url_prefix=config['URL'] + 'static/', ) web_app.settings.update(settings) socket_url = url_path_join(config['URL'], r'socket/(\S+)') host_pattern = '.*' route_pattern = url_path_join(config['URL'], '/interact') web_app.add_handlers(host_pattern, [ (route_pattern, LandingHandler), (route_pattern + '/', LandingHandler), (socket_url, RequestHandler) ])
def get_app(self): class HelloHandler(RequestHandler): def get(self): self.write("Hello world!") class PathQuotingHandler(RequestHandler): def get(self, path): self.write(path) # It would be better to run the wsgiref server implementation in # another thread instead of using our own WSGIContainer, but this # fits better in our async testing framework and the wsgiref # validator should keep us honest return WSGIContainer(validator(WSGIApplication([ ("/", HelloHandler), ("/path/(.*)", PathQuotingHandler), ])))
def initialize(self): """Hook for subclass initialization. A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize(). Example:: class ProfileHandler(RequestHandler): def initialize(self, database): self.database = database def get(self, username): ... app = Application([ (r'/user/(.*)', ProfileHandler, dict(database=database)), ]) """ pass
def __init__(self, pattern, handler_class, kwargs={}, name=None): """Creates a URLSpec. Parameters: pattern: Regular expression to be matched. Any groups in the regex will be passed in to the handler's get/post/etc methods as arguments. handler_class: RequestHandler subclass to be invoked. kwargs (optional): A dictionary of additional arguments to be passed to the handler's constructor. name (optional): A name for this handler. Used by Application.reverse_url. """ if not pattern.endswith('$'): pattern += '$' self.regex = re.compile(pattern) # ???? self.handler_class = handler_class self.kwargs = kwargs self.name = name self._path, self._group_count = self._find_groups()
def get_app(self): class HelloHandler(RequestHandler): def get(self): self.finish('Hello world') class LargeHandler(RequestHandler): def get(self): # 512KB should be bigger than the socket buffers so it will # be written out in chunks. self.write(''.join(chr(i % 256) * 1024 for i in range(512))) class FinishOnCloseHandler(RequestHandler): @asynchronous def get(self): self.flush() def on_connection_close(self): # This is not very realistic, but finishing the request # from the close callback has the right timing to mimic # some errors seen in the wild. self.finish('closed') return Application([('/', HelloHandler), ('/large', LargeHandler), ('/finish_on_close', FinishOnCloseHandler)])
def _convert_header_value(self, value): if isinstance(value, bytes_type): pass elif isinstance(value, unicode_type): value = value.encode('utf-8') elif isinstance(value, numbers.Integral): # return immediately since we know the converted value will be safe return str(value) elif isinstance(value, datetime.datetime): return httputil.format_timestamp(value) else: raise TypeError("Unsupported header value %r" % value) # If \n is allowed into the header, it is possible to inject # additional headers or split the request. Also cap length to # prevent obviously erroneous values. if (len(value) > 4000 or RequestHandler._INVALID_HEADER_CHAR_RE.search(value)): raise ValueError("Unsafe header value %r", value) return value
def render_string(self, template_name, **kwargs): """Generate the given template with the given arguments. We return the generated byte string (in utf8). To generate and write a template as a response, use render() above. """ # If no template_path is specified, use the path of the calling file template_path = self.get_template_path() if not template_path: frame = sys._getframe(0) web_file = frame.f_code.co_filename while frame.f_code.co_filename == web_file: frame = frame.f_back template_path = os.path.dirname(frame.f_code.co_filename) with RequestHandler._template_loader_lock: if template_path not in RequestHandler._template_loaders: loader = self.create_template_loader(template_path) RequestHandler._template_loaders[template_path] = loader else: loader = RequestHandler._template_loaders[template_path] t = loader.load(template_name) namespace = self.get_template_namespace() namespace.update(kwargs) return t.generate(**namespace)
def redis_timer(func): def wrap(*args, **kwargs): start_time = time() result = func(*args, **kwargs) end_time = time() duration = end_time - start_time frame = sys._getframe(1) while frame: f_locals = frame.f_locals self = f_locals.get('self') if self and isinstance(self, RequestHandler): if hasattr(self, '_db_time'): self._db_time += duration self._db_count += 1 else: self._db_time = duration self._db_count = 1 break frame = frame.f_back return result return wrap
def authenticated(method): """Decorate methods with this to require that the user be logged in. If the user is not logged in, they will be redirected to the configured `login url <RequestHandler.get_login_url>`. """ @functools.wraps(method) def wrapper(self, *args, **kwargs): if not self.current_user: if self.request.method in ("GET", "HEAD"): url = self.get_login_url() if "?" not in url: if urlparse.urlsplit(url).scheme: # if login url is absolute, make next absolute too next_url = self.request.full_url() else: next_url = self.request.uri url += "?" + urlencode(dict(next=next_url)) self.redirect(url) return raise HTTPError(403) return method(self, *args, **kwargs) return wrapper
def get_app(self): class ProcessHandler(RequestHandler): def get(self): if self.get_argument("exit", None): # must use os._exit instead of sys.exit so unittest's # exception handler doesn't catch it os._exit(int(self.get_argument("exit"))) if self.get_argument("signal", None): os.kill(os.getpid(), int(self.get_argument("signal"))) self.write(str(os.getpid())) return Application([("/", ProcessHandler)])
def open(self): try: # In a websocket context, many RequestHandler methods # raise RuntimeErrors. self.set_status(503) raise Exception("did not get expected exception") except RuntimeError: pass self.write_message(self.request.headers.get('X-Test', ''))
def get_app(self): class HelloHandler(RequestHandler): def get(self): self.finish('Hello world') def post(self): self.finish('Hello world') class LargeHandler(RequestHandler): def get(self): # 512KB should be bigger than the socket buffers so it will # be written out in chunks. self.write(''.join(chr(i % 256) * 1024 for i in range(512))) class FinishOnCloseHandler(RequestHandler): @asynchronous def get(self): self.flush() def on_connection_close(self): # This is not very realistic, but finishing the request # from the close callback has the right timing to mimic # some errors seen in the wild. self.finish('closed') return Application([('/', HelloHandler), ('/large', LargeHandler), ('/finish_on_close', FinishOnCloseHandler)])
def get_app(self): class BufferedHandler(RequestHandler): def put(self): self.write(str(len(self.request.body))) @stream_request_body class StreamingHandler(RequestHandler): def initialize(self): self.bytes_read = 0 def prepare(self): if 'expected_size' in self.request.arguments: self.request.connection.set_max_body_size( int(self.get_argument('expected_size'))) if 'body_timeout' in self.request.arguments: self.request.connection.set_body_timeout( float(self.get_argument('body_timeout'))) def data_received(self, data): self.bytes_read += len(data) def put(self): self.write(str(self.bytes_read)) return Application([('/buffered', BufferedHandler), ('/streaming', StreamingHandler)])
def get_app(self): class SmallHeaders(RequestHandler): def get(self): self.set_header("X-Filler", "a" * 100) self.write("ok") class LargeHeaders(RequestHandler): def get(self): self.set_header("X-Filler", "a" * 1000) self.write("ok") return Application([('/small', SmallHeaders), ('/large', LargeHeaders)])
def get_app(self): class SmallBody(RequestHandler): def get(self): self.write("a" * 1024 * 64) class LargeBody(RequestHandler): def get(self): self.write("a" * 1024 * 100) return Application([('/small', SmallBody), ('/large', LargeBody)])
def get_app(self): class LargeBody(RequestHandler): def get(self): self.write("a" * 1024 * 100) return Application([('/large', LargeBody)])
def get_app(self): class ChunkedWithContentLength(RequestHandler): def get(self): # Add an invalid Transfer-Encoding to the response self.set_header('Transfer-Encoding', 'chunked') self.write("Hello world") return Application([('/chunkwithcl', ChunkedWithContentLength)])
def start_tornado_server(self): class HelloHandler(RequestHandler): def get(self): self.write("Hello from tornado!") app = Application([('/', HelloHandler)], log_function=lambda x: None) server = HTTPServer(app, io_loop=self.io_loop) sock, self.tornado_port = bind_unused_port() server.add_sockets([sock])