Python gevent 模块,spawn_later() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用gevent.spawn_later()

项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
    if isAllowed(event, allowed_again):  # Not called recently, call it now
        called(event)
        # print "Calling now"
        return gevent.spawn(func, *args, **kwargs)
    else:  # Called recently, schedule it for later
        time_left = allowed_again - max(0, time.time() - called_db[event])
        log.debug("Added to queue (%.2fs left): %s " % (time_left, event))
        if not queue_db.get(event):  # Function call not queued yet
            thread = gevent.spawn_later(time_left, lambda: callQueue(event))  # Call this function later
            queue_db[event] = (func, args, kwargs, thread)
            return thread
        else:  # Function call already queued, just update the parameters
            thread = queue_db[event][3]
            queue_db[event] = (func, args, kwargs, thread)
            return thread


# Rate limit and delay function call if needed
# Return: Wait for execution/delay then return value
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def updateEndpoints( endpointActors, nextUpdate ):
    global currentEndpoints
    endpointActors.forceRefresh()
    responses = endpointActors.requestFromAll( 'report' )

    newEndpoints = Set()
    while responses.waitForResults( timeout = 10 ):
        for response in responses.getNewResults():
            if response.isSuccess and 'address' in response.data and 'port' in response.data:
                newEndpoints.add( ( response.data[ 'address' ], response.data[ 'port' ] ) )
        if responses.isFinished(): break

    currentEndpoints = newEndpoints

    tmpUpdate = nextUpdate
    if 0 == len( currentEndpoints ):
        tmpUpdate = 5

    print( "Updated list of endpoints, found %s" % len( currentEndpoints ) )
    gevent.spawn_later( tmpUpdate, updateEndpoints, endpointActors, nextUpdate )
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def pollBackendAvailability( isOneOff = True ):
    global IS_BACKEND_AVAILABLE
    aid = AgentId( '0.0.0.0.0' )
    aid.org_id = ADMIN_OID
    res = model.request( 'list_sensors', { 'aid' : aid }, timeout = 2 )
    res2 = identmanager.request( 'get_org_info', { 'include_all' : True } )
    if res.isSuccess and res2.isSuccess:
        IS_BACKEND_AVAILABLE = True
        print( 'Backend available' )
        if not isOneOff:
            gevent.spawn_later( 10, pollBackendAvailability, isOneOff = False )
    else:
        IS_BACKEND_AVAILABLE = False
        print( 'Backend unavailable' )
        if not isOneOff:
            gevent.spawn_later( 2, pollBackendAvailability, isOneOff = False )
项目:lib9    作者:Jumpscale    | 项目源码 | 文件源码
def restartTimers(self):
        now = time.time()
        for key, alert in self.rediscl.hgetall('alerts').items():
            alert = self.getAlert(key)
            if alert['state'] in ('RESOLVED', 'UNRESOLVED'):
                self.rediscl.hdel('alerts', key)
            else:
                alerttime = self.getStateTime(alert)
                if not alerttime:
                    self.rediscl.hdel('alerts', key)
                    continue
                epoch = alert['epoch'] or alert['lasttime']
                remainingtime = (epoch + alerttime) - now
                if remainingtime > 0:
                    self.logger.info("Schedule escalation in %ss for state %s" % (remainingtime, alert['state']))
                    self.timers[alert['guid']] = gevent.spawn_later(remainingtime, self.escalateHigher, alert)
                else:
                    self.escalateHigher(alert)
项目:reddit-service-websockets    作者:reddit    | 项目源码 | 文件源码
def _quiesce(self, environ, bypass_auth=False):
        """Set service state to quiesced and shed existing connections."""
        if not bypass_auth and not self._authorized_to_quiesce(environ):
            raise UnauthorizedError

        # Delay shedding to allow service deregistration after quiescing
        shed_delay_secs = 30

        if not self.quiesced:
            self.quiesced = True
            total_conns = len(self.connections)
            # Note: There's still a small chance that we miss connections
            #   that came in before we set to quiesced but are
            #   still being established.
            conns = self.connections.copy()

            # Shed shed_rate_per_sec connections every second
            #   after service deregistration delay.
            cur_iter_sec = 0
            for remaining in xrange(total_conns, 0, -self.shed_rate_per_sec):
                cur_iter_sec += 1
                # Check if fewer than shed_rate_per_sec conns left
                #   in set so there's no over-popping.
                if remaining >= self.shed_rate_per_sec:
                    num_conns = self.shed_rate_per_sec
                else:
                    num_conns = remaining
                gevent.spawn_later(cur_iter_sec + shed_delay_secs,
                                   self._shed_connections,
                                   [conns.pop() for j in xrange(num_conns)])

            # Terminate the service after shedding
            termination_delay_secs = 10
            gevent.spawn_later(shed_delay_secs + cur_iter_sec +
                               termination_delay_secs,
                               self._shutdown)
项目:Pyrlang    作者:esl    | 项目源码 | 文件源码
def schedule(delay, func, *args, **kw_args):
    """ Spawns a greenlet with args periodically """
    gevent.spawn_later(0, func, *args, **kw_args)
    gevent.spawn_later(delay, schedule, delay, func, *args, **kw_args)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def send(self, sender, host_port, bytes_):
        self.track_send(sender, host_port, bytes_)
        receive_end = self.transports[host_port].receive
        gevent.spawn_later(0.00000000001, receive_end, bytes_)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_spawn_later_greenlet(self):
        # a greenlet will have a context if the tracer is used even
        # if it's spawned later
        def greenlet():
            self.tracer.get_call_context()

        g = gevent.spawn_later(0.01, greenlet)
        g.join()
        ctx = getattr(g, '__datadog_context', None)
        ok_(ctx is not None)
        eq_(0, len(ctx._trace))
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_trace_later_greenlet(self):
        # a greenlet can be traced using the trace API
        def greenlet():
            with self.tracer.trace('greenlet') as span:
                span.resource = 'base'

        gevent.spawn_later(0.01, greenlet).join()
        traces = self.tracer.writer.pop_traces()
        eq_(1, len(traces))
        eq_(1, len(traces[0]))
        eq_('greenlet', traces[0][0].name)
        eq_('base', traces[0][0].resource)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_trace_spawn_later_multiple_greenlets_multiple_traces(self):
        # multiple greenlets must be part of the same trace
        def entrypoint():
            with self.tracer.trace('greenlet.main') as span:
                span.resource = 'base'
                jobs = [gevent.spawn_later(0.01, green_1), gevent.spawn_later(0.01, green_2)]
                gevent.joinall(jobs)

        def green_1():
            with self.tracer.trace('greenlet.worker') as span:
                span.set_tag('worker_id', '1')
                gevent.sleep(0.01)

        def green_2():
            with self.tracer.trace('greenlet.worker') as span:
                span.set_tag('worker_id', '2')
                gevent.sleep(0.01)

        gevent.spawn(entrypoint).join()
        traces = self.tracer.writer.pop_traces()
        eq_(3, len(traces))
        eq_(1, len(traces[0]))
        parent_span = traces[2][0]
        worker_1 = traces[0][0]
        worker_2 = traces[1][0]
        # check spans data and hierarchy
        eq_(parent_span.name, 'greenlet.main')
        eq_(parent_span.resource, 'base')
        eq_(worker_1.get_tag('worker_id'), '1')
        eq_(worker_1.name, 'greenlet.worker')
        eq_(worker_1.resource, 'greenlet.worker')
        eq_(worker_1.parent_id, parent_span.span_id)
        eq_(worker_2.get_tag('worker_id'), '2')
        eq_(worker_2.name, 'greenlet.worker')
        eq_(worker_2.resource, 'greenlet.worker')
        eq_(worker_2.parent_id, parent_span.span_id)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def test_trace_concurrent_spawn_later_calls(self):
        # create multiple futures so that we expect multiple
        # traces instead of a single one, even if greenlets
        # are delayed
        def greenlet():
            with self.tracer.trace('greenlet'):
                gevent.sleep(0.01)

        jobs = [gevent.spawn_later(0.01, greenlet) for x in range(100)]
        gevent.joinall(jobs)

        traces = self.tracer.writer.pop_traces()
        eq_(100, len(traces))
        eq_(1, len(traces[0]))
        eq_('greenlet', traces[0][0].name)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def _replace(g_class):
    """
    Utility function that replace the gevent Greenlet class with the given one.
    """
    # replace the original Greenlet class with the new one
    gevent.greenlet.Greenlet = g_class

    # replace gevent shortcuts
    gevent.Greenlet = gevent.greenlet.Greenlet
    gevent.spawn = gevent.greenlet.Greenlet.spawn
    gevent.spawn_later = gevent.greenlet.Greenlet.spawn_later
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_timeout(self):
            a,b = self.create_bound_pair()
            g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
            timeout = gevent.Timeout(0.1)
            timeout.start()
            self.assertRaises(gevent.Timeout, b.recv)
            g.kill()
项目:disco    作者:b1naryth1ef    | 项目源码 | 文件源码
def spawn_later(self, delay, *args, **kwargs):
        return self.spawn_wrap(functools.partial(gevent.spawn_later, delay), *args, **kwargs)
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def _connect( self ):
        try:
            self._socket = gevent.ssl.wrap_socket( gevent.socket.socket( gevent.socket.AF_INET, 
                                                                 gevent.socket.SOCK_STREAM ), 
                                           cert_reqs = gevent.ssl.CERT_NONE )
            self._socket.connect( ( self._destServer, self._destPort ) )
            self._log( "Connected" )
            headers = rSequence()
            headers.addSequence( Symbols.base.HCP_IDENT, AgentId( ( self._oid, self._iid, self._sid, self._plat, self._arch ) ).toJson() )
            headers.addStringA( Symbols.base.HOST_NAME, hashlib.md5( str( self._sid ) ).hexdigest() )
            headers.addIpv4( Symbols.base.IP_ADDRESS, "%d.%d.%d.%d" % ( random.randint( 0, 254 ), 
                                                                        random.randint( 0, 254 ), 
                                                                        random.randint( 0, 254 ), 
                                                                        random.randint( 0, 254 ) ) )
            if self._enrollmentToken is not None:
                headers.addBuffer( Symbols.hcp.ENROLLMENT_TOKEN, self._enrollmentToken )
            self._sendFrame( HcpModuleId.HCP, [ headers ], timeout = 30, isNotHbs = True )
            self._log( "Handshake sent" )
            self._threads.add( gevent.spawn( self._recvThread ) )
            self._threads.add( gevent.spawn_later( 1, self._syncHcpThread ) )
            self._threads.add( gevent.spawn_later( 10, self._syncHbsThread ) )
            self._threads.add( gevent.spawn_later( 2, lambda: self._connectedEvent.set() ) )
            return True
        except:
            self._log( "Failed to connect over TLS: %s" % traceback.format_exc() )
            return False
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def _syncHbsThread( self ):
        self._doHbsSync()
        self._threads.add( gevent.spawn_later( 60 * 5, self._syncHbsThread ) )
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def _syncHcpThread( self ):
        self._doHcpSync()
        self._threads.add( gevent.spawn_later( 60 * 10, self._syncHcpThread ) )

    ###########################################################################
    #   SEND AND RECEIVE DATA STUFF
    ###########################################################################
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def _generateEvent( self, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ):
        if self._connectedEvent.wait( 0 ):
            if upToNEvents is None or 0 != upToNEvents:
                if upToNEvents is not None:
                    upToNEvents -= 1
                try:
                    messages = next( eventGenerator )
                except StopIteration:
                    self._log( "Scheduled event generator failed to generate, ignoring it in the future." )
                    return
            else:
                return

            if type( messages ) not in ( tuple, list ):
                messages = ( messages, )

            self._sendFrame( HcpModuleId.HBS, messages, timeout = 30 )

        if not self._stopEvent.wait( 0 ):
            nextEvent = everyNSeconds
            if 0 != plusOrMinusNSeconds:
                nextEvent += random.randint( -plusOrMinusNSeconds, plusOrMinusNSeconds )
            if 0 > nextEvent:
                nextEvent = 0
            self._threads.add( gevent.spawn_later( nextEvent, self._generateEvent, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ) )

    ###########################################################################
    #   PUBLIC FUNCTIONALITY
    ###########################################################################
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def scheduleEvent( self, everyNSeconds, eventGenerator, plusOrMinusNSeconds = 0, upToNEvents = None ):
        self._threads.add( gevent.spawn_later( 0, self._generateEvent, everyNSeconds, eventGenerator, plusOrMinusNSeconds, upToNEvents ) )

    ###########################################################################
    #   MAIN
    ###########################################################################
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def __init__( self, maxQps, cbLog = None ):
        self._maxQps = maxQps
        self._q = gevent.queue.Queue()
        self._log = cbLog
        self._transmitted = 0
        self._lastWait = time.time()
        self._isRunning = True
        self._threads = gevent.pool.Group()
        self._threads.add( gevent.spawn_later( 0, self._sendLoop ) )
        self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def _resetStats( self ):
        if self._isRunning:
            self._transmitted = 0
            self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def pollOutageState():
    global IS_OUTAGE_ON
    info = deployment.request( 'get_global_config', {} )
    if info.isSuccess:
        IS_OUTAGE_ON = False if info.data[ 'global/outagestate' ] == '0' else info.data[ 'global/outagetext' ]
    gevent.spawn_later( 30, pollOutageState )
项目:django-rest-framework-reactive    作者:genialis    | 项目源码 | 文件源码
def spawn_later(self, seconds, function, *args, **kwargs):
        """Spawn a new gevent greenlet later."""
        return gevent.spawn_later(seconds, function, *args, **kwargs)
项目:felix    作者:axbaretto    | 项目源码 | 文件源码
def on_datamodel_in_sync(self):
        if not self._cleanup_done:
            # Datamodel in sync for the first time.  Give the managers some
            # time to finish processing, then trigger cleanup.
            self._cleanup_done = True
            _log.info("No cleanup scheduled, scheduling one.")
            gevent.spawn_later(self.config.STARTUP_CLEANUP_DELAY,
                               functools.partial(self._do_cleanup,
                                                 async=True))
        self._cleanup_done = True
项目:felix    作者:axbaretto    | 项目源码 | 文件源码
def on_datamodel_in_sync(self):
        if not self._cleanup_done:
            # Datamodel in sync for the first time.  Give the managers some
            # time to finish processing, then trigger cleanup.
            self._cleanup_done = True
            _log.info("No cleanup scheduled, scheduling one.")
            gevent.spawn_later(self.config.STARTUP_CLEANUP_DELAY,
                               functools.partial(self._do_cleanup,
                                                 async=True))
        self._cleanup_done = True
项目:lib9    作者:Jumpscale    | 项目源码 | 文件源码
def makeTimer(self, alert):
        greenlet = self.timers.get(alert['guid'])
        if greenlet is not None:
            scheduledalert = greenlet.args[0]
            if scheduledalert['state'] != alert['state']:
                self.logger.info("Removing schedule for alert %s" % scheduledalert['state'])
                greenlet.kill()
            else:
                return

        delay = self.getStateTime(alert)
        if delay:
            self.logger.info("Schedule escalation in %ss for state %s" % (delay, alert['state']))
            self.timers[alert['guid']] = gevent.spawn_later(delay, self.escalateHigher, alert)
项目:dissonance    作者:jhgg    | 项目源码 | 文件源码
def do_sed(message):
    if message.channel not in channels:
        return

    try:
        regex, replacement, flags, target = parse_sed(message.content[1:])
    except ValueError:
        return

    try:
        c = re.compile(regex, flags & 127)
    except re.error as e:
        return

    g = gevent.getcurrent()

    def raiseKeyboardInterrupt(s, i):
        print("timing out!", g)
        gevent.spawn(message.reply, 'fk off with ur evil regex bro')
        g.throw(gevent.GreenletExit)

    # ## We install a signal handler, to timeout the regular expression match if it's taking too long, i.e. evil regexp
    # ##  s/^(a+)+$/rip/
    old_sighandler = signal.signal(signal.SIGALRM, raiseKeyboardInterrupt)
    signal.setitimer(signal.ITIMER_REAL, 0.05)
    try:
        m = c.search
        q = channels[message.channel]
        for i in range(-1, -len(q) - 1, -1):
            nick, line = q[i]
            if m(line) and (not target or nick.lower() == target):
                q[i] = nick, doTranslation(c.sub(replacement, line, 0 if flags & 0x100 else 1)[:400], flags)
                gevent.spawn_later(0, message.reply, '*%s*: %s' % (nick, q[i][1]))
                break

    except re.error as e:
        return

    finally:
        ### Restore original handlers.
        signal.setitimer(signal.ITIMER_REAL, 0)
        signal.signal(signal.SIGALRM, old_sighandler)
项目:felix    作者:axbaretto    | 项目源码 | 文件源码
def _finish_msg_batch(self, batch, results):
        if not self._config.REPORT_ENDPOINT_STATUS:
            _log.warning("StatusReporter called even though status reporting "
                         "disabled.  Ignoring.")
            self._endpoint_status[IPV4].clear()
            self._endpoint_status[IPV6].clear()
            self._newer_dirty_endpoints.clear()
            self._older_dirty_endpoints.clear()
            return

        if self._cleanup_pending:
            try:
                self._attempt_cleanup()
            except EtcdException as e:
                _log.error("Cleanup failed: %r", e)
                _stats.increment("Status report cleanup failed")
            else:
                _stats.increment("Status report cleanup done")
                self._cleanup_pending = False

        if self._reporting_allowed:
            # We're not rate limited, go ahead and do a write to etcd.
            _log.debug("Status reporting is allowed by rate limit.")
            if not self._older_dirty_endpoints and self._newer_dirty_endpoints:
                _log.debug("_older_dirty_endpoints empty, promoting"
                           "_newer_dirty_endpoints")
                self._older_dirty_endpoints = self._newer_dirty_endpoints
                self._newer_dirty_endpoints = set()
            if self._older_dirty_endpoints:
                ep_id = self._older_dirty_endpoints.pop()
                status_v4 = self._endpoint_status[IPV4].get(ep_id)
                status_v6 = self._endpoint_status[IPV6].get(ep_id)
                status = combine_statuses(status_v4, status_v6)
                try:
                    self._write_endpoint_status_to_etcd(ep_id, status)
                except EtcdException:
                    _log.exception("Failed to report status for %s, will "
                                   "retry", ep_id)
                    # Add it into the next dirty set.  Retrying in the next
                    # batch ensures that we try to update all of the dirty
                    # endpoints before we do any retries, ensuring fairness.
                    self._newer_dirty_endpoints.add(ep_id)
                # Reset the rate limit flag.
                self._reporting_allowed = False

        if not self._timer_scheduled and ((not self._reporting_allowed) or
                                          self._cleanup_pending):
            # Schedule a timer to stop our rate limiting or retry cleanup.
            timeout = self._config.ENDPOINT_REPORT_DELAY
            timeout *= (0.9 + (random.random() * 0.2))  # Jitter by +/- 10%.
            gevent.spawn_later(timeout,
                               self._on_timer_pop,
                               async=True)
            self._timer_scheduled = True