我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用tornado.gen.multi_future()。
def test_multi_future_exceptions(self): with ExpectLog(app_log, "Multiple exceptions in yield list"): with self.assertRaises(RuntimeError) as cm: yield [self.async_exception(RuntimeError("error 1")), self.async_exception(RuntimeError("error 2"))] self.assertEqual(str(cm.exception), "error 1") # With only one exception, no error is logged. with self.assertRaises(RuntimeError): yield [self.async_exception(RuntimeError("error 1")), self.async_future(2)] # Exception logging may be explicitly quieted. with self.assertRaises(RuntimeError): yield gen.multi_future( [self.async_exception(RuntimeError("error 1")), self.async_exception(RuntimeError("error 2"))], quiet_exceptions=RuntimeError)
def test_multi_future_dict_delayed(self): @gen.engine def f(): # callbacks run at different times responses = yield gen.multi_future(dict( foo=gen.Task(self.delay_callback, 3, arg="v1"), bar=gen.Task(self.delay_callback, 1, arg="v2"), )) self.assertEqual(responses, dict(foo="v1", bar="v2")) self.stop() self.run_gen(f)
def test_multi_dict(self): @gen.engine def f(): (yield gen.Callback("k1"))("v1") (yield gen.Callback("k2"))("v2") results = yield dict(foo=gen.Wait("k1"), bar=gen.Wait("k2")) self.assertEqual(results, dict(foo="v1", bar="v2")) self.stop() self.run_gen(f) # The following tests explicitly run with both gen.Multi # and gen.multi_future (Task returns a Future, so it can be used # with either).
def test_multi_future_delayed(self): @gen.engine def f(): # callbacks run at different times responses = yield gen.multi_future([ gen.Task(self.delay_callback, 3, arg="v1"), gen.Task(self.delay_callback, 1, arg="v2"), ]) self.assertEqual(responses, ["v1", "v2"]) self.stop() self.run_gen(f)