我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用twisted.internet.threads.deferToThreadPool()。
def _authenticateUsernamePassword(self, dn, password): """ Open a secondary connection to the LDAP server and try binding to it with the given credentials @returns: True if the password is correct, False otherwise @rtype: deferred C{bool} @raises: L{LDAPConnectionError} if unable to connect. """ d = deferToThreadPool( reactor, self.threadpool, self._authenticateUsernamePassword_inThread, dn, password ) qsize = self.threadpool._queue.qsize() if qsize > 0: self.log.error("LDAP thread pool overflowing: {qsize}", qsize=qsize) self.poolStats["connection-thread-blocked"] += 1 return d
def _recordsFromQueryString( self, queryString, recordTypes=None, limitResults=None, timeoutSeconds=None ): d = deferToThreadPool( reactor, self.threadpool, self._recordsFromQueryString_inThread, queryString, recordTypes, limitResults=limitResults, timeoutSeconds=timeoutSeconds ) qsize = self.threadpool._queue.qsize() if qsize > 0: self.log.error("LDAP thread pool overflowing: {qsize}", qsize=qsize) self.poolStats["connection-thread-blocked"] += 1 return d
def _deferToThreadPool(self, f, *args, **kwargs): """Defer execution of ``f(*args, **kwargs)`` to the thread pool. This returns a deferred which will callback with the result of that expression, or errback with a failure wrapping the raised exception. """ if self._pool.joined: return fail( ReactorNotRunning("This thimble's threadpool already stopped.") ) if not self._pool.started: self._pool.start() self._reactor.addSystemEventTrigger( 'during', 'shutdown', self._pool.stop) return deferToThreadPool(self._reactor, self._pool, f, *args, **kwargs)
def getHostByName(self, name, timeout = (1, 3, 11, 45)): """ See L{twisted.internet.interfaces.IResolverSimple.getHostByName}. Note that the elements of C{timeout} are summed and the result is used as a timeout for the lookup. Any intermediate timeout or retry logic is left up to the platform via L{socket.gethostbyname}. """ if timeout: timeoutDelay = sum(timeout) else: timeoutDelay = 60 userDeferred = defer.Deferred() lookupDeferred = threads.deferToThreadPool( self.reactor, self.reactor.getThreadPool(), socket.gethostbyname, name) cancelCall = self.reactor.callLater( timeoutDelay, self._cleanup, name, lookupDeferred) self._runningQueries[lookupDeferred] = (userDeferred, cancelCall) lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred) return userDeferred
def render_GET(self, request): # pylint: disable=invalid-name if len(self.thread_pool.working) > self.max_workers: return self.error_response( request, http.SERVICE_UNAVAILABLE, 'Service is unavailable at this time, Please try again later') else: d = threads.deferToThreadPool(reactor, self.thread_pool, self.do_get, request) d.addCallback(self.final, request) d.addErrback(self.error_callback, request) return server.NOT_DONE_YET
def render_POST(self, request): # pylint: disable=invalid-name if len(self.thread_pool.working) > self.max_workers: return self.error_response( request, http.SERVICE_UNAVAILABLE, 'Service is unavailable at this time, Please try again later') else: d = threads.deferToThreadPool(reactor, self.thread_pool, self.do_post, request) d.addCallback(self.final, request) d.addErrback(self.error_callback, request) return server.NOT_DONE_YET
def testGetQueryParserInThread(self): """ L{getQueryParser} is not thread-safe. A L{FeatureError} is raised if its called outside the main thread. """ deferred = deferToThreadPool(reactor, self.threadPool, getQueryParser) return self.assertFailure(deferred, FeatureError)
def run(self, function, *args, **kwargs): """Run C{function} in a thread. C{function} is run in a thread within a transaction wrapper, which commits the transaction if C{function} succeeds. If it raises an exception the transaction is aborted. @param function: The function to run. @param args: Positional arguments to pass to C{function}. @param kwargs: Keyword arguments to pass to C{function}. @return: A C{Deferred} that will fire after the function has been run. """ return deferToThreadPool(reactor, self._threadPool, self._transact, function, *args, **kwargs)
def _recordWithDN(self, dn): d = deferToThreadPool( reactor, self.threadpool, self._recordWithDN_inThread, dn ) qsize = self.threadpool._queue.qsize() if qsize > 0: self.log.error("LDAP thread pool overflowing: {qsize}", qsize=qsize) self.poolStats["connection-thread-blocked"] += 1 return d
def resolveHostName(self, resolutionReceiver, hostName, portNumber=0, addressTypes=None, transportSemantics='TCP'): """ See L{IHostnameResolver.resolveHostName} @param resolutionReceiver: see interface @param hostName: see interface @param portNumber: see interface @param addressTypes: see interface @param transportSemantics: see interface @return: see interface """ pool = self._getThreadPool() addressFamily = _typesToAF[_any if addressTypes is None else frozenset(addressTypes)] socketType = _transportToSocket[transportSemantics] def get(): try: return self._getaddrinfo(hostName, portNumber, addressFamily, socketType) except gaierror: return [] d = deferToThreadPool(self._reactor, pool, get) resolution = HostResolution(hostName) resolutionReceiver.resolutionBegan(resolution) @d.addCallback def deliverResults(result): for family, socktype, proto, cannoname, sockaddr in result: addrType = _afToType[family] resolutionReceiver.addressResolved( addrType(_socktypeToType.get(socktype, 'TCP'), *sockaddr) ) resolutionReceiver.resolutionComplete() return resolution
def test_deferredResult(self): """ L{threads.deferToThreadPool} executes the function passed, and correctly handles the positional and keyword arguments given. """ d = threads.deferToThreadPool(reactor, self.tp, lambda x, y=5: x + y, 3, y=4) d.addCallback(self.assertEqual, 7) return d
def test_deferredFailure(self): """ Check that L{threads.deferToThreadPool} return a failure object with an appropriate exception instance when the called function raises an exception. """ class NewError(Exception): pass def raiseError(): raise NewError() d = threads.deferToThreadPool(reactor, self.tp, raiseError) return self.assertFailure(d, NewError)
def runWithConnection(self, func, *args, **kw): """ Execute a function with a database connection and return the result. @param func: A callable object of one argument which will be executed in a thread with a connection from the pool. It will be passed as its first argument a L{Connection} instance (whose interface is mostly identical to that of a connection object for your DB-API module of choice), and its results will be returned as a L{Deferred}. If the method raises an exception the transaction will be rolled back. Otherwise, the transaction will be committed. B{Note} that this function is B{not} run in the main thread: it must be threadsafe. @param *args: positional arguments to be passed to func @param **kw: keyword arguments to be passed to func @return: a L{Deferred} which will fire the return value of C{func(Transaction(...), *args, **kw)}, or a L{twisted.python.failure.Failure}. """ from twisted.internet import reactor return threads.deferToThreadPool(reactor, self.threadpool, self._runWithConnection, func, *args, **kw)
def augment_twisted_deferToThreadPool(): """Wrap every function deferred to a thread in `synchronous`.""" from twisted.internet import threads from twisted.internet.threads import deferToThreadPool from provisioningserver.utils.twisted import ISynchronous, synchronous def new_deferToThreadPool(reactor, threadpool, f, *args, **kwargs): """Variant of Twisted's that wraps all functions in `synchronous`.""" func = f if ISynchronous.providedBy(f) else synchronous(f) return deferToThreadPool(reactor, threadpool, func, *args, **kwargs) if threads.deferToThreadPool.__module__ != __name__: threads.deferToThreadPool = new_deferToThreadPool