我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用threading.active_count()。
def zmirror_status(): """????????????""" if request.remote_addr and request.remote_addr != '127.0.0.1': return generate_simple_resp_page(b'Only 127.0.0.1 are allowed', 403) output = "" output += strx('extract_real_url_from_embedded_url', extract_real_url_from_embedded_url.cache_info()) output += strx('\nis_content_type_streamed', is_mime_streamed.cache_info()) output += strx('\nembed_real_url_to_embedded_url', embed_real_url_to_embedded_url.cache_info()) output += strx('\ncheck_global_ua_pass', check_global_ua_pass.cache_info()) output += strx('\nextract_mime_from_content_type', extract_mime_from_content_type.cache_info()) output += strx('\nis_content_type_using_cdn', is_content_type_using_cdn.cache_info()) output += strx('\nis_ua_in_whitelist', is_content_type_using_cdn.cache_info()) output += strx('\nis_mime_represents_text', is_mime_represents_text.cache_info()) output += strx('\nis_domain_match_glob_whitelist', is_domain_match_glob_whitelist.cache_info()) output += strx('\nverify_ip_hash_cookie', verify_ip_hash_cookie.cache_info()) output += strx('\nis_denied_because_of_spider', is_denied_because_of_spider.cache_info()) output += strx('\nis_ip_not_in_allow_range', is_ip_not_in_allow_range.cache_info()) output += strx('\n\ncurrent_threads_number', threading.active_count()) # output += strx('\nclient_requests_text_rewrite', client_requests_text_rewrite.cache_info()) # output += strx('\nextract_url_path_and_query', extract_url_path_and_query.cache_info()) output += strx('\n----------------\n') output += strx('\ndomain_alias_to_target_set', domain_alias_to_target_set) return "<pre>" + output + "</pre>\n"
def run(self): log.debug('[ Start TweetThread ]') i = 1 a = float(1.5) # GetInfoThread?GetCancelThread, GetNewsThread????????? while active_count() >= 3: time.sleep(1) else: while True: try: t = self.queue.get(block=False, timeout=None) except Exception: # ???????????? log.debug('[ End TweetThread ]\n') break if i < 12: i += 1 # 1.5^(????)??? w = pow(a, i) time.sleep(w) lib.tweeter.tweet(t)
def run_experiment(agents_def): assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)'\ % len(agents_def) processes = [] for agent in agents_def: p = Thread(target=agent_factory, kwargs=agent) p.daemon = True p.start() # Give the server time to start if agent['role'] == 0: sleep(1) processes.append(p) try: # wait until only the challenge agent is left while active_count() > 2: sleep(0.1) except KeyboardInterrupt: print('Caught control-c - shutting down.')
def run_experiment(agents_def): assert len(agents_def) == 2, 'Not enough agents (required: 2, got: %d)' \ % len(agents_def) processes = [] for agent in agents_def: p = Thread(target=agent_factory, kwargs=agent) p.daemon = True p.start() # Give the server time to start if agent['role'] == 0: sleep(1) processes.append(p) try: # wait until only the challenge agent is left while active_count() > 2: sleep(0.1) except KeyboardInterrupt: print('Caught control-c - shutting down.')
def test_executes_events(self): queue = Queue() first_queue_mock = Mock() queue_name = "queue" multi_thread_handler = MultiThreadHandler(queue) multi_thread_handler.add_function(queue_name, first_queue_mock) event_count = 10 for i in range(0, 10): queue.put_nowait(Event(queue_name, "some data")) try: multi_thread_handler.start() wait_until_success(lambda: self.assertEqual(first_queue_mock.call_count, event_count)) thread_count = get_config()["handlers"]["multi_thread"]["thread_count"] self.assertTrue(threading.active_count() >= thread_count + 1) finally: multi_thread_handler.stop()
def test_safe_terminate(execmodel): if execmodel.backend != "threading": pytest.xfail("execution model %r does not support task count" % execmodel.backend) import threading active = threading.active_count() l = [] def term(): py.std.time.sleep(3) def kill(): l.append(1) safe_terminate(execmodel, 1, [(term, kill)] * 10) assert len(l) == 10 sleep(0.1) py.std.gc.collect() assert execmodel.active_count() == active
def test_safe_terminate2(execmodel): if execmodel.backend != "threading": pytest.xfail("execution model %r does not support task count" % execmodel.backend) import threading active = threading.active_count() l = [] def term(): return def kill(): l.append(1) safe_terminate(execmodel, 3, [(term, kill)] * 10) assert len(l) == 0 sleep(0.1) py.std.gc.collect() assert threading.active_count() == active
def main(): basic = BasicClass() print('starting') basic.start() try: while True: time.sleep(0.5) except KeyboardInterrupt: pass print('stopping threads') basic.stop() # wait for all threads to terminate while threading.active_count() > 1: time.sleep(0.1) print('threads stopped') sys.exit(0)
def test_dummy_thread_after_fork(self): # Issue #14308: a dummy thread in the active list doesn't mess up # the after-fork mechanism. code = """if 1: import thread, threading, os, time def background_thread(evt): # Creates and registers the _DummyThread instance threading.current_thread() evt.set() time.sleep(10) evt = threading.Event() thread.start_new_thread(background_thread, (evt,)) evt.wait() assert threading.active_count() == 2, threading.active_count() if os.fork() == 0: assert threading.active_count() == 1, threading.active_count() os._exit(0) else: os.wait() """ _, out, err = assert_python_ok("-c", code) self.assertEqual(out, '') self.assertEqual(err, '')
def test_BaseStream_connect(): event = threading.Event() def dummy_func(): while not event.is_set(): time.sleep(1.) base = BaseStream() n_threads_0 = threading.active_count() base.connect(dummy_func, "TEST") n_threads_1 = threading.active_count() # Check that a thread was started. assert n_threads_1 - n_threads_0 == 1, "Thread not started." # Check that the thread was created and named properly. name = [t.getName() for t in threading.enumerate() if t.getName() == "TEST"] assert name[0] == "TEST", "Thread not named properly." # Check that connect method only allows one connection. with pytest.raises(RuntimeError): base.connect(dummy_func, "SECOND_TEST") # Clean up. event.set()
def natAndSend(self,packet,port): # NAT for first packet - This removes the need to dynamically add the 3rd rule (Controller Rule) # A static rule in P4 now sends packets received back form the controller out the external port. packet[IP].src = self.router_ext_ip packet[TCP].sport = port # Recalculate checksum Scapy way? del packet[IP].chksum del packet[TCP].chksum packet = packet.__class__(str(packet)) # packet already processed so don't process again - resubmit so reason won't be valid anymore new_p_str = '\x00' * 6 + str(packet) #print "threading.active_count " + str(threading.active_count()) s = conf.L2socket(iface=self.controller_port) # sendp(new_p_str, iface=self.controller_port, verbose=0) s.send(new_p_str) print port
def test_dummy_thread_after_fork(self): # Issue #14308: a dummy thread in the active list doesn't mess up # the after-fork mechanism. code = """if 1: import _thread, threading, os, time def background_thread(evt): # Creates and registers the _DummyThread instance threading.current_thread() evt.set() time.sleep(10) evt = threading.Event() _thread.start_new_thread(background_thread, (evt,)) evt.wait() assert threading.active_count() == 2, threading.active_count() if os.fork() == 0: assert threading.active_count() == 1, threading.active_count() os._exit(0) else: os.wait() """ _, out, err = assert_python_ok("-c", code) self.assertEqual(out, b'') self.assertEqual(err, b'')
def imperial_pushed(self): """Activates imperial units and changes the look of the units buttons. Returns: None """ if self.v_link["var_units"].get() == "metric": self.v_link["var_units"].set("imperial") self.controller.update_buttons() if all([self.controller.data_present, self.v_link["error_status"] == 0, threading.active_count() < 2]): self.v_link["scrollbar_offset"] = self.yscrollbar.get() self.controller.show_display("imperial")
def begin_get_report(self): """Begin getting data for the weather report to display it on the main_canvas. The call goes to the Controller first. Then to the Model. Returns: None """ # Do nothing if no location is entered or an active sub thread # is running. if (self.v_link["var_loc"].get() == "") \ or (threading.active_count() > 1): return # Clear any error status message. self.v_link["error_message"] = "" self.v_link["var_status"].set("Gathering data, please wait...") # Request a report using a Mediating Controller in a new thread. report_thread = threading.Thread(target=self.controller.get_report) report_thread.start()
def check(i,total): global eu #os.system("title Spider,Current threads: %d,URLs left: %d,URLs exists:%d" %(threading.active_count(),total,eu)) try: #payload = {'username': 'admin', 'passwd': '123456'} r=requests.get(i+'/invoker/JMXInvokerServlet',timeout=5) status=r.status_code c=r.content.count('jboss') r_l=len(r.text) except: print i,'Timeout' status = 0 if status == 200 and c !=0: r = 0 print i,'Exists!!!!!' eu+=1 f = open("good_jboss.txt", 'a') f.write(i+'\n') f.close()
def main(): global eu eu = 0 total=len(open('8080.txt','rU').readlines()) print 'Total URLs:%d' %total for i in open("8080.txt").readlines(): i=i.strip('\n') t=threading.Thread(target=check, args=(i,total)) t.setDaemon(True) total-=1 while True: if(threading.active_count() == 1 and total == 0 ): print 'All Done at %s' %time.strftime("%Y-%m-%d[%H.%M.%S]") break elif (threading.active_count() < 200): if (total == 0): time.sleep(10) else: os.system("title Spider,Current threads: %d,URLs left: %d,URLs exists:%d" %(threading.active_count(),total,eu)) t.start() break
def main(): global eu eu = 0 total=len(open('10000.txt','rU').readlines()) print 'Total URLs:%d' %total for i in open("10000.txt").readlines(): i=i.strip('\n') t=threading.Thread(target=check, args=(i,total)) t.setDaemon(True) total-=1 while True: if(threading.active_count() == 1 and total == 0 ): print 'All Done at %s' %time.strftime("%Y-%m-%d[%H.%M.%S]") break elif (threading.active_count() < 200): if (total == 0): time.sleep(10) else: os.system("title Spider,Current threads: %d,URLs left: %d,URLs exists:%d" %(threading.active_count(),total,eu)) t.start() break
def scrapeCitiesFromList(cityList, dirPrefix, threadCount, scrapers): """scrapes locations in cities in cityList to file in dirPrefix, spawns threadCount threads""" threads = [] i = 0 while(True): if(threading.active_count() <= threadCount): if(i == len(cityList)): break for s in scrapers: if(not s.inUse): threads.append(ScrapeThread(target=scrapeCityToFile, args=(dirPrefix, cityList[i], s))) threads[-1].start() i += 1 break time.sleep(1) for t in threads: t.join()
def scrapeLocationsFromList(locList, dirPrefixes, date, timeWindow, threadCount, maxPosts, scrapers): """scrapes postscounts at locations in locList in timeWindow before date, spawning threadCount threads, scrolling to maximum of maxPosts""" threads = [] i = 0 while(True): #start a new thread if fewer than threadCount active if(threading.active_count() <= threadCount): if(i == len(locList)): #don't start more threads than locations break for s in scrapers: #find a free scraper if(not s.inUse): threads.append(ScrapeThread(target=scrapeLocationToFile, args=(dirPrefixes[i], locList[i], date, timeWindow, maxPosts, s))) threads[-1].start() i += 1 break time.sleep(1) #once all threads started, wait for them to finish for t in threads: t.join()
def run_thread(req_list, name=None, is_lock=True, limit_num=8): ''' ????? - req_list ????, list, ?????????, ??? - [ - (func_0, (para_0_1, para_0_2, *,)), - (func_1, (para_1_1, para_1_2, *,)), - * - ] - name ???, str, ???None - is_lock ??????, bool, ???True, ????, False???? - limit_num ?????, int, ???8 ''' queue = deque(req_list) while len(queue): if threading.active_count() <= limit_num: para = queue.popleft() now_thread = threading.Thread( target=para[0], args=para[1], name=name, daemon=True) now_thread.start() if is_lock: for now_thread in threading.enumerate(): if now_thread is not threading.currentThread(): now_thread.join()
def _threaded_perform_search(self): self._perform_search_complete = False # generate a name and ensure we never have two threads # with the same name names = [thread.name for thread in threading.enumerate()] for i in range(threading.active_count() + 1, 0, -1): thread_name = 'ThreadedQuery-%s' % i if not thread_name in names: break # create and start it t = threading.Thread( target=self._blocking_perform_search, name=thread_name) t.start() # don't block the UI while the thread is running context = GObject.main_context_default() while not self._perform_search_complete: time.sleep(0.02) # 50 fps while context.pending(): context.iteration() t.join() # call the query-complete callback self.emit("query-complete")
def _buildJobs (self): """ Build the jobs. """ self.props['channel'] = Channel.create([None] * self.size) rptjob = 0 if self.size == 1 else randint(0, self.size-1) def bjSingle(i): job = Job(i, self) job.init() self.jobs[i] = job row = tuple(job.data['out'].values()) self.props['channel'][i] = row utils.parallel(bjSingle, [(i, ) for i in range(self.size)], self.nthread) self.log('After job building, active threads: %s' % threading.active_count(), 'debug') if self.jobs[0].data['out']: self.channel.attach(*self.jobs[0].data['out'].keys()) self.jobs[rptjob].report()
def _get_lrc(self): while True: print(self.q.qsize()) if self.q.empty(): break if threading.active_count() > 5: time.sleep(3) continue m_info = self.q.get() try: t = threading.Thread(target=self._save_lrc, args=(m_info,)) t.start() t.join() except: self.q.put(m_info) # ??????,??
def onquit(self, ev): "Check if program is busy before quitting" if active_count() > 1: TkDialog(None, "Please wait until conversions are complete!", "Info").run() elif self.rec is None: self.quit = True
def run(self): log.debug('[ Start PrintThread ]') # GetInfoThread?GetCancelThread, GetNewsThread????????? while active_count() >= 3: time.sleep(1) else: while True: try: t = self.queue.get(block=False, timeout=None) except Exception: # ???????????? log.debug('[ End PrintThread ]\n') break print t
def do_media_info(self): """ Shows basic media info. """ if self.is_client_mod: self.send_private_msg('*Playlist Length:* ' + str(len(self.media.track_list)), self.active_user.nick) self.send_private_msg('*Track List Index:* ' + str(self.media.track_list_index), self.active_user.nick) self.send_private_msg('*Elapsed Track Time:* ' + self.format_time(self.media.elapsed_track_time()), self.active_user.nick) self.send_private_msg('*Active Track:* ' + str(self.media.has_active_track()), self.active_user.nick) self.send_private_msg('*Active Threads:* ' + str(threading.active_count()), self.active_user.nick)
def update(haproxy_config): logger.debug("Started job...") try: start_time = time.time() resolvers = Service.query_resolvers() services = Service.query_services() Haproxy().update(resolvers=resolvers, services=services, **haproxy_config) metrics.info('background-refresh.duration {}'.format(time.time() - start_time)) metrics.info('active-thread-count {}'.format(threading.active_count())) finally: logger.debug("Finished job.")
def main(): for i in range(random.randint(2,50)): thread = threading.Thread(target=myThread, args=(i,)) thread.start() time.sleep(4) print("Total Number of Active Threads: {}".format(threading.active_count()))
def _teardown(self): """ restore radio and wait on tuning thread""" clean = True self._err = "" # restore the radio - this will have the side effect of # causing the threads to error out and quit try: if self._card: phy = self._card.phy pyw.devdel(self._card) card = pyw.phyadd(phy,self._dev,self._dinfo['mode']) pyw.up(card) except pyric.error as e: clean = False self._err = "ERRNO {0} {1}".format(e.errno, e.strerror) # join threads, waiting a short time before continuing try: self._tuner.join(5.0) except (AttributeError,RuntimeError): # either tuner is None, or it never started pass try: self._sniffer.join(5.0) except (AttributeError, RuntimeError): # either sniffer is None, or it never started pass if threading.active_count() > 0: clean = False self._err += "One or more workers failed to stop" return clean
def handle(self): while True: self.logger.info("Handler thread name = {}/active count = {}" .format(threading.current_thread().name, threading.active_count())) header = MessageHeader( self.rfile.read(MessageHeader.HEADER_LENGTH)) self.logger.info("Message length = {}".format(header.length)) self.data = "" while len(self.data) < header.length: buf = self.rfile.read( min(self.MAX_BUF_LENGTH, header.length - len(self.data))) self.data += str(buf, "UTF-8") self.logger.info("current length = {}, total length = {}" .format(len(self.data), header.length)) self.logger.info("{0} request = {1}" .format(self.client_address[0], self.data)) response = JSONRPCResponseManager.handle(self.data, dispatcher) self.logger.info("response for {0} = {1}" .format(self.client_address[0], response.json)) self.wfile.write(MessageHeader.create( len(response.json)) + bytes(response.json, "UTF-8"))
def main(): parser = argparse.ArgumentParser() parser.add_argument("config") args = parser.parse_args() config = parse_config(args.config) for name, repo in config.items(): github_auth = (repo["github_username"], repo["github_password"]) snooze_label = repo["snooze_label"] ignore_members_of = repo["ignore_members_of"] callback = lambda event, message: github_callback(event, message, github_auth, snooze_label, ignore_members_of) listener = RepositoryListener( callbacks=[callback], events=LISTEN_EVENTS, **repo) t = threading.Thread(target=poll_forever, args=(listener, repo["poll_interval"])) t.daemon = True t.start() while True: # wait forever for a signal or an unusual termination if threading.active_count() < len(config) + 1: logging.error("Child polling thread quit!") return False time.sleep(1) return True
def do_media_info(self): """ Shows basic media info. """ if self.is_client_mod: self.send_owner_run_msg('*Playlist Length:* ' + str(len(self.media.track_list))) self.send_owner_run_msg('*Track List Index:* ' + str(self.media.track_list_index)) self.send_owner_run_msg('*Elapsed Track Time:* ' + self.format_time(self.media.elapsed_track_time())) self.send_owner_run_msg('*Active Track:* ' + str(self.media.has_active_track())) self.send_owner_run_msg('*Active Threads:* ' + str(threading.active_count()))
def test_starts_multiple_threads(self): multi_thread_handler = MultiThreadHandler(Queue()) multi_thread_handler.start() thread_count = get_config()["handlers"]["multi_thread"]["thread_count"] try: wait_until_success(lambda: self.assertTrue(threading.active_count() >= thread_count + 1)) finally: multi_thread_handler.stop()
def test_paralleldiagnostics_start_workers_doesnt_start_above_concurrency_limit(self): begin_threads = threading.active_count() ec2rlcore.paralleldiagnostics._start_workers(self.workers, 2, self.options, self.work_queue, self.logdir) self.assertEqual(len(self.workers), 2) ec2rlcore.paralleldiagnostics._start_workers(self.workers, 2, self.options, self.work_queue, self.logdir) self.assertEqual(len(self.workers), 2) ec2rlcore.paralleldiagnostics._start_workers(self.workers, 1, self.options, self.work_queue, self.logdir) self.assertEqual(len(self.workers), 2) # Clean up workers # This is tested in test_workers_lifecycle_sentinels() for _ in self.workers: self.work_queue.put(None)
def test_paralleldiagnostics_start_workers_does_start_additional_workers(self): begin_threads = threading.active_count() ec2rlcore.paralleldiagnostics._start_workers(self.workers, 2, self.options, self.work_queue, self.logdir) # Must have more threads than we started with, and 2 workers self.assertGreater(threading.active_count(), begin_threads) self.assertEqual(len(self.workers), 2) ec2rlcore.paralleldiagnostics._start_workers(self.workers, 4, self.options, self.work_queue, self.logdir) # Must have 4 workers self.assertEqual(len(self.workers), 4) # Clean up workers # This is tested in test_workers_lifecycle_sentinels() for _ in self.workers: self.work_queue.put(None)
def main(): parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", type=str, help="Configuration file") args = parser.parse_args() starttime = time.time() # Store opened shell sessions shells = {} # FIXME redis connection settings should be in config redis_conn = redis.StrictRedis() p = redis_conn.connection_pool publish = gevent.spawn(publisher, redis_conn) # FIXME: use config workers = 2 log.info(_("Spawning %s greenlets connecting to Redis..."), workers) redis_greenlets = [gevent.spawn(execute_workflow, redis_conn, _id, shells) for _id in xrange(workers)] # Wait until all greenlets have started and connected. gevent.sleep(1) log.info(_("# active `threading` threads: %s") % threading.active_count()) log.info(_("# Redis connections created: %s") % p._created_connections) log.info(_("# Redis connections in use: %s") % len(p._in_use_connections)) log.info(_("# Redis connections available: %s") % len(p._available_connections)) log.info(_("Waiting for Redis connection greenlets to terminate...")) gevent.joinall(redis_greenlets) d = time.time() - starttime log.info(_("All Redis connection greenlets terminated. Duration: %.2f s.") % d) publish.kill()