我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用wsgiref.simple_server.WSGIRequestHandler()。
def get_environ(self): # Strip all headers with underscores in the name before constructing # the WSGI environ. This prevents header-spoofing based on ambiguity # between underscores and dashes both normalized to underscores in WSGI # env vars. Nginx and Apache 2.4+ both do this as well. for k, v in self.headers.items(): if '_' in k: del self.headers[k] env = super(WSGIRequestHandler, self).get_environ() path = self.path if '?' in path: path = path.partition('?')[0] path = uri_to_iri(path).encode(UTF_8) # Under Python 3, non-ASCII values in the WSGI environ are arbitrarily # decoded with ISO-8859-1. We replicate this behavior here. # Refs comment in `get_bytes_from_wsgi()`. env['PATH_INFO'] = path.decode(ISO_8859_1) if six.PY3 else path return env
def handle(self): """Copy of WSGIRequestHandler, but with different ServerHandler""" self.raw_requestline = self.rfile.readline(65537) if len(self.raw_requestline) > 65536: self.requestline = '' self.request_version = '' self.command = '' self.send_error(414) return if not self.parse_request(): # An error code has been sent, just exit return handler = ServerHandler( self.rfile, self.wfile, self.get_stderr(), self.get_environ() ) handler.request_handler = self # backpointer for logging handler.run(self.server.get_app())
def run(addr, port, wsgi_handler, ipv6=False, threading=False): server_address = (addr, port) if threading: httpd_cls = type(str('WSGIServer'), (socketserver.ThreadingMixIn, WSGIServer), {}) else: httpd_cls = WSGIServer httpd = httpd_cls(server_address, WSGIRequestHandler, ipv6=ipv6) if threading: # ThreadingMixIn.daemon_threads indicates how threads will behave on an # abrupt shutdown; like quitting the server by the user or restarting # by the auto-reloader. True means the server will not wait for thread # termination before it quits. This will make auto-reloader faster # and will prevent the need to kill the server manually if a thread # isn't terminating correctly. httpd.daemon_threads = True httpd.set_app(wsgi_handler) httpd.serve_forever()
def run(addr, port, wsgi_handler, ipv6=False, threading=False, server_cls=WSGIServer): server_address = (addr, port) if threading: httpd_cls = type(str('WSGIServer'), (socketserver.ThreadingMixIn, server_cls), {}) else: httpd_cls = server_cls httpd = httpd_cls(server_address, WSGIRequestHandler, ipv6=ipv6) if threading: # ThreadingMixIn.daemon_threads indicates how threads will behave on an # abrupt shutdown; like quitting the server by the user or restarting # by the auto-reloader. True means the server will not wait for thread # termination before it quits. This will make auto-reloader faster # and will prevent the need to kill the server manually if a thread # isn't terminating correctly. httpd.daemon_threads = True httpd.set_app(wsgi_handler) httpd.serve_forever()
def run(self, handler): self.certfile = 'keys/server.pem' from wsgiref.simple_server import make_server, WSGIRequestHandler import ssl if self.quiet: class QuietHandler(WSGIRequestHandler): def log_request(*args, **kw): pass self.options['handler_class'] = QuietHandler srv = make_server(self.host, self.port, handler, **self.options) try: with open(self.certfile): pass srv.socket = ssl.wrap_socket( srv.socket, certfile=self.certfile ) srv.serve_forever() except IOError as e: logging.error("Unable to open Certificate file at location {} {}". format(self.certfile, e)) raise