Python twisted.internet.task 模块,react() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用twisted.internet.task.react()

项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_runsUntilAsyncCallback(self):
        """
        L{task.react} runs the reactor until the L{Deferred} returned by the
        function it is passed is called back, then stops it.
        """
        timePassed = []
        def main(reactor):
            finished = defer.Deferred()
            reactor.callLater(1, timePassed.append, True)
            reactor.callLater(2, finished.callback, None)
            return finished
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, _reactor=r)
        self.assertEqual(0, exitError.code)
        self.assertEqual(timePassed, [True])
        self.assertEqual(r.seconds(), 2)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_runsUntilAsyncErrback(self):
        """
        L{task.react} runs the reactor until the L{defer.Deferred} returned by
        the function it is passed is errbacked, then it stops the reactor and
        reports the error.
        """
        class ExpectedException(Exception):
            pass

        def main(reactor):
            finished = defer.Deferred()
            reactor.callLater(1, finished.errback, ExpectedException())
            return finished
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, _reactor=r)

        self.assertEqual(1, exitError.code)

        errors = self.flushLoggedErrors(ExpectedException)
        self.assertEqual(len(errors), 1)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_runsUntilSyncErrback(self):
        """
        L{task.react} returns quickly if the L{defer.Deferred} returned by the
        function it is passed has already been errbacked at the time it is
        returned.
        """
        class ExpectedException(Exception):
            pass

        def main(reactor):
            return defer.fail(ExpectedException())
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, _reactor=r)
        self.assertEqual(1, exitError.code)
        self.assertEqual(r.seconds(), 0)
        errors = self.flushLoggedErrors(ExpectedException)
        self.assertEqual(len(errors), 1)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_singleStopCallback(self):
        """
        L{task.react} doesn't try to stop the reactor if the L{defer.Deferred}
        the function it is passed is called back after the reactor has already
        been stopped.
        """
        def main(reactor):
            reactor.callLater(1, reactor.stop)
            finished = defer.Deferred()
            reactor.addSystemEventTrigger(
                'during', 'shutdown', finished.callback, None)
            return finished
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, _reactor=r)
        self.assertEqual(r.seconds(), 1)

        self.assertEqual(0, exitError.code)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_synchronousStop(self):
        """
        L{task.react} handles when the reactor is stopped just before the
        returned L{Deferred} fires.
        """
        def main(reactor):
            d = defer.Deferred()
            def stop():
                reactor.stop()
                d.callback(None)
            reactor.callWhenRunning(stop)
            return d
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, [], _reactor=r)
        self.assertEqual(0, exitError.code)
项目:stethoscope    作者:Netflix    | 项目源码 | 文件源码
def main():
  parser = argparse.ArgumentParser(
    description="""Test connectivity for the plugins which support connectivity tests."""
  )

  parser.add_argument('--log-file', dest='logfile', default='connectivity.log')
  parser.add_argument('--debug', dest="debug", action="store_true", default=False)

  parser.add_argument('--namespaces', dest='namespaces', type=str, nargs='+',
                      default=DEFAULT_NAMESPACES,
                      help='Namespaces for plugins to test.')
  parser.add_argument('--plugins', dest='plugin_names', type=str, nargs='+',
                      default=None,
                      help='Names of plugins to test.')

  config = stethoscope.api.factory.get_config()
  args = parser.parse_args()

  config['LOGBOOK'] = stethoscope.utils.setup_logbook(args.logfile)
  config['LOGBOOK'].push_application()

  config['DEBUG'] = args.debug
  config['TESTING'] = args.debug

  task.react(_main, (args, config))
项目:relaax    作者:deeplearninc    | 项目源码 | 文件源码
def adoptConnection(socket, address):
    from twisted.internet import task
    task.react(RLXProtocolFactory.buildConnection, (socket, address))
项目:deb-python-autobahn    作者:openstack    | 项目源码 | 文件源码
def test():
    transports = [
        {
            'type': 'rawsocket',
            'serializer': 'msgpack',
            'endpoint': {
                'type': 'unix',
                'path': '/tmp/cb1.sock'
            }
        },
        {
            'type': 'websocket',
            'url': 'ws://127.0.0.1:8080/ws',
            'endpoint': {
                'type': 'tcp',
                'host': '127.0.0.1',
                'port': 8080
            }
        }
    ]

    connection1 = Connection(main1, transports=transports[0])
    yield react(connection1.start)

    connection2 = Connection(main2, transports=transports[1])
    yield react(connection2.start)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_runsUntilSyncCallback(self):
        """
        L{task.react} returns quickly if the L{Deferred} returned by the
        function it is passed has already been called back at the time it is
        returned.
        """
        def main(reactor):
            return defer.succeed(None)
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, _reactor=r)
        self.assertEqual(0, exitError.code)
        self.assertEqual(r.seconds(), 0)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_arguments(self):
        """
        L{task.react} passes the elements of the list it is passed as
        positional arguments to the function it is passed.
        """
        args = []
        def main(reactor, x, y, z):
            args.extend((x, y, z))
            return defer.succeed(None)
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, [1, 2, 3], _reactor=r)
        self.assertEqual(0, exitError.code)
        self.assertEqual(args, [1, 2, 3])
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_defaultReactor(self):
        """
        L{twisted.internet.reactor} is used if no reactor argument is passed to
        L{task.react}.
        """
        def main(reactor):
            self.passedReactor = reactor
            return defer.succeed(None)

        reactor = _FakeReactor()
        with NoReactor():
            installReactor(reactor)
            exitError = self.assertRaises(SystemExit, task.react, main, [])
            self.assertEqual(0, exitError.code)
        self.assertIs(reactor, self.passedReactor)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_exitWithDefinedCode(self):
        """
        L{task.react} forwards the exit code specified by the C{SystemExit}
        error returned by the passed function, if any.
        """
        def main(reactor):
            return defer.fail(SystemExit(23))
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, [], _reactor=r)
        self.assertEqual(23, exitError.code)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_asynchronousStop(self):
        """
        L{task.react} handles when the reactor is stopped and the
        returned L{Deferred} doesn't fire.
        """
        def main(reactor):
            reactor.callLater(1, reactor.stop)
            return defer.Deferred()
        r = _FakeReactor()
        exitError = self.assertRaises(
            SystemExit, task.react, main, [], _reactor=r)
        self.assertEqual(0, exitError.code)
项目:stethoscope    作者:Netflix    | 项目源码 | 文件源码
def main():
  parser = argparse.ArgumentParser(
    description="""Pull records for a batch of users and submit to external services."""
  )
  parser.add_argument('--timeout', dest="timeout", type=int, default=10)
  parser.add_argument('--limit', dest="limit", type=int, default=10,
      help="""Retrieve data for at most this many users concurrently.""")

  parser.add_argument('--log-file', dest='logfile', default='batch.log')

  parser.add_argument('input', nargs='?', type=argparse.FileType('r'), default=None)

  parser.add_argument('--collect-only', dest="collect_only", action="store_true")
  parser.add_argument('--debug', dest="debug", action="store_true", default=False)

  config = stethoscope.api.factory.get_config()
  args = parser.parse_args()

  for plugin in ['BITFIT', 'JAMF']:
    config[plugin + '_TIMEOUT'] = args.timeout

  config['LOGBOOK'] = stethoscope.utils.setup_logbook(args.logfile)
  config['LOGBOOK'].push_application()

  config['DEBUG'] = args.debug
  config['TESTING'] = args.debug

  yaml.add_representer(arrow.arrow.Arrow, arrow_representer)
  yaml.SafeDumper.add_representer(arrow.arrow.Arrow, arrow_representer)

  task.react(_main, (args, config))
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def run():
    task.react(oonideckgen)
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def run():
    task.react(oonireport)
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def run():
    task.react(ooniprobe)