Python asyncio 模块,get_event_loop_policy() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用asyncio.get_event_loop_policy()

项目:chunnel    作者:obmarg    | 项目源码 | 文件源码
def event_loop():
    """
    Create an instance of the default event loop for each test case.

    This implementation is mostly a workaround for pytest-asyncio issues #29 &
    #30
    """
    policy = asyncio.get_event_loop_policy()
    res = policy.new_event_loop()
    asyncio.set_event_loop(res)
    res._close = res.close
    res.close = lambda: None

    yield res

    res._close()
项目:buildhub    作者:mozilla-services    | 项目源码 | 文件源码
def lambda_handler(event, context):
    # Log everything to stderr.
    logger.addHandler(logging.StreamHandler(stream=sys.stdout))
    logger.setLevel(logging.DEBUG)

    # Add Sentry (no-op if no configured).
    handler = SentryHandler(sentry)
    handler.setLevel(logging.ERROR)
    logger.addHandler(handler)

    loop = asyncio.get_event_loop_policy().new_event_loop()

    try:
        loop.run_until_complete(main(loop, event))
    except Exception:
        logger.exception('Aborted.')
        raise
    finally:
        loop.close()
项目:tartiflette    作者:4a616d6573205265696c6c79    | 项目源码 | 文件源码
def __init__(self, work_queue, result_queue, v4_nets, v6_nets):
        self.WORK_QUEUE = work_queue
        self.RESULT_QUEUE = result_queue
        policy = asyncio.get_event_loop_policy()
        policy.set_event_loop(policy.new_event_loop())
        self.LOOP = asyncio.get_event_loop()
        self.NETWORKS = {
            4: [
                ipaddress.ip_network(u'{}'.format(net.strip()), strict=False) for
                net in open(v4_nets).readlines()
            ],
            6: [
                ipaddress.ip_network(u'{}'.format(net.strip()), strict=False) for
                net in open(v6_nets).readlines()
            ],
        }
        super().__init__()
项目:saltyrtc-server-python    作者:saltyrtc    | 项目源码 | 文件源码
def event_loop(request):
    """
    Create an instance of the requested event loop.
    """
    default_event_loop(request=request)

    # Close previous event loop
    policy = asyncio.get_event_loop_policy()
    policy.get_event_loop().close()

    # Create new event loop
    _event_loop = policy.new_event_loop()
    policy.set_event_loop(_event_loop)

    def fin():
        _event_loop.close()

    # Add finaliser and return new event loop
    request.addfinalizer(fin)
    return _event_loop
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def setUp(self):
        # new event loop for each test
        policy = asyncio.get_event_loop_policy()
        self.loop = policy.new_event_loop()
        policy.set_event_loop(self.loop)
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def __init__(self, *a, **kw):
        super(AsyncLogRecord, self).__init__(*a, **kw)
        self.task = 'main'
        if asyncio.get_event_loop_policy()._local._loop:
            task = asyncio.Task.current_task()
            self.task = getattr(task, 'logging_id', 'othr')
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_handle_source_traceback(self):
        loop = asyncio.get_event_loop_policy().new_event_loop()
        loop.set_debug(True)
        self.set_event_loop(loop)

        def check_source_traceback(h):
            lineno = sys._getframe(1).f_lineno - 1
            self.assertIsInstance(h._source_traceback, list)
            self.assertEqual(h._source_traceback[-1][:3],
                             (__file__,
                              lineno,
                              'test_handle_source_traceback'))

        # call_soon
        h = loop.call_soon(noop)
        check_source_traceback(h)

        # call_soon_threadsafe
        h = loop.call_soon_threadsafe(noop)
        check_source_traceback(h)

        # call_later
        h = loop.call_later(0, noop)
        check_source_traceback(h)

        # call_at
        h = loop.call_later(0, noop)
        check_source_traceback(h)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_get_event_loop_policy(self):
        policy = asyncio.get_event_loop_policy()
        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_set_event_loop_policy(self):
        self.assertRaises(
            AssertionError, asyncio.set_event_loop_policy, object())

        old_policy = asyncio.get_event_loop_policy()

        policy = asyncio.DefaultEventLoopPolicy()
        asyncio.set_event_loop_policy(policy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
        self.assertIsNot(policy, old_policy)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def setUp(self):
            policy = asyncio.get_event_loop_policy()
            self.loop = policy.new_event_loop()
            self.set_event_loop(self.loop)

            watcher = self.Watcher()
            watcher.attach_loop(self.loop)
            policy.set_child_watcher(watcher)
            self.addCleanup(policy.set_child_watcher, None)
项目:aiotask-context    作者:Skyscanner    | 项目源码 | 文件源码
def asyncio_loop():
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()
项目:scriptworker    作者:mozilla-releng    | 项目源码 | 文件源码
def event_loop():
    """Create an instance of the default event loop for each test case.
    From https://github.com/pytest-dev/pytest-asyncio/issues/29#issuecomment-226947296
    """
    policy = asyncio.get_event_loop_policy()
    res = policy.new_event_loop()
    asyncio.set_event_loop(res)
    res._close = res.close
    res.close = lambda: None

    yield res

    res._close()
项目:factoriommo-agent    作者:factoriommo    | 项目源码 | 文件源码
def run(self):
        logger.debug("Booted rcon sender thread")
        policy = asyncio.get_event_loop_policy()
        policy.set_event_loop(policy.new_event_loop())
        loop = asyncio.get_event_loop()
        last_data = time()

        try:
            self.conn = RconConnection(self.options.rcon_host, int(self.options.rcon_port), self.options.rcon_password)
            resp = loop.run_until_complete(self.conn.exec_command("/silent-command print('FactorioMCd connected.')"))
            resp = loop.run_until_complete(self.conn.exec_command("/silent-command print('FactorioMCd connected.')"))
            self.connected = True
        except:
            logger.exception("Could not connect to rcon, retry delayed.")

        while self.running.value:
            try:
                data = self.q.get(timeout=3)
                last_data = time()
            except Empty:
                data = None

            if not data:
                if (time() - last_data) > 30 and self.connected:
                    try:
                        self.conn.close()
                        self.connected = False
                        logger.info("RCON connection closed (volentarily) due to timeout")
                    except:
                        logger.exception("Error closing connection.")
                sleep(0.1)
            else:
                resp = loop.run_until_complete(self.exec_command(data))
                logger.debug(resp)
项目:gain    作者:gaojiuli    | 项目源码 | 文件源码
def test_file_result():
    test_file = "spider.data"
    url = "file:///{}".format(test_file)
    f = FileResult(url)
    test_str = "test_str"
    policy = asyncio.get_event_loop_policy()
    loop = policy.new_event_loop()
    # loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(f.save(test_str))
    with open(test_file, 'r+') as test_file:
        assert test_file.read() == test_str + "\n"
    if os.path.exists(url):
        os.remove(url)
项目:amqproto    作者:malinoff    | 项目源码 | 文件源码
def event_loop():
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_handle_source_traceback(self):
        loop = asyncio.get_event_loop_policy().new_event_loop()
        loop.set_debug(True)
        self.set_event_loop(loop)

        def check_source_traceback(h):
            lineno = sys._getframe(1).f_lineno - 1
            self.assertIsInstance(h._source_traceback, list)
            self.assertEqual(h._source_traceback[-1][:3],
                             (__file__,
                              lineno,
                              'test_handle_source_traceback'))

        # call_soon
        h = loop.call_soon(noop)
        check_source_traceback(h)

        # call_soon_threadsafe
        h = loop.call_soon_threadsafe(noop)
        check_source_traceback(h)

        # call_later
        h = loop.call_later(0, noop)
        check_source_traceback(h)

        # call_at
        h = loop.call_later(0, noop)
        check_source_traceback(h)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_get_event_loop_policy(self):
        policy = asyncio.get_event_loop_policy()
        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_set_event_loop_policy(self):
        self.assertRaises(
            AssertionError, asyncio.set_event_loop_policy, object())

        old_policy = asyncio.get_event_loop_policy()

        policy = asyncio.DefaultEventLoopPolicy()
        asyncio.set_event_loop_policy(policy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
        self.assertIsNot(policy, old_policy)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
            policy = asyncio.get_event_loop_policy()
            self.loop = policy.new_event_loop()
            self.set_event_loop(self.loop)

            watcher = self.Watcher()
            watcher.attach_loop(self.loop)
            policy.set_child_watcher(watcher)
            self.addCleanup(policy.set_child_watcher, None)
项目:aiorethink    作者:lars-tiede    | 项目源码 | 文件源码
def event_loop():
    """Create an instance of the default event loop for each test case."""
    policy = asyncio.get_event_loop_policy()
    res = policy.new_event_loop()
    asyncio.set_event_loop(res)
    res._close = res.close
    res.close = lambda: None

    yield res

    res._close()
项目:tailsocket    作者:yeraydiazdiaz    | 项目源码 | 文件源码
def safe_event_loop():
    """Fixture to fallback to `select` in Linux.

    """
    if sys.platform == 'linux':
        selector = selectors.SelectSelector()
        loop = asyncio.SelectorEventLoop(selector)
        asyncio.set_event_loop(loop)
        return loop

    return asyncio.get_event_loop_policy().new_event_loop()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_handle_source_traceback(self):
        loop = asyncio.get_event_loop_policy().new_event_loop()
        loop.set_debug(True)
        self.set_event_loop(loop)

        def check_source_traceback(h):
            lineno = sys._getframe(1).f_lineno - 1
            self.assertIsInstance(h._source_traceback, list)
            self.assertEqual(h._source_traceback[-1][:3],
                             (__file__,
                              lineno,
                              'test_handle_source_traceback'))

        # call_soon
        h = loop.call_soon(noop)
        check_source_traceback(h)

        # call_soon_threadsafe
        h = loop.call_soon_threadsafe(noop)
        check_source_traceback(h)

        # call_later
        h = loop.call_later(0, noop)
        check_source_traceback(h)

        # call_at
        h = loop.call_later(0, noop)
        check_source_traceback(h)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_get_event_loop_policy(self):
        policy = asyncio.get_event_loop_policy()
        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_set_event_loop_policy(self):
        self.assertRaises(
            AssertionError, asyncio.set_event_loop_policy, object())

        old_policy = asyncio.get_event_loop_policy()

        policy = asyncio.DefaultEventLoopPolicy()
        asyncio.set_event_loop_policy(policy)
        self.assertIs(policy, asyncio.get_event_loop_policy())
        self.assertIsNot(policy, old_policy)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
            policy = asyncio.get_event_loop_policy()
            self.loop = policy.new_event_loop()

            # ensure that the event loop is passed explicitly in asyncio
            policy.set_event_loop(None)

            watcher = self.Watcher()
            watcher.attach_loop(self.loop)
            policy.set_child_watcher(watcher)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def tearDown(self):
            policy = asyncio.get_event_loop_policy()
            policy.set_child_watcher(None)
            self.loop.close()
            super().tearDown()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def tearDown(self):
            policy = asyncio.get_event_loop_policy()
            self.loop.close()
            policy.set_event_loop(None)
            super().tearDown()
项目:tartiflette    作者:4a616d6573205265696c6c79    | 项目源码 | 文件源码
def __init__(self, work_queue, result_queue):
        self.WORK_QUEUE = work_queue
        self.RESULT_QUEUE = result_queue
        policy = asyncio.get_event_loop_policy()
        policy.set_event_loop(policy.new_event_loop())
        self.LOOP = asyncio.get_event_loop()
        super().__init__()
项目:gdax-python-api    作者:csko    | 项目源码 | 文件源码
def event_loop():
    """Create an instance of the default event loop for each test case."""
    policy = asyncio.get_event_loop_policy()
    res = policy.new_event_loop()
    asyncio.set_event_loop(res)
    res._close = res.close
    res.close = lambda: None

    yield res

    res._close()
项目:UnicORM    作者:LordFeratum    | 项目源码 | 文件源码
def event_loop():
    policy = asyncio.get_event_loop_policy()
    res = policy.new_event_loop()
    asyncio.set_event_loop(res)
    res._close = res.close
    res.close = lambda: None

    yield res

    res._close()