我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用wsgiref.simple_server.WSGIServer()。
def run(addr, port, wsgi_handler, ipv6=False, threading=False): server_address = (addr, port) if threading: httpd_cls = type(str('WSGIServer'), (socketserver.ThreadingMixIn, WSGIServer), {}) else: httpd_cls = WSGIServer httpd = httpd_cls(server_address, WSGIRequestHandler, ipv6=ipv6) if threading: # ThreadingMixIn.daemon_threads indicates how threads will behave on an # abrupt shutdown; like quitting the server by the user or restarting # by the auto-reloader. True means the server will not wait for thread # termination before it quits. This will make auto-reloader faster # and will prevent the need to kill the server manually if a thread # isn't terminating correctly. httpd.daemon_threads = True httpd.set_app(wsgi_handler) httpd.serve_forever()
def run(addr, port, wsgi_handler, ipv6=False, threading=False, server_cls=WSGIServer): server_address = (addr, port) if threading: httpd_cls = type(str('WSGIServer'), (socketserver.ThreadingMixIn, server_cls), {}) else: httpd_cls = server_cls httpd = httpd_cls(server_address, WSGIRequestHandler, ipv6=ipv6) if threading: # ThreadingMixIn.daemon_threads indicates how threads will behave on an # abrupt shutdown; like quitting the server by the user or restarting # by the auto-reloader. True means the server will not wait for thread # termination before it quits. This will make auto-reloader faster # and will prevent the need to kill the server manually if a thread # isn't terminating correctly. httpd.daemon_threads = True httpd.set_app(wsgi_handler) httpd.serve_forever()
def run(self, app): # pragma: no cover from wsgiref.simple_server import WSGIRequestHandler, WSGIServer from wsgiref.simple_server import make_server import socket class FixedHandler(WSGIRequestHandler): def address_string(self): # Prevent reverse DNS lookups please. return self.client_address[0] def log_request(*args, **kw): if not self.quiet: return WSGIRequestHandler.log_request(*args, **kw) handler_cls = self.options.get('handler_class', FixedHandler) server_cls = self.options.get('server_class', WSGIServer) if ':' in self.host: # Fix wsgiref for IPv6 addresses. if getattr(server_cls, 'address_family') == socket.AF_INET: class server_cls(server_cls): address_family = socket.AF_INET6 srv = make_server(self.host, self.port, app, server_cls, handler_cls) srv.serve_forever()
def test_bytes_validation(self): def app(e, s): s("200 OK", [ ("Content-Type", "text/plain; charset=utf-8"), ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"), ]) return [b"data"] out, err = run_amock(validator(app)) self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n')) ver = sys.version.split()[0].encode('ascii') self.assertEqual( b"HTTP/1.0 200 OK\r\n" b"Server: WSGIServer/0.2 Python/" + ver + b"\r\n" b"Content-Type: text/plain; charset=utf-8\r\n" b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n" b"\r\n" b"data", out)
def test_bytes_validation(self): def app(e, s): s("200 OK", [ ("Content-Type", "text/plain; charset=utf-8"), ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"), ]) return [b"data"] out, err = run_amock(validator(app)) self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n')) ver = sys.version.split()[0].encode('ascii') py = python_implementation().encode('ascii') pyver = py + b"/" + ver self.assertEqual( b"HTTP/1.0 200 OK\r\n" b"Server: WSGIServer/0.2 "+ pyver + b"\r\n" b"Content-Type: text/plain; charset=utf-8\r\n" b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n" b"\r\n" b"data", out)
def run(self, app): from wsgiref.simple_server import WSGIRequestHandler, WSGIServer from wsgiref.simple_server import make_server import socket class FixedHandler(WSGIRequestHandler): def address_string(self): # Prevent reverse DNS lookups please. return self.client_address[0] def log_request(*args, **kw): if not self.quiet: return WSGIRequestHandler.log_request(*args, **kw) handler_cls = self.options.get('handler_class', FixedHandler) server_cls = self.options.get('server_class', WSGIServer) if ':' in self.host: # Fix wsgiref for IPv6 addresses. if getattr(server_cls, 'address_family') == socket.AF_INET: class server_cls(server_cls): address_family = socket.AF_INET6 self.srv = make_server(self.host, self.port, app, server_cls, handler_cls) self.srv.serve_forever() while not self.get_port(): time.sleep(0.001)
def run(self, handler): # pragma: no cover import flup.server.fcgi self.options.setdefault('bindAddress', (self.host, self.port)) flup.server.fcgi.WSGIServer(handler, **self.options).run()
def run(self, app): # pragma: no cover from wsgiref.simple_server import make_server from wsgiref.simple_server import WSGIRequestHandler, WSGIServer import socket class FixedHandler(WSGIRequestHandler): def address_string(self): # Prevent reverse DNS lookups please. return self.client_address[0] def log_request(*args, **kw): if not self.quiet: return WSGIRequestHandler.log_request(*args, **kw) handler_cls = self.options.get('handler_class', FixedHandler) server_cls = self.options.get('server_class', WSGIServer) if ':' in self.host: # Fix wsgiref for IPv6 addresses. if getattr(server_cls, 'address_family') == socket.AF_INET: class server_cls(server_cls): address_family = socket.AF_INET6 self.srv = make_server(self.host, self.port, app, server_cls, handler_cls) self.port = self.srv.server_port # update port actual port (0 means random) try: self.srv.serve_forever() except KeyboardInterrupt: self.srv.server_close() # Prevent ResourceWarning: unclosed socket raise
def run(self, handler): from gevent import pywsgi, local if not isinstance(threading.local(), local.local): msg = "Bottle requires gevent.monkey.patch_all() (before import)" raise RuntimeError(msg) if self.quiet: self.options['log'] = None address = (self.host, self.port) server = pywsgi.WSGIServer(address, handler, **self.options) if 'BOTTLE_CHILD' in os.environ: import signal signal.signal(signal.SIGINT, lambda s, f: server.stop()) server.serve_forever()
def __init__(self, *args, **kwargs): if kwargs.pop('ipv6', False): self.address_family = socket.AF_INET6 super(WSGIServer, self).__init__(*args, **kwargs)
def server_bind(self): """Override server_bind to store the server name.""" super(WSGIServer, self).server_bind() self.setup_environ()
def handle_error(self, request, client_address): if is_broken_pipe_error(): sys.stderr.write("- Broken pipe from %s\n" % (client_address,)) else: super(WSGIServer, self).handle_error(request, client_address) # Inheriting from object required on Python 2.
def _get_server_cls(self, host): """Return an appropriate WSGI server class base on provided host :param host: The listen host for the zaqar API server. """ server_cls = simple_server.WSGIServer if netutils.is_valid_ipv6(host): if getattr(server_cls, 'address_family') == socket.AF_INET: class server_cls(server_cls): address_family = socket.AF_INET6 return server_cls
def __init__(self, *args, **kwargs): if kwargs.pop('ipv6', False): self.address_family = socket.AF_INET6 self.allow_reuse_address = kwargs.pop('allow_reuse_address', True) super(WSGIServer, self).__init__(*args, **kwargs)