我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用asyncio.get_event_loop_policy()。
def event_loop(): """ Create an instance of the default event loop for each test case. This implementation is mostly a workaround for pytest-asyncio issues #29 & #30 """ policy = asyncio.get_event_loop_policy() res = policy.new_event_loop() asyncio.set_event_loop(res) res._close = res.close res.close = lambda: None yield res res._close()
def lambda_handler(event, context): # Log everything to stderr. logger.addHandler(logging.StreamHandler(stream=sys.stdout)) logger.setLevel(logging.DEBUG) # Add Sentry (no-op if no configured). handler = SentryHandler(sentry) handler.setLevel(logging.ERROR) logger.addHandler(handler) loop = asyncio.get_event_loop_policy().new_event_loop() try: loop.run_until_complete(main(loop, event)) except Exception: logger.exception('Aborted.') raise finally: loop.close()
def __init__(self, work_queue, result_queue, v4_nets, v6_nets): self.WORK_QUEUE = work_queue self.RESULT_QUEUE = result_queue policy = asyncio.get_event_loop_policy() policy.set_event_loop(policy.new_event_loop()) self.LOOP = asyncio.get_event_loop() self.NETWORKS = { 4: [ ipaddress.ip_network(u'{}'.format(net.strip()), strict=False) for net in open(v4_nets).readlines() ], 6: [ ipaddress.ip_network(u'{}'.format(net.strip()), strict=False) for net in open(v6_nets).readlines() ], } super().__init__()
def event_loop(request): """ Create an instance of the requested event loop. """ default_event_loop(request=request) # Close previous event loop policy = asyncio.get_event_loop_policy() policy.get_event_loop().close() # Create new event loop _event_loop = policy.new_event_loop() policy.set_event_loop(_event_loop) def fin(): _event_loop.close() # Add finaliser and return new event loop request.addfinalizer(fin) return _event_loop
def setUp(self): # new event loop for each test policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() policy.set_event_loop(self.loop)
def __init__(self, *a, **kw): super(AsyncLogRecord, self).__init__(*a, **kw) self.task = 'main' if asyncio.get_event_loop_policy()._local._loop: task = asyncio.Task.current_task() self.task = getattr(task, 'logging_id', 'othr')
def test_handle_source_traceback(self): loop = asyncio.get_event_loop_policy().new_event_loop() loop.set_debug(True) self.set_event_loop(loop) def check_source_traceback(h): lineno = sys._getframe(1).f_lineno - 1 self.assertIsInstance(h._source_traceback, list) self.assertEqual(h._source_traceback[-1][:3], (__file__, lineno, 'test_handle_source_traceback')) # call_soon h = loop.call_soon(noop) check_source_traceback(h) # call_soon_threadsafe h = loop.call_soon_threadsafe(noop) check_source_traceback(h) # call_later h = loop.call_later(0, noop) check_source_traceback(h) # call_at h = loop.call_later(0, noop) check_source_traceback(h)
def test_get_event_loop_policy(self): policy = asyncio.get_event_loop_policy() self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy) self.assertIs(policy, asyncio.get_event_loop_policy())
def test_set_event_loop_policy(self): self.assertRaises( AssertionError, asyncio.set_event_loop_policy, object()) old_policy = asyncio.get_event_loop_policy() policy = asyncio.DefaultEventLoopPolicy() asyncio.set_event_loop_policy(policy) self.assertIs(policy, asyncio.get_event_loop_policy()) self.assertIsNot(policy, old_policy)
def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() self.set_event_loop(self.loop) watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) self.addCleanup(policy.set_child_watcher, None)
def asyncio_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close()
def event_loop(): """Create an instance of the default event loop for each test case. From https://github.com/pytest-dev/pytest-asyncio/issues/29#issuecomment-226947296 """ policy = asyncio.get_event_loop_policy() res = policy.new_event_loop() asyncio.set_event_loop(res) res._close = res.close res.close = lambda: None yield res res._close()
def run(self): logger.debug("Booted rcon sender thread") policy = asyncio.get_event_loop_policy() policy.set_event_loop(policy.new_event_loop()) loop = asyncio.get_event_loop() last_data = time() try: self.conn = RconConnection(self.options.rcon_host, int(self.options.rcon_port), self.options.rcon_password) resp = loop.run_until_complete(self.conn.exec_command("/silent-command print('FactorioMCd connected.')")) resp = loop.run_until_complete(self.conn.exec_command("/silent-command print('FactorioMCd connected.')")) self.connected = True except: logger.exception("Could not connect to rcon, retry delayed.") while self.running.value: try: data = self.q.get(timeout=3) last_data = time() except Empty: data = None if not data: if (time() - last_data) > 30 and self.connected: try: self.conn.close() self.connected = False logger.info("RCON connection closed (volentarily) due to timeout") except: logger.exception("Error closing connection.") sleep(0.1) else: resp = loop.run_until_complete(self.exec_command(data)) logger.debug(resp)
def test_file_result(): test_file = "spider.data" url = "file:///{}".format(test_file) f = FileResult(url) test_str = "test_str" policy = asyncio.get_event_loop_policy() loop = policy.new_event_loop() # loop = asyncio.get_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(f.save(test_str)) with open(test_file, 'r+') as test_file: assert test_file.read() == test_str + "\n" if os.path.exists(url): os.remove(url)
def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close()
def event_loop(): """Create an instance of the default event loop for each test case.""" policy = asyncio.get_event_loop_policy() res = policy.new_event_loop() asyncio.set_event_loop(res) res._close = res.close res.close = lambda: None yield res res._close()
def safe_event_loop(): """Fixture to fallback to `select` in Linux. """ if sys.platform == 'linux': selector = selectors.SelectSelector() loop = asyncio.SelectorEventLoop(selector) asyncio.set_event_loop(loop) return loop return asyncio.get_event_loop_policy().new_event_loop()
def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() # ensure that the event loop is passed explicitly in asyncio policy.set_event_loop(None) watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher)
def tearDown(self): policy = asyncio.get_event_loop_policy() policy.set_child_watcher(None) self.loop.close() super().tearDown()
def tearDown(self): policy = asyncio.get_event_loop_policy() self.loop.close() policy.set_event_loop(None) super().tearDown()
def __init__(self, work_queue, result_queue): self.WORK_QUEUE = work_queue self.RESULT_QUEUE = result_queue policy = asyncio.get_event_loop_policy() policy.set_event_loop(policy.new_event_loop()) self.LOOP = asyncio.get_event_loop() super().__init__()
def event_loop(): policy = asyncio.get_event_loop_policy() res = policy.new_event_loop() asyncio.set_event_loop(res) res._close = res.close res.close = lambda: None yield res res._close()