我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.gen.coroutine()。
def motor_coroutine(f): """A coroutine that accepts an optional callback. Given a callback, the function returns None, and the callback is run with (result, error). Without a callback the function returns a Future. """ coro = gen.coroutine(f) @functools.wraps(f) def wrapper(*args, **kwargs): callback = kwargs.pop('callback', None) if callback and not callable(callback): raise callback_type_error future = coro(*args, **kwargs) if callback: def _callback(future): try: result = future.result() callback(result, None) except Exception, e: callback(None, e) future.add_done_callback(_callback) else: return future return wrapper
def release(self): """Increment the counter and wake one waiter.""" self._value += 1 while self._waiters: waiter = self._waiters.popleft() if not waiter.done(): self._value -= 1 # If the waiter is a coroutine paused at # # with (yield semaphore.acquire()): # # then the context manager's __exit__ calls release() at the end # of the "with" block. waiter.set_result(_ReleasingContextManager(self)) break
def test_acquire_fifo(self): lock = locks.Lock() self.assertTrue(lock.acquire().done()) N = 5 history = [] @gen.coroutine def f(idx): with (yield lock.acquire()): history.append(idx) futures = [f(i) for i in range(N)] self.assertFalse(any(future.done() for future in futures)) lock.release() yield futures self.assertEqual(list(range(N)), history)
def test_task_done(self): q = self.queue_class() for i in range(100): q.put_nowait(i) self.accumulator = 0 @gen.coroutine def worker(): while True: item = yield q.get() self.accumulator += item q.task_done() yield gen.sleep(random() * 0.01) # Two coroutines share work. worker() worker() yield q.join() self.assertEqual(sum(range(100)), self.accumulator)
def test_producer_consumer(self): q = queues.Queue(maxsize=3) history = [] # We don't yield between get() and task_done(), so get() must wait for # the next tick. Otherwise we'd immediately call task_done and unblock # join() before q.put() resumes, and we'd only process the first four # items. @gen.coroutine def consumer(): while True: history.append((yield q.get())) q.task_done() @gen.coroutine def producer(): for item in range(10): yield q.put(item) consumer() yield producer() yield q.join() self.assertEqual(list(range(10)), history)
def test_run_with_stack_context(self): @gen.coroutine def f1(): self.assertEqual(self.active_contexts, ['c1']) yield run_with_stack_context( StackContext(functools.partial(self.context, 'c2')), f2) self.assertEqual(self.active_contexts, ['c1']) @gen.coroutine def f2(): self.assertEqual(self.active_contexts, ['c1', 'c2']) yield gen.Task(self.io_loop.add_callback) self.assertEqual(self.active_contexts, ['c1', 'c2']) self.assertEqual(self.active_contexts, []) yield run_with_stack_context( StackContext(functools.partial(self.context, 'c1')), f1) self.assertEqual(self.active_contexts, [])
def test_async_await_mixed_multi_native_yieldpoint(self): namespace = exec_test(globals(), locals(), """ async def f1(): await gen.Task(self.io_loop.add_callback) return 42 """) @gen.coroutine def f2(): yield gen.Task(self.io_loop.add_callback) raise gen.Return(43) f2(callback=(yield gen.Callback('cb'))) results = yield [namespace['f1'](), gen.Wait('cb')] self.assertEqual(results, [42, 43]) self.finished = True
def test_replace_yieldpoint_exception(self): # Test exception handling: a coroutine can catch one exception # raised by a yield point and raise a different one. @gen.coroutine def f1(): 1 / 0 @gen.coroutine def f2(): try: yield f1() except ZeroDivisionError: raise KeyError() future = f2() with self.assertRaises(KeyError): yield future self.finished = True
def test_swallow_yieldpoint_exception(self): # Test exception handling: a coroutine can catch an exception # raised by a yield point and not raise a different one. @gen.coroutine def f1(): 1 / 0 @gen.coroutine def f2(): try: yield f1() except ZeroDivisionError: raise gen.Return(42) result = yield f2() self.assertEqual(result, 42) self.finished = True
def test_swallow_context_exception(self): # Test exception handling: exceptions thrown into the stack context # can be caught and ignored. @gen.coroutine def f2(): (yield gen.Callback(1))() yield gen.Wait(1) self.io_loop.add_callback(lambda: 1 / 0) try: yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 10) except ZeroDivisionError: raise gen.Return(42) result = yield f2() self.assertEqual(result, 42) self.finished = True
def test_moment(self): calls = [] @gen.coroutine def f(name, yieldable): for i in range(5): calls.append(name) yield yieldable # First, confirm the behavior without moment: each coroutine # monopolizes the event loop until it finishes. immediate = Future() immediate.set_result(None) yield [f('a', immediate), f('b', immediate)] self.assertEqual(''.join(calls), 'aaaaabbbbb') # With moment, they take turns. calls = [] yield [f('a', gen.moment), f('b', gen.moment)] self.assertEqual(''.join(calls), 'ababababab') self.finished = True calls = [] yield [f('a', gen.moment), f('b', immediate)] self.assertEqual(''.join(calls), 'abbbbbaaaa')
def test_py3_leak_exception_context(self): class LeakedException(Exception): pass @gen.coroutine def inner(iteration): raise LeakedException(iteration) try: yield inner(1) except LeakedException as e: self.assertEqual(str(e), "1") self.assertIsNone(e.__context__) try: yield inner(2) except LeakedException as e: self.assertEqual(str(e), "2") self.assertIsNone(e.__context__) self.finished = True
def twisted_coroutine_fetch(self, url, runner): body = [None] @gen.coroutine def f(): # This is simpler than the non-coroutine version, but it cheats # by reading the body in one blob instead of streaming it with # a Protocol. client = Agent(self.reactor) response = yield client.request(b'GET', utf8(url)) with warnings.catch_warnings(): # readBody has a buggy DeprecationWarning in Twisted 15.0: # https://twistedmatrix.com/trac/changeset/43379 warnings.simplefilter('ignore', category=DeprecationWarning) body[0] = yield readBody(response) self.stop_loop() self.io_loop.add_callback(f) runner() return body[0]
def test_streaming_until_close_future(self): server, client = self.make_iostream_pair() try: chunks = [] @gen.coroutine def client_task(): yield client.read_until_close(streaming_callback=chunks.append) @gen.coroutine def server_task(): yield server.write(b"1234") yield gen.sleep(0.01) yield server.write(b"5678") server.close() @gen.coroutine def f(): yield [client_task(), server_task()] self.io_loop.run_sync(f) self.assertEqual(chunks, [b"1234", b"5678"]) finally: server.close() client.close()
def authenticate_redirect(self, callback_uri=None, callback=None): """Just like `~OAuthMixin.authorize_redirect`, but auto-redirects if authorized. This is generally the right interface to use if you are using Twitter for single-sign on. .. versionchanged:: 3.1 Now returns a `.Future` and takes an optional callback, for compatibility with `.gen.coroutine`. """ http = self.get_auth_http_client() http.fetch(self._oauth_request_token_url(callback_uri=callback_uri), functools.partial( self._on_request_token, self._OAUTH_AUTHENTICATE_URL, None, callback))
def prepare(self): """Called at the beginning of a request before `get`/`post`/etc. Override this method to perform common initialization regardless of the request method. Asynchronous support: Decorate this method with `.gen.coroutine` or `.return_future` to make it asynchronous (the `asynchronous` decorator cannot be used on `prepare`). If this method returns a `.Future` execution will not proceed until the `.Future` is done. .. versionadded:: 3.1 Asynchronous support. """ pass
def test_context_manager_contended(self): sem = locks.Semaphore() history = [] @gen.coroutine def f(index): with (yield sem.acquire()): history.append('acquired %d' % index) yield gen.sleep(0.01) history.append('release %d' % index) yield [f(i) for i in range(2)] expected_history = [] for i in range(2): expected_history.extend(['acquired %d' % i, 'release %d' % i]) self.assertEqual(expected_history, history)
def test_replace_context_exception(self): # Test exception handling: exceptions thrown into the stack context # can be caught and replaced. # Note that this test and the following are for behavior that is # not really supported any more: coroutines no longer create a # stack context automatically; but one is created after the first # YieldPoint (i.e. not a Future). @gen.coroutine def f2(): (yield gen.Callback(1))() yield gen.Wait(1) self.io_loop.add_callback(lambda: 1 / 0) try: yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 10) except ZeroDivisionError: raise KeyError() future = f2() with self.assertRaises(KeyError): yield future self.finished = True
def test_wait_for_handshake_future(self): test = self handshake_future = Future() class TestServer(TCPServer): def handle_stream(self, stream, address): test.assertIsNone(stream.socket.cipher()) test.io_loop.spawn_callback(self.handle_connection, stream) @gen.coroutine def handle_connection(self, stream): yield stream.wait_for_handshake() handshake_future.set_result(None) yield self.connect_to_server(TestServer) yield handshake_future
def test_poll(self): @gen.coroutine def test(): a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL) f = b.poll(timeout=0) self.assertEqual(f.result(), 0) f = b.poll(timeout=1) assert not f.done() evt = yield f self.assertEqual(evt, 0) f = b.poll(timeout=1000) assert not f.done() yield a.send_multipart([b'hi', b'there']) evt = yield f self.assertEqual(evt, zmq.POLLIN) recvd = yield b.recv_multipart() self.assertEqual(recvd, [b'hi', b'there']) self.loop.run_sync(test)