我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用gevent.sleep()。
def pump_messages(self): """Maintain a connection to the broker and handle incoming frames. This will never return, so it should be run from a separate greenlet. """ while True: try: self._connect() LOG.info("connected") while self.connected: LOG.debug("pumping") self.connection.read_frames() gevent.sleep() except socket.error as exception: LOG.warning("connection failed: %s", exception) gevent.sleep(1)
def connect(self) -> bool: """ Establish a long running connection to EPMD, will not return until the connection has been established. :return: True """ while True: try: print("EPMD: Connecting %s:%d" % (self.host_, self.port_)) host_port = (self.host_, self.port_) self.sock_ = socket.create_connection(address=host_port, timeout=5.0) break # the connect loop except socket.error as err: print("EPMD: connection error:", err) gevent.sleep(5) print("EPMD: Socket connected") return True
def worker(pattern,q): try: num = pattern["page_range"] for i in range(len(pattern["url"])): index = pattern["url"][i].find("%d") if index == -1: get_and_check(pattern["url"][i],pattern,q) gevent.sleep(10) continue for j in range(1,num+1): url = pattern["url"][i] % j #log.debug("PID:%d url:%s" % (os.getpid(),url)) get_and_check(url,pattern,q) gevent.sleep(10) except Exception as e: log.error("PID:%d proxy error:%s " % (os.getpid(),e))
def heartbeat(self): """Heartbeat function Every hearbeat_interval seconds, runs registered functions. This will capture unhandled exceptions and report them. """ # Keep beating unless the WSGI worker is shutting down while self.is_alive(): logger.debug('thump') self._heartbeat_beat_once() gevent.sleep(self.heartbeat_interval) logger.info('App stopped, so stopping heartbeat.') # We're at worker shutdown, so beat until all registered lifers are ok # with us shutting down while any([fun() for fun in _registered_lifers]): logger.debug('thump (finishing up)') self._heartbeat_beat_once() # Faster beat so we can shutdown sooner gevent.sleep(1) logger.info('Everything completed.')
def test_exitcode_previous_to_join(self): p = start_process(lambda: gevent.sleep(SHORTTIME)) # Assume that the child process is still alive when the next # line is executed by the interpreter (there is no guarantee # for that, but it's rather likely). assert p.exitcode is None # Expect the child watcher mechanism to pick up # and process the child process termination event # (within at most two seconds). The `gevent.sleep()` # invocations allow for libev event loop iterations, # two of which are required after the OS delivers the # SIGCHLD signal to the parent process: one iteration # invokes the child reap loop, and the next invokes # the libev callback associated with the termination # event. deadline = time.time() + 2 while time.time() < deadline: if p.exitcode is not None: assert p.exitcode == 0 p.join() return gevent.sleep(ALMOSTZERO) raise Exception('Child termination not detected')
def test_lock_out_of_context_single(self): r, w = pipe() g = gevent.spawn(lambda r: r.get(), r) gevent.sleep(SHORTTIME) with raises(GIPCLocked): with r: pass # The context manager can't close `r`, as it is locked in `g`. g.kill(block=False) # Ensure killing via 'context switch', i.e. yield control to other # coroutines (otherwise the subsequent close attempt will fail with # `GIPCLocked` error). gevent.sleep(-1) # Close writer first. otherwise, `os.close(r._fd)` would block on Win. w.close() r.close()
def test_lock_out_of_context_single(self): h1, h2 = pipe(True) g = gevent.spawn(lambda h: h.get(), h1) gevent.sleep(SHORTTIME) with raises(GIPCLocked): with h1: pass # Can't close h1 reader on exit, as it is locked in `g`. g.kill(block=False) # Ensure killing via 'context switch', i.e. yield control to other # coroutines (otherwise the subsequent close attempt may fail with # `GIPCLocked` error). gevent.sleep(-1) h2.close() # Closes read and write handles of h2. assert h1._writer._closed assert not h1._reader._closed h1.close() # Closes read handle, ignore that writer is already closed. assert h1._reader._closed
def test_lock_out_of_context_pair(self): with raises(GIPCLocked): with pipe(True) as (h1, h2): # Write more to pipe than pipe buffer can hold # (makes `put` block when there is no reader). # Buffer is quite large on Windows. gw = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1) gevent.sleep(SHORTTIME) # Context manager tries to close h2 reader, h2 writer, and # h1 writer first. Fails upon latter, must still close # h1 reader after that. assert not h1._writer._closed assert h1._reader._closed assert h2._writer._closed assert h2._reader._closed # Kill greenlet (free lock on h1 writer), close h1 writer. gw.kill(block=False) gevent.sleep(-1) h1.close() assert h1._writer._closed
def test_lock_out_of_context_pair_3(self): with raises(GIPCLocked): with pipe(True) as (h1, h2): gr1 = gevent.spawn(lambda h: h.get(), h1) gr2 = gevent.spawn(lambda h: h.get(), h2) gevent.sleep(SHORTTIME) # Context succeeds closing h2 writer, fails upon closing h2 # reader. Proceeds closing h1 writer, succeeds, closes h1 # reader and fails. assert not h2._reader._closed assert not h1._reader._closed assert h2._writer._closed assert h1._writer._closed gr1.kill(block=False) gr2.kill(block=False) gevent.sleep(-1) h2.close() h1.close()
def test_lock_out_of_context_pair_4(self): with raises(GIPCLocked): with pipe(True) as (h1, h2): # Write more to pipe than pipe buffer can hold # (makes `put` block when there is no reader). # Buffer is quite large on Windows. gw1 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1) gw2 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h2) gevent.sleep(SHORTTIME) # Context fails closing h2 writer, succeeds upon closing h2 # reader. Proceeds closing h1 writer, fails, closes h1 # reader and succeeds. assert h2._reader._closed assert h1._reader._closed assert not h2._writer._closed assert not h1._writer._closed gw1.kill(block=False) gw2.kill(block=False) gevent.sleep(-1) h2.close() h1.close()
def privateInterpreter(self): """Trivial interpreter implementation, sends command to plac interpreter""" logging.info("Starting plain interpreter") char = line = '' try: while char != '\x1b': # \x1b = escape character char = getChar() if char: line += char line += sys.stdin.readline() if '\n' in line: self.interpreter.execute([line[:-1]], verbose=True) # '[:-1]' to omit '\n' char line = '' sys.stdout.write(">") sys.stdout.flush() gevent.sleep(0.1) except KeyboardInterrupt: pass
def _stop_client(self): """Best effort to stop the client.""" try: # Make sure not to mistake this scenario with failing to stop # client. if self._client is None: log.info("Kazoo client is None.") return _retry((Exception,), tries=3, delay=1, backoff=2, sleep_func=gevent.sleep)(self._client.stop)() log.info("Successfully stopped kazoo client.") except (Exception, gevent.Timeout): self._sc.increment("errors.zk.client.stop.failure", tags={'host': hostname}, sample_rate=1) log.exception("Failed to stop kazoo client.")
def test_serverset_destroy(self): testutil.initialize_kazoo_client_manager(ZK_HOSTS) client = KazooClientManager().get_client() server_set = ServerSet(ServerSetTestCase.SERVER_SET_DESTROY_PATH, ZK_HOSTS, waiting_in_secs=0.01) server_set.join(ServerSetTestCase.PORT_1, use_ip=False) server_set.join(ServerSetTestCase.PORT_2, use_ip=False) # Give time to let server set join to do its magic. gevent.sleep(1) server_set._destroy(ServerSetTestCase.END_POINT_1) gevent.sleep(1) children = client.get_children( ServerSetTestCase.SERVER_SET_DESTROY_PATH) for child in children: self.assertFalse(child.endswith(ServerSetTestCase.END_POINT_1))
def __init__(self, handler=SequentialGeventHandler(), hosts=None): self.handler = handler self.hosts = hosts self._state = KazooState.LOST self._listeners = [] self.Party = partial(Party, self) self.ShallowParty = partial(ShallowParty, self) self.retry = KazooRetry( max_tries=3, delay=0.0, backoff=1, max_jitter=0.0, sleep_func=gevent.sleep ) self.ChildrenWatch = partial(ChildrenWatch, self) self.DataWatch = partial(DataWatch, self) self._children_watches = {} self._data_watches = {}
def _holddown_queue_wiper(): # A greenlet which wakes up every X seconds to clean up # messages in the holddown queue. It either drops the event # or put the event back to the _NOTIFICATION_EVENT_QUEUE. while True: gevent.sleep(_NOTIFICATION_HOLDDOWN_WIPER_SLEEP_INTERVAL_IN_SECONDS) while not _NOTIFICATION_HOLDDOWN_QUEUE.empty(): (zk_path, command, value, version, max_wait_in_secs, watch_type, notification_timestamp) \ = _NOTIFICATION_HOLDDOWN_QUEUE.get() if (zk_path not in _PATH_TO_DATA) or notification_timestamp \ >= _PATH_TO_DATA[zk_path]['notification_timestamp']: _NOTIFICATION_EVENT_QUEUE.put( (zk_path, command, value, version, max_wait_in_secs, watch_type, notification_timestamp) )
def _check_local_session_state(): global _ZK_SESSION_ID while True: client = _kazoo_client() log.info("Current zk session id %s", client._session_id) if _ZK_SESSION_ID is None: _ZK_SESSION_ID = client._session_id elif _ZK_SESSION_ID != client._session_id: log.warning("Zookeeper session changes from %s to %s", _ZK_SESSION_ID,client._session_id) since_start = datetime.datetime.utcnow() - _START_TIME if since_start.total_seconds()>180: _kill("Restart since ZK session changes") gevent.sleep(60) ######################################################### ####### Funcs dealing with MetaConfig/Dependencies ###### #########################################################
def test_gevent1(self): """????????????""" def foo(): _log.info('Running in foo') gevent.sleep(0) _log.info('Explicit context switch to foo again') def bar(): _log.info('Explicit context to bar') gevent.sleep(0) _log.info('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
def test_greenlet(self): """??????Greenlet????""" class MyGreenlet(gevent.Greenlet): def __init__(self, message, n): super(MyGreenlet, self).__init__() self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n) g1 = MyGreenlet("Hi there111!", 1) g1.start() g2 = MyGreenlet("Hi there222!", 2) g2.start() gevent.joinall([g1, g2]) # def test_shutdown(self): # def run_forever(): # _log.info('run_forever start..') # gevent.sleep(1000) # gevent.signal(signal.SIGQUIT, gevent.kill) # thread = gevent.spawn(run_forever) # thread.join()
def test_event(self): """????event???????????""" evt = Event() def setter(): '''After 3 seconds, wake all threads waiting on the value of evt''' _log.info('A: Hey wait for me, I have to do something') gevent.sleep(3) _log.info("Ok, I'm done") evt.set() def waiter(): '''After 3 seconds the get call will unblock''' _log.info("I'll wait for you") evt.wait() # blocking _log.info("It's about time") gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter) ])
def chat(ws): """Relay chat messages to and from clients. """ # Subscribe to messages on the specified channel. channel = request.args.get('channel') lag_tolerance_secs = float(request.args.get('tolerance', 0.1)) chat_backend.subscribe(ws, channel) # Send heartbeat ping every 30s # so Heroku won't close the connection gevent.spawn(chat_backend.heartbeat, ws) while not ws.closed: # Sleep to prevent *constant* context-switches. gevent.sleep(lag_tolerance_secs) # Publish messages from client message = ws.receive() if message is not None: channel, data = message.split(':', 1) conn.publish(channel, data)
def wait_for_sync_etherscan(blockchain_service, url, tolerance, sleep): local_block = blockchain_service.client.block_number() etherscan_block = etherscan_query_with_retries(url, sleep) syncing_str = 'Syncing ... Current: {} / Target: ~{}' if local_block >= etherscan_block - tolerance: return print('Waiting for the ethereum node to synchronize. [Use ^C to exit]') print(syncing_str.format(local_block, etherscan_block), end='') for i in count(): sys.stdout.flush() gevent.sleep(sleep) local_block = blockchain_service.client.block_number() # update the oracle block number sparsely to not spam the server if local_block >= etherscan_block or i % 50 == 0: etherscan_block = etherscan_query_with_retries(url, sleep) if local_block >= etherscan_block - tolerance: return print(CLEARLINE + CURSOR_STARTLINE, end='') print(syncing_str.format(local_block, etherscan_block), end='')
def wait_for_sync_rpc_api(blockchain_service, sleep): if blockchain_service.is_synced(): return print('Waiting for the ethereum node to synchronize [Use ^C to exit].') for i in count(): if i % 3 == 0: print(CLEARLINE + CURSOR_STARTLINE, end='') print('.', end='') sys.stdout.flush() gevent.sleep(sleep) if blockchain_service.is_synced(): return
def check_node_connection(func): """ A decorator to reconnect if the connection to the node is lost.""" def retry_on_disconnect(self, *args, **kwargs): for i, timeout in enumerate(timeout_two_stage(10, 3, 10)): try: result = func(self, *args, **kwargs) if i > 0: log.info('Client reconnected') return result except (requests.exceptions.ConnectionError, InvalidReplyError): log.info( 'Timeout in eth client connection to {}. Is the client offline? Trying ' 'again in {}s.'.format(self.transport.endpoint, timeout) ) gevent.sleep(timeout) return retry_on_disconnect
def test_healthcheck_with_unconnected_node(raiden_network, nat_keepalive_timeout): """ Nodes start at the unknown state. """ app0, app1 = raiden_network # pylint: disable=unbalanced-tuple-unpacking address0 = app0.raiden.address address1 = app1.raiden.address assert app0.raiden.protocol.nodeaddresses_networkstatuses[address1] == NODE_NETWORK_UNKNOWN assert app1.raiden.protocol.nodeaddresses_networkstatuses[address0] == NODE_NETWORK_UNKNOWN app0.raiden.start_health_check_for(address1) gevent.sleep(nat_keepalive_timeout) assert app0.raiden.protocol.nodeaddresses_networkstatuses[address1] == NODE_NETWORK_REACHABLE assert app1.raiden.protocol.nodeaddresses_networkstatuses[address0] == NODE_NETWORK_UNKNOWN
def test_healthcheck_with_bad_peer(raiden_network, nat_keepalive_retries, nat_keepalive_timeout): """ If the Ping messages are not answered, the node must be set to unreachable. """ app0, app1 = raiden_network # pylint: disable=unbalanced-tuple-unpacking address0 = app0.raiden.address address1 = app1.raiden.address assert app0.raiden.protocol.nodeaddresses_networkstatuses[address1] == NODE_NETWORK_REACHABLE assert app1.raiden.protocol.nodeaddresses_networkstatuses[address0] == NODE_NETWORK_REACHABLE # Drop all Ping and Ack messages app0.raiden.protocol.transport.droprate = 1 app1.raiden.protocol.transport.droprate = 1 gevent.sleep( (nat_keepalive_retries + 2) * nat_keepalive_timeout ) assert app0.raiden.protocol.nodeaddresses_networkstatuses[address1] == NODE_NETWORK_UNREACHABLE assert app1.raiden.protocol.nodeaddresses_networkstatuses[address0] == NODE_NETWORK_UNREACHABLE
def geth_create_account(datadir, privkey): """ Create an account in `datadir` -- since we're not interested in the rewards, we don't care about the created address. Args: datadir (str): the datadir in which the account is created """ keyfile_path = os.path.join(datadir, 'keyfile') with open(keyfile_path, 'w') as handler: handler.write(hexlify(privkey)) create = subprocess.Popen( ['geth', '--datadir', datadir, 'account', 'import', keyfile_path], stdin=subprocess.PIPE, universal_newlines=True ) create.stdin.write(DEFAULT_PASSPHRASE + os.linesep) time.sleep(.1) create.stdin.write(DEFAULT_PASSPHRASE + os.linesep) create.communicate() assert create.returncode == 0
def send(self, sender, host_port, bytes_): # even dropped packages have to go through throttle_policy gevent.sleep(self.throttle_policy.consume(1)) if self.droprate: drop = self.network.counter % self.droprate == 0 else: drop = False if not drop: self.network.send(sender, host_port, bytes_) else: # since this path wont go to super.send we need to call track # ourselves self.network.track_send(sender, host_port, bytes_) log.debug( 'dropped packet', sender=pex(sender), counter=self.network.counter, msghash=pex(sha3(bytes_)) )
def wait_for_listening_port(port_number, tries=10, sleep=0.1, pid=None): if pid is None: pid = os.getpid() for _ in range(tries): gevent.sleep(sleep) # macoOS requires root access for the connections api to work # so get connections of the current process only connections = psutil.Process(pid).connections() for conn in connections: if conn.status == 'LISTEN' and conn.laddr[1] == port_number: return raise RuntimeError('{port} is not bound'.format(port=port_number)) # TODO: Figure out why this fixture can't work as session scoped # What happens is that after one test is done, in the next one # the server is no longer running even though the teardown has not # been invoked.
def echo_worker(self): """ The `echo_worker` works through the `self.received_transfers` queue and spawns `self.on_transfer` greenlets for all not-yet-seen transfers. """ log.debug('echo worker', qsize=self.received_transfers.qsize()) while self.stop_signal is None: if self.received_transfers.qsize() > 0: transfer = self.received_transfers.get() if transfer in self.seen_transfers: log.debug( 'duplicate transfer ignored', initiator=pex(transfer['initiator']), amount=transfer['amount'], identifier=transfer['identifier'] ) else: self.seen_transfers.append(transfer) self.greenlets.append(gevent.spawn(self.on_transfer, transfer)) else: gevent.sleep(.5)
def synchronous(): # ?????? from gevent.event import Event evt = Event() def setter(): print('A: Hey wait for me, I have to do something') gevent.sleep(3) print('Ok, I\'m done') evt.set() def waiter(): print('I\'ll wait for you') evt.wait() print('It\'s about time') gevent.joinall([gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter) ])
def _election(self): for retry in range(constant.Election.MAX_RETRY): try: self._locker.acquire( blocking=False, lock_ttl=constant.Election.LOCKER_TTL, timeout=constant.Election.TIMEOUT ) except etcd.EtcdLockExpired as e: log.warn(e) except Exception as e: log.warn(e) else: # May got locker break gevent.sleep(constant.Election.LOCK_INTERVAL)
def main(): while True: service = sel.get_service(5) print service if service: sock = socket.socket() try: sock.connect(tuple(service.addr.values())) sock.send('ping') print sock.recv(1024) except Exception as e: print e pass gevent.sleep(1)
def _publish(self): """ Start coroutine for publish. :return: """ for retry in range(constant.ETCD_RECONNECT_MAX_RETRY_INIT): try: co = gevent.spawn(self._publish_handler) co.join(constant.ETCD_CONNECT_TIMEOUT) e = co.exception if e: # if _publish_handler raise some exception, reraise it. raise e else: co.kill() except (etcd.EtcdConnectionFailed, gevent.Timeout): log.info('Connect to etcd failed, Retry(%d)...', retry) gevent.sleep(constant.ETCD_RECONNECT_INTERVAL) else: log.info('Publish OK.') break else: # publish failed raise err.OctpEtcdConnectError('Max attempts exceeded.')
def gevent_run(app, monkey_patch=True, start=True, debug=False, **kwargs): # pragma: no cover """Run your app in gevent.spawn, run simple loop if start == True :param app: queues.Microservice instance :param monkey_patch: boolean, use gevent.monkey.patch_all() for patching standard modules, default: True :param start: boolean, if True, server will be start (simple loop) :param kwargs: other params for WSGIServer(**kwargs) :return: server """ if monkey_patch: from gevent import monkey monkey.patch_all() import gevent gevent.spawn(app.run, debug=debug, **kwargs) if start: while not app.stopped: gevent.sleep(0.1)
def _timer(self): """ will remember time every 1/10 sec """ while True: # self.epochbin=struct.pack("I",time.time()) self.epoch = time.time() gevent.sleep(0.1) # def _taskSchedulerTimer(self): # """ # every 4 seconds check maintenance queue # """ # while True: # gevent.sleep(5) # self.scheduler.check(self.epoch)
def test_propagation_with_new_context(self): # create multiple futures so that we expect multiple # traces instead of a single one ctx = Context(trace_id=100, span_id=101) self.tracer.context_provider.activate(ctx) def greenlet(): with self.tracer.trace('greenlet') as span: gevent.sleep(0.01) jobs = [gevent.spawn(greenlet) for x in range(1)] gevent.joinall(jobs) traces = self.tracer.writer.pop_traces() eq_(1, len(traces)) eq_(1, len(traces[0])) eq_(traces[0][0].trace_id, 100) eq_(traces[0][0].parent_id, 101)
def sleep(cls, seconds=0): ''' Tell the CursedWindow's greenlet to sleep for seconds. This should be used to allow other CursedWindow's greenlets to execute, especially if you have long running code in your ``update`` classmethod. This is purely a restriction imposed by gevent, the concurrency library used for cursed. It is not truly parallel, so one long running greenlet can lock up execution of other windows. Calling cls.sleep() even with zero seconds (default) will allow other greenlets to start execution again. There is no benefit to calling sleep with a number other than zero. Zero will allow other greenlets to take over just fine. :param seconds: seconds to sleep. default zero is fine. ''' return gevent.sleep(seconds)
def _input_loop(self): while self.running: for cw in self.windows: if cw.THREAD.exception is not None: for cw in self.windows: cw.RUNNING = False self.running = False break if cw.RUNNING and cw.WAIT: break else: self.running = False break gevent.sleep(0) c = self.window.getch() if c == -1: continue for cw in self.windows: cw.KEY_EVENTS.put(c)
def _watch_filter(self, eventname, function, params=None): while True: try: filter = self._proxy.on(eventname, params) filter.watch(function) logger.info('Connected to filter for {}'.format(eventname)) return filter except socket.timeout as err: logger.warning('Timeout in filter creation, try to reconnect: ' + str(err)) gevent.sleep(reconnect_interval) except socket.error as err: logger.warning('Socketerror in filter creation, try to reconnect:' + str(err)) gevent.sleep(reconnect_interval) except ValueError as err: logger.warning('ValueError in filter creation, try to reconnect:' + str(err)) gevent.sleep(reconnect_interval)
def ban_command(bot, command): print print 'Ban command:' print command try: name = command.split(' ')[1] except (IndexError, ValueError): return bot.send_lobby_message('Banning %s in...' % name) for i in range(5, 0, -1): gevent.sleep(1) bot.send_lobby_message('%d' % i) gevent.sleep(1) bot.send_lobby_message('JUST A PRANK!')
def checkAlive(self,ip,port,protocol): testUrl = "https://www.baidu.com/" req_timeout = 3 cookies = urllib2.HTTPCookieProcessor() proxyHost = "" if protocol == 'HTTP' or protocol == 'HTTPS': proxyHost = {"http":r'http://%s:%s' % (ip, port)} #print proxyHost proxyHandler = urllib2.ProxyHandler(proxyHost) opener = urllib2.build_opener(cookies, proxyHandler) opener.addheaders = [('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')] try: req = opener.open(testUrl, timeout=req_timeout) result = req.read() #print result gevent.sleep(2) return True except urllib2.HTTPError as e: print e.message return False
def test_timeout(): from takumi.service import ServiceHandler, ApiMap, Context import gevent app = ServiceHandler('TestService', soft_timeout=0, hard_timeout=1) class UnknownException(Exception): def __init__(self, exc): self.exc = exc @app.handle_system_exception def system_exception(tp, value, tb): exc = UnknownException(value) return UnknownException, exc, tb @app.api def timeout(): gevent.sleep(2) api_map = ApiMap(app, Context({'client_addr': 'localhost', 'meta': {}})) with pytest.raises(UnknownException) as exc: api_map.timeout() assert str(exc.value.exc) == 'Timeout after 1 seconds'
def listen(self, namespace, max_timeout): """Register to listen to a namespace and yield messages as they arrive. If no messages arrive within `max_timeout` seconds, this will yield a `None` to allow clients to do periodic actions like send PINGs. This will run forever and yield items as an iterable. Use it in a loop and break out of it when you want to deregister. """ queue = gevent.queue.Queue() namespace = namespace.rstrip("/") for ns in _walk_namespace_hierarchy(namespace): self.consumers.setdefault(ns, []).append(queue) try: while True: # jitter the timeout a bit to ensure we don't herd timeout = max_timeout - random.uniform(0, max_timeout / 2) try: yield queue.get(block=True, timeout=timeout) except gevent.queue.Empty: yield None # ensure we're not starving others by spinning gevent.sleep() finally: for ns in _walk_namespace_hierarchy(namespace): self.consumers[ns].remove(queue) if not self.consumers[ns]: del self.consumers[ns]
def test_subscribe(): e = Observer() print '000',getcurrent() getcurrent().in_another_greenlet = in_another_greenlet b = e.subscribe('kill',getcurrent().in_another_greenlet) gevent.sleep(5) print 'END' b.unsubscribe()
def test_wait(): e = Observer() ev = e.wait('kill') try: gevent.sleep(3) except FiredEvent: print 'Fired!' else: print 'Not Fired!' finally: ev.cancel()
def fire_event(): e2 = Observer() gevent.sleep(2) e2.fire('kill')
def connect(self, node) -> bool: """ Looks up EPMD daemon and connects to it trying to discover other Erlang nodes. """ while True: if self.epmd_.connect(): return self.epmd_.alive2(self) gevent.sleep(5)
def _run(self): while not self.is_exiting_: self.handle_inbox() gevent.sleep(0.0)
def dist_command(self, receiver_node: str, message: tuple) -> None: """ Locate the connection to the given node (a string). Place a tuple crafted by the caller into message box for Erlang distribution socket. It will pick up and handle the message whenever possible. :param receiver_node: Name of a remote node :param message: A crafted tuple with command name and some more values """ if receiver_node not in self.dist_nodes_: LOG("Node: connect to node", receiver_node) handler = self.dist_.connect_to_node( this_node=self, remote_node=receiver_node) if handler is None: raise NodeException("Node not connected %s" % receiver_node) # block until connected, and get the connected message LOG("Node: wait for 'node_connected'") # msg = self.inbox_.receive_wait( # filter_fn=lambda m: m[0] == 'node_connected' # ) while receiver_node not in self.dist_nodes_: gevent.sleep(0.1) LOG("Node: connected") conn = self.dist_nodes_[receiver_node] conn.inbox_.put(message)