我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用concurrent.futures.ThreadPoolExecutor()。
def __init__(self, key, secret, options={}): self.options = options self.auth = (key, secret) if 'hostname' in options: self.hostname = options['hostname'] else: self.hostname = 'api.mybitx.com' self.port = options['port'] if 'port' in options else 443 self.pair = options['pair'] if 'pair' in options else 'XBTZAR' self.ca = options['ca'] if 'ca' in options else None self.timeout = options['timeout'] if 'timeout' in options else 30 self.headers = { 'Accept': 'application/json', 'Accept-Charset': 'utf-8', 'User-Agent': 'py-bitx v' + __version__ } self._executor = ThreadPoolExecutor(max_workers=5)
def run_in_executor(f): """ A decorator to run the given method in the ThreadPoolExecutor. """ @wraps(f) def new_f(self, *args, **kwargs): if self.is_shutdown: return try: future = self.executor.submit(f, self, *args, **kwargs) future.add_done_callback(_future_completed) except Exception: log.exception("Failed to submit task to executor") return new_f
def __init__(self, config=None): if config is None: config = {} self.config = config self.must_stop = threading.Event() self._consumers_queues = [] if self.config.get("concurrency", 1) > 1: self._thread_pool = ThreadPoolExecutor( max_workers=self.config.get("concurrency") ) else: self._thread_pool = None self.import_submodules(__name__ + '.plugins.ext') self.import_submodules(__name__ + '.consumers.ext') for extra_plugin_path in self.config.get('extra_plugins', []): self.import_directory_modules(extra_plugin_path) self._current_checks = [] self._current_checks_lock = threading.Lock()
def __init__( self, name, sub_jobs, pool=None, pool_type=ThreadPoolExecutor, join=None, error_action="stop", error_handler=None, error_default_value=None, goto=None): """ :param name ?????? :param sub_jobs ????? :param pool ????????None??????pool_type??????pool :param pool_type ??????pool?None???????????pool :param join ????????????context???????????????? :param error_action ??????????stop???????????? ???continue?????????????? :param error_handler ?????????continue?????????error listener ????????error_handler???? :param error_default_value ???continue????????????????? :param goto ????????? """ if self._error_action != "stop" and self._error_action != "continue": raise InvalidArgumentException(u"?????????stop??continue??")
def __init__(self, name, thread_num, pool=None, pool_type=ThreadPoolExecutor, start_point=None, end_point=None, context_factory=Context, extends_listeners=False, listeners=None, goto=None): """ :param name Fork???? :param thread_num ?????????? :param pool ?????????? :param pool_type ???????pool????????????????? :param start_point ???????? :param end_point ???????? :param context_factory ????? :param extends_listeners ?????????????? :param listeners ???????????? :param goto ???join???????????????????????join?? """ if self._listeners is None: self._listeners = []
def __init__(self, app_id, **kwargs): """ Initialize broker connection. :param app_id: string that identifies application """ self.app_id = app_id # fetch configuration if "url" in kwargs: self.rabbitmq_url = kwargs['url'] else: self.rabbitmq_url = os.environ.get("broker_host", RABBITMQ_URL_FALLBACK) self.rabbitmq_exchange = os.environ.get("broker_exchange", RABBITMQ_EXCHANGE_FALLBACK) self.rabbitmq_exchange_type = "topic" # create additional members self._connection = None # trigger connection setup (without blocking) self.setup_connection() # Threading workers self.thrd_pool = pool.ThreadPoolExecutor(max_workers=100) # Track the workers self.tasks = []
def main(): t1 = timeit.default_timer() with ProcessPoolExecutor(max_workers=4) as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) print("{} Seconds Needed for ProcessPoolExecutor".format(timeit.default_timer() - t1)) t2 = timeit.default_timer() with ThreadPoolExecutor(max_workers=4) as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) print("{} Seconds Needed for ThreadPoolExecutor".format(timeit.default_timer() - t2)) t3 = timeit.default_timer() for number in PRIMES: isPrime = is_prime(number) print("{} is prime: {}".format(number, isPrime)) print("{} Seconds needed for single threaded execution".format(timeit.default_timer()-t3))
def test_double_reader_abort(self): lock = fasteners.ReaderWriterLock() activated = collections.deque() def double_bad_reader(): with lock.read_lock(): with lock.read_lock(): raise RuntimeError("Broken") def happy_writer(): with lock.write_lock(): activated.append(lock.owner) with futures.ThreadPoolExecutor(max_workers=20) as e: for i in range(0, 20): if i % 2 == 0: e.submit(double_bad_reader) else: e.submit(happy_writer) self.assertEqual(10, len([a for a in activated if a == 'w']))
def download_many(cc_list): cc_list = cc_list[:5] # <1> with futures.ThreadPoolExecutor(max_workers=3) as executor: # <2> to_do = [] for cc in sorted(cc_list): # <3> future = executor.submit(download_one, cc) # <4> to_do.append(future) # <5> msg = 'Scheduled for {}: {}' print(msg.format(cc, future)) # <6> results = [] for future in futures.as_completed(to_do): # <7> res = future.result() # <8> msg = '{} result: {!r}' print(msg.format(future, res)) # <9> results.append(res) return len(results) # END FLAGS_THREADPOOL_AS_COMPLETED
def save_month(year_month, verbose): year, month = [int(s) for s in year_month.split('-')] total_size = 0 img_count = 0 dates = potd.list_days_of_month(year, month) with futures.ThreadPoolExecutor(max_workers=100) as executor: downloads = dict((executor.submit(potd.save_one, date, verbose), date) for date in dates) for future in futures.as_completed(downloads): date = downloads[future] if future.exception() is not None: print('%r generated an exception: %s' % (date, future.exception())) else: img_size = future.result() total_size += img_size img_count += 1 print('%r OK: %r' % (date, img_size)) return img_count, total_size
def upload_blocks(bucket, chunk_size, max_threads, lines): session = botocore.session.get_session() client = session.create_client('s3') start = time.perf_counter() futures = [] with ThreadPoolExecutor(max_workers=max_threads) as executor: # Start the load operations and mark each future with its URL for line in lines: raw_block, key = load_json_block(line) futures.append(executor.submit(client.put_object,Bucket=bucket, Key=key, Body=raw_block, ContentEncoding='UTF-8', ContentType='application/json')) end = time.perf_counter() done, pending = concurrent.futures.wait(futures) complete = time.perf_counter() rate = 1 / ((complete - start) / len(done)) return len(done), int(rate)
def __init__(self, log_client, shard_id, consumer_name, processor, cursor_position, cursor_start_time, max_workers=2): self.log_client = log_client self.shard_id = shard_id self.consumer_name = consumer_name self.cursor_position = cursor_position self.cursor_start_time = cursor_start_time self.processor = processor self.checkpoint_tracker = ConsumerCheckpointTracker(self.log_client, self.consumer_name, self.shard_id) self.executor = ThreadPoolExecutor(max_workers=max_workers) self.consumer_status = ConsumerStatus.INITIALIZING self.current_task_exist = False self.task_future = None self.fetch_data_future = None self.next_fetch_cursor = '' self.shutdown = False self.last_fetch_log_group = None self.last_log_error_time = 0 self.last_fetch_time = 0 self.last_fetch_count = 0 self.logger = logging.getLogger(__name__)
def __init__(self, bot_type, credentials, sheet_credentials, wit_tokens, db_url='redis://localhost:6379', num_thread=4): self.bot_api = api.LineApi(bot_type, credentials) self.logger = logging.getLogger('CoscupBot') self.task_pool = ThreadPoolExecutor(num_thread) self.db_url = db_url self.dao = db.Dao(db_url) self.dao.del_all_next_command() self.dao.del_all_context() self.dao.del_all_session() self.nlp_message_controllers = self.gen_nlp_message_controllers(wit_tokens) self.command_message_controllers = self.gen_command_message_controllers( [LanguageCode.zh_tw, LanguageCode.en_us]) self.sheet_message_controller = modules.SheetMessageController(db_url, sheet_credentials['credential_path'], sheet_credentials['name'], self) self.__mq_conn_pool = redis.ConnectionPool.from_url(url=db_url) self.edison_queue = utils.RedisQueue('edison', 'queue', connection_pool=self.__mq_conn_pool) self.realtime_msg_queue = utils.RedisQueue('realmessage', 'queue', connection_pool=self.__mq_conn_pool) self.job_scheduler = BackgroundScheduler() self.coscup_api_helper = modules.CoscupInfoHelper(db_url) self.start_scheduler() self.next_step_dic = {} self.take_photo_sec = 6
def optimiz(currencies, debug): currencies = sorted(currencies) if len(currencies) < 2 or len(currencies) > 10: return {"error": "2 to 10 currencies"} max_workers = 4 if sys.version_info[1] < 5 else None executor = ThreadPoolExecutor(max_workers) data = dict(future.result() for future in wait([executor.submit(get_ochl, cur) for cur in currencies]).done) data = [data[cur] for cur in currencies] errors = [x['error'] for x in data if 'error' in x] if errors: return {"error": "Currencies not found : " + str(errors)} weights, m, s, a, b = markowitz_optimization(data, debug) if debug: import matplotlib as mpl mpl.use('Agg') import matplotlib.pyplot as plt fig, ax = plt.subplots() plt.plot(s, m, 'o', markersize=1) plt.plot(b, a, 'or') fig.savefig("chalu.png") result = dict() for i, cur in enumerate(currencies): result[cur] = weights[i] return {"result": result}
def main(): """Generate the jumpmap.json with ESI.""" try: all_systems = retry_get(ESI.Universe.get_universe_systems) except NameError: raise SystemExit(1) num_systems = len(all_systems) complete = 0 systems = {} with ThreadPoolExecutor(max_workers=100) as executor: for future in executor.map(system_get, all_systems): complete += 1 system, result = future systems[system] = result print("{}/{} systems complete".format(complete, num_systems)) with open("jumpmap.json", "w") as openjumpmap: openjumpmap.write(json.dumps(systems))
def serve(): protoConfig = ProtoConfig.getConfig() arduino = protoConfig.arduinos[0] server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) sensors_pb2.add_ArduinoServicer_to_server(Arduino(arduino), server) port = protoConfig.ports.arduinoPort server.add_insecure_port('[::]:%s' % port) server.start() print('Started Arduino Server on Port %s ' % port) try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0)
def serve(): protoConfig = ProtoConfig.getConfig() sensor_db = Mongo() sensor_db.GetClient() # initalize the Db server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) dao_pb2.add_DaoServicer_to_server(Dao(sensor_db), server) port = protoConfig.ports.daoPort server.add_insecure_port('[::]:%s' % port) server.start() print('Started Dao Server on Port %s ' % port) try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0)
def serve(): protoConfig = ProtoConfig.getConfig() server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) pushServer = Push(accessToken=protoConfig.wioLinks['havok'].accessToken) sensors_pb2.add_PushServicer_to_server(pushServer, server) port = protoConfig.ports.pushPort server.add_insecure_port('[::]:%s' % port) server.start() print('Started Push Server on Port %s ' % port) websocket.enableTrace(True) ws = websocket.WebSocketApp( "wss://us.wio.seeed.io/v1/node/event", on_message = pushServer.on_message, on_error = pushServer.on_error, on_close = pushServer.on_close) ws.on_open = pushServer.on_open ws.run_forever() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0)
def runner(self): thread_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='DEMO') futures = dict() for url in self.urls: future = thread_pool.submit(self.get_web_content, url) futures[future] = url for future in concurrent.futures.as_completed(futures): url = futures[future] try: data = future.result() except Exception as e: print('Run thread url ('+url+') error. '+str(e)) else: print(url+'Request data ok. size='+str(len(data))) print('Finished!')
def getloc(): allloc = [] u"""??????api??????????? http://lbs.amap.com/api/webservice/guide/api/search/#text """ with ThreadPoolExecutor(max_workers=5) as executor: url = 'http://lbs.amap.com/api/webservice/guide/api/search/#text' param = { 'key': '22d6f93f929728c10ed86258653ae14a', 'keywords': u'??', 'city': '027', 'citylimit': 'true', 'output': 'json', 'page': '', } future_to_url = {executor.submit(load_url, url, merge_dicts(param, {'page': i}), 60): url for i in range(1, 46)} for future in futures.as_completed(future_to_url): if future.exception() is not None: print(future.exception()) elif future.done(): data = future.result()['pois'] allloc.extend([x['location'] for x in data]) with open('allloc1.pk', 'wb') as f: pickle.dump(allloc, f, True)
def mobai(loc): allmobai = [] with ThreadPoolExecutor(max_workers=5) as executor: url = 'https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do' headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Mobile/14E304 MicroMessenger/6.5.7 NetType/WIFI Language/zh_CN', 'Content-Type': 'application/x-www-form-urlencoded', 'Referer': 'https://servicewechat.com/wx80f809371ae33eda/23/page-frame.html', } data = { 'longitude': '', 'latitude': '', 'citycode': '027', } future_to_url = { executor.submit(load_url, url, merge_dicts(data, {'longitude': i.split(',')[0]}, {'latitude': i.split(',')[1]}), 60, headers): url for i in loc} for future in futures.as_completed(future_to_url): if future.exception() is not None: print(future.exception()) elif future.done(): data = future.result()['object'] allmobai.extend(data) # ??mongodb result = collection.insert_many(data)
def test_node_specific_thread_pool_executor(): sleep_time = 0.2 n = 10 def wait(c): sleep(sleep_time) return c executor_map = {'foo': ThreadPoolExecutor(n)} comp = Computation(executor_map=executor_map) start_dt = datetime.utcnow() for c in range(n): comp.add_node(c, wait, kwds={'c': C(c)}, executor='foo') comp.compute_all() end_dt = datetime.utcnow() delta = (end_dt - start_dt).total_seconds() assert delta < (n-1) * sleep_time
def run(self): """Concurrently invoke `get_response` for all of instance's `requests`. """ with futures.ThreadPoolExecutor( max_workers=min(self.max_workers, len(self.requests)) ) as executor: to_do = [] for i, request in enumerate(self.requests): future = executor.submit(self.get_response, request, i) to_do.append(future) for future in futures.as_completed(to_do): result = future.result() # `responses` and `pending_requests` are instance properties, which means # client code can inspect instance to read responses as they are completed if result.req.error is not None or result.err == 'skwarg': continue try: self.pending_requests.remove(result.req) except KeyError: print('{} was not in pending requests, this is weird...'.format(result.req)) self.responses.append(result) self.is_done = True
def main(): config = core.get_config() core.init_logging("pynetsim", log_level=getattr(logging, config.get("main").get("log_level", "debug").upper())) log.debug("Starting socket listeners") listener_pool = ThreadPoolExecutor(max_workers=2) futures = [] tcp_listener = TCPSocketListener(config) udp_listener = UDPSocketListener(config) futures.append(listener_pool.submit(tcp_listener.start)) futures.append(listener_pool.submit(udp_listener.start)) core.wait() log.debug("Stopping socket listeners") tcp_listener.shutdown() udp_listener.shutdown() for future in futures: if future.running(): future.cancel() tcp_listener.shutdown() udp_listener.shutdown() log.debug("Exiting...")
def main(): os.makedirs(localstor, exist_ok=True) with open('tsd_dlink_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'date', 'fsize', 'sha1', 'md5']) global executor executor = futures.ThreadPoolExecutor() models = parse_models() startI = next(i for i,sp in enumerate(models) if sp[0]=='DBT' and sp[1]=='120') for model in models[startI:]: pfx,sfx = model[0], model[1] selectModel(pfx, sfx) print('wait for Executor shutdown') executor.shutdown(True)
def main(): try: executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) with open('netgear_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fw_ver', 'fileName', 'fw_url', 'fw_date', 'fileSize', 'sha1', 'md5']) catIdx = int(sys.argv[1]) if len(sys.argv)>1 else 0 famIdx = int(sys.argv[2]) if len(sys.argv)>2 else 0 prdIdx = int(sys.argv[3]) if len(sys.argv)>3 else 0 while True: catIdx, famIdx, prdIdx = main1(catIdx, famIdx, prdIdx, executor) if catIdx is None: return assert famIdx is not None assert prdIdx is not None print("\n[main] Continue from cat,fam,prd=(%d,%d,%d)\n" % (catIdx, famIdx, prdIdx)) except BaseException as ex: traceback.print_exc() finally: executor.shutdown(True)
def main(): global executor try: session = requests.Session() executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) url = 'http://www.zyxel.com/us/en/support/download_landing.shtml' with open('zyxel_us_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5']) resp = session.get(url=url) root = html.fromstring(resp.text) models = get_all_models(root) for modelName in sorted(models.keys()): kbid = models[modelName] resp2 = session.get(url='http://www.zyxel.com/us/en/support/DownloadLandingSR.shtml', params=dict(c="us", l="en", kbid=kbid, md=modelName)) walkFiles(modelName, session, resp2) except BaseException as ex: traceback.print_exc() finally: print('Wait for exeuctor shuddown') executor.shutdown(True)
def main(): global executor try: session = requests.Session() executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) url='http://downloadcenter.netgear.com' with open('netgear_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fw_ver', 'fileName', 'fw_url', 'fw_date', 'fileSize', 'sha1', 'md5']) response = session.get(url=url) root = html.fromstring(response.text) href = root.xpath(".//a[@id='ctl00_ctl00_ctl00_mainContent_localizedContent_bodyCenter_BasicSearchPanel_btnAdvancedSearch']/@href") href = strip_js(href[0]) formdata = {"__EVENTTARGET": href} resp2 = form_submit(session, root, url, "aspnetForm", formdata, {"Referer": url}) walkCategories(session, resp2) except BaseException as ex: traceback.print_exc() finally: executor.shutdown(True)
def main(): global executor try: session = requests.Session() executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) with open('tenda_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5']) walkFiles('http://www.tendacn.com/en/service/download-cata-11.html') walkFiles('http://tendacn.com/en/service/download-cata-11-2.html') walkFiles('http://www.tendacn.com/en/service/download-cata-11-3.html') except BaseException as ex: traceback.print_exc() finally: print('Wait for exeuctor shuddown') executor.shutdown(True)
def main(): global executor executor=ThreadPoolExecutor() os.makedirs(localstor, exist_ok=True) with open('us_dlink_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'fsize', 'fdate', 'sha1', 'md5']) start_url="http://support.dlink.com/AllPro.aspx?type=all" d = pq(url=start_url) # all 442 models models = [_.text_content().strip() for _ in d('tr > td:nth-child(1) > .aRedirect')] for model in models: prod_url = "http://support.dlink.com/ProductInfo.aspx?m=%s"%parse.quote(model) crawl_prod(prod_url, model) executor.shutdown(True)
def main(): global executor try: session = requests.Session() executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) url = 'http://support.netgear.cn/' with open('netgear_cn_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5']) resp = session.get(url=url) root = html.fromstring(resp.text) startProd = 1 prods = root.xpath(".//select[@name='select']/option") for iProd, prod in enumerate(prods[startProd:], startProd): # prodText = prod.xpath("./text()")[0].strip() prodUrl = prod.xpath("./@value")[0].strip() walkProd(session, urljoin(resp.url, prodUrl)) except BaseException as ex: traceback.print_exc() finally: print('Wait for exeuctor shuddown') executor.shutdown(True)
def main(): os.makedirs(localstor, exist_ok=True) with open('tsd_dlink_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'date', 'fsize', 'sha1', 'md5']) global executor executor = futures.ThreadPoolExecutor(None) models = parse_models() startI = 0 # next(i for i,sp in enumerate(models) if sp[0]=='DIR' and sp[1]=='845L') for model in models[startI:]: pfx,sfx = model[0], model[1] selectModel(pfx, sfx) print('wait for Executor shutdown') executor.shutdown(True)
def main(): global executor try: sess = requests.Session() executor = ThreadPoolExecutor() os.makedirs(dlDir, exist_ok=True) with open('tenda_us_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5']) walkSelects() # walkModels(sess, 'http://www.tendaus.com/Default.aspx?Module=WebsiteEN&Action=DownloadCenter') # for Id in range(1, 200): # walkTables(sess, "http://www.tendaus.com/Default.aspx?Module=WebsiteEN&Action=DownloadCenter&Id=%(Id)s"%locals()) except BaseException as ex: traceback.print_exc() finally: print('Wait for exeuctor shutdown') executor.shutdown(True)
def main(): with open('ca_dlink_filelist.csv', 'w') as fout: cw = csv.writer(fout) cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'date', 'fsize', 'sha1', 'md5']) global executor executor = futures.ThreadPoolExecutor() d = pq(url='http://support.dlink.ca/AllPro.aspx?type=all') # all 442 models models = [_.text_content().strip() for _ in d('tr > td:nth-child(1) > .aRedirect')] for model in models: prod_url = 'http://support.dlink.ca/ProductInfo.aspx?m=%s'%parse.quote(model) crawl_prod(prod_url, model) print('wait for Executor shutdown') executor.shutdown(True)
def __init__(self, app_name, args=None, loop=None): self._app_name = app_name self._shutting_down = False self._stats_mgr = None Option.parse_args(args) self._loop = loop or asyncio.get_event_loop() self._loop.set_debug(self.ASYNCIO_DEBUG) executor = ThreadPoolExecutor(max_workers=self.MAX_DEFAULT_EXECUTOR_THREADS) self._loop.set_default_executor(executor) self._init_logging() self._loop.add_signal_handler(signal.SIGINT, self.shutdown) self._loop.add_signal_handler(signal.SIGTERM, self.shutdown) self.logger = logging.getLogger(self._app_name)
def __init__(self, pool, dialect, url, logging_name=None, echo=None, execution_options=None, loop=None, **kwargs): self._engine = Engine( pool, dialect, url, logging_name=logging_name, echo=echo, execution_options=execution_options, **kwargs) self._loop = loop max_workers = None # https://www.python.org/dev/peps/pep-0249/#threadsafety if dialect.dbapi.threadsafety < 2: # This might seem overly-restrictive, but when we instantiate an # AsyncioResultProxy from AsyncioEngine.execute, subsequent # fetchone calls could be in different threads. Let's limit to one. max_workers = 1 self._engine_executor = ThreadPoolExecutor(max_workers=max_workers)
def get_node_health_mt(nodes_dict, check_type="normal", n_threads=8, print_out=False): """use multithreading to check each node health Arguments: nodes_dict {dict} -- [nodesIP(domainName)->(username, mem, CPU)] Keyword Arguments: check_type {str} -- [description] (default: {"normal"}) n_threads {number} -- [description] (default: {8}) """ with ThreadPoolExecutor(max_workers=n_threads) as executor: futures = {executor.submit(check_node_health, nodeinfo[0], node, check_type, print_out): node for node, nodeinfo in nodes_dict.items()} for future in as_completed(futures): node = futures[future] nodeinfo = nodes_dict[node] result = future.result() nodes_dict[node] = (nodeinfo[0], result) # print("{} {}".format(node, nodes_dict[node]))
def __init__(self, gRPC_module, inner_service_port=None): self.__peer_id = None if ObjectManager().peer_service is None else ObjectManager().peer_service.peer_id # for peer_service, it refers to peer_inner_service / for rs_service, it refers to rs_admin_service self.inner_server = grpc.server(futures.ThreadPoolExecutor(max_workers=conf.MAX_WORKERS)) self.outer_server = grpc.server(futures.ThreadPoolExecutor(max_workers=conf.MAX_WORKERS)) # members for private, It helps simplicity of code intelligence self.__gRPC_module = gRPC_module self.__port = 0 self.__inner_service_port = inner_service_port self.__peer_target = None if inner_service_port is not None: # It means this is Peer's CommonService not RS. peer_port = inner_service_port - conf.PORT_DIFF_INNER_SERVICE self.__peer_target = util.get_private_ip() + ":" + str(peer_port) self.__subscriptions = queue.Queue() # tuple with (channel, stub) self.__group_id = "" # broadcast process self.__broadcast_process = self.__run_broadcast_process() self.__loop_functions = []
def describe_threaded_runner(): @pytest.fixture(scope='module') def runner_fixture(): return ThreadedRunner(thread_pool_executor=ThreadPoolExecutor(max_workers=2)) def describe_run_all_threaded_behavior(): def given_many_tasks(): def when_first_task_fails(): def expect_later_tasks_still_run(runner_fixture): task1_mock, task2_mock = create_task_mock(count=2) task1_mock.successful = False with pytest.raises(TaskFailedError): runner_fixture.run_all([task1_mock, task2_mock]) task1_mock.execute.assert_called_once() task2_mock.execute.assert_called_once()
def build_tasks_hierarchy(swarmci_config, task_factory): stages_from_yaml = swarmci_config.pop('stages', None) if stages_from_yaml is None: raise SwarmCIError('Did not find "stages" key in the .swarmci file.') elif type(stages_from_yaml) is not list: raise SwarmCIError('The value of the "stages" key should be a list in the .swarmci file.') thread_pool_executor = ThreadPoolExecutor(max_workers=25) stage_tasks = [] for stage in stages_from_yaml: job_tasks = [] for job in stage['jobs']: commands = [] for cmd in job['commands']: commands.append(task_factory.create(TaskType.COMMAND, cmd=cmd)) job_tasks.append(task_factory.create(TaskType.JOB, job=job, commands=commands)) stage_tasks.append( task_factory.create(TaskType.STAGE, stage=stage, jobs=job_tasks, thread_pool_executor=thread_pool_executor)) return task_factory.create(TaskType.BUILD, stages=stage_tasks)
def run(): np.random.seed(42) config = ServerConfig.load(('./server.conf',)) if sys.version_info[2] >= 6: thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-') else: thread_pool = ThreadPoolExecutor(max_workers=32) app = Application(config, thread_pool) if config.ssl_key: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key) app.listen(config.port, ssl_options=ssl_ctx) else: app.listen(config.port) if config.user: os.setgid(grp.getgrnam(config.user)[2]) os.setuid(pwd.getpwnam(config.user)[2]) if sd: sd.notify('READY=1') tokenizer_service = TokenizerService() tokenizer_service.run() for language in config.languages: load_language(app, tokenizer_service, language, config.get_model_directory(language)) sys.stdout.flush() tornado.ioloop.IOLoop.current().start()
def start_server(riot_api_token, listening_port, max_workers): """Starts a server.""" service = MatchFetcher(riot_api_token) server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) service_pb2.add_MatchFetcherServicer_to_server(service, server) server.add_insecure_port('[::]:%s' % listening_port) server.start() return server, service
def test_explicit_loop_threaded(event_loop): async with base.CleanModel() as model: model_name = model.info.name new_loop = asyncio.new_event_loop() with ThreadPoolExecutor(1) as executor: f = executor.submit( new_loop.run_until_complete, _deploy_in_loop(new_loop, model_name, model._connector.jujudata)) f.result() await model._wait_for_new('application', 'ubuntu') assert 'ubuntu' in model.applications
def multi_thread(self, begin_id): self.make_id_set(begin_id) coll = MONGO_CLIENT['kr2']['kr_flashes_multi'] for i in range(20): t = threading.Thread(target=self.loop_parse_news_flashes, name='thread%s' % i, args=[coll]) t.start() # pool = ThreadPoolExecutor(64) # for i in range(16): # pool.submit(parse_news_flashes)
def main(): sys.path.append(os.path.join(os.path.dirname(__file__), '..')) from publicdns.client import PublicDNS domains = [] filename = os.path.join(os.path.dirname(__file__), 'google_domains.txt') with open(filename, 'r') as f: domains = f.read().split('\n') size = len(domains) tqdmargs = { 'total': 100, 'unit': 'it', 'unit_scale': True, 'leave': True, } with ThreadPoolExecutor(max_workers=4) as pool: print('- dns.resolver') started = timeit.default_timer() resolver = dns_resolver.Resolver() resolver.nameservers = ['8.8.8.8', '8.8.4.4'] futures = [pool.submit(resolver.query, domains[i % size], 'A') for i in range(100)] for _ in tqdm(as_completed(futures), **tqdmargs): pass elapsed = timeit.default_timer() - started print('dns.resolver * 100 - took {}s'.format(elapsed)) with ThreadPoolExecutor(max_workers=4) as pool: print('- PublicDNS') started = timeit.default_timer() client = PublicDNS() futures = [pool.submit(client.query, domains[i % size], 'A') for i in range(100)] for _ in tqdm(as_completed(futures), **tqdmargs): pass elapsed = timeit.default_timer() - started print('\nPublicDNS * 100 - took {}s'.format(elapsed))
def __init__(self): self.markets = [] self.observers = [] self.depths = {} self.init_markets(config.markets) self.init_observers(config.observers) self.threadpool = ThreadPoolExecutor(max_workers=10)
def __init__(self, neo4j_client: Neo4jClient, max_workers: int = None): self.executor = ThreadPoolExecutor(max_workers=max_workers) super().__init__(neo4j_client)
def test_wait_for_all(): def f(sleep_time: int): sleep(sleep_time) return sleep_time def calc(fs): fs_done = wait(fs).done r = sum(r.result() for r in fs_done) return r pool = ThreadPoolExecutor() fs = [pool.submit(f, arg) for arg in (3, 2, 5)] result = pool.submit(calc, fs).result() assert result == 10
def __init__(self, thread_num=2, *args, **kwargs): self.thread_num = thread_num self._queue = queue.Queue(maxsize=200) self.api_no_connection = TdxHq_API() self._api_worker = Thread( target=self.api_worker, args=(), name='API Worker') self._api_worker.start() self.executor = ThreadPoolExecutor(self.thread_num)
def __init__(self): self.metadata = MockMetadata() self.added_hosts = [] self.removed_hosts = [] self.scheduler = Mock(spec=_Scheduler) self.executor = Mock(spec=ThreadPoolExecutor) self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(RoundRobinPolicy())