我试图弄清楚如何移植要使用的线程程序asyncio。我有很多代码可以围绕几个标准库进行同步Queues,基本上是这样的:
asyncio
Queues
import queue, random, threading, time q = queue.Queue() def produce(): while True: time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds q.put(random.random()) def consume(): while True: value = q.get(block=True) print("Consumed", value) threading.Thread(target=produce).start() threading.Thread(target=consume).start()
一个线程创建值(可能是用户输入),而另一个线程对它们执行某些操作。关键是这些线程在出现新数据之前一直处于空闲状态,此时它们将唤醒并对其进行处理。
我正在尝试使用asyncio实现此模式,但是我似乎无法弄清楚如何使其“运行”。
我的尝试或多或少看起来像这样(根本不做任何事情)。
import asyncio, random q = asyncio.Queue() @asyncio.coroutine def produce(): while True: q.put(random.random()) yield from asyncio.sleep(0.5 + random.random()) @asyncio.coroutine def consume(): while True: value = yield from q.get() print("Consumed", value) # do something here to start the coroutines. asyncio.Task()? loop = asyncio.get_event_loop() loop.run_forever()
我尝试过使用协程,不使用协程,在Tasks中包装内容,试图使它们创建或返回期货等的变体。
我开始认为我对应该如何使用asyncio有错误的想法(也许应该以我不知道的其他方式来实现此模式)。任何指针将不胜感激。
对,就是这样。任务是您的朋友:
import asyncio, random q = asyncio.Queue() @asyncio.coroutine def produce(): while True: yield from q.put(random.random()) yield from asyncio.sleep(0.5 + random.random()) @asyncio.coroutine def consume(): while True: value = yield from q.get() print("Consumed", value) loop = asyncio.get_event_loop() loop.create_task(produce()) loop.create_task(consume()) loop.run_forever()
asyncio.ensure_future 也可以用于任务创建。
asyncio.ensure_future
并且请记住:q.put()是 协程 ,因此您应该使用yield from q.put(value)。
q.put()
yield from q.put(value)
UPD
转自asyncio.Task()/asyncio.async()新品牌的APIloop.create_task()和asyncio.ensure_future()示例所示。
asyncio.Task()
asyncio.async()
loop.create_task()
asyncio.ensure_future()