Python eventlet 模块,wsgi() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用eventlet.wsgi()

项目:bilean    作者:openstack    | 项目源码 | 文件源码
def run_server(self):
        """Run a WSGI server."""

        eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
        eventlet.hubs.use_hub('poll')
        eventlet.patcher.monkey_patch(all=False, socket=True)
        self.pool = eventlet.GreenPool(size=self.threads)
        socket_timeout = cfg.CONF.eventlet_opts.client_socket_timeout or None

        try:
            eventlet.wsgi.server(
                self.sock, self.application,
                custom_pool=self.pool,
                url_length_limit=URL_LENGTH_LIMIT,
                log=self._wsgi_logger,
                debug=cfg.CONF.debug,
                keepalive=cfg.CONF.eventlet_opts.wsgi_keep_alive,
                socket_timeout=socket_timeout)
        except socket.error as err:
            if err[0] != errno.EINVAL:
                raise

        self.pool.waitall()
项目:CENO-BMSignaling    作者:nametoolong    | 项目源码 | 文件源码
def main():
    mode = config.get("CENO", "agent")

    if mode == "client":
        import eventlet
        import eventlet.wsgi
        eventlet.monkey_patch()

    bm_api.setup()

    api_ok = testapi()
    if not api_ok:
        logger.critical("Failed to test Bitmessage API, is it properly configured and running?")
        return

    if mode == "server":
        launch_server()
    elif mode == "client":
        launch_client(eventlet, eventlet.wsgi)
    else:
        logger.critical("Unknown agent mode")
        return
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_socket_options_for_simple_server(self):
        # test normal socket options has set properly
        self.flags(tcp_keepidle=500, group='wsgi')
        server = masakari.wsgi.Server("test_socket_options", None,
                                  host="127.0.0.1", port=0)
        server.start()
        sock = server._socket
        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
                                            socket.SO_REUSEADDR))
        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
                                            socket.SO_KEEPALIVE))
        if hasattr(socket, 'TCP_KEEPIDLE'):
            self.assertEqual(CONF.wsgi.tcp_keepidle,
                             sock.getsockopt(socket.IPPROTO_TCP,
                                             socket.TCP_KEEPIDLE))
        server.stop()
        server.wait()
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_uri_length_limit(self):
        server = masakari.wsgi.Server("test_uri_length_limit", None,
            host="127.0.0.1", max_url_len=16384)
        server.start()

        uri = "http://127.0.0.1:%d/%s" % (server.port, 10000 * 'x')
        resp = requests.get(uri, proxies={"http": ""})
        eventlet.sleep(0)
        self.assertNotEqual(resp.status_code,
                            requests.codes.REQUEST_URI_TOO_LARGE)

        uri = "http://127.0.0.1:%d/%s" % (server.port, 20000 * 'x')
        resp = requests.get(uri, proxies={"http": ""})
        eventlet.sleep(0)
        self.assertEqual(resp.status_code,
                         requests.codes.REQUEST_URI_TOO_LARGE)
        server.stop()
        server.wait()
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def __init__(self, config_path=None):
        """Initialize the loader, and attempt to find the config.

        :param config_path: Full or relative path to the paste config.
        :returns: None

        """
        self.config_path = None

        config_path = config_path or CONF.wsgi.api_paste_config
        if not os.path.isabs(config_path):
            self.config_path = CONF.find_file(config_path)
        elif os.path.exists(config_path):
            self.config_path = config_path

        if not self.config_path:
            raise exception.ConfigNotFound(path=config_path)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def __init__(self,
                 rfile,
                 content_length,
                 sock,
                 wfile=None,
                 wfile_line=None,
                 chunked_input=False):

        self.rfile = rfile
        self._sock = sock
        if content_length is not None:
            content_length = int(content_length)
        self.content_length = content_length

        self.wfile = wfile
        self.wfile_line = wfile_line

        self.position = 0
        self.chunked_input = chunked_input
        self.chunk_length = -1

        # (optional) headers to send with a "100 Continue" response. Set by
        # calling set_hundred_continue_respose_headers() on env['wsgi.input']
        self.hundred_continue_headers = None
        self.is_hundred_continue_response_sent = False
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def setup(self):
        # overriding SocketServer.setup to correctly handle SSL.Connection objects
        conn = self.connection = self.request

        # TCP_QUICKACK is a better alternative to disabling Nagle's algorithm
        # https://news.ycombinator.com/item?id=10607422
        if getattr(socket, 'TCP_QUICKACK', None):
            try:
                conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, True)
            except socket.error:
                pass

        try:
            self.rfile = conn.makefile('rb', self.rbufsize)
            self.wfile = conn.makefile('wb', self.wbufsize)
        except (AttributeError, NotImplementedError):
            if hasattr(conn, 'send') and hasattr(conn, 'recv'):
                # it's an SSL.Connection
                self.rfile = socket._fileobject(conn, "rb", self.rbufsize)
                self.wfile = socket._fileobject(conn, "wb", self.wbufsize)
            else:
                # it's a SSLObject, or a martian
                raise NotImplementedError(
                    '''eventlet.wsgi doesn't support sockets of type {0}'''.format(type(conn)))
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def get_environ(self):
        d = {
            'wsgi.errors': sys.stderr,
            'wsgi.version': (1, 0),
            'wsgi.multithread': True,
            'wsgi.multiprocess': False,
            'wsgi.run_once': False,
            'wsgi.url_scheme': 'http',
        }
        # detect secure socket
        if hasattr(self.socket, 'do_handshake'):
            d['wsgi.url_scheme'] = 'https'
            d['HTTPS'] = 'on'
        if self.environ is not None:
            d.update(self.environ)
        return d
项目:glare    作者:openstack    | 项目源码 | 文件源码
def configure(self, old_conf=None, has_changed=None):
        """Apply configuration settings

        :param old_conf: Cached old configuration settings (if any)
        :param has_changed: callable to determine if a parameter has changed
        """
        eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
        self.client_socket_timeout = CONF.client_socket_timeout or None

        # determine if we need to reload artifact type definitions
        if old_conf is not None and (
                has_changed('enabled_artifact_types') or
                has_changed('custom_artifact_types_modules')):
            from glare import engine
            engine.Engine.registry.reset_registry()
            engine.Engine.registry.register_all_artifacts()

        self.configure_socket(old_conf, has_changed)
        if self.initialize_glance_store:
            utils.initialize_glance_store()
项目:glare    作者:openstack    | 项目源码 | 文件源码
def run_server(self):
        """Run a WSGI server."""
        eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
        self.pool = self.create_pool()
        try:
            eventlet.wsgi.server(self.sock,
                                 self.application,
                                 log=self._logger,
                                 custom_pool=self.pool,
                                 debug=False,
                                 keepalive=CONF.http_keepalive,
                                 socket_timeout=self.client_socket_timeout)
        except socket.error as err:
            if err.args[0] != errno.EINVAL:
                raise

        # waiting on async pools
        if ASYNC_EVENTLET_THREAD_POOL_LIST:
            for pool in ASYNC_EVENTLET_THREAD_POOL_LIST:
                pool.waitall()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_socket_options_for_simple_server(self):
        # test normal socket options has set properly
        self.flags(tcp_keepidle=500, group='wsgi')
        server = nova.wsgi.Server("test_socket_options", None,
                                  host="127.0.0.1", port=0)
        server.start()
        sock = server._socket
        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
                                            socket.SO_REUSEADDR))
        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
                                            socket.SO_KEEPALIVE))
        if hasattr(socket, 'TCP_KEEPIDLE'):
            self.assertEqual(CONF.wsgi.tcp_keepidle,
                             sock.getsockopt(socket.IPPROTO_TCP,
                                             socket.TCP_KEEPIDLE))
        server.stop()
        server.wait()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_uri_length_limit(self):
        server = nova.wsgi.Server("test_uri_length_limit", None,
            host="127.0.0.1", max_url_len=16384)
        server.start()

        uri = "http://127.0.0.1:%d/%s" % (server.port, 10000 * 'x')
        resp = requests.get(uri, proxies={"http": ""})
        eventlet.sleep(0)
        self.assertNotEqual(resp.status_code,
                            requests.codes.REQUEST_URI_TOO_LARGE)

        uri = "http://127.0.0.1:%d/%s" % (server.port, 20000 * 'x')
        resp = requests.get(uri, proxies={"http": ""})
        eventlet.sleep(0)
        self.assertEqual(resp.status_code,
                         requests.codes.REQUEST_URI_TOO_LARGE)
        server.stop()
        server.wait()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_ssl_server(self):

        def test_app(env, start_response):
            start_response('200 OK', {})
            return ['PONG']

        fake_ssl_server = nova.wsgi.Server("fake_ssl", test_app,
                                           host="127.0.0.1", port=0,
                                           use_ssl=True)
        fake_ssl_server.start()
        self.assertNotEqual(0, fake_ssl_server.port)

        response = requests.post(
            'https://127.0.0.1:%s/' % fake_ssl_server.port,
            verify=os.path.join(SSL_CERT_DIR, 'ca.crt'), data='PING')
        self.assertEqual(response.text, 'PONG')

        fake_ssl_server.stop()
        fake_ssl_server.wait()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_socket_options_for_ssl_server(self):
        # test normal socket options has set properly
        self.flags(tcp_keepidle=500, group='wsgi')
        server = nova.wsgi.Server("test_socket_options", None,
                                  host="127.0.0.1", port=0,
                                  use_ssl=True)
        server.start()
        sock = server._socket
        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
                                            socket.SO_REUSEADDR))
        self.assertEqual(1, sock.getsockopt(socket.SOL_SOCKET,
                                            socket.SO_KEEPALIVE))
        if hasattr(socket, 'TCP_KEEPIDLE'):
            self.assertEqual(CONF.wsgi.tcp_keepidle,
                             sock.getsockopt(socket.IPPROTO_TCP,
                                             socket.TCP_KEEPIDLE))
        server.stop()
        server.wait()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def __init__(self, config_path=None):
        """Initialize the loader, and attempt to find the config.

        :param config_path: Full or relative path to the paste config.
        :returns: None

        """
        self.config_path = None

        config_path = config_path or CONF.wsgi.api_paste_config
        if not os.path.isabs(config_path):
            self.config_path = CONF.find_file(config_path)
        elif os.path.exists(config_path):
            self.config_path = config_path

        if not self.config_path:
            raise exception.ConfigNotFound(path=config_path)
项目:meteos    作者:openstack    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        r"""Subclasses will probably want to implement __call__ like this:

        @webob.dec.wsgify(RequestClass=Request)
        def __call__(self, req):
          # Any of the following objects work as responses:

          # Option 1: simple string
          res = 'message\n'

          # Option 2: a nicely formatted HTTP exception page
          res = exc.HTTPForbidden(detail='Nice try')

          # Option 3: a webob Response object (in case you need to play with
          # headers, or you want to be treated like an iterable, or or or)
          res = Response();
          res.app_iter = open('somefile')

          # Option 4: any wsgi app to be run next
          res = self.application

          # Option 5: you can get a Response object for a wsgi app, too, to
          # play with headers etc
          res = req.get_response(self.application)

          # You can then just return your response...
          return res
          # ... or set req.response and return None.
          req.response = res

        See the end of http://pythonpaste.org/webob/modules/dec.html
        for more info.

        """
        raise NotImplementedError(_('You must implement __call__'))
项目:ransomcare    作者:Happyholic1203    | 项目源码 | 文件源码
def start_app(self):
        try:
            self.server_thread = eventlet.listen((self.host, self.port))
            eventlet.wsgi.server(self.server_thread, self.web.app)
        except Exception as e:
            logger.exception(e)
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def __init__(self, name, conf, threads=1000):
        os.umask(0o27)  # ensure files are created with the correct privileges
        self._logger = logging.getLogger("eventlet.wsgi.server")
        self._wsgi_logger = WritableLogger(self._logger)
        self.name = name
        self.threads = threads
        self.children = set()
        self.stale_children = set()
        self.running = True
        self.pgid = os.getpid()
        self.conf = conf
        try:
            os.setpgid(self.pgid, self.pgid)
        except OSError:
            self.pgid = 0
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def start(self, application, default_port):
        """Run a WSGI server with the given application.

        :param application: The application to run in the WSGI server
        :param conf: a cfg.ConfigOpts object
        :param default_port: Port to bind to if none is specified in conf
        """

        eventlet.wsgi.MAX_HEADER_LINE = self.conf.max_header_line
        self.application = application
        self.default_port = default_port
        self.configure_socket()
        self.start_wsgi()
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def run_child(self):
        def child_hup(*args):
            """Shuts down child processes, existing requests are handled."""
            signal.signal(signal.SIGHUP, signal.SIG_IGN)
            eventlet.wsgi.is_accepting = False
            self.sock.close()

        pid = os.fork()
        if pid == 0:
            signal.signal(signal.SIGHUP, child_hup)
            signal.signal(signal.SIGTERM, signal.SIG_DFL)
            # ignore the interrupt signal to avoid a race whereby
            # a child worker receives the signal before the parent
            # and is respawned unnecessarily as a result
            signal.signal(signal.SIGINT, signal.SIG_IGN)
            # The child has no need to stash the unwrapped
            # socket, and the reference prevents a clean
            # exit on sighup
            self._sock = None
            self.run_server()
            LOG.info(_LI('Child %d exiting normally'), os.getpid())
            # self.pool.waitall() is now called in wsgi's server so
            # it's safe to exit here
            sys.exit(0)
        else:
            LOG.info(_LI('Started child %s'), pid)
            self.children.add(pid)
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def _single_run(self, application, sock):
        """Start a WSGI server in a new green thread."""

        LOG.info(_LI("Starting single process server"))
        eventlet.wsgi.server(sock, application, custom_pool=self.pool,
                             url_length_limit=URL_LENGTH_LIMIT,
                             log=self._wsgi_logger, debug=cfg.CONF.debug)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def patch_sendfile():
    from gunicorn.http import wsgi

    if o_sendfile is not None:
        setattr(wsgi, "sendfile", _eventlet_sendfile)
项目:donkey    作者:wroscoe    | 项目源码 | 文件源码
def go(self, address):

        # wrap Flask application with engineio's middleware
        self.app = socketio.Middleware(self.sio, self.app)

        # deploy as an eventlet WSGI server
        try:
            eventlet.wsgi.server(eventlet.listen(address), self.app)

        except KeyboardInterrupt:
            # unless some hits Ctrl+C and then we get this interrupt
            print('stopping')
项目:CENO-BMSignaling    作者:nametoolong    | 项目源码 | 文件源码
def launch_client(eventlet, wsgi):
    from cenobm.client.createhandler import handle_create
    from cenobm.client.requestsender import start_request_sender

    start_request_sender(eventlet.greenpool.GreenPool)

    host = config.get("RequestSender", "host")
    port = config.getint("RequestSender", "port")

    wsgi.server(eventlet.listen((host, port)), handle_create)
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def serve_forever(self):
            self.logger = LoggingWrapper()
            eventlet.wsgi.server(self.server, self.handle, self.logger)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def patch_sendfile():
    from gunicorn.http import wsgi

    if o_sendfile is not None:
        setattr(wsgi, "sendfile", _eventlet_sendfile)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_relpath_config_not_found(self):
        self.flags(api_paste_config='api-paste.ini', group='wsgi')
        self.assertRaises(
            masakari.exception.ConfigNotFound,
            masakari.wsgi.Loader,
        )
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_asbpath_config_not_found(self):
        self.flags(api_paste_config='/etc/masakari/api-paste.ini',
                   group='wsgi')
        self.assertRaises(
            masakari.exception.ConfigNotFound,
            masakari.wsgi.Loader,
        )
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(TestLoaderNormalFilesystem, self).setUp()
        self.config = tempfile.NamedTemporaryFile(mode="w+t")
        self.config.write(self._paste_config.lstrip())
        self.config.seek(0)
        self.config.flush()
        self.loader = masakari.wsgi.Loader(self.config.name)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_no_app(self):
        server = masakari.wsgi.Server("test_app", None)
        self.assertEqual("test_app", server.name)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_custom_max_header_line(self):
        self.flags(max_header_line=4096, group='wsgi')  # Default is 16384
        masakari.wsgi.Server("test_custom_max_header_line", None)
        self.assertEqual(CONF.wsgi.max_header_line,
                         eventlet.wsgi.MAX_HEADER_LINE)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_start_random_port_with_ipv6(self):
        server = masakari.wsgi.Server("test_random_port", None,
            host="::1", port=0)
        server.start()
        self.assertEqual("::1", server.host)
        self.assertNotEqual(0, server.port)
        server.stop()
        server.wait()
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_server_pool_waitall(self):
        # test pools waitall method gets called while stopping server
        server = masakari.wsgi.Server("test_server", None,
            host="127.0.0.1")
        server.start()
        with mock.patch.object(server._pool,
                              'waitall') as mock_waitall:
            server.stop()
            server.wait()
            mock_waitall.assert_called_once_with()
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_reset_pool_size_to_default(self):
        server = masakari.wsgi.Server("test_resize", None,
            host="127.0.0.1", max_url_len=16384)
        server.start()

        # Stopping the server, which in turn sets pool size to 0
        server.stop()
        self.assertEqual(server._pool.size, 0)

        # Resetting pool size to default
        server.reset()
        server.start()
        self.assertEqual(server._pool.size, CONF.wsgi.default_pool_size)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def test_keep_alive(self):
        self.flags(keep_alive=False, group='wsgi')

        # mocking eventlet spawn method to check it is called with
        # configured 'keep_alive' value.
        with mock.patch.object(eventlet,
                               'spawn') as mock_spawn:
            server = masakari.wsgi.Server("test_app", None,
                                      host="127.0.0.1", port=0)
            server.start()
            _, kwargs = mock_spawn.call_args
            self.assertEqual(CONF.wsgi.keep_alive,
                             kwargs['keepalive'])
            server.stop()
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def __init__(self, environ, *args, **kwargs):
        if CONF.wsgi.secure_proxy_ssl_header:
            scheme = environ.get(CONF.wsgi.secure_proxy_ssl_header)
            if scheme:
                environ['wsgi.url_scheme'] = scheme
        super(Request, self).__init__(environ, *args, **kwargs)
项目:masakari    作者:openstack    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        r"""Subclasses will probably want to implement __call__ like this:

        @webob.dec.wsgify(RequestClass=Request)
        def __call__(self, req):
          # Any of the following objects work as responses:

          # Option 1: simple string
          res = 'message\n'

          # Option 2: a nicely formatted HTTP exception page
          res = exc.HTTPForbidden(explanation='Nice try')

          # Option 3: a webob Response object (in case you need to play with
          # headers, or you want to be treated like an iterable, or ...)
          res = Response()
          res.app_iter = open('somefile')

          # Option 4: any wsgi app to be run next
          res = self.application

          # Option 5: you can get a Response object for a wsgi app, too, to
          # play with headers etc
          res = req.get_response(self.application)

          # You can then just return your response...
          return res
          # ... or set req.response and return None.
          req.response = res

        See the end of http://pythonpaste.org/webob/modules/dec.html
        for more info.

        """
        raise NotImplementedError(_('You must implement __call__'))
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def log_message(self, message):
        raise AttributeError('''\
eventlet.wsgi.server.log_message was deprecated and deleted.
Please use server.log.info instead.''')
项目:glare    作者:openstack    | 项目源码 | 文件源码
def __init__(self, threads=1000, initialize_glance_store=False):
        os.umask(0o27)  # ensure files are created with the correct privileges
        self._logger = logging.getLogger("eventlet.wsgi.server")
        self.threads = threads
        self.children = set()
        self.stale_children = set()
        self.running = True
        self.initialize_glance_store = initialize_glance_store
        self.pgid = os.getpid()
        try:
            os.setpgid(self.pgid, self.pgid)
        except OSError:
            self.pgid = 0
项目:glare    作者:openstack    | 项目源码 | 文件源码
def run_child(self):
        def child_hup(*args):
            """Shuts down child processes, existing requests are handled."""
            signal.signal(signal.SIGHUP, signal.SIG_IGN)
            eventlet.wsgi.is_accepting = False
            self.sock.close()

        pid = os.fork()
        if pid == 0:
            signal.signal(signal.SIGHUP, child_hup)
            signal.signal(signal.SIGTERM, signal.SIG_DFL)
            # ignore the interrupt signal to avoid a race whereby
            # a child worker receives the signal before the parent
            # and is respawned unnecessarily as a result
            signal.signal(signal.SIGINT, signal.SIG_IGN)
            # The child has no need to stash the unwrapped
            # socket, and the reference prevents a clean
            # exit on sighup
            self._sock = None
            self.run_server()
            LOG.info('Child %d exiting normally', os.getpid())
            # self.pool.waitall() is now called in wsgi's server so
            # it's safe to exit here
            sys.exit(0)
        else:
            LOG.info('Started child %s', pid)
            self.children.add(pid)
项目:glare    作者:openstack    | 项目源码 | 文件源码
def _single_run(self, application, sock):
        """Start a WSGI server in a new green thread."""
        LOG.info("Starting single process server")
        eventlet.wsgi.server(sock, application, custom_pool=self.pool,
                             log=self._logger,
                             debug=False,
                             keepalive=CONF.http_keepalive,
                             socket_timeout=self.client_socket_timeout)
项目:deb-ryu    作者:openstack    | 项目源码 | 文件源码
def serve_forever(self):
            self.logger = LoggingWrapper()
            eventlet.wsgi.server(self.server, self.handle, self.logger)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_relpath_config_not_found(self):
        self.flags(api_paste_config='api-paste.ini', group='wsgi')
        self.assertRaises(
            nova.exception.ConfigNotFound,
            nova.wsgi.Loader,
        )
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_asbpath_config_not_found(self):
        self.flags(api_paste_config='/etc/nova/api-paste.ini', group='wsgi')
        self.assertRaises(
            nova.exception.ConfigNotFound,
            nova.wsgi.Loader,
        )
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def setUp(self):
        super(TestLoaderNormalFilesystem, self).setUp()
        self.config = tempfile.NamedTemporaryFile(mode="w+t")
        self.config.write(self._paste_config.lstrip())
        self.config.seek(0)
        self.config.flush()
        self.loader = nova.wsgi.Loader(self.config.name)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_no_app(self):
        server = nova.wsgi.Server("test_app", None)
        self.assertEqual("test_app", server.name)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_custom_max_header_line(self):
        self.flags(max_header_line=4096, group='wsgi')  # Default is 16384
        nova.wsgi.Server("test_custom_max_header_line", None)
        self.assertEqual(CONF.wsgi.max_header_line,
                         eventlet.wsgi.MAX_HEADER_LINE)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_start_random_port_with_ipv6(self):
        server = nova.wsgi.Server("test_random_port", None,
            host="::1", port=0)
        server.start()
        self.assertEqual("::1", server.host)
        self.assertNotEqual(0, server.port)
        server.stop()
        server.wait()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_server_pool_waitall(self):
        # test pools waitall method gets called while stopping server
        server = nova.wsgi.Server("test_server", None,
            host="127.0.0.1")
        server.start()
        with mock.patch.object(server._pool,
                              'waitall') as mock_waitall:
            server.stop()
            server.wait()
            mock_waitall.assert_called_once_with()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_reset_pool_size_to_default(self):
        server = nova.wsgi.Server("test_resize", None,
            host="127.0.0.1", max_url_len=16384)
        server.start()

        # Stopping the server, which in turn sets pool size to 0
        server.stop()
        self.assertEqual(server._pool.size, 0)

        # Resetting pool size to default
        server.reset()
        server.start()
        self.assertEqual(server._pool.size, CONF.wsgi.default_pool_size)