我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.gen.Task()。
def test_timeout(self): # Set a short timeout and exceed it. @gen_test(timeout=0.1) def test(self): yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1) # This can't use assertRaises because we need to inspect the # exc_info triple (and not just the exception object) try: test(self) self.fail("did not get expected exception") except ioloop.TimeoutError: # The stack trace should blame the add_timeout line, not just # unrelated IOLoop/testing internals. self.assertIn( "gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)", traceback.format_exc()) self.finished = True
def test_run_with_stack_context(self): @gen.coroutine def f1(): self.assertEqual(self.active_contexts, ['c1']) yield run_with_stack_context( StackContext(functools.partial(self.context, 'c2')), f2) self.assertEqual(self.active_contexts, ['c1']) @gen.coroutine def f2(): self.assertEqual(self.active_contexts, ['c1', 'c2']) yield gen.Task(self.io_loop.add_callback) self.assertEqual(self.active_contexts, ['c1', 'c2']) self.assertEqual(self.active_contexts, []) yield run_with_stack_context( StackContext(functools.partial(self.context, 'c1')), f1) self.assertEqual(self.active_contexts, [])
def test_body_size_override_reset(self): # The max_body_size override is reset between requests. stream = IOStream(socket.socket()) try: yield stream.connect(('127.0.0.1', self.get_http_port())) # Use a raw stream so we can make sure it's all on one connection. stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n' b'Content-Length: 10240\r\n\r\n') stream.write(b'a' * 10240) headers, response = yield gen.Task(read_stream_body, stream) self.assertEqual(response, b'10240') # Without the ?expected_size parameter, we get the old default value stream.write(b'PUT /streaming HTTP/1.1\r\n' b'Content-Length: 10240\r\n\r\n') with ExpectLog(gen_log, '.*Content-Length too long'): data = yield stream.read_until_close() self.assertEqual(data, b'') finally: stream.close()
def test_stack_context_leak_exception(self): # same as previous, but with a function that exits with an exception @gen.engine def inner(callback): yield gen.Task(self.io_loop.add_callback) 1 / 0 @gen.engine def outer(): for i in range(10): try: yield gen.Task(inner) except ZeroDivisionError: pass stack_increase = len(stack_context._state.contexts) - initial_stack_depth self.assertTrue(stack_increase <= 2) self.stop() initial_stack_depth = len(stack_context._state.contexts) self.run_gen(outer)
def test_task_refcounting(self): # On CPython, tasks and their arguments should be released immediately # without waiting for garbage collection. @gen.engine def f(): class Foo(object): pass arg = Foo() self.arg_ref = weakref.ref(arg) task = gen.Task(self.io_loop.add_callback, arg=arg) self.task_ref = weakref.ref(task) yield task self.stop() self.run_gen(f) self.assertIs(self.arg_ref(), None) self.assertIs(self.task_ref(), None)
def test_async_await_mixed_multi_native_yieldpoint(self): namespace = exec_test(globals(), locals(), """ async def f1(): await gen.Task(self.io_loop.add_callback) return 42 """) @gen.coroutine def f2(): yield gen.Task(self.io_loop.add_callback) raise gen.Return(43) f2(callback=(yield gen.Callback('cb'))) results = yield [namespace['f1'](), gen.Wait('cb')] self.assertEqual(results, [42, 43]) self.finished = True
def test_swallow_context_exception(self): # Test exception handling: exceptions thrown into the stack context # can be caught and ignored. @gen.coroutine def f2(): (yield gen.Callback(1))() yield gen.Wait(1) self.io_loop.add_callback(lambda: 1 / 0) try: yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 10) except ZeroDivisionError: raise gen.Return(42) result = yield f2() self.assertEqual(result, 42) self.finished = True
def test_streaming_body(self): self.prepared = Future() self.data = Future() self.finished = Future() stream = self.connect(b"/stream_body", connection_close=True) yield self.prepared stream.write(b"4\r\nasdf\r\n") # Ensure the first chunk is received before we send the second. data = yield self.data self.assertEqual(data, b"asdf") self.data = Future() stream.write(b"4\r\nqwer\r\n") data = yield self.data self.assertEquals(data, b"qwer") stream.write(b"0\r\n") yield self.finished data = yield gen.Task(stream.read_until_close) # This would ideally use an HTTP1Connection to read the response. self.assertTrue(data.endswith(b"{}")) stream.close()
def test_future_traceback(self): @return_future @gen.engine def f(callback): yield gen.Task(self.io_loop.add_callback) try: 1 / 0 except ZeroDivisionError: self.expected_frame = traceback.extract_tb( sys.exc_info()[2], limit=1)[0] raise try: yield f() self.fail("didn't get expected exception") except ZeroDivisionError: tb = traceback.extract_tb(sys.exc_info()[2]) self.assertIn(self.expected_frame, tb) # The following series of classes demonstrate and test various styles # of use, with and without generators and futures.
def get(self, geneid=None): '''/gene/<geneid> geneid can be entrezgene, ensemblgene, retired entrezgene ids. /gene/1017 /gene/1017?fields=symbol,name /gene/1017?fields=symbol,name,reporter.HG-U133_Plus_2 ''' if geneid: kwargs = self.get_query_params() kwargs.setdefault('scopes', 'entrezgene,ensemblgene,retired') kwargs.setdefault('species', 'all') gene = yield Task(self.esq.get_gene2, geneid, **kwargs) if gene: self.return_json(gene) self.ga_track(event={'category': 'v2_api', 'action': 'gene_get'}) else: raise HTTPError(404) else: raise HTTPError(404)
def post(self, geneid=None): ''' This is essentially the same as post request in QueryHandler, with different defaults. parameters: ids fields species ''' kwargs = self.get_query_params() ids = kwargs.pop('ids', None) if ids: ids = re.split('[\s\r\n+|,]+', ids) scopes = 'entrezgene,ensemblgene,retired' fields = kwargs.pop('fields', None) res = yield Task(self.esq.mget_gene2, ids, fields=fields, scopes=scopes, **kwargs) else: res = {'success': False, 'error': "Missing required parameters."} self.return_json(res) self.ga_track(event={'category': 'v2_api', 'action': 'gene_post', 'label': 'qsize', 'value': len(ids) if ids else 0})
def test_async(self): yield gen.Task(self.io_loop.add_callback) self.finished = True
def test_no_timeout(self): # A test that does not exceed its timeout should succeed. @gen_test(timeout=1) def test(self): time = self.io_loop.time yield gen.Task(self.io_loop.add_timeout, time() + 0.1) test(self) self.finished = True
def test_timeout_environment_variable(self): @gen_test(timeout=0.5) def test_long_timeout(self): time = self.io_loop.time yield gen.Task(self.io_loop.add_timeout, time() + 0.25) # Uses provided timeout of 0.5 seconds, doesn't time out. with set_environ('ASYNC_TEST_TIMEOUT', '0.1'): test_long_timeout(self) self.finished = True
def test_no_timeout_environment_variable(self): @gen_test(timeout=0.01) def test_short_timeout(self): time = self.io_loop.time yield gen.Task(self.io_loop.add_timeout, time() + 1) # Uses environment-variable timeout of 0.1, times out. with set_environ('ASYNC_TEST_TIMEOUT', '0.1'): with self.assertRaises(ioloop.TimeoutError): test_short_timeout(self) self.finished = True
def test_with_method_kwargs(self): @gen_test def test_with_kwargs(self, **kwargs): self.assertDictEqual(kwargs, {'test': 'test'}) yield gen.Task(self.io_loop.add_callback) test_with_kwargs(self, test='test') self.finished = True
def test_yield_in_with_exception_stack_context(self): # As above, but with ExceptionStackContext instead of StackContext. @gen.engine def f(): with ExceptionStackContext(lambda t, v, tb: False): yield gen.Task(self.io_loop.add_callback) with self.assertRaises(StackContextInconsistentError): f() self.wait()
def test_async_result(self): @gen.coroutine def f(): yield gen.Task(self.io_loop.add_callback) raise gen.Return(42) self.assertEqual(self.io_loop.run_sync(f), 42)
def test_async_exception(self): @gen.coroutine def f(): yield gen.Task(self.io_loop.add_callback) 1 / 0 with self.assertRaises(ZeroDivisionError): self.io_loop.run_sync(f)
def test_native_coroutine(self): namespace = exec_test(globals(), locals(), """ async def f(): await gen.Task(self.io_loop.add_callback) """) self.io_loop.run_sync(namespace['f'])
def test_exception_in_task_phase1(self): def fail_task(callback): 1 / 0 @gen.engine def f(): try: yield gen.Task(fail_task) raise Exception("did not get expected exception") except ZeroDivisionError: self.stop() self.run_gen(f)
def test_exception_in_task_phase2(self): # This is the case that requires the use of stack_context in gen.engine def fail_task(callback): self.io_loop.add_callback(lambda: 1 / 0) @gen.engine def f(): try: yield gen.Task(fail_task) raise Exception("did not get expected exception") except ZeroDivisionError: self.stop() self.run_gen(f)
def test_task(self): @gen.engine def f(): yield gen.Task(self.io_loop.add_callback) self.stop() self.run_gen(f)
def test_multi_yieldpoint_delayed(self): @gen.engine def f(): # callbacks run at different times responses = yield gen.Multi([ gen.Task(self.delay_callback, 3, arg="v1"), gen.Task(self.delay_callback, 1, arg="v2"), ]) self.assertEqual(responses, ["v1", "v2"]) self.stop() self.run_gen(f)
def test_multi_yieldpoint_dict_delayed(self): @gen.engine def f(): # callbacks run at different times responses = yield gen.Multi(dict( foo=gen.Task(self.delay_callback, 3, arg="v1"), bar=gen.Task(self.delay_callback, 1, arg="v2"), )) self.assertEqual(responses, dict(foo="v1", bar="v2")) self.stop() self.run_gen(f)
def test_multi_future_delayed(self): @gen.engine def f(): # callbacks run at different times responses = yield gen.multi_future([ gen.Task(self.delay_callback, 3, arg="v1"), gen.Task(self.delay_callback, 1, arg="v2"), ]) self.assertEqual(responses, ["v1", "v2"]) self.stop() self.run_gen(f)
def test_multi_future_dict_delayed(self): @gen.engine def f(): # callbacks run at different times responses = yield gen.multi_future(dict( foo=gen.Task(self.delay_callback, 3, arg="v1"), bar=gen.Task(self.delay_callback, 1, arg="v2"), )) self.assertEqual(responses, dict(foo="v1", bar="v2")) self.stop() self.run_gen(f)
def test_multi_performance(self): # Yielding a list used to have quadratic performance; make # sure a large list stays reasonable. On my laptop a list of # 2000 used to take 1.8s, now it takes 0.12. start = time.time() yield [gen.Task(self.io_loop.add_callback) for i in range(2000)] end = time.time() self.assertLess(end - start, 1.0)
def test_arguments(self): @gen.engine def f(): (yield gen.Callback("noargs"))() self.assertEqual((yield gen.Wait("noargs")), None) (yield gen.Callback("1arg"))(42) self.assertEqual((yield gen.Wait("1arg")), 42) (yield gen.Callback("kwargs"))(value=42) result = yield gen.Wait("kwargs") self.assertTrue(isinstance(result, gen.Arguments)) self.assertEqual(((), dict(value=42)), result) self.assertEqual(dict(value=42), result.kwargs) (yield gen.Callback("2args"))(42, 43) result = yield gen.Wait("2args") self.assertTrue(isinstance(result, gen.Arguments)) self.assertEqual(((42, 43), {}), result) self.assertEqual((42, 43), result.args) def task_func(callback): callback(None, error="foo") result = yield gen.Task(task_func) self.assertTrue(isinstance(result, gen.Arguments)) self.assertEqual(((None,), dict(error="foo")), result) self.stop() self.run_gen(f)
def test_stack_context_leak(self): # regression test: repeated invocations of a gen-based # function should not result in accumulated stack_contexts def _stack_depth(): head = stack_context._state.contexts[1] length = 0 while head is not None: length += 1 head = head.old_contexts[1] return length @gen.engine def inner(callback): yield gen.Task(self.io_loop.add_callback) callback() @gen.engine def outer(): for i in range(10): yield gen.Task(inner) stack_increase = _stack_depth() - initial_stack_depth self.assertTrue(stack_increase <= 2) self.stop() initial_stack_depth = _stack_depth() self.run_gen(outer)