我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用twisted.internet.reactor.threadpool()。
def do_cleanThreads(cls): from twisted.internet import reactor if interfaces.IReactorThreads.providedBy(reactor): reactor.suggestThreadPoolSize(0) if hasattr(reactor, 'threadpool') and reactor.threadpool: reactor.threadpool.stop() reactor.threadpool = None # *Put it back* and *start it up again*. The # reactor's threadpool is *private*: we cannot just # rape it and walk away. reactor.threadpool = threadpool.ThreadPool(0, 10) reactor.threadpool.start()
def postClassCleanup(self): """ Called by L{unittest.TestCase} after the last test in a C{TestCase} subclass. Ensures the reactor is clean by murdering the threadpool, catching any pending L{DelayedCall<twisted.internet.base.DelayedCall>}s, open sockets etc. """ selectables = self._cleanReactor() calls = self._cleanPending() if selectables or calls: aggregate = DirtyReactorAggregateError(calls, selectables) self.result.addError(self.test, Failure(aggregate)) self._cleanThreads()
def _cleanThreads(self): reactor = self._getReactor() if interfaces.IReactorThreads.providedBy(reactor): if reactor.threadpool is not None: # Stop the threadpool now so that a new one is created. # This improves test isolation somewhat (although this is a # post class cleanup hook, so it's only isolating classes # from each other, not methods from each other). reactor._stopThreadPool()
def test__default_pool_is_disconnected_pool(self): pool = reactor.threadpool self.assertThat(pool, IsInstance(ThreadPool)) self.assertThat( pool.context.contextFactory, Is(orm.TotallyDisconnected)) self.assertThat(pool.min, Equals(0))
def __install(): log = logging.getLogger('tpython') log.info('setting up twisted reactor in ipython loop') from twisted.internet import _threadedselect _threadedselect.install() from twisted.internet import reactor from collections import deque from IPython.lib import inputhook from IPython import InteractiveShell q = deque() def reactor_wake(twisted_loop_next, q=q): q.append(twisted_loop_next) def reactor_work(*_args): if q: while len(q): q.popleft()() return 0 def reactor_start(*_args): log.info('starting twisted reactor in ipython') reactor.interleave(reactor_wake) # @UndefinedVariable inputhook.set_inputhook(reactor_work) def reactor_stop(): if reactor.threadpool: # @UndefinedVariable log.info('stopping twisted threads') reactor.threadpool.stop() # @UndefinedVariable log.info('shutting down twisted reactor') reactor._mainLoopShutdown() # @UndefinedVariable ip = InteractiveShell.instance() ask_exit = ip.ask_exit def ipython_exit(): reactor_stop() return ask_exit() ip.ask_exit = ipython_exit reactor_start() return reactor