我目前在关闭应用程序的CTRL-C期间关闭异步协程时遇到问题。下面的代码是我现在所拥有的简化版本:
#!/usr/bin/env python # -*- coding: UTF-8 -*- import asyncio import time import functools import signal class DummyProtocol(asyncio.Protocol): def __init__(self, *args, **kwargs): self._shutdown = asyncio.Event() self._response = asyncio.Queue(maxsize=1) super().__init__(*args, **kwargs) def connection_made(self, transport): self.transport = transport def close(self): print("Closing protocol") self._shutdown.set() def data_received(self, data): #data = b'OK MPD ' # Start listening for commands after a successful handshake if data.startswith(b'OK MPD '): print("Ready for sending commands") self._proxy_task = asyncio.ensure_future(self._send_commands()) return # saving response for later consumption in self._send_commands self._response.put_nowait(data) async def _send_commands(self): while not self._shutdown.is_set(): print("Waiting for commands coming in ...") command = None # listen for commands coming in from the global command queue. Only blocking 1sec. try: command = await asyncio.wait_for(cmd_queue.get(), timeout=1) except asyncio.TimeoutError: continue # sending the command over the pipe self.transport.write(command) # waiting for the response. Blocking until response is complete. res = await self._response.get() # put it into the global response queue res_queue.put_nowait(res) async def connect(loop): c = lambda: DummyProtocol() t = asyncio.Task(loop.create_connection(c, '192.168.1.143', '6600')) try: # Wait for 3 seconds, then raise TimeoutError trans, proto = await asyncio.wait_for(t, timeout=3) print("Connected to <192.168.1.143:6600>.") return proto except (asyncio.TimeoutError, OSError) as e: print("Could not connect to <192.168.1.143:6600>. Trying again ...") if isinstance(e, OSError): log.exception(e) def shutdown(proto, loop): # http://stackoverflow.com/a/30766124/1230358 print("Shutdown of DummyProtocol initialized ...") proto.close() # give the coros time to finish time.sleep(2) # cancel all other tasks # for task in asyncio.Task.all_tasks(): # task.cancel() # stopping the event loop if loop: print("Stopping event loop ...") loop.stop() print("Shutdown complete ...") if __name__ == "__main__": loop = asyncio.get_event_loop() cmd_queue = asyncio.Queue() res_queue = asyncio.Queue() dummy_proto = loop.run_until_complete(connect(loop)) for signame in ('SIGINT','SIGTERM'): loop.add_signal_handler(getattr(signal, signame), functools.partial(shutdown, dummy_proto, loop)) try: loop.run_forever() except KeyboardInterrupt: pass finally: loop.close()
如果按CTRL-C,会给我以下输出:
Connected to <192.168.1.143:6600>. Ready for sending commands Waiting for commands coming in ... Waiting for commands coming in ... Waiting for commands coming in ... Waiting for commands coming in ... ^CShutdown of DummyProtocol initialized ... Closing protocol Stopping event loop ... Shutdown complete ... Task was destroyed but it is pending! task: <Task pending coro=<DummyProtocol._send_commands() running at ./dummy.py:45> wait_for=<Future pending cb=[Task._wakeup()]>> Task was destroyed but it is pending! task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py:168> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:344]> Exception ignored in: <generator object Queue.get at 0x10594b468> Traceback (most recent call last): File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py", line 170, in get File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 227, in cancel File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed RuntimeError: Event loop is closed
我对asyncio的经验不是很丰富,所以我很确定我在这里缺少重要的东西。真正让我头疼的是输出之后的部分Shutdown complete ...。从开始Task was destroyed but it is pending!,我必须承认我不知道发生了什么。我查看了其他问题,但无法正常工作。那么,为什么这段代码输出类似的东西Task was destroyed but it is pending! aso.?如何干净地关闭协程呢?
Shutdown complete ...
Task was destroyed but it is pending!
Task was destroyed but it is pending! aso.
谢谢你的帮助!
如果目前您的程序已完成一些异步任务,但仍未完成,则会收到此警告。之所以需要此警告,是因为某些正在运行的任务可能无法正确释放某些资源。
有两种常见的解决方法:
让我们来看一下代码:
def shutdown(proto, loop): print("Shutdown of DummyProtocol initialized ...") proto.close() time.sleep(2) # ...
time.sleep(2)-这行不会给协程足够的时间。它将冻结所有程序两秒钟。在这段时间内什么都不会发生。
time.sleep(2)
发生这种情况是因为事件循环在您调用的同一进程中运行time.sleep(2)。您永远不要在异步程序中以这种方式调用长时间运行的同步操作。请阅读此答案以查看异步代码如何工作。
让我们尝试修改shutdown功能。这不是异步函数,您不能await在其中添加任何内容。要执行一些异步代码,我们需要手动执行:停止当前正在运行的循环(因为它已经在运行),创建一些异步函数来等待任务完成,并传递此函数以在事件循环中执行。
shutdown
await
def shutdown(proto, loop): print("Shutdown of DummyProtocol initialized ...") # Set shutdown event: proto.close() # Stop loop: loop.stop() # Find all running tasks: pending = asyncio.Task.all_tasks() # Run loop until tasks done: loop.run_until_complete(asyncio.gather(*pending)) print("Shutdown complete ...")
您也可以取消任务并等待它们完成。有关详细信息,请参见此答案。
我对信号并不陌生,但是您真的需要它来捕获CTRL- C吗?无论何时KeyboardInterrupt发生的事件,都会在运行事件循环的地方逐行抛出(在代码中为loop.run_forever())。我在这里可能是错的,但是处理这种情况的常用方法是将所有清理操作都置于finally阻塞状态。
KeyboardInterrupt
loop.run_forever()
finally
例如,您可以看到aiohttp它是如何进行的:
aiohttp
try: loop.run_forever() except KeyboardInterrupt: # pragma: no branch pass finally: srv.close() loop.run_until_complete(srv.wait_closed()) loop.run_until_complete(app.shutdown()) loop.run_until_complete(handler.finish_connections(shutdown_timeout)) loop.run_until_complete(app.cleanup()) loop.close()