我正在尝试解决此错误:RuntimeError: Cannot close a running event loop在我的异步过程中。我相信这是因为在任务仍未完成时发生故障,然后尝试关闭事件循环。我以为我需要在关闭事件循环之前等待其余的响应,但是我不确定如何在我的特定情况下正确完成该操作。
RuntimeError: Cannot close a running event loop
def start_job(self): if self.auth_expire_timestamp < get_timestamp(): api_obj = api_handler.Api('Api Name', self.dbObj) self.api_auth_resp = api_obj.get_auth_response() self.api_attr = api_obj.get_attributes() try: self.queue_manager(self.do_stuff(json_data)) except aiohttp.ServerDisconnectedError as e: logging.info("Reconnecting...") api_obj = api_handler.Api('API Name', self.dbObj) self.api_auth_resp = api_obj.get_auth_response() self.api_attr = api_obj.get_attributes() self.run_eligibility() async def do_stuff(self, data): tasks = [] async with aiohttp.ClientSession() as session: for row in data: task = asyncio.ensure_future(self.async_post('url', session, row)) tasks.append(task) result = await asyncio.gather(*tasks) self.load_results(result) def queue_manager(self, method): self.loop = asyncio.get_event_loop() future = asyncio.ensure_future(method) self.loop.run_until_complete(future) async def async_post(self, resource, session, data): async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response: resp = [] try: headers = response.headers['foo'] content = await response.read() resp.append(headers) resp.append(content) except KeyError as e: logging.error('KeyError at async_post response') logging.error(e) return resp def shutdown(self): //need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one. self.loop.close() return True
我该如何处理错误并正确关闭事件循环,以便可以启动一个新程序并从本质上重新启动整个程序并继续。
编辑:
根据这个答案,这就是我现在正在尝试的方法。不幸的是,这种错误很少发生,因此,除非我可以强制执行,否则我将不得不等待并观察其是否有效。在我的queue_manager方法中,我将其更改为:
queue_manager
try: self.loop.run_until_complete(future) except Exception as e: future.cancel() self.loop.run_until_complete(future) future.exception()
更新:
我摆脱了该shutdown()方法,queue_manager()而是将此方法添加到了我的方法中,它似乎可以正常工作:
shutdown()
queue_manager()
try: self.loop.run_until_complete(future) except Exception as e: future.cancel() self.check_in_records() self.reconnect() self.start_job() future.exception()
为了回答最初提出的问题,不需要close()运行循环,您可以对整个程序重用同一循环。
close()
给定更新中的代码,您queue_manager可能会像这样:
try: self.loop.run_until_complete(future) except Exception as e: self.check_in_records() self.reconnect() self.start_job()
取消future是没有必要的,据我所知没有任何作用。这不同于专门对做出反应的参考答案KeyboardInterrupt,特别是因为它是由asyncio本身引发的。KeyboardInterrupt可以run_until_complete在将来尚未真正完成的情况下进行传播。Ctrl-C在asyncio中正确处理是非常困难的,甚至是不可能的(有关详细信息,请参见此处),但是幸运的是,问题根本不Ctrl-C存在,而是协程引发的异常。(请注意,KeyboardInterrupt它不会继承自Exception,因此,Ctrl-C除非主体不执行。)
future
KeyboardInterrupt
run_until_complete
Ctrl-C
Exception
我取消了未来,因为在这种情况下,还有其他待处理的任务,并且我想从本质上删除那些任务并开始一个新的事件循环。
这是一件正确的事情,但是(已更新)问题中的代码仅取消了一个已传递给的未来run_until_complete。回想一下,future是将在以后提供的结果值的占位符。提供该值后,可以通过调用进行检索future.result()。如果将来的“价值”是一个例外,future.result()将引发该例外。run_until_complete具有约定,它将在给定的未来产生值之前一直运行事件循环,然后返回该值。如果“值”实际上是要提高的例外,run_until_complete则将重新提高它。例如:
future.result()
loop = asyncio.get_event_loop() fut = loop.create_future() loop.call_soon(fut.set_exception, ZeroDivisionError) # raises ZeroDivisionError, as that is the future's result, # manually set loop.run_until_complete(fut)
当所讨论的future实际上是a时Task,一个特定于异步的对象将协程包装为a Future,这种future的结果就是协程返回的对象。如果协程引发异常,则检索结果将重新引发它,因此run_until_complete:
Task
Future
async def fail(): 1/0 loop = asyncio.get_event_loop() fut = loop.create_task(fail()) # raises ZeroDivisionError, as that is the future's result, # because the coroutine raises it loop.run_until_complete(fut)
处理任务时,run_until_complete完成意味着协程也已经完成,返回值或引发异常(由run_until_complete返回或引发确定)。
另一方面,取消任务的工作方式是安排要恢复的任务以及将await其挂起的表达式提高CancelledError。除非任务专门捕获并抑制了此异常(行为良好的异步代码不应该这样做),否则该任务将停止执行,并且CancelledError将成为其结果。但是,如果协程在cancel()被调用时已经完成,那么cancel()将无能为力,因为没有待处理await的注入CancelledError。
await
CancelledError
cancel()