我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用twisted.internet.reactor.iterate()。
def testCallInNextIteration(self): calls = [] def f1(): calls.append('f1') reactor.callLater(0.0, f2) def f2(): calls.append('f2') reactor.callLater(0.0, f3) def f3(): calls.append('f3') reactor.callLater(0, f1) self.assertEquals(calls, []) reactor.iterate() self.assertEquals(calls, ['f1']) reactor.iterate() self.assertEquals(calls, ['f1', 'f2']) reactor.iterate() self.assertEquals(calls, ['f1', 'f2', 'f3'])
def testOpenSSLBuffering(self): serverProto = self.serverProto = SingleLineServerProtocol() clientProto = self.clientProto = RecordingClientProtocol() server = protocol.ServerFactory() client = self.client = protocol.ClientFactory() server.protocol = lambda: serverProto client.protocol = lambda: clientProto client.buffer = [] sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath) cCTX = ssl.ClientContextFactory() port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1') reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX) i = 0 while i < 5000 and not client.buffer: i += 1 reactor.iterate() self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
def do_cleanPending(cls): # don't import reactor when module is loaded from twisted.internet import reactor # flush short-range timers reactor.iterate(0) reactor.iterate(0) pending = reactor.getDelayedCalls() if pending: s = PENDING_TIMED_CALLS_MSG for p in pending: s += " %s\n" % (p,) if p.active(): p.cancel() # delete the rest else: print "WEIRNESS! pending timed call not active+!" raise PendingTimedCallsError(s)
def resolve(self, names): """Resolves DNS names in parallel""" self._finished = False self.results = defaultdict(list) deferred_list = [] for name in names: for deferred in self.lookup(name): deferred.addCallback(self._extract_records, name) deferred.addErrback(self._errback, name) deferred_list.append(deferred) deferred_list = defer.DeferredList(deferred_list) deferred_list.addCallback(self._parse_result) deferred_list.addCallback(self._finish) while not self._finished: reactor.iterate() # Although the results are in at this point, we may need an extra # iteration to ensure the resolver library closes its UDP sockets reactor.iterate() return dict(self.results)
def _cleanPending(self): """ Cancel all pending calls and return their string representations. """ reactor = self._getReactor() # flush short-range timers reactor.iterate(0) reactor.iterate(0) delayedCallStrings = [] for p in reactor.getDelayedCalls(): if p.active(): delayedString = str(p) p.cancel() else: print("WEIRDNESS! pending timed call not active!") delayedCallStrings.append(delayedString) return delayedCallStrings
def setUp(self): CFTPClientTestBase.setUp(self) self.startServer() cmds = ('-p %i -l testuser ' '--known-hosts kh_test ' '--user-authentications publickey ' '--host-key-algorithms ssh-rsa ' '-K direct ' '-i dsa_test ' '-a --nocache ' '-v ' '127.0.0.1') port = self.server.getHost().port cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp') log.msg('running %s %s' % (sys.executable, cmds)) self.processProtocol = SFTPTestProcess() env = os.environ.copy() env['PYTHONPATH'] = os.pathsep.join(sys.path) reactor.spawnProcess(self.processProtocol, sys.executable, cmds, env=env) timeout = time.time() + 10 while (not self.processProtocol.buffer) and (time.time() < timeout): reactor.iterate(0.1) if time.time() > timeout: self.skip = "couldn't start process" else: self.processProtocol.clearBuffer()
def _getCmdResult(self, cmd): self.processProtocol.clearBuffer() self.processProtocol.transport.write(cmd+'\n') timeout = time.time() + 10 while (self.processProtocol.buffer.find('cftp> ') == -1) and (time.time() < timeout): reactor.iterate(0.1) self.failIf(time.time() > timeout, "timeout") if self.processProtocol.buffer.startswith('cftp> '): self.processProtocol.buffer = self.processProtocol.buffer[6:] return self.processProtocol.buffer[:-6].strip()
def wait(self): start = time.time() while time.time() - start < 1: reactor.iterate(1.0)
def setUp(self): self.finished = 0 self.counter = 0 self.timers = {} self.deferred = defer.Deferred() # ick. Sometimes there are magic timers already running: # popsicle.Freezer.tick . Kill off all such timers now so they won't # interfere with the test. Of course, this kind of requires that # getDelayedCalls already works, so certain failure modes won't be # noticed. if not hasattr(reactor, "getDelayedCalls"): return for t in reactor.getDelayedCalls(): t.cancel() reactor.iterate() # flush timers
def testActive(self): dcall = reactor.callLater(0, lambda: None) self.assertEquals(dcall.active(), 1) reactor.iterate() self.assertEquals(dcall.active(), 0)
def testScheduling(self): c = Counter() for i in range(100): self.schedule(c.add) for i in range(100): reactor.iterate() self.assertEquals(c.index, 100)
def testCorrectOrder(self): o = Order() self.schedule(o.a) self.schedule(o.b) self.schedule(o.c) reactor.iterate() reactor.iterate() reactor.iterate() self.assertEquals(o.stage, 3)
def testNotRunAtOnce(self): c = Counter() self.schedule(c.add) # scheduled tasks should not be run at once: self.assertEquals(c.index, 0) reactor.iterate() self.assertEquals(c.index, 1)
def doIterations(self, count=5): for i in range(count): reactor.iterate()
def ____wait(d): "." from twisted.internet import reactor from twisted.python import failure l = [] d.addBoth(l.append) while not l: reactor.iterate() if isinstance(l[0], failure.Failure): l[0].raiseException() return l[0]
def testHiddenException(self): """ What happens if an error is raised in a DelayedCall and an error is also raised in the test? L{test_reporter.TestErrorReporting.testHiddenException} checks that both errors get reported. Note that this behaviour is deprecated. A B{real} test would return a Deferred that got triggered by the callLater. This would guarantee the delayed call error gets reported. """ reactor.callLater(0, self.go) reactor.iterate(0.01) self.fail("Deliberate failure to mask the hidden exception")
def _send(self, data, rpcID, address): """ Transmit the specified data over UDP, breaking it up into several packets if necessary If the data is spread over multiple UDP datagrams, the packets have the following structure:: | | | | | |||||||||||| 0x00 | |Transmision|Total number|Sequence number| RPC ID |Header end| | type ID | of packets |of this packet | | indicator| | (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) | | | | | | |||||||||||| | @note: The header used for breaking up large data segments will possibly be moved out of the KademliaProtocol class in the future, into something similar to a message translator/encoder class (see C{kademlia.msgformat} and C{kademlia.encoding}). """ if len(data) > self.msgSizeLimit: # We have to spread the data over multiple UDP datagrams, and provide sequencing information # 1st byte is transmission type id, bytes 2 & 3 are the total number of packets in this transmission, bytes 4 & 5 are the sequence number for this specific packet totalPackets = len(data) / self.msgSizeLimit if len(data) % self.msgSizeLimit > 0: totalPackets += 1 encTotalPackets = chr(totalPackets >> 8) + chr(totalPackets & 0xff) seqNumber = 0 startPos = 0 while seqNumber < totalPackets: #reactor.iterate() #IGNORE:E1101 packetData = data[startPos:startPos+self.msgSizeLimit] encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff) txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData) reactor.callLater(self.maxToSendDelay*seqNumber+self.minToSendDelay, self.transport.write, txData, address) #IGNORE:E1101 startPos += self.msgSizeLimit seqNumber += 1 else: self.transport.write(data, address)
def _send(self, data, rpcID, address): """ Transmit the specified data over UDP, breaking it up into several packets if necessary If the data is spread over multiple UDP datagrams, the packets have the following structure:: | | | | | |||||||||||| 0x00 | |Transmision|Total number|Sequence number| RPC ID |Header end| | type ID | of packets |of this packet | | indicator| | (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) | | | | | | |||||||||||| | @note: The header used for breaking up large data segments will possibly be moved out of the KademliaProtocol class in the future, into something similar to a message translator/encoder class (see C{kademlia.msgformat} and C{kademlia.encoding}). """ if len(data) > self.msgSizeLimit: # We have to spread the data over multiple UDP datagrams, and provide sequencing information # 1st byte is transmission type id, bytes 2 & 3 are the total number of packets in this transmission, bytes 4 & 5 are the sequence number for this specific packet totalPackets = len(data) / self.msgSizeLimit if len(data) % self.msgSizeLimit > 0: totalPackets += 1 encTotalPackets = chr(totalPackets >> 8) + chr(totalPackets & 0xff) seqNumber = 0 startPos = 0 while seqNumber < totalPackets: #reactor.iterate() #IGNORE:E1101 packetData = data[startPos:startPos+self.msgSizeLimit] encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff) txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData) self._sendNext(txData, address) startPos += self.msgSizeLimit seqNumber += 1 else: self._sendNext(data, address)
def stopProtocol(self): """ Called when the transport is disconnected. Will only be called once, after all ports are disconnected. """ for key in self._callLaterList.keys(): try: if key > time.time(): self._callLaterList[key].cancel() except Exception, e: print e del self._callLaterList[key] #TODO: test: do we really need the reactor.iterate() call? reactor.iterate()
def testHiddenException(self): """ What happens if an error is raised in a DelayedCall and an error is also raised in the test? L{test_reporter.ErrorReportingTests.testHiddenException} checks that both errors get reported. Note that this behaviour is deprecated. A B{real} test would return a Deferred that got triggered by the callLater. This would guarantee the delayed call error gets reported. """ reactor.callLater(0, self.go) reactor.iterate(0.01) self.fail("Deliberate failure to mask the hidden exception")
def testTriggerSystemEvent1(self): l = [] l2 = [] d = Deferred() d2 = Deferred() def _returnDeferred(d=d): return d def _returnDeferred2(d2=d2): return d2 def _appendToList(l=l): l.append(1) def _appendToList2(l2=l2): l2.append(1) ## d.addCallback(lambda x: sys.stdout.write("firing d\n")) ## d2.addCallback(lambda x: sys.stdout.write("firing d2\n")) r = reactor self.addTrigger("before", "test", _appendToList) self.addTrigger("during", "test", _appendToList) self.addTrigger("after", "test", _appendToList) self.assertEquals(len(l), 0, "Nothing happened yet.") r.fireSystemEvent("test") r.iterate() self.assertEquals(len(l), 3, "Should have filled the list.") l[:]=[] self.addTrigger("before", "defer", _returnDeferred) self.addTrigger("before", "defer", _returnDeferred2) self.addTrigger("during", "defer", _appendToList) self.addTrigger("after", "defer", _appendToList) r.fireSystemEvent("defer") self.assertEquals(len(l), 0, "Event should not have fired yet.") d.callback(None) self.assertEquals(len(l), 0, "Event still should not have fired yet.") d2.callback(None) self.assertEquals(len(l), 2) l[:]=[] a = self.addTrigger("before", "remove", _appendToList) b = self.addTrigger("before", "remove", _appendToList2) self.removeTrigger(b) r.fireSystemEvent("remove") self.assertEquals(len(l), 1) self.assertEquals(len(l2), 0)