我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用twisted.internet.task.react()。
def test_runsUntilAsyncCallback(self): """ L{task.react} runs the reactor until the L{Deferred} returned by the function it is passed is called back, then stops it. """ timePassed = [] def main(reactor): finished = defer.Deferred() reactor.callLater(1, timePassed.append, True) reactor.callLater(2, finished.callback, None) return finished r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, _reactor=r) self.assertEqual(0, exitError.code) self.assertEqual(timePassed, [True]) self.assertEqual(r.seconds(), 2)
def test_runsUntilAsyncErrback(self): """ L{task.react} runs the reactor until the L{defer.Deferred} returned by the function it is passed is errbacked, then it stops the reactor and reports the error. """ class ExpectedException(Exception): pass def main(reactor): finished = defer.Deferred() reactor.callLater(1, finished.errback, ExpectedException()) return finished r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, _reactor=r) self.assertEqual(1, exitError.code) errors = self.flushLoggedErrors(ExpectedException) self.assertEqual(len(errors), 1)
def test_runsUntilSyncErrback(self): """ L{task.react} returns quickly if the L{defer.Deferred} returned by the function it is passed has already been errbacked at the time it is returned. """ class ExpectedException(Exception): pass def main(reactor): return defer.fail(ExpectedException()) r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, _reactor=r) self.assertEqual(1, exitError.code) self.assertEqual(r.seconds(), 0) errors = self.flushLoggedErrors(ExpectedException) self.assertEqual(len(errors), 1)
def test_singleStopCallback(self): """ L{task.react} doesn't try to stop the reactor if the L{defer.Deferred} the function it is passed is called back after the reactor has already been stopped. """ def main(reactor): reactor.callLater(1, reactor.stop) finished = defer.Deferred() reactor.addSystemEventTrigger( 'during', 'shutdown', finished.callback, None) return finished r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, _reactor=r) self.assertEqual(r.seconds(), 1) self.assertEqual(0, exitError.code)
def test_synchronousStop(self): """ L{task.react} handles when the reactor is stopped just before the returned L{Deferred} fires. """ def main(reactor): d = defer.Deferred() def stop(): reactor.stop() d.callback(None) reactor.callWhenRunning(stop) return d r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, [], _reactor=r) self.assertEqual(0, exitError.code)
def main(): parser = argparse.ArgumentParser( description="""Test connectivity for the plugins which support connectivity tests.""" ) parser.add_argument('--log-file', dest='logfile', default='connectivity.log') parser.add_argument('--debug', dest="debug", action="store_true", default=False) parser.add_argument('--namespaces', dest='namespaces', type=str, nargs='+', default=DEFAULT_NAMESPACES, help='Namespaces for plugins to test.') parser.add_argument('--plugins', dest='plugin_names', type=str, nargs='+', default=None, help='Names of plugins to test.') config = stethoscope.api.factory.get_config() args = parser.parse_args() config['LOGBOOK'] = stethoscope.utils.setup_logbook(args.logfile) config['LOGBOOK'].push_application() config['DEBUG'] = args.debug config['TESTING'] = args.debug task.react(_main, (args, config))
def adoptConnection(socket, address): from twisted.internet import task task.react(RLXProtocolFactory.buildConnection, (socket, address))
def test(): transports = [ { 'type': 'rawsocket', 'serializer': 'msgpack', 'endpoint': { 'type': 'unix', 'path': '/tmp/cb1.sock' } }, { 'type': 'websocket', 'url': 'ws://127.0.0.1:8080/ws', 'endpoint': { 'type': 'tcp', 'host': '127.0.0.1', 'port': 8080 } } ] connection1 = Connection(main1, transports=transports[0]) yield react(connection1.start) connection2 = Connection(main2, transports=transports[1]) yield react(connection2.start)
def test_runsUntilSyncCallback(self): """ L{task.react} returns quickly if the L{Deferred} returned by the function it is passed has already been called back at the time it is returned. """ def main(reactor): return defer.succeed(None) r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, _reactor=r) self.assertEqual(0, exitError.code) self.assertEqual(r.seconds(), 0)
def test_arguments(self): """ L{task.react} passes the elements of the list it is passed as positional arguments to the function it is passed. """ args = [] def main(reactor, x, y, z): args.extend((x, y, z)) return defer.succeed(None) r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, [1, 2, 3], _reactor=r) self.assertEqual(0, exitError.code) self.assertEqual(args, [1, 2, 3])
def test_defaultReactor(self): """ L{twisted.internet.reactor} is used if no reactor argument is passed to L{task.react}. """ def main(reactor): self.passedReactor = reactor return defer.succeed(None) reactor = _FakeReactor() with NoReactor(): installReactor(reactor) exitError = self.assertRaises(SystemExit, task.react, main, []) self.assertEqual(0, exitError.code) self.assertIs(reactor, self.passedReactor)
def test_exitWithDefinedCode(self): """ L{task.react} forwards the exit code specified by the C{SystemExit} error returned by the passed function, if any. """ def main(reactor): return defer.fail(SystemExit(23)) r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, [], _reactor=r) self.assertEqual(23, exitError.code)
def test_asynchronousStop(self): """ L{task.react} handles when the reactor is stopped and the returned L{Deferred} doesn't fire. """ def main(reactor): reactor.callLater(1, reactor.stop) return defer.Deferred() r = _FakeReactor() exitError = self.assertRaises( SystemExit, task.react, main, [], _reactor=r) self.assertEqual(0, exitError.code)
def main(): parser = argparse.ArgumentParser( description="""Pull records for a batch of users and submit to external services.""" ) parser.add_argument('--timeout', dest="timeout", type=int, default=10) parser.add_argument('--limit', dest="limit", type=int, default=10, help="""Retrieve data for at most this many users concurrently.""") parser.add_argument('--log-file', dest='logfile', default='batch.log') parser.add_argument('input', nargs='?', type=argparse.FileType('r'), default=None) parser.add_argument('--collect-only', dest="collect_only", action="store_true") parser.add_argument('--debug', dest="debug", action="store_true", default=False) config = stethoscope.api.factory.get_config() args = parser.parse_args() for plugin in ['BITFIT', 'JAMF']: config[plugin + '_TIMEOUT'] = args.timeout config['LOGBOOK'] = stethoscope.utils.setup_logbook(args.logfile) config['LOGBOOK'].push_application() config['DEBUG'] = args.debug config['TESTING'] = args.debug yaml.add_representer(arrow.arrow.Arrow, arrow_representer) yaml.SafeDumper.add_representer(arrow.arrow.Arrow, arrow_representer) task.react(_main, (args, config))
def run(): task.react(oonideckgen)
def run(): task.react(oonireport)
def run(): task.react(ooniprobe)