我有一个大文本文件,我想在其中处理每一行(执行一些操作)并将它们存储在数据库中。由于单个简单程序花费的时间太长,我希望它可以通过多个进程或线程来完成。每个线程/进程应从单个文件中读取不同的数据(不同的行),并对它们的数据(行)进行一些操作,然后将它们放入数据库中,以便最后我可以处理所有数据,数据库随我需要的数据一起转储了。
但是我无法弄清楚如何解决这个问题。
您正在寻找的是生产者/消费者模式
基本线程示例
这是使用线程模块的基本示例(而不是多处理)
import threading import Queue import sys def do_work(in_queue, out_queue): while True: item = in_queue.get() # process result = item out_queue.put(result) in_queue.task_done() if __name__ == "__main__": work = Queue.Queue() results = Queue.Queue() total = 20 # start for workers for i in xrange(4): t = threading.Thread(target=do_work, args=(work, results)) t.daemon = True t.start() # produce data for i in xrange(total): work.put(i) work.join() # get the results for i in xrange(total): print results.get() sys.exit()
您不会与线程共享文件对象。您可以通过为队列提供数据行来为他们工作。然后,每个线程将接起一行,对其进行处理,然后将其返回到队列中。
在多处理模块中内置了一些更高级的功能来共享数据,例如列表和特殊的Queue。在使用多处理与线程时需要权衡取舍,这取决于您的工作是CPU约束还是IO约束。
基本的多处理池示例
这是一个多处理池的真正基本示例
from multiprocessing import Pool def process_line(line): return "FOO: %s" % line if __name__ == "__main__": pool = Pool(4) with open('file.txt') as source_file: # chunk the work into batches of 4 lines at a time results = pool.map(process_line, source_file, 4) print results
池是管理其自身进程的便捷对象。由于打开的文件可以遍历其行,因此可以将其传递到pool.map(),该文件将循环遍历并将行传递给worker函数。映射将阻止并在完成后返回整个结果。请注意,这是一个过于简化的示例,pool.map()在进行工作之前,它将立即将整个文件读入内存。如果您希望有大文件,请记住这一点。有更多高级方法可以设计生产者/消费者设置。
pool.map()
手动“池”,具有限制和行重新排序
这是Pool.map的手动示例,但是您可以设置队列大小,以使只按其可以处理的最快速度逐个喂入,而不是一次性消耗整个可迭代对象。我还添加了行号,以便以后可以跟踪它们并引用它们。
from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() line_no, line = item # exit signal if line == None: return # fake work time.sleep(.5) result = (line_no, line) out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data with open("source.txt") as f: iters = itertools.chain(f, (None,)*num_workers) for num_and_line in enumerate(iters): work.put(num_and_line) for p in pool: p.join() # get the results # example: [(1, "foo"), (10, "bar"), (0, "start")] print sorted(results)