我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.pool.map()。
def map(self, func, iterable, chunksize=None): """ Equivalent of `map()` built-in, without swallowing `KeyboardInterrupt`. :param func: The function to apply to the items. :param iterable: An iterable of items that will have `func` applied to them. """ # The key magic is that we must call r.get() with a timeout, because # a Condition.wait() without a timeout swallows KeyboardInterrupts. r = self.map_async(func, iterable, chunksize) while True: try: return r.get(self.wait_timeout) except multiprocessing.TimeoutError: pass except KeyboardInterrupt: self.terminate() self.join() raise
def test_imap_unordered_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) it = self.pool.imap_unordered(sqr, exception_throwing_generator(10, 3), 1) expected_values = map(sqr, range(10)) with self.assertRaises(SayWhenError): # imap_unordered makes it difficult to anticipate the SayWhenError for i in range(10): value = next(it) self.assertIn(value, expected_values) expected_values.remove(value) it = self.pool.imap_unordered(sqr, exception_throwing_generator(20, 7), 2) expected_values = map(sqr, range(20)) with self.assertRaises(SayWhenError): for i in range(20): value = next(it) self.assertIn(value, expected_values) expected_values.remove(value)
def list_all_machines(cloud_ids, headers): "Given the cloud ids, runs in parallel queries to get all machines" def list_one_cloud(cloud_id): cloud_machines = requests.get('https://mist.io/clouds/%s/machines' % cloud_id, headers=headers) if cloud_machines.status_code == 200: machines = cloud_machines.json() for machine in machines: machine['cloud'] = cloud_id return machines return [] pool = multiprocessing.pool.ThreadPool(8) results = pool.map(list_one_cloud, cloud_ids) pool.terminate() machines = [] for result in results: machines.extend(result) return machines
def map(self, *args, **kwargs): ''' ''' return NotImplementedError('Method ``map`` must be called from subclasses.')
def map(self, function, iterable): ''' ''' return list(map(function, iterable))
def _ConvertToWebP(webp_binary, png_files): pool = multiprocessing.pool.ThreadPool(10) def convert_image(png_path): root = os.path.splitext(png_path)[0] webp_path = root + '.webp' args = [webp_binary, png_path] + _PNG_TO_WEBP_ARGS + [webp_path] subprocess.check_call(args) os.remove(png_path) # Android requires pngs for 9-patch images. pool.map(convert_image, [f for f in png_files if not f.endswith('.9.png')]) pool.close() pool.join()
def test_map(self): pmap = self.pool.map self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), list(map(sqr, list(range(100)))))
def test_imap(self): it = self.pool.imap(sqr, list(range(10))) self.assertEqual(list(it), list(map(sqr, list(range(10))))) it = self.pool.imap(sqr, list(range(10))) for i in range(10): self.assertEqual(next(it), i*i) self.assertRaises(StopIteration, it.__next__) it = self.pool.imap(sqr, list(range(1000)), chunksize=100) for i in range(1000): self.assertEqual(next(it), i*i) self.assertRaises(StopIteration, it.__next__)
def test_imap_unordered(self): it = self.pool.imap_unordered(sqr, list(range(1000))) self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53) self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
def pool_in_process(): pool = multiprocessing.Pool(processes=4) x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
def test_map(self): pmap = self.pool.map self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) self.assertEqual(pmap(sqr, range(100), chunksize=20), map(sqr, range(100)))
def test_map_unplicklable(self): # Issue #19425 -- failure to pickle should not cause a hang if self.TYPE == 'threads': self.skipTest('test not appropriate for {}'.format(self.TYPE)) class A(object): def __reduce__(self): raise RuntimeError('cannot pickle') with self.assertRaises(RuntimeError): self.pool.map(sqr, [A()]*10)
def test_imap(self): it = self.pool.imap(sqr, range(10)) self.assertEqual(list(it), map(sqr, range(10))) it = self.pool.imap(sqr, range(10)) for i in range(10): self.assertEqual(it.next(), i*i) self.assertRaises(StopIteration, it.next) it = self.pool.imap(sqr, range(1000), chunksize=100) for i in range(1000): self.assertEqual(it.next(), i*i) self.assertRaises(StopIteration, it.next)
def test_imap_unordered(self): it = self.pool.imap_unordered(sqr, range(1000)) self.assertEqual(sorted(it), map(sqr, range(1000))) it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) self.assertEqual(sorted(it), map(sqr, range(1000)))
def test_map_async(self): self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), list(map(sqr, list(range(10)))))
def test_empty_iterable(self): # See Issue 12157 p = self.Pool(1) self.assertEqual(p.map(sqr, []), []) self.assertEqual(list(p.imap(sqr, [])), []) self.assertEqual(list(p.imap_unordered(sqr, [])), []) self.assertEqual(p.map_async(sqr, []).get(), []) p.close() p.join()
def benchmark(path, threads=1, thread_library='threading'): print('%20s: %s' % ('File', path)) print('%20s: %d' % ('Threads', threads)) if threads > 1: print('%20s: %s' % ('Library', thread_library)) if threads > 1: if thread_library == 'threading': pool = multiprocessing.pool.ThreadPool(threads) elif thread_library == 'multiprocessing': pool = multiprocessing.pool.Pool(threads) with _Timer('Total'): with _Timer('Initialization'): loader = pencroft.open(path) if threads > 1: if thread_library == 'threading': loader.make_thread_safe() elif thread_library == 'multiprocessing': loader.make_mp_safe() keys = loader.keys() random.shuffle(keys) with _Timer('Read data'): if threads > 1: pool.map(loader.get, keys) else: for key in keys: loader.get(key) print('')
def pool_in_process(): pool = multiprocessing.Pool(processes=4) x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) pool.close() pool.join()