假设我们有很多链接可供下载,并且每个链接可能花费不同的时间来下载。而且我只能使用最多3个连接进行下载。现在,我想确保使用asyncio有效地做到这一点。
这是我要实现的目标:在任何时间点,请尝试确保至少运行3个下载。
Connection 1: 1---------7---9--- Connection 2: 2---4----6----- Connection 3: 3-----5---8-----
数字代表下载链接,连字符代表等待下载。
这是我现在正在使用的代码
from random import randint import asyncio count = 0 async def download(code, permit_download, no_concurrent, downloading_event): global count downloading_event.set() wait_time = randint(1, 3) print('downloading {} will take {} second(s)'.format(code, wait_time)) await asyncio.sleep(wait_time) # I/O, context will switch to main function print('downloaded {}'.format(code)) count -= 1 if count < no_concurrent and not permit_download.is_set(): permit_download.set() async def main(loop): global count permit_download = asyncio.Event() permit_download.set() downloading_event = asyncio.Event() no_concurrent = 3 i = 0 while i < 9: if permit_download.is_set(): count += 1 if count >= no_concurrent: permit_download.clear() loop.create_task(download(i, permit_download, no_concurrent, downloading_event)) await downloading_event.wait() # To force context to switch to download function downloading_event.clear() i += 1 else: await permit_download.wait() await asyncio.sleep(9) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(loop)) finally: loop.close()
输出是预期的:
downloading 0 will take 2 second(s) downloading 1 will take 3 second(s) downloading 2 will take 1 second(s) downloaded 2 downloading 3 will take 2 second(s) downloaded 0 downloading 4 will take 3 second(s) downloaded 1 downloaded 3 downloading 5 will take 2 second(s) downloading 6 will take 2 second(s) downloaded 5 downloaded 6 downloaded 4 downloading 7 will take 1 second(s) downloading 8 will take 1 second(s) downloaded 7 downloaded 8
但是这是我的问题:
目前,我只是在等待9秒钟以使主要功能保持运行状态,直到下载完成。在退出主要功能之前,是否有一种有效的方式等待上一次下载完成?(我知道有一个asyncio.wait,但是我需要存储所有任务引用才能使其正常工作)
做这种任务的好图书馆是什么?我知道javascript有很多异步库,但是Python呢?
编辑:2.有什么好的库可以处理常见的异步模式?(类似https://www.npmjs.com/package/async)
在阅读本答案的其余部分之前,请注意,惯用的方法是使用asyncio来限制并行任务的数量asyncio.Semaphore,优雅地抽象了该方法。这个答案包含有效的方法,但要实现这一点则要复杂得多。我留下答案的原因是,在某些情况下,这种方法比信号量具有优势,特别是当要完成的工作量很大或不受限制时,并且您无法提前创建所有协程。在这种情况下,第二个(基于队列的)解决方案就是您想要的答案。但是在大多数常规情况下,例如通过aiohttp并行下载,您应该使用信号量。
asyncio.Semaphore
基本上,您需要一个固定大小的下载任务 池 。asyncio虽然没有预先创建的任务池,但是创建一个任务池很容易:只需保留一组任务,不要让它超出限制。尽管这个问题表明您不愿意这样做,但是代码的结尾却更加优雅:
asyncio
async def download(code): wait_time = randint(1, 3) print('downloading {} will take {} second(s)'.format(code, wait_time)) await asyncio.sleep(wait_time) # I/O, context will switch to main function print('downloaded {}'.format(code)) async def main(loop): no_concurrent = 3 dltasks = set() i = 0 while i < 9: if len(dltasks) >= no_concurrent: # Wait for some download to finish before adding a new one _done, dltasks = await asyncio.wait( dltasks, return_when=asyncio.FIRST_COMPLETED) dltasks.add(loop.create_task(download(i))) i += 1 # Wait for the remaining downloads to finish await asyncio.wait(dltasks)
一种替代方法是创建一定数量的协程进行下载,就像固定大小的线程池一样,并使用来喂它们工作asyncio.Queue。这消除了手动限制下载数量的需要,下载数量将自动受到协程调用数量的限制download():
asyncio.Queue
download()
# download() defined as above async def download_worker(q): while True: code = await q.get() await download(code) q.task_done() async def main(loop): q = asyncio.Queue() workers = [loop.create_task(download_worker(q)) for _ in range(3)] i = 0 while i < 9: await q.put(i) i += 1 await q.join() # wait for all tasks to be processed for worker in workers: worker.cancel() await asyncio.gather(*workers, return_exceptions=True)
至于您的其他问题,显而易见的选择是aiohttp。
aiohttp