我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.pool.Pool()。
def load_embeddings_mp(path, word_dim, processes=None): if processes is None: processes = multiprocessing.cpu_count() pool = mp.Pool(processes, initializer=_mp_initialize, initargs=(word_dim,)) with open(path, "r") as f: iterator = chunks(f, n=processes, k=processes * 10000) ret = {} for batches in iterator: results = pool.map_async(_mp_process, batches) results = results.get() results = aggregate_dicts(*results) ret.update(results) return ret
def multiprocess(function, args, n_jobs, random_seed=None): """ Call function with args across n_jobs processes (n_jobs doesn't have to be the length of list_of_args). Arguments: function (callable): args (iterable): an iterable of [(1,2), (3, 4)] results in [function(1,2), function(3,4)] n_jobs (int): 0 < n_jobs random_seed (int | array): Returns: list: """ if random_seed is not None: # Each process initializes with the current jobs' randomness (random # state & random state index). Any changes to these processes' # randomnesses won't update the current process' randomness. seed(random_seed) with Pool(n_jobs) as p: return p.starmap(function, args)
def main(): # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, range(10))] run2 = [a for a in p.imap_unordered(echo, range(10))] run3 = [a for a in p.imap_unordered(echo, range(10))] run4 = [a for a in p.imap_unordered(echo, range(10))] print( run1 == run2 == run3 == run4 ) # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, range(10))] run2 = [a for a in p.imap_unordered(echo, range(10))] run3 = [a for a in p.imap_unordered(echo, range(10))] run4 = [a for a in p.imap_unordered(echo, range(10))] run1[0] print( run1 == run2 == run3 == run4 )
def score_reviews(model): reviews = [] processes = [] num = 1.0 while num <= 5.0: processes.append({ 'string_score': str(num).replace('.', '_'), 'score': num, 'model': model }) num += 0.5 pool = Pool(8) for result in pool.imap(run_computation, processes): reviews.extend(result) pool.close() pool.join() # sort reviews from best to worst reviews.sort(key=get_second, reverse=True) reviews.sort(key=get_first, reverse=True) return reviews
def multithread(fn, args=[[]], pool_type=Pool, processes=_cpus, maxtasksperchild=1, chunksize=1): '''Multithread method using a Pool. Not inherently threadsafe. For threadsafe operations, use Managers or Locks. Args must be wrapped in their own iterator, as starmap is used for multiple arguments. Returns an iterator of the results''' def helper(pool): return pool.starmap(fn, args, chunksize=chunksize) # ThreadPools do not take a maxtasksperchild argument, # so we need to conditionally construct a pool if type(pool_type) is Pool: with pool_type(processes, maxtasksperchild=maxtasksperchild) as pool: results = helper(pool) else: with pool_type(processes) as pool: results = helper(pool) return results
def run(self): # Get paths to all images im_files = find_images(join(self.input_dir)) assert (len(im_files) > 0) if 'augmentation' in self.pipeline.keys(): print "Starting preprocessing ({} processes)".format(self.processes) optimization_pool = Pool(self.processes) subprocess = partial(preprocess, params=self) results = optimization_pool.map(subprocess, im_files) else: print "Using previously augmented data" # Create training and validation (imbalanced) print "Splitting into training/validation" try: train_imgs, val_imgs = self.train_val_split(listdir(self.augment_dir)) self.random_sample(train_imgs, val_imgs, classes=DEFAULT_CLASSES) except AssertionError: print "No images found in one more classes - unable to split training and validation" exit()
def generate_dataset(self, split_dir, mode='training'): if mode not in ['training', 'testing']: raise ValueError("Mode must be 'training' or 'testing'") do_augment = mode == 'training' # we only want to augment the training data split_df = pd.DataFrame.from_csv(join(split_dir, '{}.csv'.format(mode))) # load splits data_dir = make_sub_dir(split_dir, mode) # output directory for images # Make directories for each class of images in advance classes = [str(l) for l in split_df[self.label].unique()] for class_name in classes: make_sub_dir(data_dir, str(class_name)) # Pre-process, augment and randomly sample the training set print "Preprocessing {} data...".format(mode) if len(find_images(join(data_dir, '*'))) == 0: pool = Pool(self.processes) subprocess = partial(do_preprocess, args={'params': self, 'augment': do_augment, 'out_dir': data_dir}) img_list = list(split_df['full_path']) _ = pool.map(subprocess, img_list) self.generate_h5(find_images_by_class(data_dir, classes=classes), join(split_dir, '{}.h5'.format(mode)), split_df, random_sample=True, classes=classes)
def run_tbbpool(n, body): """TBB.Pool""" from TBB import Pool global reused_pool, numthreads if 'reused_pool' not in globals(): log.debug("Creating TBB.Pool(%s)" % numthreads) reused_pool = Pool(int(numthreads)) reused_pool.map(body, n)
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): ''' Returns a process pool object ''' from multiprocessing.pool import Pool return Pool(processes, initializer, initargs, maxtasksperchild)
def download_chunks(self, max_workers=5): print('Will now download chunks.') original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) executor = Pool(max_workers) signal.signal(signal.SIGINT, original_sigint_handler) try: r = executor.map_async(self.get, self.urls) result = list(r.get(43200)) DownloadResultProcessor.process_and_print(result) except KeyboardInterrupt: executor.terminate() else: executor.close() executor.join()
def QA_util_MP_process(num): pool = Pool(num) return pool
def fill_cache(self, repair_incorrect: bool = False) -> None: with Pool(processes=multiprocessing.cpu_count()) as pool: total = len(self.labeled_spectrograms) not_yet_cached = [s for s in self.labeled_spectrograms if not s.is_cached()] to_calculate = self.labeled_spectrograms if repair_incorrect else not_yet_cached log("Filling cache with {} spectrograms: {} already cached, {} to calculate.".format( total, total - len(not_yet_cached), len(to_calculate))) for index, labeled_spectrogram in enumerate(to_calculate): pool.apply_async(_repair_cached_spectrogram_if_incorrect if repair_incorrect else _cache_spectrogram, (labeled_spectrogram,)) pool.close() pool.join()
def loadBatch(img_paths): with Pool(processes=8) as pool: imgs = pool.map(loadImage, zip(img_paths, range(len(img_paths)))) return np.asarray(imgs) # Use this for training, instead of loading everything into memory, in only loads chunks
def __init__(self, func, pool_size=4, timeout=None): # The signal handler for the consumer exists only in the parent # process. If we don't give children their own noop signal handler, # any signal propagated to them by the parent will cause them to throw # an exception and terminate. super(ProcessPoolWorker, self).__init__(func, pool_size=pool_size, timeout=timeout) self.pool = pool.Pool(processes=pool_size, initializer=init_process_pool) self.logger = get_logger(__name__)
def main(directory, convert_directory, target_size, extension): util.check_required_program_args([directory, convert_directory]) try: os.mkdir(convert_directory) except OSError: pass supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif']) filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory) for f in fn if f.split('.')[-1].lower() in supported_extensions] filenames = sorted(filenames) print("Resizing images in {} to {}, this takes a while." "".format(directory, convert_directory)) n = len(filenames) # process in batches, sometimes weird things happen with Pool on my machine batchsize = 500 batches = n // batchsize + 1 pool = Pool(N_PROC) args = [] for f in filenames: args.append((convert, (directory, convert_directory, f, target_size, extension))) for i in range(batches): print("batch {:>2} / {}".format(i + 1, batches)) pool.map(process, args[i * batchsize: (i + 1) * batchsize]) pool.close() print('done')
def main(directory, convert_directory, target_size, extension): util.check_required_program_args([directory, convert_directory]) try: os.mkdir(convert_directory) except OSError: pass supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif']) filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory) for f in fn if f.split('.')[-1].lower() in supported_extensions] filenames = sorted(filenames) print("Resizing images in {} to {}, this takes a while." "".format(directory, convert_directory)) n = len(filenames) # process in batches, sometimes weird things happen with Pool on my machine batchsize = 500 batches = n // batchsize + 1 pool = Pool(N_PROC) args = [] for f in filenames: args.append((resize, (directory, convert_directory, f, target_size, extension))) for i in range(batches): print("batch {:>2} / {}".format(i + 1, batches)) pool.map(process, args[i * batchsize: (i + 1) * batchsize]) pool.close() print('done')
def __init__(self, multiprocess=True): #Determine the center to rotate around self.center_shift = np.array((params.PIXELS, params.PIXELS)) / 2. - 0.5 self.multiprocess = multiprocess if self.multiprocess: self.pool = Pool(4)
def main(directory, convert_directory, test, crop_height, crop_width, extension): try: os.mkdir(convert_directory) except OSError: pass supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif']) filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory) for f in fn if f.split('.')[-1].lower() in supported_extensions] filenames = sorted(filenames) print("Resizing images in {} to {}, this takes a while." "".format(directory, convert_directory)) n = len(filenames) # process in batches, sometimes weird things happen with Pool on my machine batchsize = 500 batches = n // batchsize + 1 pool = Pool(N_PROC) args = [] for f in filenames: args.append((convert_seg_labels, (directory, convert_directory, f, crop_height, crop_width, extension))) for i in range(batches): print("batch {:>2} / {}".format(i + 1, batches)) pool.map(process, args[i * batchsize: (i + 1) * batchsize]) pool.close() print('done')
def parallel_variability_analysis(tmodel, kind='reactions', proc_num = BEST_THREAD_RATIO): """ WIP. :param tmodel: :param kind: :param proc_num: :return: """ raise(NotImplementedError) objective = tmodel.objective if kind == Reaction or kind.lower() in ['reaction','reactions']: these_vars = tmodel.reactions else: these_vars = tmodel.get_variables_of_type(kind) func = partial(_variability_analysis_element, tmodel) pool = Pool(processes=proc_num) async_result = pool.map_async(func, these_vars) pool.close() pool.join() # aggregated_result = pd.DataFrame(async_result.get(), # columns = ['minimize','maximize']) tmodel.objective = objective return async_result
def multiprocessor_batch_calc(self, batch_queue): p = Pool(3) prediction = p.map(self.predict_batch, batch_queue) return list(itertools.chain.from_iterable(prediction))
def QA_util_MP_process(num): pool=Pool(num) return pool
def process_url(url_list): g = pool.Pool(5) #print 'fuck!!!' #for url in url_list: # g.spawn(procFunc, url) g.map(procFunc, url_list) g.join()
def test_mp(): pool = mppool.Pool(4) start_time = time.time() lengths = pool.map(worker, range(4)) finish_time = time.time() print('Multiprocessing: total_length={}, time={:.2f}s.'.format(sum(lengths), finish_time - start_time))
def store_samples(self, directory, preprocess_fnc): """ Read audio files from `directory` and store the preprocessed version in preprocessed/`directory` Args: directory: the sub-directory to read from preprocess_fnc: The preprocessing function to use """ out_directory = self._get_directory(preprocess_fnc, directory) if not os.path.exists(out_directory): os.makedirs(out_directory) audio_files = list(iglob_recursive(self._data_directory + '/' + directory, '*.flac')) with Pool(processes=multiprocessing.cpu_count()) as pool: transcript_dict = self._transcript_dict for audio_file in audio_files: audio_id = self._extract_audio_id(audio_file) transcript_entry = transcript_dict[audio_id] transform_args = (audio_file, preprocess_fnc, transcript_entry, out_directory) pool.apply_async(SpeechCorpusReader._transform_and_store_sample, transform_args, error_callback=self._preprocessing_error_callback) pool.close() pool.join()
def gen_set(): img_files = [files for _, _, files in os.walk(BASEDIR)] with Pool(processes=pool_size) as pool: for idx, image in enumerate(img_files): image_path = os.path.join(BASEDIR, image) for sample_options in ALG: pool.apply(save_training_sampl, (image_path, idx, sample_options))
def store_training(out): start = time.time() img_files = [files for _, _, files in os.walk(TRAINDIR)] train_combs = itertools.combinations(img_files, 2) with Pool(processes=pool_size) as pool: vectors = pool.map(get_images_diff_vectors, train_combs) df = pd.DataFrame(vectors) df.to_csv(out, index=False) finished = time.time() - start click.echo("Finised in %s seconds" % finished)
def process_mp(texts, args, pool=None): if pool is None: pool = mp.Pool(args.n_processes, initializer=mp_initialize, initargs=(args, )) iterator = chunks(enumerate(texts), n=args.n_processes, k=args.n_processes * 1000) if args.progress: t = tqdm.tqdm() else: t = None results = [] for batches in iterator: n_items = sum(len(x) for x in batches) result = pool.map_async(process, batches) result = result.get() result = [i for batch in result for i in batch] result.sort(key=lambda x: x[0]) idx, result = zip(*result) results.extend(result) if args.progress: t.update(n_items) return results
def pool(md5s): pool=Pool(processes=10) pool.map(downloader,md5s) pool.close() pool.join()
def kill_tasks(tasks, tags=None): # import glob from multiprocessing.pool import ThreadPool as Pool if not tasks: tasks = glob.glob(os.path.join(os.path.expanduser('~'), '.sos', 'tasks', '*.task')) all_tasks = [os.path.basename(x)[:-5] for x in tasks] else: all_tasks = [] for t in tasks: matched = glob.glob(os.path.join(os.path.expanduser('~'), '.sos', 'tasks', f'{t}*.task')) matched = [os.path.basename(x)[:-5] for x in matched] if not matched: env.logger.warning(f'{t} does not match any existing task') else: all_tasks.extend(matched) if tags: all_tasks = [x for x in all_tasks if any(x in tags for x in taskTags(x).split(' '))] if not all_tasks: env.logger.warning('No task to kill') return all_tasks = sorted(list(set(all_tasks))) p = Pool(len(all_tasks)) killed = p.map(kill_task, all_tasks) for s, t in zip(killed, all_tasks): print(f'{t}\t{s}')
def init_pool(self, worker_count): return Pool(worker_count)
def add_extracted_text(xs): with Pool() as pool: for doc, features in zip( xs, pool.imap(extract_features, xs, chunksize=10)): doc.update(features)
def build(in_dir, out_file, pool_size): with closing(AbstractDB(out_file, protocol=-1)) as db: target_files = [f for f in sorted(os.listdir(in_dir)) if f.endswith('ttl.gz')] with closing(Pool(pool_size)) as pool: f = partial(_process_file, in_dir=in_dir) for ret in pool.imap(f, target_files): for (key, obj) in ret: db[key] = obj
def pool(self): return Pool( processes=self.processes, initializer=initializer, initargs=self.initargs, maxtasksperchild=self.maxtasks, context=get_context('forkserver'), )
def run_pp(n, body): """Process Pool.map""" from multiprocessing.pool import Pool global reused_pool, numthreads global args if 'reused_pool' not in globals(): log.debug("Creating Pool(%s)" % numthreads) reused_pool = Pool(int(numthreads)) reused_pool.map(body, n)
def main(directory, convert_directory, test, crop_size, extension): try: os.mkdir(convert_directory) except OSError: pass filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory) for f in fn if f.endswith('jpeg') or f.endswith('tiff')] filenames = sorted(filenames) if test: names = data.get_names(filenames) y = data.get_labels(names) for f, level in zip(filenames, y): if level == 1: try: img = convert(f, crop_size) img.show() Image.open(f).show() real_raw_input = vars(__builtins__).get('raw_input',input) real_raw_input('enter for next') except KeyboardInterrupt: exit(0) print("Resizing images in {} to {}, this takes a while." "".format(directory, convert_directory)) n = len(filenames) # process in batches, sometimes weird things happen with Pool on my machine batchsize = 500 batches = n // batchsize + 1 pool = Pool(N_PROC) args = [] for f in filenames: args.append((convert, (directory, convert_directory, f, crop_size, extension))) for i in range(batches): print("batch {:>2} / {}".format(i + 1, batches)) pool.map(process, args[i * batchsize: (i + 1) * batchsize]) pool.close() print('done')
def query_all_tweets(query): """ Queries *all* tweets in the history of twitter for the given query. This will run in parallel for each ~10 days. :param query: A twitter advanced search query. :return: A list of tweets. """ year = 2006 month = 3 limits = [] while date(year=year, month=month, day=1) < date.today(): nextmonth = month + 1 if month < 12 else 1 nextyear = year + 1 if nextmonth == 1 else year limits.append( (date(year=year, month=month, day=1), date(year=year, month=month, day=10)) ) limits.append( (date(year=year, month=month, day=10), date(year=year, month=month, day=20)) ) limits.append( (date(year=year, month=month, day=20), date(year=nextyear, month=nextmonth, day=1)) ) year, month = nextyear, nextmonth queries = ['{} since:{} until:{}'.format(query, since, until) for since, until in reversed(limits)] pool = Pool(20) all_tweets = [] try: for new_tweets in pool.imap_unordered(query_tweets_once, queries): all_tweets.extend(new_tweets) logging.info("Got {} tweets ({} new).".format( len(all_tweets), len(new_tweets))) except KeyboardInterrupt: logging.info("Program interrupted by user. Returning all tweets " "gathered so far.") return sorted(all_tweets)
def main(directory, convert_directory, test, crop_size, extension): try: os.mkdir(convert_directory) except OSError: pass supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif']) filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory) for f in fn if f.split('.')[-1].lower() in supported_extensions] filenames = sorted(filenames) if test: names = data.get_names(filenames) y = data.get_labels(names) for f, level in zip(filenames, y): if level == 1: try: img = convert(f, crop_size) img.show() Image.open(f).show() real_raw_input = vars(__builtins__).get('raw_input', input) real_raw_input('enter for next') except KeyboardInterrupt: exit(0) print("Resizing images in {} to {}, this takes a while." "".format(directory, convert_directory)) n = len(filenames) # process in batches, sometimes weird things happen with Pool on my machine batchsize = 500 batches = n // batchsize + 1 pool = Pool(N_PROC) args = [] for f in filenames: args.append((convert, (directory, convert_directory, f, crop_size, extension))) for i in range(batches): print("batch {:>2} / {}".format(i + 1, batches)) pool.map(process, args[i * batchsize: (i + 1) * batchsize]) pool.close() print('done')
def main(directory, convert_directory, test, crop_size, extension): try: os.mkdir(convert_directory) except OSError: pass supported_extensions = set(['jpg', 'png', 'tiff', 'jpeg', 'tif']) # filenames = [os.path.join(dp, f) for dp, dn, fn in os.walk(directory) # for f in fn if f.split('.')[-1].lower() in supported_extensions] filenames = [each for each in os.listdir( directory) if each.endswith('.jpg')] filenames = [os.path.join(directory, filename.strip( '\n')) for filename in filenames] # with open('/home/artelus_server/data/segment_artelus/train.txt', 'r') as f: # filenames = f.readlines() # filenames = [os.path.join(directory, filename.strip( # '\n') + '.jpg') for filename in filenames] filenames = sorted(filenames) if test: names = data.get_names(filenames) y = data.get_labels(names) for f, level in zip(filenames, y): if level == 1: try: img = convert(f, crop_size) img.show() Image.open(f).show() real_raw_input = vars(__builtins__).get('raw_input', input) real_raw_input('enter for next') except KeyboardInterrupt: exit(0) print("Resizing images in {} to {}, this takes a while." "".format(directory, convert_directory)) n = len(filenames) # process in batches, sometimes weird things happen with Pool on my machine batchsize = 500 batches = n // batchsize + 1 pool = Pool(N_PROC) args = [] for f in filenames: label_f = f[:-4] + '_final_mask.png' args.append((convert, (directory, convert_directory, f, label_f, crop_size, extension))) for i in range(batches): print("batch {:>2} / {}".format(i + 1, batches)) pool.map(process, args[i * batchsize: (i + 1) * batchsize]) pool.close() print('done')
def fit(self, X, y): labels = list(set(y)) if len(labels) != 2: raise Exception("A binary setup is required") min_count = X.shape[0] self._min_label = None for label in labels: count = list(y).count(label) if count <= min_count: min_count = count self._min_label = label if self._reference_label is None: self._reference_label = self._min_label if not self._reference_label in labels: raise Exception("Reference label does not appear in training data") if min_count >= self._n_folds: cv = cross_validation.StratifiedKFold(y, n_folds=min(X.shape[0], self._n_folds), shuffle=True, random_state=self._seed) else: cv = cross_validation.KFold(X.shape[0], n_folds=min(X.shape[0], self._n_folds), shuffle=True, random_state=self._seed) tp = 0 fp = 0 ptp = 0 pfn = 0 pfp = 0 ptn = 0 pool = Pool(processes=10) requests = list() for train_cv, test_cv in cv: requests.append((X, y, train_cv, test_cv)) for tp, fp, ptp, pfn, pfp, ptn in pool.map(self._fit_fold, requests): tp += tp fp += fp ptp += ptp pfn += ptn pfp += pfp ptn += ptn pool.close() positives = min_count negatives = X.shape[0] - positives self._tpr = tp / positives self._fpr = fp / negatives self._ptpr = ptp / (ptp + pfn) self._pfpr = pfp / (pfp + ptn) self._clf.fit(X, y) if self._clf.classes_[0] == self._min_label: self._pos_idx = 0 self._neg_idx = 1 else: self._neg_idx = 0 self._pos_idx = 1
def __call__(self, a): m = _get_backing_memmap(a) if m is not None: # a is already backed by a memmap file, let's reuse it directly return _reduce_memmap_backed(a, m) if (not a.dtype.hasobject and self._max_nbytes is not None and a.nbytes > self._max_nbytes): # check that the folder exists (lazily create the pool temp folder # if required) try: os.makedirs(self._temp_folder) os.chmod(self._temp_folder, FOLDER_PERMISSIONS) except OSError as e: if e.errno != errno.EEXIST: raise e # Find a unique, concurrent safe filename for writing the # content of this array only once. basename = "%d-%d-%s.pkl" % ( os.getpid(), id(threading.current_thread()), hash(a)) filename = os.path.join(self._temp_folder, basename) # In case the same array with the same content is passed several # times to the pool subprocess children, serialize it only once # XXX: implement an explicit reference counting scheme to make it # possible to delete temporary files as soon as the workers are # done processing this data. if not os.path.exists(filename): if self.verbose > 0: print("Memmaping (shape=%r, dtype=%s) to new file %s" % ( a.shape, a.dtype, filename)) for dumped_filename in dump(a, filename): os.chmod(dumped_filename, FILE_PERMISSIONS) if self._prewarm: # Warm up the data to avoid concurrent disk access in # multiple children processes load(filename, mmap_mode=self._mmap_mode).max() elif self.verbose > 1: print("Memmaping (shape=%s, dtype=%s) to old file %s" % ( a.shape, a.dtype, filename)) # The worker process will use joblib.load to memmap the data return (load, (filename, self._mmap_mode)) else: # do not convert a into memmap, let pickler do its usual copy with # the default system pickler if self.verbose > 1: print("Pickling array (shape=%r, dtype=%s)." % ( a.shape, a.dtype)) return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),)) ############################################################################### # Enable custom pickling in Pool queues
def multithread_failsafe(fn, args=[[]], pool_type=Pool, processes=_cpus, maxtasksperchild=1, chunksize=1, verbose=True): '''Aynchronous multithreading that does not break on individual errors. Instead, prints error and message, and the input is disregarded Unfortunately, due to context-management restrictions, (as far as I can tell) both generators are needed even though the only difference is the maxtasksperchild arg''' '''Generators that yield next completed task. While execution of individual tasks is asynchronous, iterating through the results is not''' def process_generator(pool_type): with pool_type(processes, maxtasksperchild=maxtasksperchild) as pool: result_objs = (pool.apply_async(fn, arg) for arg in args) for r in result_objs: try: yield r.get() except GeneratorExit as g: raise g except: if verbose: print('######BEGIN TRACEBACK######') traceback.print_exc() print('######END TRACEBACK######') print() continue def thread_generator(pool_type): with pool_type(processes) as pool: result_objs = (pool.apply_async(fn, arg) for arg in args) for r in result_objs: try: yield r.get() except GeneratorExit as g: raise g except: if verbose: print('######BEGIN TRACEBACK######') traceback.print_exc() print('######END TRACEBACK######') print() continue # ThreadPools do not take a maxtasksperchild argument, # so we need to conditionally construct a generator if issubclass(pool_type, ThreadPool): return thread_generator(pool_type) else: return process_generator(pool_type)