我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.ioloop.IOLoop.configured_class()。
def setUp(self): if IOLoop.configured_class().__name__ in ('TwistedIOLoop', 'AsyncIOMainLoop'): # TwistedIOLoop only supports the global reactor, so we can't have # separate IOLoops for client and server threads. # AsyncIOMainLoop doesn't work with the default policy # (although it could with some tweaks to this test and a # policy that created loops for non-main threads). raise unittest.SkipTest( 'Sync HTTPClient not compatible with TwistedIOLoop or ' 'AsyncIOMainLoop') self.server_ioloop = IOLoop() sock, self.port = bind_unused_port() app = Application([('/', HelloWorldHandler)]) self.server = HTTPServer(app, io_loop=self.server_ioloop) self.server.add_socket(sock) self.server_thread = threading.Thread(target=self.server_ioloop.start) self.server_thread.start() self.http_client = HTTPClient()
def setUp(self): if IOLoop.configured_class().__name__ == 'TwistedIOLoop': # TwistedIOLoop only supports the global reactor, so we can't have # separate IOLoops for client and server threads. raise unittest.SkipTest( 'Sync HTTPClient not compatible with TwistedIOLoop') self.server_ioloop = IOLoop() sock, self.port = bind_unused_port() app = Application([('/', HelloWorldHandler)]) server = HTTPServer(app, io_loop=self.server_ioloop) server.add_socket(sock) self.server_thread = threading.Thread(target=self.server_ioloop.start) self.server_thread.start() self.http_client = HTTPClient()
def test_add_callback_while_closing(self): # Issue #635: add_callback() should raise a clean exception # if called while another thread is closing the IOLoop. if IOLoop.configured_class().__name__.endswith('AsyncIOLoop'): raise unittest.SkipTest("AsyncIOMainLoop shutdown not thread safe") closing = threading.Event() def target(): other_ioloop.add_callback(other_ioloop.stop) other_ioloop.start() closing.set() other_ioloop.close(all_fds=True) other_ioloop = IOLoop() thread = threading.Thread(target=target) thread.start() closing.wait() for i in range(1000): try: other_ioloop.add_callback(lambda: None) except RuntimeError as e: self.assertEqual("IOLoop is closing", str(e)) break
def skip_if_twisted(): if IOLoop.configured_class().__name__.endswith(('TwistedIOLoop', 'AsyncIOMainLoop')): raise unittest.SkipTest("Process tests not compatible with " "TwistedIOLoop or AsyncIOMainLoop") # Not using AsyncHTTPTestCase because we need control over the IOLoop.
def test_subprocess(self): if IOLoop.configured_class().__name__.endswith('LayeredTwistedIOLoop'): # This test fails non-deterministically with LayeredTwistedIOLoop. # (the read_until('\n') returns '\n' instead of 'hello\n') # This probably indicates a problem with either TornadoReactor # or TwistedIOLoop, but I haven't been able to track it down # and for now this is just causing spurious travis-ci failures. raise unittest.SkipTest("Subprocess tests not compatible with " "LayeredTwistedIOLoop") subproc = Subprocess([sys.executable, '-u', '-i'], stdin=Subprocess.STREAM, stdout=Subprocess.STREAM, stderr=subprocess.STDOUT, io_loop=self.io_loop) self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM)) subproc.stdout.read_until(b'>>> ', self.stop) self.wait() subproc.stdin.write(b"print('hello')\n") subproc.stdout.read_until(b'\n', self.stop) data = self.wait() self.assertEqual(data, b"hello\n") subproc.stdout.read_until(b">>> ", self.stop) self.wait() subproc.stdin.write(b"raise SystemExit\n") subproc.stdout.read_until_close(self.stop) data = self.wait() self.assertEqual(data, b"")
def save_signal_handlers(): saved = {} for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]: saved[sig] = signal.getsignal(sig) if "twisted" in repr(saved): if not issubclass(IOLoop.configured_class(), TwistedIOLoop): # when the global ioloop is twisted, we expect the signal # handlers to be installed. Otherwise, it means we're not # cleaning up after twisted properly. raise Exception("twisted signal handlers already installed") return saved
def skip_if_twisted(): if IOLoop.configured_class().__name__.endswith('TwistedIOLoop'): raise unittest.SkipTest("Process tests not compatible with TwistedIOLoop") # Not using AsyncHTTPTestCase because we need control over the IOLoop.