我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用asyncio.test_utils()。
def test_call_later_aio(framework_aio): ''' Wait for two Futures. ''' # Trollius doesn't come with this, so won't work on py2 pytest.importorskip('asyncio.test_utils') def time_gen(): when = yield assert when == 1 # even though we only do one call, I guess TestLoop needs # a "trailing" yield? "or something" when = yield 0 from asyncio.test_utils import TestLoop new_loop = TestLoop(time_gen) calls = [] with replace_loop(new_loop) as fake_loop: def foo(*args, **kw): calls.append((args, kw)) delay = txaio.call_later(1, foo, 5, 6, 7, foo="bar") assert len(calls) == 0 assert hasattr(delay, 'cancel') fake_loop.advance_time(2) fake_loop._run_once() assert len(calls) == 1 assert calls[0][0] == (5, 6, 7) assert calls[0][1] == dict(foo="bar")
def run_once(): ''' A helper that takes one trip through the event-loop to process any pending Futures. This is a no-op for Twisted, because you don't need to use the event-loop to get callbacks to happen in Twisted. ''' import txaio if txaio.using_twisted: return try: import asyncio from asyncio.test_utils import run_once as _run_once return _run_once(txaio.config.loop or asyncio.get_event_loop()) except ImportError: import trollius as asyncio # let any trollius import error out; if we're not using # twisted, and have no asyncio *and* no trollius, that's a # problem. # copied from asyncio.testutils because trollius has no # testutils" # just like modern asyncio.testutils.run_once does it... loop = asyncio.get_event_loop() loop.stop() loop.run_forever() asyncio.gather(*asyncio.Task.all_tasks())
def test_async_on_connect_server(self): # see also issue 757 # for python 3.5, this can be "async def foo" def foo(x): f = txaio.create_future() txaio.resolve(f, x * x) return f values = [] def on_connect(req): f = txaio.create_future() def cb(x): f = foo(42) f.add_callbacks(f, lambda v: values.append(v), None) return f txaio.add_callbacks(f, cb, None) return f factory = WebSocketServerFactory() server = factory() server.onConnect = on_connect transport = Mock() server.connection_made(transport) # need/want to insert real-fake handshake data? server.data = b"\r\n".join([ b'GET /ws HTTP/1.1', b'Host: www.example.com', b'Sec-WebSocket-Version: 13', b'Origin: http://www.example.com.malicious.com', b'Sec-WebSocket-Extensions: permessage-deflate', b'Sec-WebSocket-Key: tXAxWFUqnhi86Ajj7dRY5g==', b'Connection: keep-alive, Upgrade', b'Upgrade: websocket', b'\r\n', # last string doesn't get a \r\n from join() ]) server.processHandshake() import asyncio from asyncio.test_utils import run_once run_once(asyncio.get_event_loop()) self.assertEqual(1, len(values)) self.assertEqual(42 * 42, values[0])
def event_loop(): """Fixture providing a test event loop. The event loop simulate and records the sleep operation, available as event_loop.steps It also tracks simulated resources and make sure they are all released before the loop is closed. """ class TimeTrackingTestLoop(asyncio.test_utils.TestLoop): stuck_threshold = 100 def __init__(self): super().__init__() self.clear() def _run_once(self): super(asyncio.test_utils.TestLoop, self)._run_once() # Update internals self.busy_count += 1 self._timers = sorted( when for when in self._timers if when > loop.time()) # Time advance if self.time_to_go: when = self._timers.pop(0) step = when - loop.time() self.steps.append(step) self.advance_time(step) self.busy_count = 0 @property def stuck(self): return self.busy_count > self.stuck_threshold @property def time_to_go(self): return self._timers and (self.stuck or not self._ready) def clear(self): self.steps = [] self.open_resources = 0 self.resources = 0 self.busy_count = 0 @contextmanager def assert_cleanup(self): self.clear() yield self assert self.open_resources == 0 self.clear() loop = TimeTrackingTestLoop() asyncio.set_event_loop(loop) with loop.assert_cleanup(): yield loop loop.close()