我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用gevent.spawn_later()。
def callAsync(event, allowed_again=10, func=None, *args, **kwargs): if isAllowed(event, allowed_again): # Not called recently, call it now called(event) # print "Calling now" return gevent.spawn(func, *args, **kwargs) else: # Called recently, schedule it for later time_left = allowed_again - max(0, time.time() - called_db[event]) log.debug("Added to queue (%.2fs left): %s " % (time_left, event)) if not queue_db.get(event): # Function call not queued yet thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later queue_db[event] = (func, args, kwargs, thread) return thread else: # Function call already queued, just update the parameters thread = queue_db[event][3] queue_db[event] = (func, args, kwargs, thread) return thread # Rate limit and delay function call if needed # Return: Wait for execution/delay then return value
def updateEndpoints( endpointActors, nextUpdate ): global currentEndpoints endpointActors.forceRefresh() responses = endpointActors.requestFromAll( 'report' ) newEndpoints = Set() while responses.waitForResults( timeout = 10 ): for response in responses.getNewResults(): if response.isSuccess and 'address' in response.data and 'port' in response.data: newEndpoints.add( ( response.data[ 'address' ], response.data[ 'port' ] ) ) if responses.isFinished(): break currentEndpoints = newEndpoints tmpUpdate = nextUpdate if 0 == len( currentEndpoints ): tmpUpdate = 5 print( "Updated list of endpoints, found %s" % len( currentEndpoints ) ) gevent.spawn_later( tmpUpdate, updateEndpoints, endpointActors, nextUpdate )
def pollBackendAvailability( isOneOff = True ): global IS_BACKEND_AVAILABLE aid = AgentId( '0.0.0.0.0' ) aid.org_id = ADMIN_OID res = model.request( 'list_sensors', { 'aid' : aid }, timeout = 2 ) res2 = identmanager.request( 'get_org_info', { 'include_all' : True } ) if res.isSuccess and res2.isSuccess: IS_BACKEND_AVAILABLE = True print( 'Backend available' ) if not isOneOff: gevent.spawn_later( 10, pollBackendAvailability, isOneOff = False ) else: IS_BACKEND_AVAILABLE = False print( 'Backend unavailable' ) if not isOneOff: gevent.spawn_later( 2, pollBackendAvailability, isOneOff = False )
def restartTimers(self): now = time.time() for key, alert in self.rediscl.hgetall('alerts').items(): alert = self.getAlert(key) if alert['state'] in ('RESOLVED', 'UNRESOLVED'): self.rediscl.hdel('alerts', key) else: alerttime = self.getStateTime(alert) if not alerttime: self.rediscl.hdel('alerts', key) continue epoch = alert['epoch'] or alert['lasttime'] remainingtime = (epoch + alerttime) - now if remainingtime > 0: self.logger.info("Schedule escalation in %ss for state %s" % (remainingtime, alert['state'])) self.timers[alert['guid']] = gevent.spawn_later(remainingtime, self.escalateHigher, alert) else: self.escalateHigher(alert)
def _quiesce(self, environ, bypass_auth=False): """Set service state to quiesced and shed existing connections.""" if not bypass_auth and not self._authorized_to_quiesce(environ): raise UnauthorizedError # Delay shedding to allow service deregistration after quiescing shed_delay_secs = 30 if not self.quiesced: self.quiesced = True total_conns = len(self.connections) # Note: There's still a small chance that we miss connections # that came in before we set to quiesced but are # still being established. conns = self.connections.copy() # Shed shed_rate_per_sec connections every second # after service deregistration delay. cur_iter_sec = 0 for remaining in xrange(total_conns, 0, -self.shed_rate_per_sec): cur_iter_sec += 1 # Check if fewer than shed_rate_per_sec conns left # in set so there's no over-popping. if remaining >= self.shed_rate_per_sec: num_conns = self.shed_rate_per_sec else: num_conns = remaining gevent.spawn_later(cur_iter_sec + shed_delay_secs, self._shed_connections, [conns.pop() for j in xrange(num_conns)]) # Terminate the service after shedding termination_delay_secs = 10 gevent.spawn_later(shed_delay_secs + cur_iter_sec + termination_delay_secs, self._shutdown)
def schedule(delay, func, *args, **kw_args): """ Spawns a greenlet with args periodically """ gevent.spawn_later(0, func, *args, **kw_args) gevent.spawn_later(delay, schedule, delay, func, *args, **kw_args)
def test_timeout(self): a,b = self.create_bound_pair() g = gevent.spawn_later(0.5, lambda: a.send(b'hi')) timeout = gevent.Timeout(0.1) timeout.start() self.assertRaises(gevent.Timeout, b.recv) g.kill()
def send(self, sender, host_port, bytes_): self.track_send(sender, host_port, bytes_) receive_end = self.transports[host_port].receive gevent.spawn_later(0.00000000001, receive_end, bytes_)
def test_spawn_later_greenlet(self): # a greenlet will have a context if the tracer is used even # if it's spawned later def greenlet(): self.tracer.get_call_context() g = gevent.spawn_later(0.01, greenlet) g.join() ctx = getattr(g, '__datadog_context', None) ok_(ctx is not None) eq_(0, len(ctx._trace))
def test_trace_later_greenlet(self): # a greenlet can be traced using the trace API def greenlet(): with self.tracer.trace('greenlet') as span: span.resource = 'base' gevent.spawn_later(0.01, greenlet).join() traces = self.tracer.writer.pop_traces() eq_(1, len(traces)) eq_(1, len(traces[0])) eq_('greenlet', traces[0][0].name) eq_('base', traces[0][0].resource)
def test_trace_spawn_later_multiple_greenlets_multiple_traces(self): # multiple greenlets must be part of the same trace def entrypoint(): with self.tracer.trace('greenlet.main') as span: span.resource = 'base' jobs = [gevent.spawn_later(0.01, green_1), gevent.spawn_later(0.01, green_2)] gevent.joinall(jobs) def green_1(): with self.tracer.trace('greenlet.worker') as span: span.set_tag('worker_id', '1') gevent.sleep(0.01) def green_2(): with self.tracer.trace('greenlet.worker') as span: span.set_tag('worker_id', '2') gevent.sleep(0.01) gevent.spawn(entrypoint).join() traces = self.tracer.writer.pop_traces() eq_(3, len(traces)) eq_(1, len(traces[0])) parent_span = traces[2][0] worker_1 = traces[0][0] worker_2 = traces[1][0] # check spans data and hierarchy eq_(parent_span.name, 'greenlet.main') eq_(parent_span.resource, 'base') eq_(worker_1.get_tag('worker_id'), '1') eq_(worker_1.name, 'greenlet.worker') eq_(worker_1.resource, 'greenlet.worker') eq_(worker_1.parent_id, parent_span.span_id) eq_(worker_2.get_tag('worker_id'), '2') eq_(worker_2.name, 'greenlet.worker') eq_(worker_2.resource, 'greenlet.worker') eq_(worker_2.parent_id, parent_span.span_id)
def test_trace_concurrent_spawn_later_calls(self): # create multiple futures so that we expect multiple # traces instead of a single one, even if greenlets # are delayed def greenlet(): with self.tracer.trace('greenlet'): gevent.sleep(0.01) jobs = [gevent.spawn_later(0.01, greenlet) for x in range(100)] gevent.joinall(jobs) traces = self.tracer.writer.pop_traces() eq_(100, len(traces)) eq_(1, len(traces[0])) eq_('greenlet', traces[0][0].name)
def _replace(g_class): """ Utility function that replace the gevent Greenlet class with the given one. """ # replace the original Greenlet class with the new one gevent.greenlet.Greenlet = g_class # replace gevent shortcuts gevent.Greenlet = gevent.greenlet.Greenlet gevent.spawn = gevent.greenlet.Greenlet.spawn gevent.spawn_later = gevent.greenlet.Greenlet.spawn_later
def spawn_later(self, delay, *args, **kwargs): return self.spawn_wrap(functools.partial(gevent.spawn_later, delay), *args, **kwargs)
def _connect( self ): try: self._socket = gevent.ssl.wrap_socket( gevent.socket.socket( gevent.socket.AF_INET, gevent.socket.SOCK_STREAM ), cert_reqs = gevent.ssl.CERT_NONE ) self._socket.connect( ( self._destServer, self._destPort ) ) self._log( "Connected" ) headers = rSequence() headers.addSequence( Symbols.base.HCP_IDENT, AgentId( ( self._oid, self._iid, self._sid, self._plat, self._arch ) ).toJson() ) headers.addStringA( Symbols.base.HOST_NAME, hashlib.md5( str( self._sid ) ).hexdigest() ) headers.addIpv4( Symbols.base.IP_ADDRESS, "%d.%d.%d.%d" % ( random.randint( 0, 254 ), random.randint( 0, 254 ), random.randint( 0, 254 ), random.randint( 0, 254 ) ) ) if self._enrollmentToken is not None: headers.addBuffer( Symbols.hcp.ENROLLMENT_TOKEN, self._enrollmentToken ) self._sendFrame( HcpModuleId.HCP, [ headers ], timeout = 30, isNotHbs = True ) self._log( "Handshake sent" ) self._threads.add( gevent.spawn( self._recvThread ) ) self._threads.add( gevent.spawn_later( 1, self._syncHcpThread ) ) self._threads.add( gevent.spawn_later( 10, self._syncHbsThread ) ) self._threads.add( gevent.spawn_later( 2, lambda: self._connectedEvent.set() ) ) return True except: self._log( "Failed to connect over TLS: %s" % traceback.format_exc() ) return False
def _syncHbsThread( self ): self._doHbsSync() self._threads.add( gevent.spawn_later( 60 * 5, self._syncHbsThread ) )
def _syncHcpThread( self ): self._doHcpSync() self._threads.add( gevent.spawn_later( 60 * 10, self._syncHcpThread ) ) ########################################################################### # SEND AND RECEIVE DATA STUFF ###########################################################################
def _generateEvent( self, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ): if self._connectedEvent.wait( 0 ): if upToNEvents is None or 0 != upToNEvents: if upToNEvents is not None: upToNEvents -= 1 try: messages = next( eventGenerator ) except StopIteration: self._log( "Scheduled event generator failed to generate, ignoring it in the future." ) return else: return if type( messages ) not in ( tuple, list ): messages = ( messages, ) self._sendFrame( HcpModuleId.HBS, messages, timeout = 30 ) if not self._stopEvent.wait( 0 ): nextEvent = everyNSeconds if 0 != plusOrMinusNSeconds: nextEvent += random.randint( -plusOrMinusNSeconds, plusOrMinusNSeconds ) if 0 > nextEvent: nextEvent = 0 self._threads.add( gevent.spawn_later( nextEvent, self._generateEvent, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ) ) ########################################################################### # PUBLIC FUNCTIONALITY ###########################################################################
def scheduleEvent( self, everyNSeconds, eventGenerator, plusOrMinusNSeconds = 0, upToNEvents = None ): self._threads.add( gevent.spawn_later( 0, self._generateEvent, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ) ) ########################################################################### # MAIN ###########################################################################
def __init__( self, maxQps, cbLog = None ): self._maxQps = maxQps self._q = gevent.queue.Queue() self._log = cbLog self._transmitted = 0 self._lastWait = time.time() self._isRunning = True self._threads = gevent.pool.Group() self._threads.add( gevent.spawn_later( 0, self._sendLoop ) ) self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
def _resetStats( self ): if self._isRunning: self._transmitted = 0 self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
def pollOutageState(): global IS_OUTAGE_ON info = deployment.request( 'get_global_config', {} ) if info.isSuccess: IS_OUTAGE_ON = False if info.data[ 'global/outagestate' ] == '0' else info.data[ 'global/outagetext' ] gevent.spawn_later( 30, pollOutageState )
def spawn_later(self, seconds, function, *args, **kwargs): """Spawn a new gevent greenlet later.""" return gevent.spawn_later(seconds, function, *args, **kwargs)
def on_datamodel_in_sync(self): if not self._cleanup_done: # Datamodel in sync for the first time. Give the managers some # time to finish processing, then trigger cleanup. self._cleanup_done = True _log.info("No cleanup scheduled, scheduling one.") gevent.spawn_later(self.config.STARTUP_CLEANUP_DELAY, functools.partial(self._do_cleanup, async=True)) self._cleanup_done = True
def makeTimer(self, alert): greenlet = self.timers.get(alert['guid']) if greenlet is not None: scheduledalert = greenlet.args[0] if scheduledalert['state'] != alert['state']: self.logger.info("Removing schedule for alert %s" % scheduledalert['state']) greenlet.kill() else: return delay = self.getStateTime(alert) if delay: self.logger.info("Schedule escalation in %ss for state %s" % (delay, alert['state'])) self.timers[alert['guid']] = gevent.spawn_later(delay, self.escalateHigher, alert)
def do_sed(message): if message.channel not in channels: return try: regex, replacement, flags, target = parse_sed(message.content[1:]) except ValueError: return try: c = re.compile(regex, flags & 127) except re.error as e: return g = gevent.getcurrent() def raiseKeyboardInterrupt(s, i): print("timing out!", g) gevent.spawn(message.reply, 'fk off with ur evil regex bro') g.throw(gevent.GreenletExit) # ## We install a signal handler, to timeout the regular expression match if it's taking too long, i.e. evil regexp # ## s/^(a+)+$/rip/ old_sighandler = signal.signal(signal.SIGALRM, raiseKeyboardInterrupt) signal.setitimer(signal.ITIMER_REAL, 0.05) try: m = c.search q = channels[message.channel] for i in range(-1, -len(q) - 1, -1): nick, line = q[i] if m(line) and (not target or nick.lower() == target): q[i] = nick, doTranslation(c.sub(replacement, line, 0 if flags & 0x100 else 1)[:400], flags) gevent.spawn_later(0, message.reply, '*%s*: %s' % (nick, q[i][1])) break except re.error as e: return finally: ### Restore original handlers. signal.setitimer(signal.ITIMER_REAL, 0) signal.signal(signal.SIGALRM, old_sighandler)
def _finish_msg_batch(self, batch, results): if not self._config.REPORT_ENDPOINT_STATUS: _log.warning("StatusReporter called even though status reporting " "disabled. Ignoring.") self._endpoint_status[IPV4].clear() self._endpoint_status[IPV6].clear() self._newer_dirty_endpoints.clear() self._older_dirty_endpoints.clear() return if self._cleanup_pending: try: self._attempt_cleanup() except EtcdException as e: _log.error("Cleanup failed: %r", e) _stats.increment("Status report cleanup failed") else: _stats.increment("Status report cleanup done") self._cleanup_pending = False if self._reporting_allowed: # We're not rate limited, go ahead and do a write to etcd. _log.debug("Status reporting is allowed by rate limit.") if not self._older_dirty_endpoints and self._newer_dirty_endpoints: _log.debug("_older_dirty_endpoints empty, promoting" "_newer_dirty_endpoints") self._older_dirty_endpoints = self._newer_dirty_endpoints self._newer_dirty_endpoints = set() if self._older_dirty_endpoints: ep_id = self._older_dirty_endpoints.pop() status_v4 = self._endpoint_status[IPV4].get(ep_id) status_v6 = self._endpoint_status[IPV6].get(ep_id) status = combine_statuses(status_v4, status_v6) try: self._write_endpoint_status_to_etcd(ep_id, status) except EtcdException: _log.exception("Failed to report status for %s, will " "retry", ep_id) # Add it into the next dirty set. Retrying in the next # batch ensures that we try to update all of the dirty # endpoints before we do any retries, ensuring fairness. self._newer_dirty_endpoints.add(ep_id) # Reset the rate limit flag. self._reporting_allowed = False if not self._timer_scheduled and ((not self._reporting_allowed) or self._cleanup_pending): # Schedule a timer to stop our rate limiting or retry cleanup. timeout = self._config.ENDPOINT_REPORT_DELAY timeout *= (0.9 + (random.random() * 0.2)) # Jitter by +/- 10%. gevent.spawn_later(timeout, self._on_timer_pop, async=True) self._timer_scheduled = True