Python twisted.internet.reactor 模块,threadpool() 实例源码

我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用twisted.internet.reactor.threadpool()

项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def do_cleanThreads(cls):
        from twisted.internet import reactor
        if interfaces.IReactorThreads.providedBy(reactor):
            reactor.suggestThreadPoolSize(0)
            if hasattr(reactor, 'threadpool') and reactor.threadpool:
                reactor.threadpool.stop()
                reactor.threadpool = None
                # *Put it back* and *start it up again*.  The
                # reactor's threadpool is *private*: we cannot just
                # rape it and walk away.
                reactor.threadpool = threadpool.ThreadPool(0, 10)
                reactor.threadpool.start()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def do_cleanThreads(cls):
        from twisted.internet import reactor
        if interfaces.IReactorThreads.providedBy(reactor):
            reactor.suggestThreadPoolSize(0)
            if hasattr(reactor, 'threadpool') and reactor.threadpool:
                reactor.threadpool.stop()
                reactor.threadpool = None
                # *Put it back* and *start it up again*.  The
                # reactor's threadpool is *private*: we cannot just
                # rape it and walk away.
                reactor.threadpool = threadpool.ThreadPool(0, 10)
                reactor.threadpool.start()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def postClassCleanup(self):
        """
        Called by L{unittest.TestCase} after the last test in a C{TestCase}
        subclass. Ensures the reactor is clean by murdering the threadpool,
        catching any pending
        L{DelayedCall<twisted.internet.base.DelayedCall>}s, open sockets etc.
        """
        selectables = self._cleanReactor()
        calls = self._cleanPending()
        if selectables or calls:
            aggregate = DirtyReactorAggregateError(calls, selectables)
            self.result.addError(self.test, Failure(aggregate))
        self._cleanThreads()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _cleanThreads(self):
        reactor = self._getReactor()
        if interfaces.IReactorThreads.providedBy(reactor):
            if reactor.threadpool is not None:
                # Stop the threadpool now so that a new one is created.
                # This improves test isolation somewhat (although this is a
                # post class cleanup hook, so it's only isolating classes
                # from each other, not methods from each other).
                reactor._stopThreadPool()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__default_pool_is_disconnected_pool(self):
        pool = reactor.threadpool
        self.assertThat(pool, IsInstance(ThreadPool))
        self.assertThat(
            pool.context.contextFactory,
            Is(orm.TotallyDisconnected))
        self.assertThat(pool.min, Equals(0))
项目:bwscanner    作者:TheTorProject    | 项目源码 | 文件源码
def __install():
    log = logging.getLogger('tpython')
    log.info('setting up twisted reactor in ipython loop')

    from twisted.internet import _threadedselect
    _threadedselect.install()

    from twisted.internet import reactor
    from collections import deque
    from IPython.lib import inputhook
    from IPython import InteractiveShell

    q = deque()

    def reactor_wake(twisted_loop_next, q=q):
        q.append(twisted_loop_next)

    def reactor_work(*_args):
        if q:
            while len(q):
                q.popleft()()
        return 0

    def reactor_start(*_args):
        log.info('starting twisted reactor in ipython')
        reactor.interleave(reactor_wake)  # @UndefinedVariable
        inputhook.set_inputhook(reactor_work)

    def reactor_stop():
        if reactor.threadpool:  # @UndefinedVariable
            log.info('stopping twisted threads')
            reactor.threadpool.stop()  # @UndefinedVariable
        log.info('shutting down twisted reactor')
        reactor._mainLoopShutdown()  # @UndefinedVariable

    ip = InteractiveShell.instance()

    ask_exit = ip.ask_exit
    def ipython_exit():
        reactor_stop()
        return ask_exit()

    ip.ask_exit = ipython_exit

    reactor_start()

    return reactor