我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用gevent.wait()。
def test_run(self, chat): client = Mock() chat.pubsub = Mock() chat.pubsub.channels = ['quorum'] chat.pubsub.listen.return_value = [{ 'type': 'message', 'channel': 'quorum', 'data': 'Calloo! Callay!', }] chat.greenlet = Mock() chat.subscribe(client, 'quorum') chat.run() gevent.wait() # wait for event loop client.send.assert_called_once_with('quorum:Calloo! Callay!')
def stop_and_wait(self): # Stop handling incoming packets, but don't close the socket. The # socket can only be safely closed after all outgoing tasks are stopped self.transport.stop_accepting() # Stop processing the outgoing queues self.event_stop.set() gevent.wait(self.greenlets) # All outgoing tasks are stopped. Now it's safe to close the socket. At # this point there might be some incoming message being processed, # keeping the socket open is not useful for these. self.transport.stop() # Set all the pending results to False for waitack in self.senthashes_to_states.itervalues(): waitack.async_result.set(False)
def send_and_wait(self, recipient, message, timeout): """ Send `message` to `recipient` and wait for the response or `timeout`. Args: recipient (address): The address of the node that will receive the message. message: The transfer message. timeout (float): How long should we wait for a response from `recipient`. Returns: None: If the wait timed out object: The result from the event """ if not isaddress(recipient): raise ValueError('recipient is not a valid address.') self.protocol.send_and_wait(recipient, message, timeout)
def test_leaving(raiden_network, token_addresses): token_address = token_addresses[0] connection_managers = [ app.raiden.connection_manager_for_token(token_address) for app in raiden_network ] all_channels = list( itertools.chain.from_iterable( connection_manager.receiving_channels for connection_manager in connection_managers ) ) leaving_async = [ app.raiden.leave_all_token_networks_async() for app in raiden_network[1:] ] gevent.wait(leaving_async, timeout=50) assert not connection_managers[0].receiving_channels assert all( channel.state == CHANNEL_STATE_SETTLED for channel in all_channels )
def runner(args): if args.command == "file": jobs = [] files = frozenset() for fn in args.file: files = files.union(glob.iglob(fn)) logger.info("Processing files: %s", files) for f in files: argsCopy = copy.deepcopy(args) with open(f, "r", newline="\n") as fh: argsCopy.file = fh contentArgs = ContentArgParser(fh) contentArgs.updateArgs(argsCopy) logger.debug("Updated args: %s", argsCopy) jobs.append(gevent.spawn(processItem, argsCopy, contentArgs)) gevent.wait(jobs) return 0 else: return processItem(args)
def main(): parser = argparse.ArgumentParser(description="Haproxy agent check service") parser.add_argument("-c", "--config", default="/etc/herald/config.yml", type=str, help="path to yaml configuraion file") parser.add_argument("-b", "--bind", default='0.0.0.0', type=str, help="listen address") parser.add_argument("-p", "--port", default=5555, type=int, help="listen port") parser.add_argument("-l", "--loglevel", default='info', choices=['info', 'warn', 'debug', 'critical'], type=str, help="set logging level") args = parser.parse_args() setup_logging(args) config = load_configuration(args.config) all_plugins = load_all_plugins(config['plugins_dir']) plugin = load_plugin(all_plugins, config['plugins']) start_plugin(plugin) server = start_server(args, config, plugin) setup_handlers(server, plugin) gevent.wait()
def amqp_consume(self): """Block on AMQP channel messages until an exception raises""" log.info("%s: Starting AMQP consumer.", self.lbl) try: while True: self.chan.wait() except BaseException as exc: log.error("%s: AMQP consumer exception %r, stopping.", self.lbl, exc) self.close_chan() self.close_conn()
def main(args=None, workers=None, client=EchoHubClient, worker_kwargs=None): gevent.monkey.patch_all() args = args if args else prepare_argparse().parse_args() prepare_logging(args.verbose or 1) if args.mode == 'server': hub = HubServer(workers=workers) elif args.mode == 'client': hub = client(worker_kwargs=worker_kwargs) else: raise Exception("Unknown mode '%s'." % args.mode) def sig_handler(sig=None, frame=None): log.warning("Hub process received SIGTERM/SIGINT") hub.stop() log.info("Sig handler completed.") gevent.signal(signal.SIGTERM, sig_handler) gevent.signal(signal.SIGINT, sig_handler) # KeyboardInterrupt also hub.start() gevent.wait()
def chat(sockets, pubsub): chat = sockets.ChatBackend() chat.pubsub = pubsub yield chat gevent.wait()
def test_heartbeat(self, chat, sockets): client = Mock() client.closed = False chat.send = Mock() sockets.HEARTBEAT_DELAY = 1 gevent.spawn(chat.heartbeat, client) gevent.sleep(2) client.closed = True gevent.wait() chat.send.assert_called_with(client, 'ping')
def retry(protocol, data, receiver_address, event_stop, timeout_backoff): """ Send data until it's acknowledged. Exits when the first of the following happen: - The packet is acknowledged. - Event_stop is set. - The iterator timeout_backoff runs out of values. Returns: bool: True if the message was acknowledged, False otherwise. """ async_result = protocol.send_raw_with_result( data, receiver_address, ) event_quit = event_first_of( async_result, event_stop, ) for timeout in timeout_backoff: if event_quit.wait(timeout=timeout) is True: break protocol.send_raw_with_result( data, receiver_address, ) return async_result.ready()
def wait_recovery(event_stop, event_healthy): event_first_of( event_stop, event_healthy, ).wait() if event_stop.is_set(): return # There may be multiple threads waiting, do not restart them all at # once to avoid message flood. gevent.sleep(random.random())
def send_and_wait(self, receiver_address, message, timeout=None): """Sends a message and wait for the response ack.""" async_result = self.send_async(receiver_address, message) return async_result.wait(timeout=timeout)
def leave_all_token_networks_async(self): token_addresses = self.token_to_channelgraph.keys() leave_results = [] for token_address in token_addresses: try: connection_manager = self.connection_manager_for_token(token_address) except InvalidAddress: pass leave_results.append(connection_manager.leave_async()) combined_result = AsyncResult() gevent.spawn(gevent.wait, leave_results).link(combined_result) return combined_result
def test_throughput(apps, tokens, num_transfers, amount): def start_transfers(curr_app, curr_token, num_transfers): graph = curr_app.raiden.token_to_channelgraph[curr_token] all_paths = graph.get_paths_of_length( source=curr_app.raiden.address, num_hops=2, ) path = all_paths[0] target = path[-1] api = curr_app.raiden.api events = list() for i in range(num_transfers): async_result = api.transfer_async( curr_token, amount, target, 1) # TODO: fill in identifier events.append(async_result) return events finished_events = [] # Start all transfers start_time = time.time() for idx, curr_token in enumerate(tokens): curr_app = apps[idx] finished = start_transfers(curr_app, curr_token, num_transfers) finished_events.extend(finished) # Wait until the transfers for all tokens are done gevent.wait(finished_events) elapsed = time.time() - start_time completed_transfers = num_transfers * len(tokens) tps = completed_transfers / elapsed print('Completed {} transfers {:.5} tps / {:.5}s'.format(completed_transfers, tps, elapsed))
def stop_and_wait(self): self.stop_event.set(True) gevent.wait(self)
def stop(self): self.stop_signal = True self.greenlets.append(self.echo_worker_greenlet) gevent.wait(self.greenlets)
def flood_to(self, subscriptions): jobs = [ gevent.spawn(self._pull_from, subscription) for subscription in subscriptions ] gevent.wait(jobs)
def main(): server = MixedTCPServer(LISTEN_PORT, SS_PORT) gevent.signal(signal.SIGTERM, server.close) gevent.signal(signal.SIGINT, server.close) server.start() gevent.wait()
def main(): args = sys.argv[1:] if len(args) != 2: sys.exit('Usage: %s source-address destination-address' % __file__) source = args[0] dest = parse_address(args[1]) server = PortForwarder(source, dest) log('Starting port forwarder %s:%s -> %s:%s', *(server.address[:2] + dest)) gevent.signal(signal.SIGTERM, server.close) gevent.signal(signal.SIGINT, server.close) server.start() gevent.wait()
def get(self, timeout=None): return gevent.wait(self._parts, timeout=timeout)
def wait(self, timeout=None): gevent.joinall(self._parts, timeout=None)
def main(sysargv=sys.argv): args = parse_args(sysargv[1:]) print(args) jobs = [gevent.spawn(convert, args.file, args.stdin, i) for i in range(args.count)] gevent.wait(jobs)
def amqp_consume(self): """Connect to Hub Server and set up and start AMQP consumer""" # define callback queue self.queue = self.chan.queue_declare(exclusive=True).queue self.chan.queue_bind(self.queue, self.exchange, self.queue) self.chan.basic_consume(self.queue, callback=self.amqp_handle_msg, no_ack=True) log.debug("%s: Initialized amqp connection, channel, queue.", self.lbl) # send rpc request self.worker_id = None self.correlation_id = uuid.uuid4().hex reply_to = self.queue routing_key = '%s.worker.%s' % (self.key, self.worker_type) msg = amqp.Message(json.dumps(self.worker_kwargs), correlation_id=self.correlation_id, reply_to=reply_to, content_type='application/json') self.amqp_send_msg(msg, routing_key) log.info("%s: sent RPC request, will wait for response.", self.lbl) # wait for rpc response try: while not self.worker_id: log.debug("%s: Waiting for RPC response.", self.lbl) self.chan.wait() except BaseException as exc: log.error("%s: Amqp consumer received %r while waiting for RPC " "response. Stopping.", self.lbl, exc) log.info("%s: Finished waiting for RPC response.", self.lbl) super(HubClient, self).amqp_consume()
def _send(self, command, payload=None): # send rpc request if self.correlation_id: raise Exception("Can't send second request while already waiting.") self.response = None self.correlation_id = uuid.uuid4().hex routing_key = '%s.%s' % (self.key, command) msg = amqp.Message(json.dumps(payload), correlation_id=self.correlation_id, reply_to=self.queue, content_type='application/json') log.debug("Sending AMQP msg with routing key '%s' and body %r.", routing_key, msg.body) self.chan.basic_publish(msg, self.exchange, routing_key) log.info("Sent RPC request, will wait for response.") # wait for rpc response try: while self.correlation_id: log.debug("Waiting for RPC response.") self.chan.wait() except BaseException as exc: log.error("Amqp consumer received %r while waiting for RPC " "response. Stopping.", exc) log.info("Finished waiting for RPC response.") response = self.response self.response = None return response
def test_component_spec(): assert GenerateTestData.get_spec() == { 'name': 'tests.components/GenerateTestData', 'description': '"Generates stream of packets under control of a counter', 'inPorts': [ { 'addressable': False, 'description': '', 'id': 'wait', 'required': False, 'type': 'bang' }, { 'addressable': False, 'default': 1, 'description': 'Count of packets to be generated', 'id': 'COUNT', 'required': False, 'schema': {'type': 'int'} } ], 'outPorts': [ { 'addressable': False, 'description': '', 'id': 'done', 'required': False, 'type': 'bang' }, { 'addressable': False, 'description': 'Generated stream', 'id': 'OUT', 'required': False, 'schema': {'type': 'string'} } ], 'subgraph': False }
def serve_runtime(runtime=None, host=DEFAULTS['host'], port=DEFAULTS['port'], registry_host=DEFAULTS['registry_host'], registry_port=DEFAULTS['registry_port']): runtime = runtime if runtime is not None else Runtime() address = 'ws://{}:{:d}'.format(host, port) def runtime_application_task(): """ This greenlet runs the websocket server that responds to remote commands that inspect/manipulate the Runtime. """ print('Runtime listening at {}'.format(address)) WebSocketRuntimeApplication.runtimes[port] = runtime try: r = geventwebsocket.Resource( OrderedDict([('/', WebSocketRuntimeApplication)])) s = geventwebsocket.WebSocketServer(('', port), r) s.serve_forever() finally: WebSocketRuntimeApplication.runtimes.pop(port) def local_registration_task(): """ This greenlet will run the rill registry to register the runtime with the ui. """ from rill.registry import serve_registry serve_registry(registry_host, registry_port, host, port) tasks = [runtime_application_task, local_registration_task] # Start! gevent.wait([gevent.spawn(t) for t in tasks])
def _yield_to_others(sleep): if any( [gevent.monkey.is_module_patched(mod) for mod in ["socket", "subprocess"]]): gevent.wait(timeout=sleep) else: time.sleep(sleep)
def _run(self): # in_cpubound_thread is sentinel to prevent double thread dispatch thread_ctx = threading.local() thread_ctx.in_cpubound_thread = True try: self.in_async = gevent.get_hub().loop.async() self.in_q_has_data = gevent.event.Event() self.in_async.start(self.in_q_has_data.set) while not self.stopping: if not self.in_q: # wait for more work self.in_q_has_data.clear() self.in_q_has_data.wait() continue # arbitrary non-preemptive service discipline can go here # FIFO for now, but we should experiment with others jobid, func, args, kwargs = self.in_q.popleft() start_time = arrow.now() try: with db.cleanup_session(): self.results[jobid] = func(*args, **kwargs) except Exception as e: log.exception("Exception raised in cpubound_thread:") self.results[jobid] = self._Caught(e) finished_time = arrow.now() run_delta = finished_time - start_time log.d("Function - '{}'\n".format(func.__name__), "\tRunning time: {}\n".format(run_delta), "\tJobs left:", len(self.in_q), ) self.out_q.append(jobid) self.out_async.send() except BaseException: self._error() # this may always halt the server process
def apply(self, func, args, kwargs): done = gevent.event.Event() self.in_q.append((done, func, args, kwargs)) while not self.in_async: gevent.sleep(0.01) # poll until worker thread has initialized self.in_async.send() done.wait() res = self.results[done] del self.results[done] if isinstance(res, self._Caught): raise res.err return res
def _notify(self): try: while not self.stopping: if not self.out_q: # wait for jobs to complete self.out_q_has_data.clear() self.out_q_has_data.wait() continue self.out_q.popleft().set() except BaseException: self._error()
def get(self, block=True, timeout=None): if not self._value == self.NoValue: return self._value if block: gevent.wait([self._future], timeout) self._value = self._future.get() else: self._value = self._future.get(block, timeout) return self._value
def retry_with_recovery( protocol, data, receiver_address, event_stop, event_healthy, event_unhealthy, backoff): """ Send data while the node is healthy until it's acknowledged. Note: backoff must be an infinite iterator, otherwise this task will become a hot loop. """ # The underlying unhealthy will be cleared, care must be taken to properly # clear stop_or_unhealthy too. stop_or_unhealthy = event_first_of( event_stop, event_unhealthy, ) acknowledged = False while not event_stop.is_set() and not acknowledged: # Packets must not be sent to an unhealthy node, nor should the task # wait for it to become available if the message has been acknowledged. if event_unhealthy.is_set(): wait_recovery( event_stop, event_healthy, ) # Assume wait_recovery returned because unhealthy was cleared and # continue execution, this is safe to do because event_stop is # checked below. stop_or_unhealthy.clear() if event_stop.is_set(): return acknowledged = retry( protocol, data, receiver_address, # retry will stop when this event is set, allowing this task to # wait for recovery when the node becomes unhealthy or to quit if # the stop event is set. stop_or_unhealthy, # Intentionally reusing backoff to restart from the last # timeout/number of iterations. backoff, ) return acknowledged
def test_latency(apps, tokens, num_transfers, amount): def start_transfers(idx, curr_token, num_transfers): curr_app = apps[idx] graph = curr_app.raiden.token_to_channelgraph[curr_token] all_paths = graph.get_paths_of_length( source=curr_app.raiden.address, num_hops=2, ) path = all_paths[0] target = path[-1] finished = gevent.event.Event() def _transfer(): api = curr_app.raiden.api for i in range(num_transfers): async_result = api.transfer_async( curr_token, amount, target, 1 # TODO: fill in identifier ) async_result.wait() finished.set() gevent.spawn(_transfer) return finished finished_events = [] # Start all transfers start_time = time.time() for idx, curr_token in enumerate(tokens): finished = start_transfers(idx, curr_token, num_transfers) finished_events.append(finished) # Wait until the transfers for all tokens are done gevent.wait(finished_events) elapsed = time.time() - start_time completed_transfers = num_transfers * len(tokens) tps = completed_transfers / elapsed print('Completed {} transfers. tps:{:.5} latency:{:.5} time:{:.5}s'.format( completed_transfers, tps, elapsed / completed_transfers, elapsed, ))
def processItem(args, contentArgs=None): blogger = EasyBlogger(args.clientid, args.secret, args.blogid, args.url) try: if args.command == "post": newPost = blogger.post(args.title, args.content or args.file, args.labels, args.filters, isDraft=not args.publish, fmt=args.format) postId = newPost['id'] logger.debug("Created post: %s", postId) if contentArgs: contentArgs.updateFileWithPostId(postId) print(newPost['url']) if args.command == 'delete': logger.debug("Deleting post: %s", args.postIds) for postId in args.postIds: blogger.deletePost(postId) if args.command == 'update': logger.debug("Updating post: %s", args.postId) updated = blogger.updatePost( args.postId, args.title, args.content or args.file, args.labels, args.filters, isDraft=not args.publish, fmt=args.format) print(updated['url']) if args.command == "get": if args.postId: posts = blogger.getPosts(postId=args.postId) elif args.query: posts = blogger.getPosts( query=args.query, maxResults=args.count) elif args.u: posts = blogger.getPosts( url=args.u) else: posts = blogger.getPosts( labels=args.labels, maxResults=args.count) jobs = [gevent.spawn(printPosts, item, args.fields, args.doc, args.tofiles) for item in posts] gevent.wait(jobs) except AccessTokenRefreshError: # The AccessTokenRefreshError exception is raised if the credentials # have been revoked by the user or they have expired. print('The credentials have been revoked or expired, please re-run' ' the application to re-authorize') return -1 return 0
def _wait(self): gevent.wait()
def main(filename): print('Importing {}'.format(filename)) group_singular = { 'conifers': [ 'conifer', 'plant', 'land plant', 'botany'], 'reptiles': [ 'reptile', 'animal', 'cold blood', 'cold bloded', 'vertebrate', 'fauna'], 'turtles (non-marine)': [ 'turtle', 'animal', 'non-marine', 'cold blood', 'cold bloded', 'vertebrate', 'fauna'], 'butterflies': [ 'butterfly', 'animal', 'insect', 'moths and butterflies', 'fauna', 'invertebrate'], 'dragonflies': [ 'dragonfly', 'animal', 'insect', 'dragonflies and damseflies', 'invertebrate', 'fauna'], 'mammals': [ 'mammal', 'animal', 'warm blood', 'warm blooded', 'vertebrate', 'fauna'], 'birds': [ 'bird', 'animal', 'warm blood', 'warm blooded', 'vertebrate', 'fauna'], 'amphibians': [ 'amfibian', 'animal', 'vertebrate', 'fauna'], 'sphingid moths': [ 'sphingid moth', 'moth', 'animal', 'insect', 'invertebrate', 'fauna', 'moths and butterflies'], 'bumblebees': [ 'bumblebee', 'bee', 'bees', 'animal', 'insect', 'invertebrate'], } with open(filename, newline='') as f: count = 0 # "Scientific Name","Common Name","Family","Taxonomic Group" for row in csv.reader(f, delimiter=',', quotechar='"'): count += 1 common = row[1] if common == 'null': common = row[0] gevent.spawn( post_species, row[0], common, [row[2], row[3]] + group_singular[row[3].lower()]) if count >= 100: gevent.wait() count = 0 gevent.wait() return 0
def test_pickle(graph): graph.add_component("Generate", GenerateTestData, COUNT=5) passthru = graph.add_component("Pass", SlowPass, DELAY=0.1) count = graph.add_component("Counter", Counter) dis1 = graph.add_component("Discard1", Discard) dis2 = graph.add_component("Discard2", Discard) graph.connect("Generate.OUT", "Pass.IN") graph.connect("Pass.OUT", "Counter.IN") graph.connect("Counter.COUNT", "Discard1.IN") graph.connect("Counter.OUT", "Discard2.IN") net = Network(graph) netrunner = gevent.spawn(net.go) try: with gevent.Timeout(.35) as timeout: gevent.wait([netrunner]) except gevent.Timeout: print(count.execute) assert count.count == 4 assert dis2.values == ['000005', '000004', '000003', '000002'] import pickle # dump before terminating to get the runner statuses data = pickle.dumps(net) # FIXME: do we need to auto-terminate inside wait_for_all if there is an error? net.terminate() net.wait_for_all() # gevent.wait([netrunner]) # this causes more packets to be sent. no good. net2 = pickle.loads(data) assert net2.graph.component('Counter').count == 4 assert net2.graph.component('Discard2').values == ['000005', '000004', '000003', '000002'] net2.go(resume=True) assert net2.graph.component('Counter').count == 5 assert net2.graph.component('Discard2').values == ['000005', '000004', '000003', '000002', '000001'] # FIXME: test the case where a packet is lost due to being shut-down # packet counting should catch it. to test, use this component, which # can be killed while it sleeps holding a packet: # @component # @outport("OUT") # @inport("IN") # @inport("DELAY", type=float, required=True) # def SlowPass(IN, DELAY, OUT): # """ # Pass a stream of packets to an output stream with a delay between packets # """ # delay = DELAY.receive_once() # for p in IN: # time.sleep(delay) # OUT.send(p)