我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用http.server.HTTPServer.__init__()。
def __init__(self, host, port, app, handler=None, passthrough_errors=False, ssl_context=None): if handler is None: handler = WSGIRequestHandler self.address_family = select_ip_version(host, port) HTTPServer.__init__(self, (host, int(port)), handler) self.app = app self.passthrough_errors = passthrough_errors self.shutdown_signal = False if ssl_context is not None: try: from OpenSSL import tsafe except ImportError: raise TypeError('SSL is not available if the OpenSSL ' 'library is not installed.') if isinstance(ssl_context, tuple): ssl_context = load_ssl_context(*ssl_context) if ssl_context == 'adhoc': ssl_context = generate_adhoc_ssl_context() self.socket = tsafe.Connection(ssl_context, self.socket) self.ssl_context = ssl_context else: self.ssl_context = None
def __init__(self, host, port, app, handler=None, passthrough_errors=False, ssl_context=None): if handler is None: handler = WSGIRequestHandler self.address_family = select_ip_version(host, port) HTTPServer.__init__(self, (host, int(port)), handler) self.app = app self.passthrough_errors = passthrough_errors self.shutdown_signal = False if ssl_context is not None: if isinstance(ssl_context, tuple): ssl_context = load_ssl_context(*ssl_context) if ssl_context == 'adhoc': ssl_context = generate_adhoc_ssl_context() self.socket = ssl_context.wrap_socket(self.socket, server_side=True) self.ssl_context = ssl_context else: self.ssl_context = None
def __init__(self, addr, handler, poll_interval, sockmap): self._localaddr = addr self._remoteaddr = None self.data_size_limit = None self.sockmap = sockmap asyncore.dispatcher.__init__(self, map=sockmap) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(0) self.set_socket(sock, map=sockmap) # try to re-use a server port if possible self.set_reuse_addr() self.bind(addr) self.port = sock.getsockname()[1] self.listen(5) except: self.close() raise self._handler = handler self._thread = None self.poll_interval = poll_interval
def __init__(self, addr, handler, poll_interval=0.5, log=False, sslctx=None): class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): def __getattr__(self, name, default=None): if name.startswith('do_'): return self.process_request raise AttributeError(name) def process_request(self): self.server._handler(self) def log_message(self, format, *args): if log: super(DelegatingHTTPRequestHandler, self).log_message(format, *args) HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) ControlMixin.__init__(self, handler, poll_interval) self.sslctx = sslctx
def setUp(self): class CheckingFilter(logging.Filter): def __init__(self, cls): self.cls = cls def filter(self, record): t = type(record) if t is not self.cls: msg = 'Unexpected LogRecord type %s, expected %s' % (t, self.cls) raise TypeError(msg) return True BaseTest.setUp(self) self.filter = CheckingFilter(DerivedLogRecord) self.root_logger.addFilter(self.filter) self.orig_factory = logging.getLogRecordFactory()
def __init__(self, addr, handler, poll_interval=0.5, bind_and_activate=True): class DelegatingUDPRequestHandler(DatagramRequestHandler): def handle(self): self.server._handler(self) def finish(self): data = self.wfile.getvalue() if data: try: super(DelegatingUDPRequestHandler, self).finish() except OSError: if not self.server._closed: raise ThreadingUDPServer.__init__(self, addr, DelegatingUDPRequestHandler, bind_and_activate) ControlMixin.__init__(self, handler, poll_interval) self._closed = False
def __init__(self, protocol): self._protocol = protocol self._certfile = None self._keyfile = None self._password = None
def __init__(self, host, port, app, handler=None, passthrough_errors=False, ssl_context=None, fd=None): if handler is None: handler = WSGIRequestHandler self.address_family = select_ip_version(host, port) if fd is not None: real_sock = socket.fromfd(fd, self.address_family, socket.SOCK_STREAM) port = 0 HTTPServer.__init__(self, (host, int(port)), handler) self.app = app self.passthrough_errors = passthrough_errors self.shutdown_signal = False self.host = host self.port = port # Patch in the original socket. if fd is not None: self.socket.close() self.socket = real_sock self.server_address = self.socket.getsockname() if ssl_context is not None: if isinstance(ssl_context, tuple): ssl_context = load_ssl_context(*ssl_context) if ssl_context == 'adhoc': ssl_context = generate_adhoc_ssl_context() # If we are on Python 2 the return value from socket.fromfd # is an internal socket object but what we need for ssl wrap # is the wrapper around it :( sock = self.socket if PY2 and not isinstance(sock, socket.socket): sock = socket.socket(sock.family, sock.type, sock.proto, sock) self.socket = ssl_context.wrap_socket(sock, server_side=True) self.ssl_context = ssl_context else: self.ssl_context = None
def __init__(self, host, port, app, processes=40, handler=None, passthrough_errors=False, ssl_context=None, fd=None): BaseWSGIServer.__init__(self, host, port, app, handler, passthrough_errors, ssl_context, fd) self.max_children = processes
def __init__(self, host, port, app, handler=None, passthrough_errors=False, ssl_context=None, fd=None): if handler is None: handler = WSGIRequestHandler self.address_family = select_ip_version(host, port) if fd is not None: real_sock = socket.fromfd(fd, self.address_family, socket.SOCK_STREAM) port = 0 HTTPServer.__init__(self, (host, int(port)), handler) self.app = app self.passthrough_errors = passthrough_errors self.shutdown_signal = False self.host = host self.port = self.socket.getsockname()[1] # Patch in the original socket. if fd is not None: self.socket.close() self.socket = real_sock self.server_address = self.socket.getsockname() if ssl_context is not None: if isinstance(ssl_context, tuple): ssl_context = load_ssl_context(*ssl_context) if ssl_context == 'adhoc': ssl_context = generate_adhoc_ssl_context() # If we are on Python 2 the return value from socket.fromfd # is an internal socket object but what we need for ssl wrap # is the wrapper around it :( sock = self.socket if PY2 and not isinstance(sock, socket.socket): sock = socket.socket(sock.family, sock.type, sock.proto, sock) self.socket = ssl_context.wrap_socket(sock, server_side=True) self.ssl_context = ssl_context else: self.ssl_context = None
def __init__(self, host, port, app, processes=40, handler=None, passthrough_errors=False, ssl_context=None, fd=None): if not can_fork: raise ValueError('Your platform does not support forking.') BaseWSGIServer.__init__(self, host, port, app, handler, passthrough_errors, ssl_context, fd) self.max_children = processes
def __init__(self, host, port, app, handler=None, passthrough_errors=False, ssl_context=None, fd=None): if handler is None: handler = WSGIRequestHandler self.address_family = select_ip_version(host, port) if fd is not None: real_sock = socket.fromfd(fd, self.address_family, socket.SOCK_STREAM) port = 0 HTTPServer.__init__(self, (host, int(port)), handler) self.app = app self.passthrough_errors = passthrough_errors self.shutdown_signal = False self.host = host self.port = port # Patch in the original socket. if fd is not None: self.socket.close() self.socket = real_sock self.server_address = self.socket.getsockname() if ssl_context is not None: if isinstance(ssl_context, tuple): ssl_context = load_ssl_context(*ssl_context) if ssl_context == 'adhoc': ssl_context = generate_adhoc_ssl_context() self.socket = ssl_context.wrap_socket(self.socket, server_side=True) self.ssl_context = ssl_context else: self.ssl_context = None
def __init__(self, host, port, app, processes=40, handler=None, passthrough_errors=False, ssl_context=None): BaseWSGIServer.__init__(self, host, port, app, handler, passthrough_errors, ssl_context) self.max_children = processes
def __init__(self, con): self._con = con
def __init__(self, host, port): server_address = (host, port) HTTPServer.__init__(self, server_address, RequestHandler)