我有以下情况:
aiohttp
motor
该代码使用async/编写await,并且对于手动执行的单个调用也可以正常工作。
async
await
我不知道该怎么做,就是要大量使用输入数据。
asyncio我看到的所有示例都asyncio.wait通过发送有限列表作为参数进行了演示。但是我不能简单地向它发送任务列表,因为输入文件可能有数百万行。
asyncio
asyncio.wait
我的情况是关于通过传送带将数据流传输到消费者。
我还可以做些什么?我希望程序使用它可以聚集的所有资源来处理文件中的数据,而又不会感到不知所措。
我的情况是关于通过传送带将数据流传输到消费者。我还可以做些什么?
您可以创建固定数量的任务,这些任务大致与传送带的容量相对应,然后将它们弹出队列。例如:
async def consumer(queue): while True: line = await queue.get() # connect to API, Mongo, etc. ... queue.task_done() async def producer(): N_TASKS = 10 loop = asyncio.get_event_loop() queue = asyncio.Queue(N_TASKS) tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)] try: with open('input') as f: for line in f: await queue.put(line) await queue.join() finally: for t in tasks: t.cancel()
由于与线程不同,任务是轻量级的,并且不占用操作系统资源,因此最好在创建“太多”任务时犯错。asyncio可以毫不费力地处理成千上万的任务,尽管这对于这些任务来说可能是过大的事- 几十就足够了。