我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用gevent.pool.Group()。
def test_group(self): def talk(msg): for i in xrange(3): print(msg) g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz') group = Group() group.add(g1) group.add(g2) group.join() group.add(g3) group.join()
def get_conns(cred, providers): """Collect node data asynchronously using gevent lib.""" cld_svc_map = {"aws": conn_aws, "azure": conn_az, "gcp": conn_gcp} sys.stdout.write("\rEstablishing Connections: ") sys.stdout.flush() busy_obj = busy_disp_on() conn_fn = [[cld_svc_map[x.rstrip('1234567890')], cred[x], x] for x in providers] cgroup = Group() conn_res = [] conn_res = cgroup.map(get_conn, conn_fn) cgroup.join() conn_objs = {} for item in conn_res: conn_objs.update(item) busy_disp_off(dobj=busy_obj) sys.stdout.write("\r \r") sys.stdout.write("\033[?25h") # cursor back on sys.stdout.flush() return conn_objs
def get_data(conn_objs, providers): """Refresh node data using previous connection-objects.""" cld_svc_map = {"aws": nodes_aws, "azure": nodes_az, "gcp": nodes_gcp} sys.stdout.write("\rCollecting Info: ") sys.stdout.flush() busy_obj = busy_disp_on() collec_fn = [[cld_svc_map[x.rstrip('1234567890')], conn_objs[x]] for x in providers] ngroup = Group() node_list = [] node_list = ngroup.map(get_nodes, collec_fn) ngroup.join() busy_disp_off(dobj=busy_obj) sys.stdout.write("\r \r") sys.stdout.write("\033[?25h") # cursor back on sys.stdout.flush() return node_list
def __init__(self, locust_classes, options): self.locust_classes = locust_classes self.hatch_rate = options.hatch_rate self.num_clients = options.num_clients self.num_requests = options.num_requests self.host = options.host self.locusts = Group() self.state = STATE_INIT self.hatching_greenlet = None self.exceptions = {} self.stats = global_stats # register listener that resets stats when hatching is complete def on_hatch_complete(user_count): self.state = STATE_RUNNING logger.info("Resetting stats\n") self.stats.reset_all() events.hatch_complete += on_hatch_complete
def kill_locusts(self, kill_count): """ Kill a kill_count of weighted locusts from the Group() object in self.locusts """ bucket = self.weight_locusts(kill_count) kill_count = len(bucket) self.num_clients -= kill_count logger.info("Killing %i locusts" % kill_count) dying = [] for g in self.locusts: for l in bucket: if l == g.args[0]: dying.append(g) bucket.remove(l) break for g in dying: self.locusts.killone(g) events.hatch_complete.fire(user_count=self.num_clients)
def pipeline(stages, initial_data): monitors = Group() # Make sure items in initial_data are iterable. if not isinstance(initial_data, types.GeneratorType): try: iter(initial_data) except: raise TypeError('initial_data must be iterable') # The StopIteration will bubble through the queues as it is reached. # Once a stage monitor sees it, it indicates that the stage will read # no more data and the monitor can wait for the current work to complete # and clean up. if hasattr(initial_data, 'append'): initial_data.append(StopIteration) if not stages: return PipelineResult(monitors, []) # chain stage queue io # Each stage shares an output queue with the next stage's input. qs = [initial_data] + [Queue() for _ in range(len(stages))] for stage, in_q, out_q in zip(stages, qs[:-1], qs[1:]): stage.in_q = in_q stage.out_q = out_q monitors.spawn(stage_monitor, stage) gevent.sleep(0) return PipelineResult(monitors, stages[-1].out_q)
def gevent_click_page(): global TRY_COUNT TRY_COUNT = int(sys.argv[1]) _log.info('????????...') # ???????? driver = webdriver.PhantomJS() driver.get('https://www.xncoding.com/archives/') # driver.maximize_window() posts_count = len(driver.find_elements_by_xpath( '//article/header/h1[@class="post-title"]/a[@class="post-title-link"]')) driver.close() # gevent?pool?? psize = posts_count / THREAD_COUNT _log.info('???????:{}, ??????????:{}'.format(posts_count, psize)) group = Group() for i in range(0, THREAD_COUNT + 1): group.add(gevent.spawn(_click_page, posts_count, psize, i)) group.join() _log.info('????...')
def __init__(self, callback=None, ttype=None, source=None): """ ??????? """ self.group = Group() self.task_queue = Queue() self.task_type = ttype self.cb = callback self.source = source self.task_name = "%s-%s-%s" % (socket.gethostname(), self.source, self.task_type)
def __init__(self, *args, **kwargs): super(MasterLocustRunner, self).__init__(*args, **kwargs) class SlaveNodesDict(dict): def get_by_state(self, state): return [c for c in self.values() if c.state == state] @property def ready(self): return self.get_by_state(STATE_INIT) @property def hatching(self): return self.get_by_state(STATE_HATCHING) @property def running(self): return self.get_by_state(STATE_RUNNING) self.clients = SlaveNodesDict() self.server = rpc.Server(self.master_bind_host, self.master_bind_port) self.greenlet = Group() self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop) # listener that gathers info on how many locust users the slaves has spawned def on_slave_report(client_id, data): if client_id not in self.clients: logger.info("Discarded report from unrecognized slave %s", client_id) return self.clients[client_id].user_count = data["user_count"] events.slave_report += on_slave_report # register listener that sends quit message to slave nodes def on_quitting(): self.quit() events.quitting += on_quitting
def __init__(self, *args, **kwargs): super(SlaveLocustRunner, self).__init__(*args, **kwargs) self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0, 10000)).encode('utf-8')).hexdigest() self.client = rpc.Client(self.master_host, self.master_port) self.greenlet = Group() self.greenlet.spawn(self.worker).link_exception(callback=self.noop) self.client.send(Message("client_ready", None, self.client_id)) self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop) # register listener for when all locust users have hatched, and report it to the master node def on_hatch_complete(user_count): self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id)) events.hatch_complete += on_hatch_complete # register listener that adds the current number of spawned locusts to the report that is sent to the master node def on_report_to_master(client_id, data): data["user_count"] = self.user_count events.report_to_master += on_report_to_master # register listener that sends quit message to master def on_quitting(): self.client.send(Message("quit", None, self.client_id)) events.quitting += on_quitting # register listener thats sends locust exceptions to master def on_locust_error(locust_instance, exception, tb): formatted_tb = "".join(traceback.format_tb(tb)) self.client.send(Message("exception", {"msg" : str(exception), "traceback" : formatted_tb}, self.client_id)) events.locust_error += on_locust_error
def __init__(self): self.mods=[] self.runMods=[] self.group = Group() self.app=None self.ProcessID=None
def __init__(self, name, group=pool.Group()): self.name = name self._commands = {} # cmd_id : command self._greenlets = {} # cmd_id : greenlet self._decorators = {} # cmd_id : callable self._group = group self._queue = queue.Queue() self.services.append(weakref.ref(self))
def stage_monitor(stage): """ Stage monitor is a worker that monitors a stage while it is being executed. The stage monitor coordinates running stage workers, saving results, and determining the end of any particular stage. """ # Pool of stage function worker greenlets. work_pool = Pool(size=stage.n_workers) # Group of greenlets which save results from workers via callbacks. save_group = Group() def save_result(x): """ Save results onto the output queue as a tuple or if there is only a single returned value, save that instead as that singular item. """ if type(stage) == Reduce: # XXX: This would not work for stream inputs afaict # But, reduction should not work anyway if len(work_pool) + len(save_group) + len(stage.in_q) == 1: stage.out_q.put(x) else: if not stage.returns_many: stage.out_q.put(x) else: try: for i in x: stage.out_q.put(i) except: stage.out_q.put([x]) for x in stage.in_q: """ Iterate the input queue until StopIteration is received. Spawn new workers for work items on the input queue. Keep track of storing results via a group of result saving greenlets. Ignore all DROP items in the input queue. Once we receive a StopIteration, wait for all open workers to finish and once they are finished, bubble the StopIteration to the next stage """ gevent.sleep(0) if x is DROP: continue if x is StopIteration: break func_args = [x] cb_worker = work_pool.apply_async(stage.func, func_args, callback=save_result) save_group.add(cb_worker) logger.debug('Worker Pool: << {} >>'.format(work_pool)) work_pool.join() save_group.join() stage.out_q.put(StopIteration) return stage