我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.concurrent.run_on_executor()。
def test_no_calling(self): class Object(object): def __init__(self, io_loop): self.io_loop = io_loop self.executor = futures.thread.ThreadPoolExecutor(1) @run_on_executor def f(self): return 42 o = Object(io_loop=self.io_loop) answer = yield o.f() self.assertEqual(answer, 42)
def test_call_with_no_args(self): class Object(object): def __init__(self, io_loop): self.io_loop = io_loop self.executor = futures.thread.ThreadPoolExecutor(1) @run_on_executor() def f(self): return 42 o = Object(io_loop=self.io_loop) answer = yield o.f() self.assertEqual(answer, 42)
def test_call_with_io_loop(self): class Object(object): def __init__(self, io_loop): self._io_loop = io_loop self.executor = futures.thread.ThreadPoolExecutor(1) @run_on_executor(io_loop='_io_loop') def f(self): return 42 o = Object(io_loop=self.io_loop) answer = yield o.f() self.assertEqual(answer, 42)
def test_call_with_executor(self): class Object(object): def __init__(self, io_loop): self.io_loop = io_loop self.__executor = futures.thread.ThreadPoolExecutor(1) @run_on_executor(executor='_Object__executor') def f(self): return 42 o = Object(io_loop=self.io_loop) answer = yield o.f() self.assertEqual(answer, 42)
def test_call_with_both(self): class Object(object): def __init__(self, io_loop): self._io_loop = io_loop self.__executor = futures.thread.ThreadPoolExecutor(1) @run_on_executor(io_loop='_io_loop', executor='_Object__executor') def f(self): return 42 o = Object(io_loop=self.io_loop) answer = yield o.f() self.assertEqual(answer, 42)