我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用tornado.options()。
def _get_certivox_server_secret_share(self, expires): method = options.certivoxServerSecret methods = { 'dta': self._get_certivox_server_secret_share_dta, 'credentials.json': self._get_certivox_server_secret_share_credentials, 'manual': lambda x: raw_input('MIRACL server secret share:'), 'config': lambda x: options.certivoxServerSecret } func = methods[method if method in methods else 'config'] certivox_server_secret_hex = func(expires) try: return certivox_server_secret_hex.decode("hex") except TypeError as e: log.error(e) raise SecretsError('Invalid CertiVox server secret share')
def setup(): """ setup common commandline options and parse them all you must add any of your own options before this """ tornado.options.define("host", default=None, help="run on the given address", type=str) tornado.options.define("port", default=None, help="run on the given port", type=int) tornado.options.define("base", default=None, type=str) tornado.options.define("debug", default=None, type=int) tornado.options.define("fcgi", default=None, type=str) tornado.options.define("db_path", default=None, type=str) tornado.options.define("config", default=None, help='Config file', type=str) not_parsed = tornado.options.parse_command_line() opts = tornado.options.options setup_global_config(host=opts.host, port=opts.port, base=opts.base, debug=opts.debug, fcgi=opts.fcgi, db_path=opts.db_path, config=opts.config) return not_parsed
def test_voice_base64_data(self): if not options.cache_dir: return response = self.fetch("/tts?text=hello") status = escape.json_decode(response.body) self.assertEqual(response.code, 200) self.assertTrue(status["success"]) self.assertGreater(len(status["data"]["file"]), 0) # ?????????????????? self.assertEqual(status["data"]["voice"], options.voice) voicedata = base64.b64decode(status["data"]["sound_base64"]) with open(os.path.join(options.cache_dir, status["data"]["file"]), "rb") as f: filedata = f.read() self.assertEqual(filedata, voicedata) # ????cache??
def main(): ''' main ?? ''' # ?? search_engin_server ioloop = tornado.ioloop.IOLoop.instance() server = tornado.httpserver.HTTPServer(Application(), xheaders=True) server.listen(options.port) def sig_handler(sig, _): ''' ?????? ''' logging.warn("Caught signal: %s", sig) shutdown(ioloop, server) signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) ioloop.start()
def web(port = 23456, via_cli = False, ): """ Bind Tornado server to specified port. """ print ('BINDING',port) try: tornado.options.parse_command_line() http_server = HTTPServer(Application(), xheaders=True, ) http_server.bind(port) http_server.start(16) # Forks multiple sub-processes tornado.ioloop.IOLoop.instance().set_blocking_log_threshold(0.5) IOLoop.instance().start() except KeyboardInterrupt: print 'Exit' print ('WEB_STARTED')
def parse_command_line() -> Namespace: """Parse command line options and set them to ``config``. This function skips unknown command line options. After parsing options, set log level and set options in ``tornado.options``. """ import tornado.options parser.parse_known_args(namespace=config) set_loglevel() # set new log level based on commanline option for k, v in vars(config).items(): if k.startswith('log'): tornado.options.options.__setattr__(k, v) return config
def main(): define(name='port', default=8000, type=int, help='run on the given port') tornado.options.parse_command_line() logger.info('================ spider web server has started ================ ') logger.info(' server start at port -> {}, debug mode = {}'.format(options.port, constants.DEBUG)) app = make_web_app() http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()
def override_config(config, override): '''Overrides the given config by tornado.options''' # Init config object if 'tornado' not in config: config['tornado'] = {} if config['tornado'].get('server') is None: config['tornado']['server'] = {} if config['tornado'].get('app_settings') is None: config['tornado']['app_settings'] = {} if 'db' not in config: config['db'] = {} # Handle host, port, base, with the following priorities # 1. command line arg (by tornado.options) # 2. config file # 3. hardcoded default server_cfg = config['tornado']['server'] host = find_first([override.get('host'), server_cfg.get('host'), DEFAULT_HOST]) port = find_first([override.get('port'), server_cfg.get('port'), DEFAULT_PORT]) base = find_first([override.get('base'), server_cfg.get('base'), DEFAULT_BASE]) config['tornado']['server'].update(dict(host=host, port=port, base=base)) # If the debug flag is set, save it in app_settings and activate db echo if override.get('debug') is not None: config['tornado']['app_settings']['debug'] = override.get('debug') config['db']['echo'] = override.get('debug') == 2 # Set up default database uri if it is not given db_uri = find_first([override.get('db_path'), config['db'].get('uri'), DEFAULT_DEV_DB_URI]) config['db']['uri'] = db_uri # Set up default cookie secret if it is not given cookie_secret = config['tornado']['app_settings'].get('cookie_secret') cookie_secret = find_first([cookie_secret, DEFAULT_COOKIE_SECRET]) config['tornado']['app_settings']['cookie_secret'] = cookie_secret
def main(app): global http_server if not tornado.options.options.fcgi: http_server = tornado.httpserver.HTTPServer(app) server_opts = le_config.tornado.server host = server_opts.host port = server_opts.port base = server_opts.base http_server.listen(port, address=host) tornado.log.gen_log.info('HTTP Server started on http://%s:%s/%s', host, port, base) signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) try: tornado.ioloop.IOLoop.instance().start() except KeyboardInterrupt: if hasattr(app, 'shutdown_hook'): app.shutdown_hook() raise else: from tornado.wsgi import WSGIAdapter wsgi_app = WSGIAdapter(app) def fcgiapp(env, start): # set the script name to "" so it does not appear in the tornado path match pattern env['SCRIPT_NAME'] = '' return wsgi_app(env, start) from flup.server.fcgi import WSGIServer WSGIServer(fcgiapp, bindAddress=tornado.options.options.fcgi).run()
def get_app(self): return Application([ url("/tts", server.TTSHandler), url("/tts/voicelist", server.TTSVoiceHandler), url("/tts/voice/([^/]+)", server.VoiceStaticHandler, {"path":options.cache_dir}), ],) # log_function=lambda h: h)
def test_default_voice(self): response = self.fetch("/tts?text=hello") status = escape.json_decode(response.body) self.assertEqual(response.code, 200) self.assertEqual(status["data"]["voice"], options.voice)
def test_hello_all_voice(self): text = "hello world" if not options.test_all_voice: return for voice in server.ALL_VOICE: self._test_voice(text, voice) # ?????base64?????????????????
def test_voice_filename(self): response = self.fetch("/tts?text=hello&type=filename") status = escape.json_decode(response.body) if response.code == 400: self.assertEqual(status["error"]["name"], "tts-server close cache options") elif response.code == 200: self.assertTrue(status["success"]) self.assertGreater(len(status["data"]["file"]), 0) # ?????????????????? self.assertEqual(status["data"]["voice"], options.voice) else: self.assertTrue(0) # ????web??????????????????
def test_all_voice_fmt(self): for fmt in options.fmtlist: if len(fmt): continue response = self.fetch("/tts?text=hello&type=filename&fmt=%s"%fmt) status = escape.json_decode(response.body) self.assertEqual(response.code, 200) self.assertTrue(status["success"]) # ?????????????? self.assertEqual(os.path.splitext(status['data']['name'][1]), fmt)
def options(self): # no body self.set_status(204) self.finish()
def __init__(self): ''' ????? ''' settings = { 'xsrf_cookies': False, 'site_title': 'demo', 'debug': options.debug, 'static_path': os.path.join(options.project_path, 'static'), 'template_path': os.path.join(options.project_path, 'tpl'), } handlers = auto_route_handlers logging.info("----> %s", handlers) tornado.web.Application.__init__(self, handlers, **settings)
def __init__(self): tornado.web.Application.__init__(self, handlers, **settings) # self.session_manager = session.TornadoSessionManager(settings["session_secret"], settings["session_dir"]) # self.db = dbutils.Connection( # host=options.DATABASE_HOST, database=options.DATABASE_NAME, # user=options.DATABASE_USER, password=options.DATABASE_PASSWORD)
def main(port): #train() load_grocery() tornado.options.parse_command_line() print "start on port %s..." % port http_server = tornado.httpserver.HTTPServer(Application(), xheaders=True) http_server.listen(port) #tornado.autoreload.start() tornado.ioloop.IOLoop.instance().start()
def options(self): # add CORS headers if TabPy has a cors_origin specified self._add_CORS_header() self.write({})
def options(self, pred_name): # add CORS headers if TabPy has a cors_origin specified self._add_CORS_header() self.write({})
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Sets the default of the docker configuration options from the # current environment. These will possibly be overridded by # the appropriate entries in the configuration file when parse_file # is invoked env = os.environ self.docker_host = env.get("DOCKER_HOST", "") if self.docker_host == "": self.docker_host = "unix://var/run/docker.sock" # Note that definedness, not value, is considered, meaning # that defining DOCKER_TLS_VERIFY=0 will still evaluate to True. # This is consistent with both docker behavior and general shell # practice. self.tls_verify = (env.get("DOCKER_TLS_VERIFY", "") != "") # We don't have an envvar way of saying TLS = True, so we rely on # TLS_VERIFY set status self.tls = self.tls_verify cert_path = env.get("DOCKER_CERT_PATH", os.path.join(os.path.expanduser("~"), ".docker")) self.tls_cert = os.path.join(cert_path, 'cert.pem') self.tls_key = os.path.join(cert_path, 'key.pem') self.tls_ca = os.path.join(cert_path, 'ca.pem') if self.tls: self.docker_host = self.docker_host.replace('tcp://', 'https://') # ------------------------------------------------------------------------- # Public
def parse_config(self, config_file): """Parses the config file, and assign their values to our local traits. """ # Keep the file line parser isolated, but use the global one # so that we can get the help of the command line options. file_line_parser = tornado.options.OptionParser() for traitlet_name, traitlet in self.traits().items(): # tornado.OptionParser defines an option with a Python type # and performs type validation. default_value = getattr(self, traitlet_name) file_line_parser.define( traitlet_name, default=default_value, type=type(default_value), help=traitlet.help) # Let it raise the exception if the file is not there. # We always want the config file to be present, even if empty try: file_line_parser.parse_config_file(config_file) except FileNotFoundError: raise tornado.options.Error( 'Could not find specified configuration' ' file "{}"'.format(config_file)) set_traits_from_dict(self, file_line_parser.as_dict()) if self.tls or self.tls_verify: self.docker_host = self.docker_host.replace('tcp://', 'https://')
def get_app(self): options.access_log_file_path = 'test.access.log' options.application_log_file_path = 'test.application.log' options.logging = 'debug' log.setup_logging() return application.TailSocketApplication()
def tearDown(self): if os.path.exists(tornado.options.options.access_log_file_path): os.remove(tornado.options.options.access_log_file_path) if os.path.exists(tornado.options.options.application_log_file_path): os.remove(tornado.options.options.application_log_file_path)
def test_home_page_returns_200_and_content_and_logs(self): response = self.fetch('/') assert response.code == 200 assert response.body is not None assert os.stat( tornado.options.options.access_log_file_path).st_size > 0
def _get_customer_server_secret_share(self, expires): path = 'serverSecret' url_params = url_concat( '{0}/{1}'.format(options.DTALocalURL, path), { 'app_id': self.app_id, 'expires': expires, 'signature': signMessage('{0}{1}{2}'.format(path, self.app_id, expires), self.app_key) }) log.debug('customer server secret request: {0}'.format(url_params)) httpclient = tornado.httpclient.HTTPClient() import socket # Make at most 30 attempts to get server secret from local TA for attempt in range(30): try: response = httpclient.fetch(url_params) except (tornado.httpclient.HTTPError, socket.error) as e: log.error(e) log.error( 'Unable to get Server Secret from the customer TA server. ' 'Retying...') time.sleep(2) continue httpclient.close() break else: # Max attempts reached raise SecretsError( 'Unable to get Server Secret from the customer TA server.') try: data = json.loads(response.body) except ValueError: raise SecretsError('TA server response contains invalid JSON') if 'serverSecret' not in data: raise SecretsError('serverSecret not in response from TA server') return data["serverSecret"].decode("hex")
def main(): global DRIVER, CONTROLLER, NODESET, DB # note: this makes sure we have at least one handler # logging.basicConfig(level=logging.WARNING) #logging.basicConfig(level=logging.ERROR) tornado.options.parse_command_line() logger = logging.getLogger() logger.setLevel(logging.INFO) logger.setLevel(logging.WARNING) logger.setLevel(logging.ERROR) for h in logger.handlers: h.setFormatter(MyFormatter()) logging.info("opening shelf: %s", OPTIONS.db) DB = Db(OPTIONS.db) application = tornado.web.Application( HANDLERS, debug=True, task_pool=multiprocessing.Pool(OPTIONS.tasks), # map static/xxx to Static/xxx static_path="Static/", ) logging.info("opening serial") MQ = zmessage.MessageQueue() device = zdriver.MakeSerialDevice(OPTIONS.serial_port) DRIVER = zdriver.Driver(device, MQ) NODESET = znode.NodeSet(MQ, NodeEventCallback, OPTIONS.node_auto_refresh_secs) CONTROLLER = zcontroller.Controller(MQ, ControllerEventCallback, pairing_timeout_secs=OPTIONS.pairing_timeout_secs) CONTROLLER.Initialize() CONTROLLER.WaitUntilInitialized() CONTROLLER.UpdateRoutingInfo() time.sleep(2) print(CONTROLLER) n = NODESET.GetNode(CONTROLLER.GetNodeId()) n.InitializeExternally(CONTROLLER.props.product, CONTROLLER.props.library_type, True) for num in CONTROLLER.nodes: n = NODESET.GetNode(num) n.Ping(3, False) logging.warning("listening on port %d", OPTIONS.port) application.listen(OPTIONS.port) tornado.ioloop.IOLoop.instance().start() return 0
def late_init(self, db: Gino, *, loop=None, options=_options): """ Initialize this application with a database object. This method does a few things to setup application for working with the database: - it enables task local storage; - creates a connection pool and binds it to the passed database object; - populates :py:attr:`~.db`. :param db: the :py:class:`gino.ext.tornado.Gino()` class instance that will be used in this application. :param loop: io loop that will be used to run heep server, either tornado's or asyncio's. :param options: a tornado's ``OptionParser()`` instance or any dictionary-like object with the database settings. Default is to use ``tornado.options.options`` global. """ if loop is None: loop = tornado.ioloop.IOLoop.current() if isinstance(loop, tornado.platform.asyncio.BaseAsyncIOLoop): asyncio_loop = loop.asyncio_loop elif isinstance(loop, asyncio.BaseEventLoop): asyncio_loop = loop else: raise RuntimeError('AsyncIOLoop is required to run GINO') _enable_task_local(asyncio_loop) self.db: Gino = db await db.create_pool( host=options['db_host'], port=options['db_port'], user=options['db_user'], password=options['db_password'], database=options['db_database'], min_size=options['db_pool_min_size'], max_size=options['db_pool_max_size'], max_inactive_connection_lifetime=( options['db_pool_max_inactive_conn_lifetime'] ), max_queries=options['db_pool_max_queries'], loop=asyncio_loop ) # noinspection PyAbstractClass