我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.internet.task.deferLater()。
def message_received(self, unwrapped_message): """ message is of type UnwrappedMessage """ delay = self._sys_rand.randint(0, self.max_delay) action = start_action( action_type=u"send delayed message", delay=delay, ) with action.context(): d = deferLater(self.reactor, delay, self.protocol.packet_proxy, unwrapped_message) DeferredContext(d).addActionFinish() self._pending_sends.add(d) def _remove(res, d=d): self._pending_sends.remove(d) return res d.addBoth(_remove)
def sendEvents(self, events): if not events: return self.state = PythonCollectionTask.STATE_SEND_EVENTS if len(events) < 1: return # Default event fields. for i, event in enumerate(events): event.setdefault('device', self.configId) event.setdefault('severity', ZenEventClasses.Info) # On CTRL-C or exit the reactor might stop before we get to this # call and generate a traceback. if reactor.running: #do in chunks of 100 to give time to reactor self._eventService.sendEvent(event) if i % 100: yield task.deferLater(reactor, 0, lambda: None)
def message_received(self, unwrapped_message): """ message is of type UnwrappedMessage """ self._batch.append(unwrapped_message) # [(destination, sphinx_packet) if len(self._batch) >= self.threshold_count: delay = self._sys_rand.randint(0, self.max_delay) action = start_action( action_type=u"send delayed message batch", delay=delay, ) with action.context(): released = self._batch self._batch = [] random.shuffle(released) d = deferLater(self.reactor, delay, self.batch_send, released) DeferredContext(d).addActionFinish() self._pending_batch_sends.add(d) def _remove(res, d=d): self._pending_batch_sends.remove(d) return res d.addBoth(_remove)
def __init__(self, host=defaultHost, port=4400, redisPort=6379, neo4jPort=7474, initialKey=None): #self.protocol = Peer(self) self.host = host self.port = port self.users = {} # maps user names to Chat instances self.redisPort = redisPort self.neo4jPort = neo4jPort if initialKey: # need test case self.commandKeys.append(initialKey) # self.redis.addToKeys(initialKey) """Add loops to factory? why not add loops to main reactor??""" defly = task.deferLater(reactor, 10, self.ping) defly.addErrback(whoops) #reactor.callLater(2, redis_test) #task.deferLater(reactor, 60, hiya).addCallback(lambda _: reactor.stop()) loop = task.LoopingCall(peerBeat) loopDeferred = loop.start(15.0) loopDeferred.addCallback(peerSuccess) loopDeferred.addErrback(peerFailure) # pylint: disable=no-self-use
def run_scan(state): circuits = ExitScan(state) url = 'https://check.torproject.org' outfile = open("exit-addresses.%s.json" % datetime.datetime.utcnow().isoformat(), 'w+') all_tasks_done = defer.Deferred() tasks = [] def pop(circuits): try: tasks.append(task.deferLater( reactor, 0, fetch, circuits.next(), url, state)) reactor.callLater(.2, pop, circuits) except StopIteration: results = defer.DeferredList(tasks) results.addCallback(save_results, outfile)\ .addCallback(lambda _: outfile.close)\ .chainDeferred(all_tasks_done) reactor.callLater(0, pop, circuits) return all_tasks_done
def start_initial(game): round_data, users_plots = get_round(game) state = 'initial' if round_data is None: game.end_time = timezone.now() game.save() game.broadcast(action='redirect', url=reverse('interactive:exit')) return else: cache.set(game.id, {'state': state, 'round_data': round_data, 'users_plots': users_plots, }) initial(game, round_data, users_plots) task.deferLater(reactor, 1, game_state_checker, game, state, round_data, users_plots).addErrback(twisted_error)
def throttled(func): """Decorator for AgentProxyMixIn.getTable to throttle requests""" def _wrapper(*args, **kwargs): self = args[0] last_request = getattr(self, '_last_request') delay = (last_request + self.throttle_delay) - time.time() setattr(self, '_last_request', time.time()) if delay > 0: _logger.debug("%sss delay due to throttling: %r", delay, self) return deferLater(reactor, delay, func, *args, **kwargs) else: return func(*args, **kwargs) return wraps(func)(_wrapper) # pylint: disable=R0903
def test_callback(self): """ The L{Deferred} returned by L{task.deferLater} is called back after the specified delay with the result of the function passed in. """ results = [] flag = object() def callable(foo, bar): results.append((foo, bar)) return flag clock = task.Clock() d = task.deferLater(clock, 3, callable, 'foo', bar='bar') d.addCallback(self.assertIs, flag) clock.advance(2) self.assertEqual(results, []) clock.advance(1) self.assertEqual(results, [('foo', 'bar')]) return d
def test_cancel(self): """ The L{Deferred} returned by L{task.deferLater} can be cancelled to prevent the call from actually being performed. """ called = [] clock = task.Clock() d = task.deferLater(clock, 1, called.append, None) d.cancel() def cbCancelled(ignored): # Make sure there are no calls outstanding. self.assertEqual([], clock.getDelayedCalls()) # And make sure the call didn't somehow happen already. self.assertFalse(called) self.assertFailure(d, defer.CancelledError) d.addCallback(cbCancelled) return d
def stopListening(self): """ Stop accepting connections on this port. This will shut down my socket and call self.connectionLost(). @return: A L{Deferred} that fires when this port has stopped. """ self.stopReading() if self.disconnecting: return self._stoppedDeferred elif self.connected: self._stoppedDeferred = task.deferLater( self.reactor, 0, self.connectionLost) self.disconnecting = True return self._stoppedDeferred else: return defer.succeed(None)
def _loopbackAsyncContinue(ignored, server, serverToClient, client, clientToServer, pumpPolicy): # Clear the Deferred from each message queue, since it has already fired # and cannot be used again. clientToServer._notificationDeferred = None serverToClient._notificationDeferred = None # Schedule some more byte-pushing to happen. This isn't done # synchronously because no actual transport can re-enter dataReceived as # a result of calling write, and doing this synchronously could result # in that. from twisted.internet import reactor return deferLater( reactor, 0, _loopbackAsyncBody, server, serverToClient, client, clientToServer, pumpPolicy)
def parse(self, response): webpage = response.webpage frame = webpage.mainFrame() name_input = frame.findFirstElement('#the-basics + div .well input') name_input.setAttribute('value', "World") # Trigger change event. name_input.evaluateJavaScript(""" var event = document.createEvent("HTMLEvents"); event.initEvent("change", false, true); this.dispatchEvent(event); """) # Let WebKit run. yield deferLater(reactor, 0, lambda: None) h1 = frame.findFirstElement('#the-basics + div .well h1') text = h1.toPlainText() returnValue([AngularJSHelloText(text=text)])
def _run_command(self, command): base64_encoded_command = base64.encodestring('{0}\r\n'.format(command)) yield self._sender.send_request( 'send', shell_id=self._shell_id, command_id=self._command_id, base64_encoded_command=base64_encoded_command) stdout = [] stderr = [] for i in xrange(_MAX_REQUESTS_PER_COMMAND): out, err = yield task.deferLater( reactor, self._READ_DELAY, self._get_output) stderr.extend(err) if not out: continue stdout.extend(out[:-1]) if out[-1] == self._prompt: break stdout.append(out[-1]) else: raise Exception("Reached max requests per command.") defer.returnValue((stdout, stderr))
def _iterate(reactor, intervals, f): """ Run a function repeatedly. :param reactor: See ``run_many_service``. :return Deferred: A deferred which fires when ``f`` fails or when ``intervals`` is exhausted. """ while True: before = reactor.seconds() yield f() after = reactor.seconds() try: interval = next(intervals) except StopIteration: break delay = max(0, interval - (after - before)) yield deferLater(reactor, delay, lambda: None)
def loop_until_success(predicate, timeout=None, message="task"): """ Call predicate every second, until it fires a non-failed Deferred, or hits the timeout. :param predicate: Callable returning termination condition. :type predicate: 0-argument callable returning a Deferred. :return: A ``Deferred`` firing with the first non-failed Deferred from ``predicate``, or, if predicate didn't fire with non-``Failure``-y thing within the timeout, returns the ``Failure``. """ d = maybeDeferred(predicate) then = time.time() def loop(failure): if timeout and time.time() - then > timeout: # propogate the failure return failure print "Retrying %s given result %r..." % (message, failure.getErrorMessage()) d = deferLater(reactor, 1.0, predicate) d.addErrback(loop) return d d.addErrback(loop) return d
def loop_until(predicate, timeout=None, message="task"): """ Call predicate every second, until it returns something ``Truthy``. :param predicate: Callable returning termination condition. :type predicate: 0-argument callable returning a Deferred. :return: A ``Deferred`` firing with the first ``Truthy`` response from ``predicate``, or, if predicate didn't fire truthfully within the timeout, raise TimeoutError(). """ d = maybeDeferred(predicate) then = time.time() def loop(result): if timeout and time.time() - then > timeout: raise TimeoutError() if not result: print "Retrying %s given result %r..." % (message, result) d = deferLater(reactor, 1.0, predicate) d.addCallback(loop) return d return result d.addCallback(loop) return d
def test__handles_timeout(self): node, power_info = yield deferToDatabase( self.make_node_with_power_info) def defer_way_later(*args, **kwargs): # Create a defer that will finish in 1 minute. return deferLater(reactor, 60 * 60, lambda: None) rack_id = factory.make_name("system_id") client = Mock() client.ident = rack_id client.side_effect = defer_way_later self.patch(power_module, "getAllClients").return_value = [client] power_state, success_racks, failed_racks = yield power_query_all( node.system_id, node.hostname, power_info, timeout=0.5) self.assertEqual(POWER_STATE.UNKNOWN, power_state) self.assertItemsEqual([], success_racks) self.assertItemsEqual([rack_id], failed_racks)
def test__handles_timeout(self): def defer_way_later(*args, **kwargs): # Create a defer that will finish in 1 minute. return deferLater(reactor, 60 * 60, lambda: None) rack_id = factory.make_name("system_id") client = Mock() client.ident = rack_id client.side_effect = defer_way_later self.patch(pods_module, "getAllClients").return_value = [client] discovered = yield discover_pod( factory.make_name("pod"), {}, timeout=0.5) self.assertThat(discovered[0], Equals({})) self.assertThat(discovered[1], MatchesDict({ rack_id: IsInstance(CancelledError), }))
def check_aggregator(self): ''' If the aggregator is live, but isn't getting events, log a diagnostic warning. This function is sometimes called using deferLater, so any exceptions will be handled by errorCallback. ''' if (self.aggregator is not None and not self.is_aggregator_pending and self.expected_aggregator_start_time is not None and self.expected_aggregator_start_time < time()): aggregator_live_time = time() - self.expected_aggregator_start_time flag_message = "Is your relay in the Tor consensus?" flag_list = self.get_flag_list() if len(flag_list) > 0: flag_message = "Consensus flags: {}".format(" ".join(flag_list)) if self.are_dc_events_expected(): log_fn = logging.warning else: log_fn = logging.info if ((self.aggregator.protocol is None or self.aggregator.protocol.state != "processing") and aggregator_live_time > EXPECTED_CONTROL_ESTABLISH_MAX): logging.warning("Aggregator has been running {}, but is not connected to the control port. Is your control port working?" .format(format_elapsed_time_since( self.expected_aggregator_start_time, 'since'))) elif (self.aggregator.last_event_time is None and aggregator_live_time > EXPECTED_EVENT_INTERVAL_MAX): log_fn("Aggregator has been running {}, but has not seen a tor event. {}" .format(format_elapsed_time_since( self.expected_aggregator_start_time, 'since'), flag_message)) elif (self.aggregator.last_event_time is not None and self.aggregator.last_event_time < time() - EXPECTED_EVENT_INTERVAL_MAX): log_fn("Aggregator has not received any events recently, {}. {}" .format(format_last_event_time_since( self.aggregator.last_event_time), flag_message))
def _start_aggregator_deferred(self): ''' This function is called using deferLater, so any exceptions will be handled by errorCallback. ''' if self.is_aggregator_pending: self.is_aggregator_pending = False self.aggregator.start() # schedule a once-off check that the aggregator has connected check_aggregator_deferred = task.deferLater( reactor, EXPECTED_CONTROL_ESTABLISH_MAX + 1.0, self.check_aggregator) check_aggregator_deferred.addErrback(errorCallback)
def _flush_later(self, msg): ''' This function is called using deferLater, so any exceptions will be handled by errorCallback. ''' self._flush_now(msg) self._inject_events()
def close(self): """ close all http connections. returns a deferred that fires once they're all closed. """ def validate_client(client): """ Validate that the connection is for the current client :param client: :return: """ host, port = client.addr parsed_url = urlparse(self._hostname) return host == parsed_url.hostname and port == parsed_url.port # read https://github.com/twisted/treq/issues/86 # to understand the following... def _check_fds(_): fds = set(reactor.getReaders() + reactor.getReaders()) if not [fd for fd in fds if isinstance(fd, Client) and validate_client(fd)]: return return deferLater(reactor, 0, _check_fds, None) pool = self._async_http_client_params["pool"] return pool.closeCachedConnections().addBoth(_check_fds)
def tick_fine(self): self.time_wheel_fine.moveNext() self.tick_fine_d = task.deferLater( self.reactor, Config.TIMER_FINE_GRANULARITY, self.tick_fine) self.tick_fine_d.addBoth(Utils.nop)
def tick_coarse(self): self.time_wheel_coarse.moveNext() self.tick_coarse_d = task.deferLater( self.reactor, Config.TIMER_COARSE_GRANULARITY, self.tick_coarse) self.tick_coarse_d.addBoth(Utils.nop)
def send_key(self): """ Send user's key to provider. Public key bound to user's is sent to provider, which will replace any prior keys for the same address in its database. :return: A Deferred which fires when the key is sent, or which fails with KeyNotFound if the key was not found in local database. :rtype: Deferred :raise UnsupportedKeyTypeError: if invalid key type """ if not self.token: self.log.debug( 'Token not available, scheduling ' 'a new key sending attempt...') yield task.deferLater(reactor, 5, self.send_key) self.log.info('Sending public key to server') key = yield self.get_key(self._address, fetch_remote=False) yield self._nicknym.put_key(self.uid, key.key_data, self._api_uri, self._api_version) emit_async(catalog.KEYMANAGER_DONE_UPLOADING_KEYS, self._address) self.log.info('Key sent to server') defer.returnValue(key)
def retry_connect(self): with self._lock: if 'connection' not in self._in_retry or not self._in_retry['connection']: self.conn_retry_interval += 2 log.err("Connection Closed! retry connecting in %s seconds..." % self.conn_retry_interval) self._in_retry['connection'] = True d = task.deferLater(reactor, self.conn_retry_interval, self.do_connect) d.addErrback(self.failed)
def startService(self): service.Service.startService(self) self._connector = self._connectMethod(*self._args, factory=self._factory, **self._kwargs) def waitForConnect(): if self._connector.state == 'connected': log.msg('Starting child services now.', level=logging.DEBUG) # noinspection PyTypeChecker for svc in self: svc.startService() else: from twisted.internet import reactor task.deferLater(reactor, 1, waitForConnect) waitForConnect()
def render_GET(self, request): session = request.getSession() session.touch() log.debug("HTTP Request from %s:%s (%s) to %s", request.client.host, request.client.port, session.uid, request.uri) if not self.sessions.has_key(session.uid): log.info("New Client Session: %s" % session.uid) session._expireCall.cancel() session.sessionTimeout = HTTP_SESSION_TIMEOUT session.startCheckingExpiration() session.notifyOnExpire(self._expireSession) session.updates = [] session.isAuthenticated = not self.monast.authRequired session.username = None self.sessions[session.uid] = session if not session.isAuthenticated and request.path != "/doAuthentication": return "ERROR :: Authentication Required" handler = self.handlers.get(request.path) if handler: d = task.deferLater(reactor, 0.1, lambda: request) d.addCallback(handler) d.addErrback(self._onRequestFailure, request) return TWebServer.NOT_DONE_YET return "ERROR :: Request Not Found"
def test_usr1_rotates_logs(self): """ SIGUSR1 should cause logs to be reopened. """ logging.getLogger().addHandler(logging.FileHandler(self.makeFile())) # Store the initial set of handlers original_streams = [handler.stream for handler in logging.getLogger().handlers if isinstance(handler, logging.FileHandler)] # Instantiating LandscapeService should register the handler TestService(self.config) # We'll call it directly handler = signal.getsignal(signal.SIGUSR1) self.assertTrue(handler) handler(None, None) def check(ign): new_streams = [handler.stream for handler in logging.getLogger().handlers if isinstance(handler, logging.FileHandler)] for stream in new_streams: self.assertTrue(stream not in original_streams) # We need to give some room for the callFromThread to run d = deferLater(reactor, 0, lambda: None) return d.addCallback(check)
def test_waiter(self): print("test_main()") return task.deferLater(reactor, 3, self._called_by_deffered)
def on_welcome(mc): print('simulated on-welcome') mc.tx_irc_client.lineRate = 0.2 if mc.nick == "irc_publisher": d = task.deferLater(reactor, 3.0, junk_pubmsgs, mc) d.addCallback(junk_longmsgs) d.addCallback(junk_announce) d.addCallback(junk_fill)
def test_waiter(self): print("test_main()") #reactor.callLater(1.0, junk_messages, self.mcc) return task.deferLater(reactor, 22, self._called_by_deffered)
def test_waiter(self): return task.deferLater(reactor, 12, self._called_by_deffered)
def test_waiter(self): return task.deferLater(reactor, 5, self._called_by_deffered)
def render_GET(self, request): path = request.path.decode() request.setHeader("Content-Type", "text/plain; charset=UTF-8") if path == '/metrics': if not request.args.get('target', [None])[0]: request.setResponseCode(404) return 'No target defined\r\n'.encode() d = deferLater(reactor, 0, lambda: request) d.addCallback(self.generate_latest_target) d.addErrback(self.errback, request) return NOT_DONE_YET else: request.setResponseCode(404) return '404 Not Found'.encode()
def test_blocking_call(self): """ Check if the reactor thread is properly blocked by a function marked as such. """ @blocking_call_on_reactor_thread @inlineCallbacks def waiter(): # 'Release' our claim on the reactor thread. # blocking_call_on_reactor_thread should prevent anything else being scheduled though. yield deferLater(reactor, 0.01, lambda: None) waiter.variable += 1 returnValue(waiter.variable) @blocking_call_on_reactor_thread def quicker(): # Immediately use the reactor thread and return waiter.variable += 1 return succeed(waiter.variable) waiter.variable = 1 # 'Release' the reactor thread and increment waiter.variable # If release didn't allow other to be scheduled, waiter.variable is now 2 # If quicker() came first, waiter.variable is now 3 (bad) value = yield waiter() # Claim reactor thread and increment waiter.variable # If waiter() came first, waiter.variable is now 3 # If quicker() managed to sneak in before this, waiter.variable is now 2 (bad) value2 = yield quicker() self.assertEqual(value, 2) self.assertEqual(value2, 3)
def test_blocking_call_in_thread(self): """ Check if the reactor thread is properly blocked by a threaded function. """ @blocking_call_on_reactor_thread @inlineCallbacks def waiter(): # 'Release' our claim on the reactor thread. # blocking_call_on_reactor_thread should prevent anything else being scheduled though. yield deferLater(reactor, 0.01, lambda: None) waiter.variable += 1 returnValue(waiter.variable) @blocking_call_on_reactor_thread def quicker(): # Immediately use the reactor thread and return waiter.variable += 1 return succeed(waiter.variable) waiter.variable = 1 # 'Release' the reactor thread and increment waiter.variable # If release didn't allow other to be scheduled, waiter.variable is now 2 # If quicker() came first, waiter.variable is now 3 (bad) value = yield deferToThread(waiter) # Claim reactor thread and increment waiter.variable # If waiter() came first, waiter.variable is now 3 # If quicker() managed to sneak in before this, waiter.variable is now 2 (bad) value2 = yield deferToThread(quicker) self.assertEqual(value, 2) self.assertEqual(value2, 3)
def test_delayed_deferred_requires_value(self): self.assertRaises(ValueError, self.tm.register_task, "test", deferLater(reactor, 0.0, lambda: None), delay=1)
def sleep(self, time=.05): yield deferLater(reactor, time, lambda: None)
def call_later(delay, f, *args, **kw): task.deferLater(reactor, delay, f, *args, **kw).addErrback(my_err_back)
def game_state_checker(game, state, round_data, users_plots, counter=0): if counter == SECONDS: # move to the next state if state == 'initial': start_interactive(game, round_data, users_plots) elif state == 'interactive': start_outcome(game, round_data, users_plots) else: start_initial(game) return if state == 'initial': r = InteractiveRound.objects.filter(game=game, round_order=round_data.get('current_round'), guess=None).count() if r == 0: start_interactive(game, round_data, users_plots) return elif state == 'interactive': r = InteractiveRound.objects.filter(game=game, round_order=round_data.get('current_round'), influenced_guess=None).count() if r == 0: start_outcome(game, round_data, users_plots) return elif state == 'outcome': r = InteractiveRound.objects.filter(game=game, round_order=round_data.get('current_round'), outcome=False).count() if r == 0: start_initial(game) return counter += 1 task.deferLater(reactor, 1, game_state_checker, game, state, round_data, users_plots, counter).addErrback( twisted_error)
def start_interactive(game, round_data, users_plots): state = 'interactive' cache.set(game.id, {'state': state, 'round_data': round_data, 'users_plots': users_plots, }) for i in users_plots: user = i['user'] round_data['plot'] = i['plot'] interactive(user, game, round_data) task.deferLater(reactor, 1, game_state_checker, game, state, round_data, users_plots).addErrback(twisted_error) return
def start_outcome(game, round_data, users_plots): cache.set(game.id, {'state': 'outcome', 'round_data': round_data, 'users_plots': users_plots, }) for i in users_plots: user = i['user'] round_data['plot'] = i['plot'] outcome(user, game, round_data) task.deferLater(reactor, 1, game_state_checker, game, 'outcome', round_data, users_plots).addErrback(twisted_error)
def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)): """ Stop accepting connections on this port. This will shut down the socket and call self.connectionLost(). It returns a deferred which will fire successfully when the port is actually closed, or with a failure if an error occurs shutting down. """ self.disconnecting = True self.stopReading() if self.connected: self.deferred = deferLater( self.reactor, 0, self.connectionLost, connDone) return self.deferred
def test_errback(self): """ The L{Deferred} returned by L{task.deferLater} is errbacked if the supplied function raises an exception. """ def callable(): raise TestException() clock = task.Clock() d = task.deferLater(clock, 1, callable) clock.advance(1) return self.assertFailure(d, TestException)
def test_runStopAfterTests(self): """ L{DistTrialRunner} calls C{reactor.stop} and unlocks the test directory once the tests have run. """ functions = [] class FakeReactorWithSuccess(FakeReactor): def spawnProcess(self, worker, *args, **kwargs): worker.makeConnection(FakeTransport()) self.spawnCount += 1 worker._ampProtocol.run = self.succeedingRun def succeedingRun(self, case, result): return succeed(None) def addSystemEventTrigger(oself, phase, event, function): self.assertEqual('before', phase) self.assertEqual('shutdown', event) functions.append(function) workingDirectory = self.runner._workingDirectory fakeReactor = FakeReactorWithSuccess() self.runner.run(TestCase(), fakeReactor) def check(): localLock = FilesystemLock(workingDirectory + ".lock") self.assertTrue(localLock.lock()) self.assertEqual(1, fakeReactor.stopCount) # We don't wait for the process deferreds here, so nothing is # returned by the function before shutdown self.assertIdentical(None, functions[0]()) return deferLater(reactor, 0, check)
def test_inCallback(self): """ Log an error in an asynchronous callback. """ return task.deferLater(reactor, 0, lambda: log.err(makeFailure()))
def slow_operation(): def calc_value(value): return 42 return task.deferLater(reactor, 1, calc_value, None)
def deferred_error(): def calc_result(value): raise JsonRpcError("You wanted an error, here you have it!") return task.deferLater(reactor, 0.1, calc_result, None)
def deferred_internal_error(): def calc_result(value): return 56 / 0 return task.deferLater(reactor, 0.1, calc_result, None)