Python tornado 模块,options() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用tornado.options()

项目:incubator-milagro-mfa-server    作者:apache    | 项目源码 | 文件源码
def _get_certivox_server_secret_share(self, expires):
        method = options.certivoxServerSecret
        methods = {
            'dta': self._get_certivox_server_secret_share_dta,
            'credentials.json': self._get_certivox_server_secret_share_credentials,
            'manual': lambda x: raw_input('MIRACL server secret share:'),
            'config': lambda x: options.certivoxServerSecret
        }
        func = methods[method if method in methods else 'config']
        certivox_server_secret_hex = func(expires)

        try:
            return certivox_server_secret_hex.decode("hex")
        except TypeError as e:
            log.error(e)
            raise SecretsError('Invalid CertiVox server secret share')
项目:bbtornado    作者:bakkenbaeck    | 项目源码 | 文件源码
def setup():
    """
    setup common commandline options and parse them all

    you must add any of your own options before this
    """
    tornado.options.define("host", default=None, help="run on the given address", type=str)
    tornado.options.define("port", default=None, help="run on the given port", type=int)
    tornado.options.define("base", default=None, type=str)
    tornado.options.define("debug", default=None, type=int)
    tornado.options.define("fcgi", default=None, type=str)
    tornado.options.define("db_path", default=None, type=str)
    tornado.options.define("config", default=None, help='Config file', type=str)
    not_parsed = tornado.options.parse_command_line()

    opts = tornado.options.options
    setup_global_config(host=opts.host,
                        port=opts.port,
                        base=opts.base,
                        debug=opts.debug,
                        fcgi=opts.fcgi,
                        db_path=opts.db_path,
                        config=opts.config)

    return not_parsed
项目:tts-server    作者:langsiji    | 项目源码 | 文件源码
def test_voice_base64_data(self):
        if not options.cache_dir: return
        response = self.fetch("/tts?text=hello")
        status = escape.json_decode(response.body)
        self.assertEqual(response.code, 200)
        self.assertTrue(status["success"])
        self.assertGreater(len(status["data"]["file"]), 0)
        # ??????????????????
        self.assertEqual(status["data"]["voice"], options.voice)
        voicedata = base64.b64decode(status["data"]["sound_base64"])
        with open(os.path.join(options.cache_dir,
                               status["data"]["file"]), "rb") as f:
            filedata = f.read()
        self.assertEqual(filedata, voicedata)

    # ????cache??
项目:RobotAIEngine    作者:JK-River    | 项目源码 | 文件源码
def main():
    ''' main ??
    '''
    # ?? search_engin_server
    ioloop = tornado.ioloop.IOLoop.instance()
    server = tornado.httpserver.HTTPServer(Application(), xheaders=True)
    server.listen(options.port)

    def sig_handler(sig, _):
        ''' ??????
        '''
        logging.warn("Caught signal: %s", sig)
        shutdown(ioloop, server)

    signal.signal(signal.SIGTERM, sig_handler)
    signal.signal(signal.SIGINT, sig_handler)
    ioloop.start()
项目:mediachain-indexer    作者:mediachain    | 项目源码 | 文件源码
def web(port = 23456,
        via_cli = False,
        ):
    """
    Bind Tornado server to specified port.    
    """

    print ('BINDING',port)

    try:
        tornado.options.parse_command_line()
        http_server = HTTPServer(Application(),
                                 xheaders=True,
                                 )
        http_server.bind(port)
        http_server.start(16) # Forks multiple sub-processes
        tornado.ioloop.IOLoop.instance().set_blocking_log_threshold(0.5)
        IOLoop.instance().start()

    except KeyboardInterrupt:
        print 'Exit'

    print ('WEB_STARTED')
项目:wdom    作者:miyakogi    | 项目源码 | 文件源码
def parse_command_line() -> Namespace:
    """Parse command line options and set them to ``config``.

    This function skips unknown command line options. After parsing options,
    set log level and set options in ``tornado.options``.
    """
    import tornado.options
    parser.parse_known_args(namespace=config)
    set_loglevel()  # set new log level based on commanline option
    for k, v in vars(config).items():
        if k.startswith('log'):
            tornado.options.options.__setattr__(k, v)
    return config
项目:webspider    作者:GuozhuHe    | 项目源码 | 文件源码
def main():
    define(name='port', default=8000, type=int, help='run on the given port')
    tornado.options.parse_command_line()
    logger.info('================ spider web server has started ================ ')
    logger.info('       server start at port -> {}, debug mode = {}'.format(options.port, constants.DEBUG))
    app = make_web_app()
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()
项目:bbtornado    作者:bakkenbaeck    | 项目源码 | 文件源码
def override_config(config, override):
    '''Overrides the given config by tornado.options'''

    # Init config object
    if 'tornado' not in config:
        config['tornado'] = {}
    if config['tornado'].get('server') is None:
        config['tornado']['server'] = {}
    if config['tornado'].get('app_settings') is None:
        config['tornado']['app_settings'] = {}
    if 'db' not in config:
        config['db'] = {}


    # Handle host, port, base, with the following priorities
    # 1. command line arg (by tornado.options)
    # 2. config file
    # 3. hardcoded default
    server_cfg = config['tornado']['server']
    host = find_first([override.get('host'), server_cfg.get('host'), DEFAULT_HOST])
    port = find_first([override.get('port'), server_cfg.get('port'), DEFAULT_PORT])
    base = find_first([override.get('base'), server_cfg.get('base'), DEFAULT_BASE])
    config['tornado']['server'].update(dict(host=host, port=port, base=base))

    # If the debug flag is set, save it in app_settings and activate db echo
    if override.get('debug') is not None:
        config['tornado']['app_settings']['debug'] = override.get('debug')
        config['db']['echo'] = override.get('debug') == 2

    # Set up default database uri if it is not given
    db_uri = find_first([override.get('db_path'), config['db'].get('uri'), DEFAULT_DEV_DB_URI])
    config['db']['uri'] = db_uri

    # Set up default cookie secret if it is not given
    cookie_secret = config['tornado']['app_settings'].get('cookie_secret')
    cookie_secret = find_first([cookie_secret, DEFAULT_COOKIE_SECRET])
    config['tornado']['app_settings']['cookie_secret'] = cookie_secret
项目:bbtornado    作者:bakkenbaeck    | 项目源码 | 文件源码
def main(app):

    global http_server

    if not tornado.options.options.fcgi:

        http_server = tornado.httpserver.HTTPServer(app)

        server_opts = le_config.tornado.server
        host = server_opts.host
        port = server_opts.port
        base = server_opts.base
        http_server.listen(port, address=host)
        tornado.log.gen_log.info('HTTP Server started on http://%s:%s/%s',
                                 host, port, base)

        signal.signal(signal.SIGTERM, sig_handler)
        signal.signal(signal.SIGINT, sig_handler)

        try:
            tornado.ioloop.IOLoop.instance().start()
        except KeyboardInterrupt:
            if hasattr(app, 'shutdown_hook'):
                app.shutdown_hook()
            raise


    else:

        from tornado.wsgi import WSGIAdapter

        wsgi_app = WSGIAdapter(app)

        def fcgiapp(env, start):
            # set the script name to "" so it does not appear in the tornado path match pattern
            env['SCRIPT_NAME'] = ''
            return wsgi_app(env, start)

        from flup.server.fcgi import WSGIServer
        WSGIServer(fcgiapp, bindAddress=tornado.options.options.fcgi).run()
项目:tts-server    作者:langsiji    | 项目源码 | 文件源码
def get_app(self):
        return Application([
            url("/tts", server.TTSHandler),
            url("/tts/voicelist", server.TTSVoiceHandler),
            url("/tts/voice/([^/]+)", server.VoiceStaticHandler,
                {"path":options.cache_dir}),
            ],)
            # log_function=lambda h: h)
项目:tts-server    作者:langsiji    | 项目源码 | 文件源码
def test_default_voice(self):
        response = self.fetch("/tts?text=hello")
        status = escape.json_decode(response.body)
        self.assertEqual(response.code, 200)
        self.assertEqual(status["data"]["voice"], options.voice)
项目:tts-server    作者:langsiji    | 项目源码 | 文件源码
def test_hello_all_voice(self):
        text = "hello world"
        if not options.test_all_voice:
            return
        for voice in server.ALL_VOICE:
            self._test_voice(text, voice)

    # ?????base64?????????????????
项目:tts-server    作者:langsiji    | 项目源码 | 文件源码
def test_voice_filename(self):
        response = self.fetch("/tts?text=hello&type=filename")
        status = escape.json_decode(response.body)
        if response.code == 400:
            self.assertEqual(status["error"]["name"],
                            "tts-server close cache options")
        elif response.code == 200:
            self.assertTrue(status["success"])
            self.assertGreater(len(status["data"]["file"]), 0)
            # ??????????????????
            self.assertEqual(status["data"]["voice"], options.voice)
        else:
            self.assertTrue(0)

    # ????web??????????????????
项目:tts-server    作者:langsiji    | 项目源码 | 文件源码
def test_all_voice_fmt(self):
        for fmt in options.fmtlist:
            if len(fmt): continue
            response = self.fetch("/tts?text=hello&type=filename&fmt=%s"%fmt)
            status = escape.json_decode(response.body)
            self.assertEqual(response.code, 200)
            self.assertTrue(status["success"])
            # ??????????????
            self.assertEqual(os.path.splitext(status['data']['name'][1]), fmt)
项目:PyZwaver    作者:robertmuth    | 项目源码 | 文件源码
def options(self):
        # no body
        self.set_status(204)
        self.finish()
项目:RobotAIEngine    作者:JK-River    | 项目源码 | 文件源码
def __init__(self):
        '''
        ?????
        '''
        settings = {
            'xsrf_cookies': False,
            'site_title': 'demo',
            'debug': options.debug,
            'static_path': os.path.join(options.project_path, 'static'),
            'template_path': os.path.join(options.project_path, 'tpl'),
        }
        handlers = auto_route_handlers
        logging.info("----> %s", handlers)
        tornado.web.Application.__init__(self, handlers, **settings)
项目:imClassifier    作者:liutaihua    | 项目源码 | 文件源码
def __init__(self):
        tornado.web.Application.__init__(self, handlers, **settings)
        # self.session_manager = session.TornadoSessionManager(settings["session_secret"], settings["session_dir"])
        # self.db = dbutils.Connection(
        #    host=options.DATABASE_HOST, database=options.DATABASE_NAME,
        #    user=options.DATABASE_USER, password=options.DATABASE_PASSWORD)
项目:imClassifier    作者:liutaihua    | 项目源码 | 文件源码
def main(port):
    #train()
    load_grocery()
    tornado.options.parse_command_line()
    print "start on port %s..." % port
    http_server = tornado.httpserver.HTTPServer(Application(), xheaders=True)
    http_server.listen(port)
    #tornado.autoreload.start()
    tornado.ioloop.IOLoop.instance().start()
项目:TabPy    作者:tableau    | 项目源码 | 文件源码
def options(self):
        # add CORS headers if TabPy has a cors_origin specified
        self._add_CORS_header()
        self.write({})
项目:TabPy    作者:tableau    | 项目源码 | 文件源码
def options(self, pred_name):
        # add CORS headers if TabPy has a cors_origin specified
        self._add_CORS_header()
        self.write({})
项目:simphony-remote    作者:simphony    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Sets the default of the docker configuration options from the
        # current environment. These will possibly be overridded by
        # the appropriate entries in the configuration file when parse_file
        # is invoked

        env = os.environ

        self.docker_host = env.get("DOCKER_HOST", "")
        if self.docker_host == "":
            self.docker_host = "unix://var/run/docker.sock"

        # Note that definedness, not value, is considered, meaning
        # that defining DOCKER_TLS_VERIFY=0 will still evaluate to True.
        # This is consistent with both docker behavior and general shell
        # practice.
        self.tls_verify = (env.get("DOCKER_TLS_VERIFY", "") != "")
        # We don't have an envvar way of saying TLS = True, so we rely on
        # TLS_VERIFY set status
        self.tls = self.tls_verify

        cert_path = env.get("DOCKER_CERT_PATH",
                            os.path.join(os.path.expanduser("~"), ".docker"))

        self.tls_cert = os.path.join(cert_path, 'cert.pem')
        self.tls_key = os.path.join(cert_path, 'key.pem')
        self.tls_ca = os.path.join(cert_path, 'ca.pem')

        if self.tls:
            self.docker_host = self.docker_host.replace('tcp://', 'https://')

    # -------------------------------------------------------------------------
    # Public
项目:simphony-remote    作者:simphony    | 项目源码 | 文件源码
def parse_config(self, config_file):
        """Parses the config file, and assign their values to our local traits.
        """
        # Keep the file line parser isolated, but use the global one
        # so that we can get the help of the command line options.
        file_line_parser = tornado.options.OptionParser()

        for traitlet_name, traitlet in self.traits().items():
            # tornado.OptionParser defines an option with a Python type
            # and performs type validation.
            default_value = getattr(self, traitlet_name)

            file_line_parser.define(
                traitlet_name,
                default=default_value,
                type=type(default_value),
                help=traitlet.help)

        # Let it raise the exception if the file is not there.
        # We always want the config file to be present, even if empty
        try:
            file_line_parser.parse_config_file(config_file)
        except FileNotFoundError:
            raise tornado.options.Error(
                'Could not find specified configuration'
                ' file "{}"'.format(config_file))

        set_traits_from_dict(self, file_line_parser.as_dict())

        if self.tls or self.tls_verify:
            self.docker_host = self.docker_host.replace('tcp://', 'https://')
项目:tailsocket    作者:yeraydiazdiaz    | 项目源码 | 文件源码
def get_app(self):
        options.access_log_file_path = 'test.access.log'
        options.application_log_file_path = 'test.application.log'
        options.logging = 'debug'
        log.setup_logging()
        return application.TailSocketApplication()
项目:tailsocket    作者:yeraydiazdiaz    | 项目源码 | 文件源码
def tearDown(self):
        if os.path.exists(tornado.options.options.access_log_file_path):
            os.remove(tornado.options.options.access_log_file_path)
        if os.path.exists(tornado.options.options.application_log_file_path):
            os.remove(tornado.options.options.application_log_file_path)
项目:tailsocket    作者:yeraydiazdiaz    | 项目源码 | 文件源码
def test_home_page_returns_200_and_content_and_logs(self):
        response = self.fetch('/')
        assert response.code == 200
        assert response.body is not None
        assert os.stat(
            tornado.options.options.access_log_file_path).st_size > 0
项目:incubator-milagro-mfa-server    作者:apache    | 项目源码 | 文件源码
def _get_customer_server_secret_share(self, expires):
        path = 'serverSecret'
        url_params = url_concat(
            '{0}/{1}'.format(options.DTALocalURL, path),
            {
                'app_id': self.app_id,
                'expires': expires,
                'signature': signMessage('{0}{1}{2}'.format(path, self.app_id, expires), self.app_key)
            })
        log.debug('customer server secret request: {0}'.format(url_params))

        httpclient = tornado.httpclient.HTTPClient()

        import socket
        # Make at most 30 attempts to get server secret from local TA
        for attempt in range(30):
            try:
                response = httpclient.fetch(url_params)
            except (tornado.httpclient.HTTPError, socket.error) as e:
                log.error(e)
                log.error(
                    'Unable to get Server Secret from the customer TA server. '
                    'Retying...')
                time.sleep(2)
                continue

            httpclient.close()
            break
        else:
            # Max attempts reached
            raise SecretsError(
                'Unable to get Server Secret from the customer TA server.')

        try:
            data = json.loads(response.body)
        except ValueError:
            raise SecretsError('TA server response contains invalid JSON')

        if 'serverSecret' not in data:
            raise SecretsError('serverSecret not in response from TA server')

        return data["serverSecret"].decode("hex")
项目:PyZwaver    作者:robertmuth    | 项目源码 | 文件源码
def main():
    global DRIVER, CONTROLLER, NODESET, DB
    # note: this makes sure we have at least one handler
    # logging.basicConfig(level=logging.WARNING)
    #logging.basicConfig(level=logging.ERROR)

    tornado.options.parse_command_line()
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    logger.setLevel(logging.WARNING)
    logger.setLevel(logging.ERROR)
    for h in logger.handlers:
        h.setFormatter(MyFormatter())

    logging.info("opening shelf: %s", OPTIONS.db)
    DB = Db(OPTIONS.db)

    application = tornado.web.Application(
        HANDLERS,
        debug=True,
        task_pool=multiprocessing.Pool(OPTIONS.tasks),
        # map static/xxx to Static/xxx
        static_path="Static/",
    )

    logging.info("opening serial")
    MQ =  zmessage.MessageQueue()
    device = zdriver.MakeSerialDevice(OPTIONS.serial_port)

    DRIVER = zdriver.Driver(device, MQ)
    NODESET = znode.NodeSet(MQ, NodeEventCallback, OPTIONS.node_auto_refresh_secs)
    CONTROLLER = zcontroller.Controller(MQ,
                                        ControllerEventCallback,
                                        pairing_timeout_secs=OPTIONS.pairing_timeout_secs)
    CONTROLLER.Initialize()
    CONTROLLER.WaitUntilInitialized()
    CONTROLLER.UpdateRoutingInfo()
    time.sleep(2)
    print(CONTROLLER)

    n = NODESET.GetNode(CONTROLLER.GetNodeId())
    n.InitializeExternally(CONTROLLER.props.product, CONTROLLER.props.library_type, True)

    for num in CONTROLLER.nodes:
        n = NODESET.GetNode(num)
        n.Ping(3, False)

    logging.warning("listening on port %d", OPTIONS.port)
    application.listen(OPTIONS.port)
    tornado.ioloop.IOLoop.instance().start()
    return 0
项目:gino    作者:fantix    | 项目源码 | 文件源码
def late_init(self, db: Gino, *, loop=None, options=_options):
        """
        Initialize this application with a database object.

        This method does a few things to setup application for working with
        the database:

        - it enables task local storage;
        - creates a connection pool and binds it to the passed database object;
        - populates :py:attr:`~.db`.

        :param db: the :py:class:`gino.ext.tornado.Gino()` class instance that
            will be used in this application.
        :param loop: io loop that will be used to run heep server, either
            tornado's or asyncio's.
        :param options: a tornado's ``OptionParser()`` instance or any
            dictionary-like object with the database settings. Default is to
            use ``tornado.options.options`` global.

        """

        if loop is None:
            loop = tornado.ioloop.IOLoop.current()
        if isinstance(loop, tornado.platform.asyncio.BaseAsyncIOLoop):
            asyncio_loop = loop.asyncio_loop
        elif isinstance(loop, asyncio.BaseEventLoop):
            asyncio_loop = loop
        else:
            raise RuntimeError('AsyncIOLoop is required to run GINO')

        _enable_task_local(asyncio_loop)

        self.db: Gino = db
        await db.create_pool(
            host=options['db_host'],
            port=options['db_port'],
            user=options['db_user'],
            password=options['db_password'],
            database=options['db_database'],
            min_size=options['db_pool_min_size'],
            max_size=options['db_pool_max_size'],
            max_inactive_connection_lifetime=(
                options['db_pool_max_inactive_conn_lifetime']
            ),
            max_queries=options['db_pool_max_queries'],
            loop=asyncio_loop
        )


# noinspection PyAbstractClass