我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.pool.ThreadPool()。
def getSourceFileObjects(kwargs_list, workers=None): """ Gets source file objects by applying each item on kwargs_list as kwargs on the source parser class. Uses kwargs['filename'] to determine if the source is VHDL or Verilog/SystemVerilog """ pool = Pool(workers) async_results = [] for kwargs in kwargs_list: if _isVhdl(kwargs['filename']): cls = VhdlParser else: cls = VerilogParser async_results += [pool.apply_async(cls, kwds=kwargs)] pool.close() pool.join() results = [x.get() for x in async_results] return results
def batch_geoparse(self, text_list): """ Batch geoparsing function. Take in a list of text documents and return a list of lists of the geoparsed documents. The speed improvements come from using spaCy's `nlp.pipe` and by multithreading calls to `geoparse`. Parameters ---------- text_list : list of strs List of documents. The documents should not have been pre-processed by spaCy. Returns ------- proced : list of list of dicts The list is the same length as the input list of documents. Each element is a list of geolocated entities. """ nlped_docs = nlp.pipe(text_list, n_threads = self.n_threads) pool = ThreadPool(self.n_threads) processed = pool.map(self.geoparse, nlped_docs) pool.close() pool.join() return processed
def process_method_on_list(method_to_run, items): '''helper method that processes a method on each listitem with pooling if the system supports it''' all_items = [] if SUPPORTS_POOL: pool = ThreadPool() try: all_items = pool.map(method_to_run, items) except Exception: # catch exception to prevent threadpool running forever log_msg(format_exc(sys.exc_info())) log_msg("Error in %s" % method_to_run) pool.close() pool.join() else: all_items = [method_to_run(item) for item in items] all_items = filter(None, all_items) return all_items
def parse(d): """Check a dict keyed by the related calls against their expected values Dict format: Key: tuple: [0] - module from which the command is called [1] - command which you are calling [*] - index=x, where x is the index you wish [*] - end=x, where x is the end of the range to return [*] - all other args in the order the command is supposed to receive it; keyed arguments are not supported Value: The expected return value """ if d == {} or d is None: return True if len(d) == 1: return process(list(d.items())[0]) from multiprocessing.pool import ThreadPool p = list(d.items()) r = ThreadPool().map(process, p) return not (False in r)
def _ResolveTombstones(jobs, tombstones, tombstone_symbolizer): """Resolve a list of tombstones. Args: jobs: the number of jobs to use with multithread. tombstones: a list of tombstones. """ if not tombstones: logging.warning('No tombstones to resolve.') return [] tombstone_symbolizer.UnzipAPKIfNecessary() if len(tombstones) == 1: data = [_ResolveTombstone([tombstones[0], tombstone_symbolizer])] else: pool = ThreadPool(jobs) data = pool.map( _ResolveTombstone, [[tombstone, tombstone_symbolizer] for tombstone in tombstones]) resolved_tombstones = [] for tombstone in data: resolved_tombstones.extend(tombstone) return resolved_tombstones
def multiDownload(self): #???????????????????????????? #????CPU????????????????? starttime = datetime.datetime.now() """????????""" self.get_symbol() cx = self.Symbol_Db['equity'].find() symbolSet = set([d['code'] for d in cx]) # ???????? p = ThreadPool(100) p.map(self.downloadEquityAllData, symbolSet) p.close() p.join() endtime = datetime.datetime.now() print "??: " + str(endtime - starttime)
def run_per_file(config, ignore_paths=None, path=None, config_dir=None): ignore_paths = ignore_paths or [] path = path or os.getcwd() cmd = run_config(config, config_dir) print(cmd) run_cmds = [] patterns = PATTERNS.get(config.get('language')) paths = all_filenames_in_dir(path=path, ignore_paths=ignore_paths) for pattern in patterns: for filepath in fnmatch.filter(paths, pattern): run_cmds.append(cmd + [filepath]) pool = Pool() def result(run_cmd): _, out = run_command(run_cmd) return run_cmd[-1], out output = pool.map(result, run_cmds) return output
def perform_normalisation(self, ori_file_list, output_file_list, label_type="state_align", dur_file_list=None): logger = logging.getLogger("perform_normalisation") logger.info('perform linguistic feature extraction') self.utterance_num = len(ori_file_list) if self.utterance_num != len(output_file_list): logger.error('the number of input and output linguistic files should be the same!\n') sys.exit(1) def _perform_normalisation(i): if not dur_file_list: self.extract_linguistic_features(ori_file_list[i], output_file_list[i], label_type) else: self.extract_linguistic_features(ori_file_list[i], output_file_list[i], label_type, dur_file_list[i]) pool = Pool() pool.map(_perform_normalisation, range(self.utterance_num)) pool.close() pool.join() ## the exact function to do the work ## need to be implemented in the specific class ## the function will write the linguistic features directly to the output file
def __init__(self, n, probe_key, ignore_clock_skew=False, metadata_encoding=None, disable_action_probes=False): # Each QR code takes about 1ms (and updates at 5fps). We do # our best to ensure the QR is processed in time for the next # step call (n/16 would put us right at the threshold). self.pool = pool.ThreadPool(max(int(n/4), 1)) self.qr_pool = pool.ThreadPool(max(int(n/8), 1)) self.lock = threading.RLock() self.instance_n = [None] * n self.ignore_clock_skew = ignore_clock_skew self.disable_action_probes = disable_action_probes self.metadata_encoding = metadata_encoding self.update(probe_key=probe_key, metadata_encoding=metadata_encoding) # only used in flashgames right now
def lambda_handler(event, context): LOGGER.debug("Received event: " + json.dumps(event, indent=2)) n_fail = 0 n_succ = 0 recs = event['Records'] tp = ThreadPool(min(len(recs),20)) try: recs = [event_to_dynamo_images(x) for x in recs] rc = tp.map_async(check_remove_queue,recs) # identities can be bulk-deleted in groups of 60 via AWS API, so handle in this thread check_remove_identities(recs) rc.wait(max(context.get_remaining_time_in_millis()/1000.0 - 2.0,0)) if not rc.ready(): LOGGER.error("Timeout waiting on processors") tp.terminate() else: n_del = len([x for x in rc.get() if x]) LOGGER.info("Processed {0} records, {1} queues deleted".format(len(recs),n_del)) finally: tp.close()
def test_members_get(self): with self.app.app_context(): # create collection, members c_obj = self.mock.collection(description={'something':'abcdefghi123ö'}) m_objs = [self.mock.member() for i in range(5)] # add collection, members self.app.db.set_collection(c_obj) # for m_obj in m_objs: self.app.db.set_member(c_obj.id, m_objs) # pool = ThreadPool(50) # pool.map(lambda m_obj: self.app.db.set_member(c_obj.id, m_obj), m_objs) # GET members response = self.get("collections/"+urllib.parse.quote_plus(c_obj.id)+"/members") # assert 200 OK self.assertEqual(response.status_code, 200) sortedResponse = [r.dict() for r in sorted(json.loads(response.data)['contents'], key=lambda x: x.id)] sortedMocks = [m.dict() for m in sorted(m_objs, key=lambda x: x.id)] for i in range(len(sortedMocks)): self.assertDictEqual(sortedResponse[i], sortedMocks[i])
def _find_get_func_for_client(client): '''Return the "get" function corresponding to client''' if client is None: return get_sync elif Executor and isinstance(client, Executor): def get(*args, **kwargs): pbar = ProgressBar() pbar.register() out = client.get(*args, **kwargs) pbar.unregister() return out return get elif isinstance(client, ThreadPool): return dask_threaded_get else: raise ValueError('client argument not a thread pool dask scheduler or None')
def client_context(dask_client=None, dask_scheduler=None): '''client_context creates a dask distributed or threadpool client or None Parameters: dask_client: str from choices ("DISTRIBUTED", 'THREAD_POOL', 'SERIAL') or None to take DASK_CLIENT from environment dask_scheduler: Distributed scheduler url or None to take DASK_SCHEDULER from environment ''' env = parse_env_vars() dask_client = dask_client or env.get('DASK_CLIENT', 'DISTRIBUTED') dask_scheduler = dask_scheduler or env.get('DASK_SCHEDULER') if dask_client == 'DISTRIBUTED': client = Executor(dask_scheduler) if dask_scheduler else Executor() elif dask_client == 'THREAD_POOL': client = ThreadPool(env.get('DASK_THREADS')) elif dask_client == 'SERIAL': client = None else: raise ValueError('Did not expect DASK_CLIENT to be {}'.format(dask_client)) get_func = _find_get_func_for_client(client) with da.set_options(pool=dask_client): yield client
def parallel_apply_method(method, nodes, sample_rate=1, duration=1, leaves_only=False): """ Apply wrapped-method "method" to every node in "nodes", "sample_rate" times per second, for "duration" seconds. Returns a list of results for each time slice. Each time slice result is a wrapped-method result tuple (node, return value, exception) """ if leaves_only: nodes = [x for x in nodes if x.get_property('#units') != "PathNode"] if not nodes: return {} with ThreadPool(len(nodes)) as pool: time_slice_results = queue.Queue() def apply_time_slice(): time_slice_results.put(pool.map_async(method, nodes)) num_slices = int(duration * sample_rate) slice_times = [slice_number / sample_rate for slice_number in range(num_slices)] time_slice_threads = [threading.Timer(time, apply_time_slice) for time in slice_times] complete_all_threads(time_slice_threads) return consume_queue(time_slice_results)
def make_concurrent_calls(*calls): """ If you need to make multiple concurrent calls, potentially to different functions, or with different kwargs each time. Args: *calls (Iterable[Union[function, str], dict]) - list of (func or func path, kwargs) tuples to call concurrently Returns: List[Any] - return values from each call in `calls` (results are returned in same order as supplied) """ pool = Pool(len(calls)) results = [] for func, kwargs in calls: results.append( pool.apply_async(test_call, args=(func,), kwds=kwargs) ) pool.close() pool.join() # add a bit of extra timeout to allow process terminate cleanup to run # (because we also have an inner timeout on our ProcessManager thread join) return [result.get(timeout=SUBPROCESS_TIMEOUT + 2) for result in results]
def map_progress(func, targets, n_threads): """ Process targets in multi-threaded mode with progress bar """ progress.set_n_total(len(targets)) pool = ThreadPool(processes=n_threads) ret = [] try: ret = pool.map(func, targets) except Exception as exc: Logger.error('Unexpected exception while processing targets: {}'.format(exc), exc_info=True) finally: progress.finish() return list(zip(targets, ret))
def test_multithread_stringio_read_csv(self): # GH 11786 max_row_range = 10000 num_files = 100 bytes_to_df = [ '\n'.join( ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)] ).encode() for j in range(num_files)] files = [BytesIO(b) for b in bytes_to_df] # Read all files in many threads pool = ThreadPool(8) results = pool.map(pd.read_csv, files) first_result = results[0] for result in results: tm.assert_frame_equal(first_result, result)
def lanczosSubPixShiftStack( imageStack, translations, n_threads=16 ): """ Does subpixel translations shifts for a stack of images using a ThreadPool to distribute the load. I could make this a general function utility by passing in the function handle. """ tPool = ThreadPool( n_threads ) if imageStack.ndim != 3: raise ValueError( "lanczosSubPixShiftStack() only works on image stacks with Z-axis as the zero dimension" ) slices = imageStack.shape[0] # Build parameters list for the threaded processeses, consisting of index tArgs = [None] * slices for J in np.arange(slices): tArgs[J] = (J, imageStack, translations) # All operations are done 'in-place' tPool.map( lanczosIndexedShift, tArgs ) tPool.close() tPool.join()
def get_page_torrents(page_links, workers, numbers): """ given a list of links containing individual torrent info pages, return a list containing Torrent objects """ pool = Pool(processes=workers) while len(page_links) > numbers: page_links.pop() assert (len(page_links) != 0), 'Number of torrent pages equals to 0!' torrents = pool.map(get_torrent_info, page_links) #torrents = map(get_torrent_info, page_links) #torrents = [pool.apply(get_torrent_info, args=(x,)) for x in page_links] pool.close() pool.join() return torrents
def re_run_everything(self): if self.rerunning: print 'already running...' return self.rerunning = True utts = [] for sess in self.get_all_sessions(): utts.extend(self.get_session_utterances(sess['_id'])) # unleash the threads... p = Pool(multiprocessing.cpu_count()) # TODO: would be good to have some sort of identifier so that # these jobs can be cancelled if new commands are added. print 'starting re_run_everything' p.map(self.re_run, utts) p.close() self.rerunning = False print 'finished'
def handle(self, *args, **options): # Setup logger with levels and path log_path = os.path.join(options['log'], 'riverscope', __name__ + '_log.txt') if options['debug']: LOG.set_print_handler_level(logging.DEBUG) LOG.set_file_handler(log_path, logging.DEBUG) else: LOG.set_print_handler_level(logging.INFO) LOG.set_file_handler(log_path, logging.DEBUG) time_start = utils.start_timer() pool = ThreadPool(100) # TODO http://stackoverflow.com/questions/2632520/what-is-the-fastest-way-to-send-100-000-http-requests-in-python results = pool.map(get_readings, zip(Stations.objects.all(), repeat(options['lastn']))) clean_results = list(filter(None, results)) station_readings = [s for sl in clean_results for s in sl] with transaction.atomic(): StationReadings.objects.all().delete() StationReadings.objects.bulk_create(station_readings) time_diff = utils.end_timer(time_start) LOG.info('Added {} readings in {}'.format(len(station_readings), time_diff))
def setup(self,bottom,top): #self.top_names = ['data_a', 'data_p', 'data_n', 'data_l'] self.top_names = ['data_a', 'data_p', 'data_n'] params = eval(self.param_str) # Check the paramameters for validity. check_params(params) # store input as class variables self.batch_loader = BatchLoader(params) self.batch_size = params['batch_size'] self.pool = ThreadPool(processes=1) self.thread_results = self.pool.apply_async(\ self.batch_loader.load_next_batch, ()) # reshape top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[1].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[2].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) #top[3].reshape(params['batch_size'], 3) #label of anchor,pos & neg example print_info('Triplet data layer',params)
def setup(self,bottom,top): #self.top_names = ['data_a', 'data_p', 'data_n', 'data_l'] self.top_names = ['data_s', 'data_i', 'label_s','label_i'] params = eval(self.param_str) # Check the paramameters for validity. check_params(params) # store input as class variables self.batch_loader = BatchLoader(params) self.batch_size = params['batch_size'] #1 self.pool = ThreadPool(processes=1) self.thread_results = self.pool.apply_async(\ self.batch_loader.load_next_batch, ()) # reshape top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[1].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[2].reshape(params['batch_size'], 1) top[3].reshape(params['batch_size'], 1) if 'verbose' not in params: print_info('2-branch data layer',params)
def setup(self,bottom,top): self.top_names = ['data', 'label'] params = eval(self.param_str) # Check the paramameters for validity. check_params(params) # store input as class variables self.batch_loader = BatchLoader(params) self.batch_size = params['batch_size'] #1 self.pool = ThreadPool(processes=1) self.thread_results = self.pool.apply_async(self.batch_loader.load_next_batch, ()) # reshape top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[1].reshape(params['batch_size']) print_info('Data layer',params)
def setup(self,bottom,top): #self.top_names = ['data_a', 'data_p', 'data_n', 'data_l'] self.top_names = ['data_a', 'data_p', 'data_n'] params = eval(self.param_str) # Check the paramameters for validity. check_params(params) # store input as class variables self.batch_loader = BatchLoader(params) self.batch_size = params['batch_size'] #1 self.pool = ThreadPool(processes=1) self.thread_results = self.pool.apply_async(\ self.batch_loader.load_next_batch, ()) # reshape top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[1].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[2].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) #top[3].reshape(params['batch_size'], 3) #label of anchor,pos & neg example if 'verbose' not in params: print_info('Triplet data layer',params)
def __init__(self, params): self.batch_size = params['batch_size'] self.outshape = params['shape'] self.lmdb = lmdbs(params['source']) self.labels = self.lmdb.get_label_list() self.img_mean = biproto2py(params['mean_file']).squeeze() self.NIMGS = len(self.labels) assert self.NIMGS%self.batch_size==0,'NIMGS {} not dividible by batchsize {}'.format( self.NIMGS,self.batch_size) self.num_batches = self.NIMGS/self.batch_size self._cur = 0 # current batch self.labels_tab = self.labels.reshape((self.num_batches,self.batch_size)) # this class does some simple data-manipulations self.img_augment = SimpleAugment(mean=self.img_mean,shape=params['shape'], scale = params['scale']) #create threadpools for parallel augmentation #self.pool = ThreadPool() #4
def __init__(self, params): self.batch_size = params['batch_size'] self.outshape = params['shape'] self.lmdb = lmdbs(params['source']) self.labels = self.lmdb.get_label_list() self.img_mean = biproto2py(params['mean_file']).squeeze() self.NIMGS = len(self.labels) self.num_batches = int(np.ceil(self.NIMGS/float(self.batch_size))) self._cur = 0 # current batch # this class does some simple data-manipulations self.img_augment = SimpleAugment(mean=self.img_mean,shape=params['shape'], scale = params['scale']) #create threadpools for parallel augmentation #self.pool = ThreadPool() #4
def setup(self,bottom,top): #self.top_names = ['data_a', 'data_p', 'data_n', 'data_l'] self.top_names = ['data_a', 'data_p', 'label_a', 'label_p'] params = eval(self.param_str) # Check the paramameters for validity. check_params(params) # store input as class variables self.batch_loader = BatchLoader(params) self.batch_size = params['batch_size'] #1 self.pool = ThreadPool(processes=1) self.thread_results = self.pool.apply_async(\ self.batch_loader.load_next_batch, ()) # reshape top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[1].reshape(2*params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[2].reshape(params['batch_size'], 1) #label of anchor top[3].reshape(2*params['batch_size'], 1) #label of pos and neg if 'verbose' not in params: print_info('Triplet data layer',params)
def setup(self,bottom,top): #self.top_names = ['data_a', 'data_p', 'data_n', 'data_l'] self.top_names = ['data_a', 'data_p', 'data_n'] params = eval(self.param_str) # Check the paramameters for validity. check_params(params) # store input as class variables self.batch_loader = BatchLoader(params) self.batch_size = params['batch_size'] #1 self.pool = ThreadPool(processes=1) self.thread_results = self.pool.apply_async(\ self.batch_loader.load_next_batch, ()) self.batch_loader_refresh = False # reshape top[0].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[1].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) top[2].reshape(params['batch_size'], 1, params['shape'][0], params['shape'][1]) #top[3].reshape(params['batch_size'], 3) #label of anchor,pos & neg example if 'verbose' not in params: print_info('Triplet data layer',params)
def test_concurrent_ocsp_requests(tmpdir): from multiprocessing.pool import ThreadPool cache_file_name = path.join(str(tmpdir), 'cache_file.txt') urls = [ 'sfc-dev1-regression.s3.amazonaws.com', 'sfctest0.snowflakecomputing.com', 'sfc-ds2-customer-stage.s3.amazonaws.com', 'snowflake.okta.com', 'sfcdev1.blob.core.windows.net', ] ocsp_pyopenssl.OCSP_VALIDATION_CACHE = {} # reset the memory cache urls = urls + urls + urls + urls + urls + urls pool = ThreadPool(len(urls)) for url in urls: pool.apply_async(_validate_certs_using_ocsp, [url, cache_file_name]) pool.close() pool.join()
def download(self, batch): if self.driver_pool_size: pool = Pool(processes=self.driver_pool_size) else: pool = Pool(processes=default_settings.DRIVER_POOL_SIZE) results = [] for request in batch: results.append(pool.apply_async(self.download_one, (request,))) pool.close() pool.join() true_responses = [] for result in results: true_response = result.get() true_responses.append(true_response) logger.info(true_response) return true_responses
def spawn_docker_instances(self): # Must create shared secret beforehand otherwise the # testcase does not know which instances are relevant ensure_shared_secret('cjdns') spawn_command = "spawn --no-assimilate " \ "--server-type headless " \ "--compute-type docker" pool = ThreadPool(self.workers) for _ in range(self.amount_of_instances): pool.apply_async( run_raptiformica_command, args=(self.temp_cache_dir, spawn_command) ) sleep(20) pool.close() pool.join()
def join_consul_neighbours(mapping): """ Consul join all known neighbours. Will join as many instances at the same time as threads in the threadpool. :param dict mapping: Key value mapping with the config data :return None: """ ipv6_addresses = get_neighbour_hosts(mapping) shuffle(ipv6_addresses) new_ipv6_addresses = list( filter(not_already_known_consul_neighbour, ipv6_addresses) ) pool = ThreadPool() groups = group_n_elements( new_ipv6_addresses, CONSUL_JOIN_BATCH_SIZE ) for ipv6_addresses in groups: pool.apply_async(try_run_consul_join, args=(ipv6_addresses,)) pool.close() pool.join()
def nmap_scan(to_scan, slow=False, threads=None, threads_per_cpu=1): """Scans the specified networks using `nmap`. The `to_scan` dictionary must be in the format: {<interface-name>: <iterable-of-cidr-strings>, ...} If the `slow` option is specified, will limit the maximum rate nmap uses to send out packets. """ jobs = yield_nmap_parameters(to_scan, slow) if threads is None: threads = cpu_count() * threads_per_cpu if threads == 1: yield from (run_nmap(job) for job in jobs) with ThreadPool(processes=threads) as pool: yield from pool.imap_unordered(run_nmap, jobs)
def ping_scan(to_scan: dict, threads=None, threads_per_cpu=4): """Scans the specified networks using `ping`. The `to_scan` dictionary must be in the format: {<interface_name>: <iterable-of-cidr-strings>, ...} If the `threads` argument is supplied, the specified number of threads will be used for concurrent scanning. If threads=1 is specified, scanning will use a single process (and be very slow). """ jobs = yield_ping_parameters(to_scan) if threads is None: threads = cpu_count() * threads_per_cpu if threads == 1: yield from (run_ping(job) for job in jobs) else: with ThreadPool(processes=threads) as pool: yield from pool.imap(run_ping, jobs)
def __init__(self, img_dir, img_names, pre_process_img_func, extract_feat_func, batch_size, num_threads, multi_thread_stacking=False): """ Args: extract_feat_func: External model for extracting features. It takes a batch of images and returns a batch of features. multi_thread_stacking: bool, whether to use multi threads to speed up `np.stack()` or not. When the system is memory overburdened, using `np.stack()` to stack a batch of images takes ridiculously long time. E.g. it may take several seconds to stack a batch of 64 images. """ self.img_dir = img_dir self.img_names = img_names self.pre_process_img_func = pre_process_img_func self.extract_feat_func = extract_feat_func self.prefetcher = utils.Prefetcher( self.get_sample, len(img_names), batch_size, num_threads=num_threads) self.epoch_done = True self.multi_thread_stacking = multi_thread_stacking if multi_thread_stacking: self.pool = Pool(processes=8)
def run_tp(n, body): """ThreadPool.map""" from multiprocessing.pool import ThreadPool global reused_pool, numthreads if 'reused_pool' not in globals(): log.debug("Creating ThreadPool(%s)" % numthreads) reused_pool = ThreadPool(int(numthreads)) reused_pool.map(body, n)
def run_tpaa(n, body): """ThreadPool.apply_async""" from multiprocessing.pool import ThreadPool global reused_pool, numthreads if 'reused_pool' not in globals(): log.debug("Creating ThreadPool(%s) for apply_async()" % numthreads) reused_pool = ThreadPool(int(numthreads)) reused_pool.map(body, range(n)) wait_list = [] for i in n: b = tbb_job(i, body) a = reused_pool.apply_async(b) wait_list.append(a) for a in wait_list: a.wait()
def Pool(processes=None, initializer=None, initargs=()): from multiprocessing.pool import ThreadPool return ThreadPool(processes, initializer, initargs)
def __init__(self, config): super(ThreadPoolClient, self).__init__(config) self.pool = ThreadPool(processes=config['kinesis_concurrency'])
def run(): from multiprocessing.pool import ThreadPool session = SubprocessSession('/bin/cat', EchoWriter, EchoReader) pool = ThreadPool(50) requests = pool.map(lambda j: session.put('message %d' % j), xrange(2000)) results = pool.map(lambda r: r.get(), requests) print results == ['message %d' % j for j in xrange(2000)]
def _get_thread_pool(self): # lazily initialized if not self._thread_pool: self._thread_pool = ThreadPool(os.cpu_count()) return self._thread_pool
def iter_packages(repo_path): pkgbuild_paths = [] if os.path.isfile(repo_path) and os.path.basename(repo_path) == "PKGBUILD": pkgbuild_paths.append(repo_path) else: print("Searching for PKGBUILD files in %s" % repo_path) for base, dirs, files in os.walk(repo_path): for f in files: if f == "PKGBUILD": # in case we find a PKGBUILD, don't go deeper del dirs[:] path = os.path.join(base, f) pkgbuild_paths.append(path) pkgbuild_paths.sort() if not pkgbuild_paths: print("No PKGBUILD files found here") return else: print("Found %d PKGBUILD files" % len(pkgbuild_paths)) pool = ThreadPool(cpu_count() * 2) pool_iter = pool.imap_unordered(SrcInfoPackage.for_pkgbuild, pkgbuild_paths) print("Parsing PKGBUILD files...") with progress(len(pkgbuild_paths)) as update: for i, packages in enumerate(pool_iter): update(i + 1) for package in packages: yield package pool.close()
def main(args): sources = {} repo_path = os.path.abspath(args.path) repo_packages = PacmanPackage.get_all_packages() repo_package_names = set(p.pkgname for p in repo_packages) for package in iter_packages(repo_path): # only check packages which are in the repo, all others are many # times broken in other ways. if not args.all and package.pkgname not in repo_package_names: continue for source in package.sources: url = source_get_url(source) if url: sources.setdefault(url, set()).add(package.pkgbuild_path) print("Checking URLs...") work_items = sources.items() pool = ThreadPool(50) pool_iter = pool.imap_unordered(_check_url, work_items) broken = [] with progress(len(work_items)) as update: for i, (url, pkgbuilds, error) in enumerate(pool_iter): update(i + 1) if error: broken.append((url, pkgbuilds, error)) pool.close() pool.join() for url, pkgbuilds, error in broken: print("\n%s\n %s\n %s" % ( url, " ".join(error.splitlines()), ", ".join(pkgbuilds)))