我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.pool.join()。
def test_pool_worker_lifetime_early_close(self): # Issue #10332: closing a pool whose workers have limited lifetimes # before all the tasks completed would make join() hang. p = multiprocessing.Pool(3, maxtasksperchild=1) results = [] for i in range(6): results.append(p.apply_async(sqr, (i, 0.3))) p.close() p.join() # check the results for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) # # Test of creating a customized manager class #
def MoveImagesToNonMdpiFolders(res_root): """Move images from drawable-*-mdpi-* folders to drawable-* folders. Why? http://crbug.com/289843 """ for src_dir_name in os.listdir(res_root): src_components = src_dir_name.split('-') if src_components[0] != 'drawable' or 'mdpi' not in src_components: continue src_dir = os.path.join(res_root, src_dir_name) if not os.path.isdir(src_dir): continue dst_components = [c for c in src_components if c != 'mdpi'] assert dst_components != src_components dst_dir_name = '-'.join(dst_components) dst_dir = os.path.join(res_root, dst_dir_name) build_utils.MakeDirectory(dst_dir) for src_file_name in os.listdir(src_dir): if not src_file_name.endswith('.png'): continue src_file = os.path.join(src_dir, src_file_name) dst_file = os.path.join(dst_dir, src_file_name) assert not os.path.lexists(dst_file) shutil.move(src_file, dst_file)
def PackageArgsForExtractedZip(d): """Returns the aapt args for an extracted resources zip. A resources zip either contains the resources for a single target or for multiple targets. If it is multiple targets merged into one, the actual resource directories will be contained in the subdirectories 0, 1, 2, ... """ subdirs = [os.path.join(d, s) for s in os.listdir(d)] subdirs = [s for s in subdirs if os.path.isdir(s)] is_multi = any(os.path.basename(s).isdigit() for s in subdirs) if is_multi: res_dirs = sorted(subdirs, key=lambda p : int(os.path.basename(p))) else: res_dirs = [d] package_command = [] for d in res_dirs: MoveImagesToNonMdpiFolders(d) package_command += ['-S', d] return package_command
def test_sentinel(self): if self.TYPE == "threads": return event = self.Event() p = self.Process(target=self._test_sentinel, args=(event,)) with self.assertRaises(ValueError): p.sentinel p.start() self.addCleanup(p.join) sentinel = p.sentinel self.assertIsInstance(sentinel, int) self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) event.set() p.join() self.assertTrue(wait_for_handle(sentinel, timeout=DELTA)) # # #
def test_stderr_flush(self): # sys.stderr is flushed at process shutdown (issue #13812) if self.TYPE == "threads": return testfn = test.support.TESTFN self.addCleanup(test.support.unlink, testfn) proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) proc.start() proc.join() with open(testfn, 'r') as f: err = f.read() # The whole traceback was printed self.assertIn("ZeroDivisionError", err) self.assertIn("test_multiprocessing.py", err) self.assertIn("1/0 # MARKER", err)
def test_waitfor(self): # based on test in test/lock_tests.py cond = self.Condition() state = self.Value('i', -1) p = self.Process(target=self._test_waitfor_f, args=(cond, state)) p.daemon = True p.start() with cond: result = cond.wait_for(lambda : state.value==0) self.assertTrue(result) self.assertEqual(state.value, 0) for i in range(4): time.sleep(0.01) with cond: state.value += 1 cond.notify() p.join(5) self.assertFalse(p.is_alive()) self.assertEqual(p.exitcode, 0)
def test_waitfor_timeout(self): # based on test in test/lock_tests.py cond = self.Condition() state = self.Value('i', 0) success = self.Value('i', False) sem = self.Semaphore(0) p = self.Process(target=self._test_waitfor_timeout_f, args=(cond, state, success, sem)) p.daemon = True p.start() self.assertTrue(sem.acquire(timeout=10)) # Only increment 3 times, so state == 4 is never reached. for i in range(3): time.sleep(0.01) with cond: state.value += 1 cond.notify() p.join(5) self.assertTrue(success.value)
def test_wait_result(self): if isinstance(self, ProcessesMixin) and sys.platform != 'win32': pid = os.getpid() else: pid = None c = self.Condition() with c: self.assertFalse(c.wait(0)) self.assertFalse(c.wait(0.1)) p = self.Process(target=self._test_wait_result, args=(c, pid)) p.start() self.assertTrue(c.wait(10)) if pid is not None: self.assertRaises(KeyboardInterrupt, c.wait, 10) p.join()
def test_value(self, raw=False): if raw: values = [self.RawValue(code, value) for code, value, _ in self.codes_values] else: values = [self.Value(code, value) for code, value, _ in self.codes_values] for sv, cv in zip(values, self.codes_values): self.assertEqual(sv.value, cv[1]) proc = self.Process(target=self._test, args=(values,)) proc.daemon = True proc.start() proc.join() for sv, cv in zip(values, self.codes_values): self.assertEqual(sv.value, cv[2])
def test_fd_transfer(self): if self.TYPE != 'processes': self.skipTest("only makes sense with processes") conn, child_conn = self.Pipe(duplex=True) p = self.Process(target=self._writefd, args=(child_conn, b"foo")) p.daemon = True p.start() self.addCleanup(test.support.unlink, test.support.TESTFN) with open(test.support.TESTFN, "wb") as f: fd = f.fileno() if msvcrt: fd = msvcrt.get_osfhandle(fd) reduction.send_handle(conn, fd, p.pid) p.join() with open(test.support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"foo")
def test_dont_merge(self): a, b = self.Pipe() self.assertEqual(a.poll(0.0), False) self.assertEqual(a.poll(0.1), False) p = self.Process(target=self._child_dont_merge, args=(b,)) p.start() self.assertEqual(a.recv_bytes(), b'a') self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(1.0), True) self.assertEqual(a.recv_bytes(), b'b') self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(1.0), True) self.assertEqual(a.poll(0.0), True) self.assertEqual(a.recv_bytes(), b'cd') p.join() # # Test of sending connection and socket objects between processes #
def test_sharedctypes(self, lock=False): x = Value('i', 7, lock=lock) y = Value(c_double, 1.0/3.0, lock=lock) foo = Value(_Foo, 3, 2, lock=lock) arr = self.Array('d', list(range(10)), lock=lock) string = self.Array('c', 20, lock=lock) string.value = latin('hello') p = self.Process(target=self._double, args=(x, y, foo, arr, string)) p.daemon = True p.start() p.join() self.assertEqual(x.value, 14) self.assertAlmostEqual(y.value, 2.0/3.0) self.assertEqual(foo.x, 6) self.assertAlmostEqual(foo.y, 4.0) for i in range(10): self.assertAlmostEqual(arr[i], i*2) self.assertEqual(string.value, latin('hellohello'))
def test_poll_eintr(self): got_signal = [False] def record(*args): got_signal[0] = True pid = os.getpid() oldhandler = signal.signal(signal.SIGUSR1, record) try: killer = self.Process(target=self._killer, args=(pid,)) killer.start() p = self.Process(target=time.sleep, args=(1,)) p.start() p.join() self.assertTrue(got_signal[0]) self.assertEqual(p.exitcode, 0) killer.join() finally: signal.signal(signal.SIGUSR1, oldhandler) # # Test to verify handle verification, see issue 3321 #
def tearDownClass(cls): # only the manager process should be returned by active_children() # but this can take a bit on slow machines, so wait a few seconds # if there are other children too (see #17395) t = 0.01 while len(multiprocessing.active_children()) > 1 and t < 5: time.sleep(t) t *= 2 gc.collect() # do garbage collection if cls.manager._number_of_objects() != 0: # This is not really an error since some tests do not # ensure that all processes which hold a reference to a # managed object have been joined. print('Shared objects which still exist at manager shutdown:') print(cls.manager._debug_info()) cls.manager.shutdown() cls.manager.join() cls.manager = None
def test_ignore(self): conn, child_conn = multiprocessing.Pipe() try: p = multiprocessing.Process(target=self._test_ignore, args=(child_conn,)) p.daemon = True p.start() child_conn.close() self.assertEqual(conn.recv(), 'ready') time.sleep(0.1) os.kill(p.pid, signal.SIGUSR1) time.sleep(0.1) conn.send(1234) self.assertEqual(conn.recv(), 1234) time.sleep(0.1) os.kill(p.pid, signal.SIGUSR1) self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024)) time.sleep(0.1) p.join() finally: conn.close()
def test_sentinel(self): if self.TYPE == "threads": self.skipTest('test not appropriate for {}'.format(self.TYPE)) event = self.Event() p = self.Process(target=self._test_sentinel, args=(event,)) with self.assertRaises(ValueError): p.sentinel p.start() self.addCleanup(p.join) sentinel = p.sentinel self.assertIsInstance(sentinel, int) self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) event.set() p.join() self.assertTrue(wait_for_handle(sentinel, timeout=1)) # # #
def test_stderr_flush(self): # sys.stderr is flushed at process shutdown (issue #13812) if self.TYPE == "threads": self.skipTest('test not appropriate for {}'.format(self.TYPE)) testfn = test.support.TESTFN self.addCleanup(test.support.unlink, testfn) proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) proc.start() proc.join() with open(testfn, 'r') as f: err = f.read() # The whole traceback was printed self.assertIn("ZeroDivisionError", err) self.assertIn("test_multiprocessing.py", err) self.assertIn("1/0 # MARKER", err)
def test_spawn_close(self): # We test that a pipe connection can be closed by parent # process immediately after child is spawned. On Windows this # would have sometimes failed on old versions because # child_conn would be closed before the child got a chance to # duplicate it. conn, child_conn = self.Pipe() p = self.Process(target=self._echo, args=(child_conn,)) p.daemon = True p.start() child_conn.close() # this might complete before child initializes msg = latin('hello') conn.send_bytes(msg) self.assertEqual(conn.recv_bytes(), msg) conn.send_bytes(SENTINEL) conn.close() p.join()
def test_poll_eintr(self): got_signal = [False] def record(*args): got_signal[0] = True pid = os.getpid() oldhandler = signal.signal(signal.SIGUSR1, record) try: killer = self.Process(target=self._killer, args=(pid,)) killer.start() try: p = self.Process(target=time.sleep, args=(2,)) p.start() p.join() finally: killer.join() self.assertTrue(got_signal[0]) self.assertEqual(p.exitcode, 0) finally: signal.signal(signal.SIGUSR1, oldhandler) # # Test to verify handle verification, see issue 3321 #
def test_large_fd_transfer(self): # With fd > 256 (issue #11657) if self.TYPE != 'processes': self.skipTest("only makes sense with processes") conn, child_conn = self.Pipe(duplex=True) p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) p.daemon = True p.start() self.addCleanup(test.support.unlink, test.support.TESTFN) with open(test.support.TESTFN, "wb") as f: fd = f.fileno() for newfd in range(256, MAXFD): if not self._is_fd_assigned(newfd): break else: self.fail("could not find an unassigned large file descriptor") os.dup2(fd, newfd) try: reduction.send_handle(conn, newfd, p.pid) finally: os.close(newfd) p.join() with open(test.support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"bar")
def _GenerateDensitySplitPaths(apk_path): for density, config in DENSITY_SPLITS.iteritems(): src_path = '%s_%s' % (apk_path, '_'.join(config)) dst_path = '%s_%s' % (apk_path, density) yield src_path, dst_path
def _ConvertToWebP(webp_binary, png_files): pool = multiprocessing.pool.ThreadPool(10) def convert_image(png_path): root = os.path.splitext(png_path)[0] webp_path = root + '.webp' args = [webp_binary, png_path] + _PNG_TO_WEBP_ARGS + [webp_path] subprocess.check_call(args) os.remove(png_path) # Android requires pngs for 9-patch images. pool.map(convert_image, [f for f in png_files if not f.endswith('.9.png')]) pool.close() pool.join()
def main(args): args = build_utils.ExpandFileArgs(args) options = _ParseArgs(args) package_command = _ConstructMostAaptArgs(options) output_paths = [options.apk_path] if options.create_density_splits: for _, dst_path in _GenerateDensitySplitPaths(options.apk_path): output_paths.append(dst_path) output_paths.extend( _GenerateLanguageSplitOutputPaths(options.apk_path, options.language_splits)) input_paths = [options.android_manifest] + options.resource_zips input_strings = [options.exclude_xxxhdpi] + options.xxxhdpi_whitelist input_strings.extend(package_command) if options.png_to_webp: # This is necessary to ensure conversion if the option is toggled. input_strings.extend("png_to_webp") # The md5_check.py doesn't count file path in md5 intentionally, # in order to repackage resources when assets' name changed, we need # to put assets into input_strings, as we know the assets path isn't # changed among each build if there is no asset change. if options.asset_dir and os.path.exists(options.asset_dir): asset_paths = [] for root, _, filenames in os.walk(options.asset_dir): asset_paths.extend(os.path.join(root, f) for f in filenames) input_paths.extend(asset_paths) input_strings.extend(sorted(asset_paths)) build_utils.CallAndWriteDepfileIfStale( lambda: _OnStaleMd5(package_command, options), options, input_paths=input_paths, input_strings=input_strings, output_paths=output_paths)