我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用asyncio.DefaultEventLoopPolicy()。
def test_get_event_loop(self): policy = asyncio.DefaultEventLoopPolicy() self.assertIsNone(policy._local._loop) loop = policy.get_event_loop() self.assertIsInstance(loop, asyncio.AbstractEventLoop) self.assertIs(policy._local._loop, loop) self.assertIs(loop, policy.get_event_loop()) loop.close()
def test_get_event_loop_calls_set_event_loop(self): policy = asyncio.DefaultEventLoopPolicy() with mock.patch.object( policy, "set_event_loop", wraps=policy.set_event_loop) as m_set_event_loop: loop = policy.get_event_loop() # policy._local._loop must be set through .set_event_loop() # (the unix DefaultEventLoopPolicy needs this call to attach # the child watcher correctly) m_set_event_loop.assert_called_with(loop) loop.close()
def test_get_event_loop_after_set_none(self): policy = asyncio.DefaultEventLoopPolicy() policy.set_event_loop(None) self.assertRaises(RuntimeError, policy.get_event_loop)
def test_get_event_loop_thread(self, m_current_thread): def f(): policy = asyncio.DefaultEventLoopPolicy() self.assertRaises(RuntimeError, policy.get_event_loop) th = threading.Thread(target=f) th.start() th.join()
def test_new_event_loop(self): policy = asyncio.DefaultEventLoopPolicy() loop = policy.new_event_loop() self.assertIsInstance(loop, asyncio.AbstractEventLoop) loop.close()
def test_set_event_loop_policy(self): self.assertRaises( AssertionError, asyncio.set_event_loop_policy, object()) old_policy = asyncio.get_event_loop_policy() policy = asyncio.DefaultEventLoopPolicy() asyncio.set_event_loop_policy(policy) self.assertIs(policy, asyncio.get_event_loop_policy()) self.assertIsNot(policy, old_policy)
def create_policy(self): return asyncio.DefaultEventLoopPolicy()
def test_get_event_loop_after_set_none(self): policy = asyncio.DefaultEventLoopPolicy() policy.set_event_loop(None) self.assertRaises(AssertionError, policy.get_event_loop)
def test_get_event_loop_thread(self, m_current_thread): def f(): policy = asyncio.DefaultEventLoopPolicy() self.assertRaises(AssertionError, policy.get_event_loop) th = threading.Thread(target=f) th.start() th.join()