我已经使用Python asyncio和aiohttp成功构建了一个RESTful微服务,该服务可侦听POST事件以收集来自各种供料器的实时事件。
然后,它构建一个内存结构,以将事件的最后24小时缓存在嵌套的defaultdict / deque结构中。
现在,我想定期检查该结构到磁盘的位置,最好使用pickle。
由于内存结构可以大于100MB,因此我希望避免在检查点结构所需的时间上占用传入事件处理时间。
我宁愿为该结构创建快照副本(例如,Deepcopy),然后花点时间将其写入磁盘并按预设的时间间隔重复执行。
我一直在寻找有关如何组合线程的示例(并且线程是否是为此的最佳解决方案?)和异步用于此目的,但是找不到对我有用的东西。
非常感谢任何入门的指点!
使用以下方法将方法委派给线程或子流程非常简单BaseEventLoop.run_in_executor:
BaseEventLoop.run_in_executor
import asyncio import time from concurrent.futures import ProcessPoolExecutor def cpu_bound_operation(x): time.sleep(x) # This is some operation that is CPU-bound @asyncio.coroutine def main(): # Run cpu_bound_operation in the ProcessPoolExecutor # This will make your coroutine block, but won't block # the event loop; other coroutines can run in meantime. yield from loop.run_in_executor(p, cpu_bound_operation, 5) loop = asyncio.get_event_loop() p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes loop.run_until_complete(main())
至于使用aProcessPoolExecutor还是ThreadPoolExecutor,这很难说。腌制一个大物体肯定会消耗一些CPU周期,最初您会认为这ProcessPoolExecutor是可行的方法。但是,将100MB对象传递到Process池中的a将需要在主进程中腌制该实例,通过IPC将字节发送到子进程,在子进程中将其取消腌制,然后 再次 进行腌制,以便可以将其写入磁盘。鉴于此,我的猜测是,酸洗/去酸洗的开销将足够大ThreadPoolExecutor,即使使用GIL会对性能造成负面影响,您也最好使用。
ProcessPoolExecutor
ThreadPoolExecutor
Process
也就是说,两种方法的测试都非常简单,并且可以确定找出来,所以您也可以这样做。