Python asyncio 模块,test_utils() 实例源码

我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用asyncio.test_utils()

项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def test_call_later_aio(framework_aio):
    '''
    Wait for two Futures.
    '''

    # Trollius doesn't come with this, so won't work on py2
    pytest.importorskip('asyncio.test_utils')

    def time_gen():
        when = yield
        assert when == 1
        # even though we only do one call, I guess TestLoop needs
        # a "trailing" yield? "or something"
        when = yield 0
    from asyncio.test_utils import TestLoop
    new_loop = TestLoop(time_gen)

    calls = []
    with replace_loop(new_loop) as fake_loop:
        def foo(*args, **kw):
            calls.append((args, kw))

        delay = txaio.call_later(1, foo, 5, 6, 7, foo="bar")
        assert len(calls) == 0
        assert hasattr(delay, 'cancel')
        fake_loop.advance_time(2)
        fake_loop._run_once()

        assert len(calls) == 1
        assert calls[0][0] == (5, 6, 7)
        assert calls[0][1] == dict(foo="bar")
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def run_once():
    '''
    A helper that takes one trip through the event-loop to process any
    pending Futures. This is a no-op for Twisted, because you don't
    need to use the event-loop to get callbacks to happen in Twisted.
    '''

    import txaio
    if txaio.using_twisted:
        return

    try:
        import asyncio
        from asyncio.test_utils import run_once as _run_once
        return _run_once(txaio.config.loop or asyncio.get_event_loop())

    except ImportError:
        import trollius as asyncio
        # let any trollius import error out; if we're not using
        # twisted, and have no asyncio *and* no trollius, that's a
        # problem.

        # copied from asyncio.testutils because trollius has no
        # testutils"

        # just like modern asyncio.testutils.run_once does it...
        loop = asyncio.get_event_loop()
        loop.stop()
        loop.run_forever()
        asyncio.gather(*asyncio.Task.all_tasks())
项目:deb-python-autobahn    作者:openstack    | 项目源码 | 文件源码
def test_async_on_connect_server(self):
        # see also issue 757

        # for python 3.5, this can be "async def foo"
        def foo(x):
            f = txaio.create_future()
            txaio.resolve(f, x * x)
            return f

        values = []

        def on_connect(req):
            f = txaio.create_future()

            def cb(x):
                f = foo(42)
                f.add_callbacks(f, lambda v: values.append(v), None)
                return f
            txaio.add_callbacks(f, cb, None)
            return f

        factory = WebSocketServerFactory()
        server = factory()
        server.onConnect = on_connect
        transport = Mock()

        server.connection_made(transport)
        # need/want to insert real-fake handshake data?
        server.data = b"\r\n".join([
            b'GET /ws HTTP/1.1',
            b'Host: www.example.com',
            b'Sec-WebSocket-Version: 13',
            b'Origin: http://www.example.com.malicious.com',
            b'Sec-WebSocket-Extensions: permessage-deflate',
            b'Sec-WebSocket-Key: tXAxWFUqnhi86Ajj7dRY5g==',
            b'Connection: keep-alive, Upgrade',
            b'Upgrade: websocket',
            b'\r\n',  # last string doesn't get a \r\n from join()
        ])
        server.processHandshake()

        import asyncio
        from asyncio.test_utils import run_once
        run_once(asyncio.get_event_loop())

        self.assertEqual(1, len(values))
        self.assertEqual(42 * 42, values[0])
项目:aiostream    作者:vxgmichel    | 项目源码 | 文件源码
def event_loop():
    """Fixture providing a test event loop.

    The event loop simulate and records the sleep operation,
    available as event_loop.steps

    It also tracks simulated resources and make sure they are
    all released before the loop is closed.
    """

    class TimeTrackingTestLoop(asyncio.test_utils.TestLoop):

        stuck_threshold = 100

        def __init__(self):
            super().__init__()
            self.clear()

        def _run_once(self):
            super(asyncio.test_utils.TestLoop, self)._run_once()
            # Update internals
            self.busy_count += 1
            self._timers = sorted(
                when for when in self._timers if when > loop.time())
            # Time advance
            if self.time_to_go:
                when = self._timers.pop(0)
                step = when - loop.time()
                self.steps.append(step)
                self.advance_time(step)
                self.busy_count = 0

        @property
        def stuck(self):
            return self.busy_count > self.stuck_threshold

        @property
        def time_to_go(self):
            return self._timers and (self.stuck or not self._ready)

        def clear(self):
            self.steps = []
            self.open_resources = 0
            self.resources = 0
            self.busy_count = 0

        @contextmanager
        def assert_cleanup(self):
            self.clear()
            yield self
            assert self.open_resources == 0
            self.clear()

    loop = TimeTrackingTestLoop()
    asyncio.set_event_loop(loop)
    with loop.assert_cleanup():
        yield loop
    loop.close()