Python asyncio 模块,DefaultEventLoopPolicy() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用asyncio.DefaultEventLoopPolicy()

项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_get_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()
        self.assertIsNone(policy._local._loop)

        loop = policy.get_event_loop()
        self.assertIsInstance(loop, asyncio.AbstractEventLoop)

        self.assertIs(policy._local._loop, loop)
        self.assertIs(loop, policy.get_event_loop())
        loop.close()
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_get_event_loop_calls_set_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()

        with mock.patch.object(
                policy, "set_event_loop",
                wraps=policy.set_event_loop) as m_set_event_loop:

            loop = policy.get_event_loop()

            # policy._local._loop must be set through .set_event_loop()
            # (the unix DefaultEventLoopPolicy needs this call to attach
            # the child watcher correctly)
            m_set_event_loop.assert_called_with(loop)

        loop.close()
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_get_event_loop_after_set_none(self):
        policy = asyncio.DefaultEventLoopPolicy()
        policy.set_event_loop(None)
        self.assertRaises(RuntimeError, policy.get_event_loop)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_get_event_loop_thread(self, m_current_thread):

        def f():
            policy = asyncio.DefaultEventLoopPolicy()
            self.assertRaises(RuntimeError, policy.get_event_loop)

        th = threading.Thread(target=f)
        th.start()
        th.join()
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_new_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()

        loop = policy.new_event_loop()
        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
        loop.close()
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_set_event_loop_policy(self):
        self.assertRaises(
            AssertionError, asyncio.set_event_loop_policy, object())

        old_policy = asyncio.get_event_loop_policy()

        policy = asyncio.DefaultEventLoopPolicy()
        asyncio.set_event_loop_policy(policy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
        self.assertIsNot(policy, old_policy)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def create_policy(self):
        return asyncio.DefaultEventLoopPolicy()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_get_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()
        self.assertIsNone(policy._local._loop)

        loop = policy.get_event_loop()
        self.assertIsInstance(loop, asyncio.AbstractEventLoop)

        self.assertIs(policy._local._loop, loop)
        self.assertIs(loop, policy.get_event_loop())
        loop.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_get_event_loop_calls_set_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()

        with mock.patch.object(
                policy, "set_event_loop",
                wraps=policy.set_event_loop) as m_set_event_loop:

            loop = policy.get_event_loop()

            # policy._local._loop must be set through .set_event_loop()
            # (the unix DefaultEventLoopPolicy needs this call to attach
            # the child watcher correctly)
            m_set_event_loop.assert_called_with(loop)

        loop.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_get_event_loop_after_set_none(self):
        policy = asyncio.DefaultEventLoopPolicy()
        policy.set_event_loop(None)
        self.assertRaises(RuntimeError, policy.get_event_loop)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_get_event_loop_thread(self, m_current_thread):

        def f():
            policy = asyncio.DefaultEventLoopPolicy()
            self.assertRaises(RuntimeError, policy.get_event_loop)

        th = threading.Thread(target=f)
        th.start()
        th.join()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_new_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()

        loop = policy.new_event_loop()
        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
        loop.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_set_event_loop_policy(self):
        self.assertRaises(
            AssertionError, asyncio.set_event_loop_policy, object())

        old_policy = asyncio.get_event_loop_policy()

        policy = asyncio.DefaultEventLoopPolicy()
        asyncio.set_event_loop_policy(policy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
        self.assertIsNot(policy, old_policy)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def create_policy(self):
        return asyncio.DefaultEventLoopPolicy()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_get_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()
        self.assertIsNone(policy._local._loop)

        loop = policy.get_event_loop()
        self.assertIsInstance(loop, asyncio.AbstractEventLoop)

        self.assertIs(policy._local._loop, loop)
        self.assertIs(loop, policy.get_event_loop())
        loop.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_get_event_loop_calls_set_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()

        with mock.patch.object(
                policy, "set_event_loop",
                wraps=policy.set_event_loop) as m_set_event_loop:

            loop = policy.get_event_loop()

            # policy._local._loop must be set through .set_event_loop()
            # (the unix DefaultEventLoopPolicy needs this call to attach
            # the child watcher correctly)
            m_set_event_loop.assert_called_with(loop)

        loop.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_get_event_loop_after_set_none(self):
        policy = asyncio.DefaultEventLoopPolicy()
        policy.set_event_loop(None)
        self.assertRaises(AssertionError, policy.get_event_loop)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_get_event_loop_thread(self, m_current_thread):

        def f():
            policy = asyncio.DefaultEventLoopPolicy()
            self.assertRaises(AssertionError, policy.get_event_loop)

        th = threading.Thread(target=f)
        th.start()
        th.join()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_new_event_loop(self):
        policy = asyncio.DefaultEventLoopPolicy()

        loop = policy.new_event_loop()
        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
        loop.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_set_event_loop_policy(self):
        self.assertRaises(
            AssertionError, asyncio.set_event_loop_policy, object())

        old_policy = asyncio.get_event_loop_policy()

        policy = asyncio.DefaultEventLoopPolicy()
        asyncio.set_event_loop_policy(policy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
        self.assertIsNot(policy, old_policy)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def create_policy(self):
        return asyncio.DefaultEventLoopPolicy()