我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用twisted.internet.reactor.getDelayedCalls()。
def test_delayedCallCleanup(self): """Checking to make sure Sessions do not leave extra DelayedCalls. """ from twisted.internet import reactor delayedCallsBeforeSession = repr(reactor.getDelayedCalls()) session = self.site.makeSession() session.touch() session.expire() self.failUnlessEqual(delayedCallsBeforeSession, repr(reactor.getDelayedCalls())) # Conditional requests: # If-None-Match, If-Modified-Since # make conditional request: # normal response if condition succeeds # if condition fails: # response code # no body
def testGetDelayedCalls(self): if not hasattr(reactor, "getDelayedCalls"): return # This is not a race because we don't do anything which might call # the reactor until we have all the timers set up. If we did, this # test might fail on slow systems. self.checkTimers() self.addTimer(35, self.done) self.addTimer(20, self.callback) self.addTimer(30, self.callback) which = self.counter self.addTimer(29, self.callback) self.addTimer(25, self.addCallback) self.addTimer(26, self.callback) self.timers[which].cancel() del self.timers[which] self.checkTimers() self.deferred.addCallback(lambda x : self.checkTimers()) return self.deferred
def do_cleanPending(cls): # don't import reactor when module is loaded from twisted.internet import reactor # flush short-range timers reactor.iterate(0) reactor.iterate(0) pending = reactor.getDelayedCalls() if pending: s = PENDING_TIMED_CALLS_MSG for p in pending: s += " %s\n" % (p,) if p.active(): p.cancel() # delete the rest else: print "WEIRNESS! pending timed call not active+!" raise PendingTimedCallsError(s)
def test_initiallySchedulesOneDataCall(self): """ When a H2Connection is established it schedules one call to be run as soon as the reactor has time. """ reactor = task.Clock() a = H2Connection(reactor) calls = reactor.getDelayedCalls() self.assertEqual(len(calls), 1) call = calls[0] # Validate that the call is scheduled for right now, but hasn't run, # and that it's correct. self.assertTrue(call.active()) self.assertEqual(call.time, 0) self.assertEqual(call.func, a._sendPrioritisedData) self.assertEqual(call.args, ()) self.assertEqual(call.kw, {})
def checkTimers(self): l1 = self.timers.values() l2 = list(reactor.getDelayedCalls()) # There should be at least the calls we put in. There may be other # calls that are none of our business and that we should ignore, # though. missing = [] for dc in l1: if dc not in l2: missing.append(dc) if missing: self.finished = 1 self.assertFalse(missing, "Should have been missing no calls, instead " + "was missing " + repr(missing))
def _cleanPending(self): """ Cancel all pending calls and return their string representations. """ reactor = self._getReactor() # flush short-range timers reactor.iterate(0) reactor.iterate(0) delayedCallStrings = [] for p in reactor.getDelayedCalls(): if p.active(): delayedString = str(p) p.cancel() else: print("WEIRDNESS! pending timed call not active!") delayedCallStrings.append(delayedString) return delayedCallStrings
def setUp(self): self.finished = 0 self.counter = 0 self.timers = {} self.deferred = defer.Deferred() # ick. Sometimes there are magic timers already running: # popsicle.Freezer.tick . Kill off all such timers now so they won't # interfere with the test. Of course, this kind of requires that # getDelayedCalls already works, so certain failure modes won't be # noticed. if not hasattr(reactor, "getDelayedCalls"): return for t in reactor.getDelayedCalls(): t.cancel() reactor.iterate() # flush timers
def checkTimers(self): l1 = self.timers.values() l2 = list(reactor.getDelayedCalls()) # There should be at least the calls we put in. There may be other # calls that are none of our business and that we should ignore, # though. missing = [] for dc in l1: if dc not in l2: missing.append(dc) if missing: self.finished = 1 self.failIf(missing, "Should have been missing no calls, instead was missing " + repr(missing))
def test_noTimeoutIfConnectionLost(self): """ When a L{H2Connection} loses its connection it cancels its timeout. """ frameFactory = FrameFactory() frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory) initialData = frameFactory.clientConnectionPreface() initialData += b''.join(f.serialize() for f in frames) reactor, conn, transport = self.initiateH2Connection( initialData, requestFactory=DummyProducerHandler, ) sentData = transport.value() oldCallCount = len(reactor.getDelayedCalls()) # Now lose the connection. conn.connectionLost("reason") # There should be one fewer call than there was. currentCallCount = len(reactor.getDelayedCalls()) self.assertEqual(oldCallCount - 1, currentCallCount) # Advancing the clock should do nothing. reactor.advance(101) self.assertEqual(transport.value(), sentData)
def process_metrics(self): metrics = [] # Compute frequency of measurements if 'packet_time' in self.metrics and self.metrics['packet_time'] is not None: self.metrics.setdefault('packet_starttime', self.metrics.packet_time) # Convert nanos to seconds packet_duration = (self.metrics.packet_time - self.metrics.packet_starttime) / 1000.0 / 1000.0 / 1000.0 packet_duration = packet_duration or self.metrics.starttime if packet_duration != 0: packet_frequency = self.metrics.tx_count / float(packet_duration) else: packet_frequency = 0.0 metrics.append('measurements: %.02f Hz' % packet_frequency) # Reset for next round self.metrics.packet_starttime = self.metrics.packet_time # Compute frequency of transactions now = time.time() transaction_duration = now - self.metrics.starttime if transaction_duration != 0: transaction_frequency = self.metrics.tx_count / float(transaction_duration) else: transaction_frequency = 0.0 metrics.append('transactions: %.02f tps' % transaction_frequency) # Reset for next round self.metrics.tx_count = 0 self.metrics.starttime = now # Add information from the Twisted reactor pending_calls = reactor.getDelayedCalls() pending_count = len(pending_calls) #metrics.append('pending: %d' % pending_count) metrics_info = ', '.join(metrics) log.info('[{realm:12s}] {metrics_info}', realm=self.channel.realm, metrics_info=metrics_info)