我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.web.Application()。
def set_secure_cookie(self, name, value, expires_days=30, version=None, **kwargs): """Signs and timestamps a cookie so it cannot be forged. You must specify the ``cookie_secret`` setting in your Application to use this method. It should be a long, random sequence of bytes to be used as the HMAC secret for the signature. To read a cookie set with this method, use `get_secure_cookie()`. Note that the ``expires_days`` parameter sets the lifetime of the cookie in the browser, but is independent of the ``max_age_days`` parameter to `get_secure_cookie`. Secure cookies may contain arbitrary byte values, not just unicode strings (unlike regular cookies) .. versionchanged:: 3.2.1 Added the ``version`` argument. Introduced cookie version 2 and made it the default. """ self.set_cookie(name, self.create_signed_value(name, value, version=version), expires_days=expires_days, **kwargs)
def test_missing_key(self): """A missing SSL key should cause an immediate exception.""" application = Application() module_dir = os.path.dirname(__file__) existing_certificate = os.path.join(module_dir, 'test.crt') existing_key = os.path.join(module_dir, 'test.key') self.assertRaises((ValueError, IOError), HTTPServer, application, ssl_options={ "certfile": "/__mising__.crt", }) self.assertRaises((ValueError, IOError), HTTPServer, application, ssl_options={ "certfile": existing_certificate, "keyfile": "/__missing__.key" }) # This actually works because both files exist HTTPServer(application, ssl_options={ "certfile": existing_certificate, "keyfile": existing_key, })
def setUp(self): if IOLoop.configured_class().__name__ in ('TwistedIOLoop', 'AsyncIOMainLoop'): # TwistedIOLoop only supports the global reactor, so we can't have # separate IOLoops for client and server threads. # AsyncIOMainLoop doesn't work with the default policy # (although it could with some tweaks to this test and a # policy that created loops for non-main threads). raise unittest.SkipTest( 'Sync HTTPClient not compatible with TwistedIOLoop or ' 'AsyncIOMainLoop') self.server_ioloop = IOLoop() sock, self.port = bind_unused_port() app = Application([('/', HelloWorldHandler)]) self.server = HTTPServer(app, io_loop=self.server_ioloop) self.server.add_socket(sock) self.server_thread = threading.Thread(target=self.server_ioloop.start) self.server_thread.start() self.http_client = HTTPClient()
def get_app(self): # callable objects to finish pending /trigger requests self.triggers = collections.deque() return Application([ url("/trigger", TriggerHandler, dict(queue=self.triggers, wake_callback=self.stop)), url("/chunk", ChunkHandler), url("/countdown/([0-9]+)", CountdownHandler, name="countdown"), url("/hang", HangHandler), url("/hello", HelloWorldHandler), url("/content_length", ContentLengthHandler), url("/head", HeadHandler), url("/options", OptionsHandler), url("/no_content", NoContentHandler), url("/see_other_post", SeeOtherPostHandler), url("/see_other_get", SeeOtherGetHandler), url("/host_echo", HostEchoHandler), url("/no_content_length", NoContentLengthHandler), url("/echo_post", EchoPostHandler), url("/respond_in_prepare", RespondInPrepareHandler), url("/redirect", RedirectHandler), ], gzip=True)
def initialize(self): """Hook for subclass initialization. A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize(). Example:: class ProfileHandler(RequestHandler): def initialize(self, database): self.database = database def get(self, username): ... app = Application([ (r'/user/(.*)', ProfileHandler, dict(database=database)), ]) """ pass
def log_request(self, handler): """Writes a completed HTTP request to the logs. By default writes to the python root logger. To change this behavior either subclass Application and override this method, or pass a function in the application settings dictionary as ``log_function``. """ if "log_function" in self.settings: self.settings["log_function"](handler) return if handler.get_status() < 400: log_method = access_log.info elif handler.get_status() < 500: log_method = access_log.warning else: log_method = access_log.error request_time = 1000.0 * handler.request.request_time() log_method("%d %s %.2fms", handler.get_status(), handler._request_summary(), request_time)
def make_tornado_app(self): """ Creates a :py:class`tornado.web.Application` instance that respect the JSON RPC 2.0 specs and exposes the designated methods. Can be used in tests to obtain the Tornado application. :return: a :py:class:`tornado.web.Application` instance """ handlers = [ (self.endpoint, TornadoJsonRpcHandler, {"microservice": self}) ] self._add_extra_handlers(handlers) self._add_static_handlers(handlers) return Application(handlers, template_path=self.template_dir)
def start(config): SERVER_PORT = config["base_port"] SERVER_BIND_ADDRESS = config["bind_address"] task_id = process.fork_processes(4) config = { "config": config } application = web.Application([ (r"/", InfoHandler, dict(config=config)), (r"/([^/]+)", IndexHandler, dict(config=config)), (r"/([^/]+)/([^/]+)", IndexQueryHandler, dict(config=config)), (r"/([^/]+)/([^/]+)/([^/]+)", IndexQueryHandler, dict(config=config)) ]) http_server = httpserver.HTTPServer(application) http_server.add_sockets(netutil.bind_sockets(SERVER_PORT + task_id, address=SERVER_BIND_ADDRESS)) log.info("Frontend listening on %d", SERVER_PORT + task_id) IOLoop.current().start()
def start_server(app: web.Application = None, port: int = None, address: str = None, **kwargs: Any) -> HTTPServer: """Start server with ``app`` on ``localhost:port``. If port is not specified, use command line option of ``--port``. """ app = app or get_app() port = port if port is not None else config.port address = address if address is not None else config.address server = app.listen(port, address=address) app.server = server server_config['address'] = address for sock in server._sockets.values(): if sock.family == socket.AF_INET: server_config['port'] = sock.getsockname()[1] break return server
def __init__(self, sess, param_dict, num_worker, weight_combiner=None, port=10080, reusable=False): # threading.Thread.__init__(self) self._session = sess self._port = port self._param_dict = param_dict self._application = web.Application([(r"/", ParameterServerHandler, {'server':self})]) self._version = 0 self._sync_lock = threading.Lock() self._num_worker = num_worker self._ended_worker = sets.Set() self._http_server = None self._reusable = reusable if weight_combiner is None: self._weight_combiner = MeanWeightCombiner(num_worker) else: self._weight_combiner = weight_combiner
def generate_swagger_json(self): """Generates the swagger.json contents for the Calm Application.""" swagger_json = { 'swagger': '2.0', 'info': self._generate_swagger_info(), 'consumes': ['application/json'], 'produces': ['application/json'], 'definitions': self._generate_swagger_definitions(), 'responses': self._generate_swagger_responses(), 'paths': self._generate_swagger_paths() } if self.host: swagger_json['host'] = self.host if self.base_path: swagger_json['basePath'] = self.base_path # TODO: add schemes return swagger_json
def get_app(self): component = Component() component.configurations = { 'proxies': [ { 'name': 'upstream', 'upstream_url': 'http://xxxxx.upstream.test/', 'request_path_regex': '/upstream/(.*)', 'request_path_sub': '/\1', 'pool_max_workers': 1, 'pool_auto_spawn': False, } ] } component.install() return Application(component.urls)
def get_app(self): component = Component() component.configurations = { 'proxies': [ { 'name': 'upstream', 'upstream_url': 'http://upstream.test', 'request_path_regex': r'/upstream/(.*)', 'request_path_sub': r'/\1', 'pool_max_workers': 1, 'pool_auto_spawn': False, } ] } component.install() return Application(component.urls)
def get_app(self): component = Component() component.configurations = { 'proxies': [ { 'name': 'upstream', 'upstream_url': 'http://upstream.test', 'request_path_regex': r'/upstream/(.*)', 'request_path_sub': r'/upstream2/\1', 'pool_max_workers': 1, 'pool_auto_spawn': False, } ] } component.install() return Application(component.urls)
def __init__(self): logger = logging.getLogger('WebChatSrv') logger.propagate = False logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter('[%(asctime)s][%(levelname)s]: %(name)s | %(message)s', '%H:%M:%S') ch.setFormatter(formatter) logger.addHandler(ch) self.logger = logger self.clients = Client.Clients(self) self.providers = ProvidersContainer() self.plugin_loader = PluginLoader(server=self) register_handlers(self) self.app = web.Application([ (r'/ws', WebSocketHandler.WebSocketHandler, {'clients': self.clients}), (r'/(.*)', web.StaticFileHandler, {'path': 'www-root/', 'default_filename': 'index.html'}) ])
def get_application() -> web.Application: return web.Application([ ('/', MainHandler), ('/1', LinkHandler1), ('/redirect1', RedirectHandler1), ('/redirect2', RedirectHandler2), ], logging='error')
def run(): define('port', default=8090, type=int, help='') define('debug', default=False, type=bool, help='') parse_command_line() settings['debug'] = options.debug if settings['debug']: print 'debug mode' ''' connect mongodb ''' try: client = MotorClient(settings['database']['address']) settings['connection'] = client[settings['database']['db']] except: print 'can not connect MongoDB' sys.exit(0) ''' connect redis ''' try: client = redis.Redis(host=settings['redis']['host'], port=settings['redis']['port'], db=settings['redis']['db']) settings['redis_conn'] = client except: print 'can not connect redis' sys.exit(0) application = Application( handlers=urlpattern, **settings ) http_server = HTTPServer(application, xheaders=True) http_server.listen(options.port) IOLoop.instance().start()
def get_app(self): class ProcessHandler(RequestHandler): def get(self): if self.get_argument("exit", None): # must use os._exit instead of sys.exit so unittest's # exception handler doesn't catch it os._exit(int(self.get_argument("exit"))) if self.get_argument("signal", None): os.kill(os.getpid(), int(self.get_argument("signal"))) self.write(str(os.getpid())) return Application([("/", ProcessHandler)])
def get_app(self): self.close_future = Future() return Application([ ('/echo', EchoHandler, dict(close_future=self.close_future)), ('/non_ws', NonWebSocketHandler), ('/header', HeaderHandler, dict(close_future=self.close_future)), ('/close_reason', CloseReasonHandler, dict(close_future=self.close_future)), ('/error_in_on_message', ErrorInOnMessageHandler, dict(close_future=self.close_future)), ('/async_prepare', AsyncPrepareHandler, dict(close_future=self.close_future)), ])
def get_app(self): self.close_future = Future() return Application([ ('/echo', EchoHandler, dict( close_future=self.close_future, compression_options=self.get_server_compression_options())), ])
def wrap_web_tests_adapter(): result = {} for cls in web_test.wsgi_safe_tests: class WSGIAdapterWrappedTest(cls): def get_app(self): self.app = Application(self.get_handlers(), **self.get_app_kwargs()) return WSGIContainer(validator(WSGIAdapter(self.app))) result["WSGIAdapter_" + cls.__name__] = WSGIAdapterWrappedTest return result
def get_app(self): return Application([('/', TestRequestHandler, dict(io_loop=self.io_loop))])
def get_app(self): return Application([('/', HelloWorldRequestHandler, dict(protocol="https"))])
def test_missing_arguments(self): application = Application() self.assertRaises(KeyError, HTTPServer, application, ssl_options={ "keyfile": "/__missing__.crt", })
def get_app(self): return Application(self.get_handlers())
def get_app(self): return Application([("/echo", EchoHandler), ("/typecheck", TypeCheckHandler), ("//doubleslash", EchoHandler), ])
def get_app(self): return Application([('/', XHeaderTest.Handler)])
def setUp(self): super(UnixSocketTest, self).setUp() self.tmpdir = tempfile.mkdtemp() self.sockfile = os.path.join(self.tmpdir, "test.sock") sock = netutil.bind_unix_socket(self.sockfile) app = Application([("/hello", HelloWorldRequestHandler)]) self.server = HTTPServer(app, io_loop=self.io_loop) self.server.add_socket(sock) self.stream = IOStream(socket.socket(socket.AF_UNIX), io_loop=self.io_loop) self.stream.connect(self.sockfile, self.stop) self.wait()
def get_app(self): class HelloHandler(RequestHandler): def get(self): self.finish('Hello world') def post(self): self.finish('Hello world') class LargeHandler(RequestHandler): def get(self): # 512KB should be bigger than the socket buffers so it will # be written out in chunks. self.write(''.join(chr(i % 256) * 1024 for i in range(512))) class FinishOnCloseHandler(RequestHandler): @asynchronous def get(self): self.flush() def on_connection_close(self): # This is not very realistic, but finishing the request # from the close callback has the right timing to mimic # some errors seen in the wild. self.finish('closed') return Application([('/', HelloHandler), ('/large', LargeHandler), ('/finish_on_close', FinishOnCloseHandler)])
def get_app(self): return Application([('/', EchoHandler)])
def get_app(self): return Application([('/', HelloWorldRequestHandler)])
def get_app(self): class BufferedHandler(RequestHandler): def put(self): self.write(str(len(self.request.body))) @stream_request_body class StreamingHandler(RequestHandler): def initialize(self): self.bytes_read = 0 def prepare(self): if 'expected_size' in self.request.arguments: self.request.connection.set_max_body_size( int(self.get_argument('expected_size'))) if 'body_timeout' in self.request.arguments: self.request.connection.set_body_timeout( float(self.get_argument('body_timeout'))) def data_received(self, data): self.bytes_read += len(data) def put(self): self.write(str(self.bytes_read)) return Application([('/buffered', BufferedHandler), ('/streaming', StreamingHandler)])
def get_app(self): return Application([ ('/digest', DigestAuthHandler), ('/custom_reason', CustomReasonHandler), ('/custom_fail_reason', CustomFailReasonHandler), ])