我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用twisted.internet.defer.DeferredSemaphore()。
def fetch_all(feeds): BATCH_SIZE=5 batches = [] for feeds_batch in batch_gen(feeds, BATCH_SIZE): sem = DeferredSemaphore(len(feeds_batch)) batch = [] for feed_ in feeds_batch: batch.append(sem.run(fetch_single, feed_meta=feed_)) batchDef = gatherResults(batch, consumeErrors=False) batchDef.addCallback(store_fetched_data) batches.append(batchDef) # rendez-vous for all feeds that were fetched batchesDef = gatherResults(batches, consumeErrors=False) batchesDef.addCallbacks( clean_up_and_exit, errback=lambda x: None, ) return batchesDef
def __init__(self, settings): self.options = settings.get('PHANTOMJS_OPTIONS', {}) # ??? max_run = settings.get('PHANTOMJS_MAXRUN', 10) # PhantomJS ???????????, ??10 self.sem = defer.DeferredSemaphore(max_run) self.queue = Queue.LifoQueue(maxsize=max_run) # LifoQueue ?????? SignalManager(dispatcher.Any).connect(receiver=self._close, signal=signals.spider_closed)
def testSemaphore(self): N = 13 sem = defer.DeferredSemaphore(N) controlDeferred = defer.Deferred() def helper(self, arg): self.arg = arg return controlDeferred results = [] uniqueObject = object() resultDeferred = sem.run(helper, self=self, arg=uniqueObject) resultDeferred.addCallback(results.append) resultDeferred.addCallback(self._incr) self.assertEquals(results, []) self.assertEquals(self.arg, uniqueObject) controlDeferred.callback(None) self.assertEquals(results.pop(), None) self.assertEquals(self.counter, 1) self.counter = 0 for i in range(1, 1 + N): sem.acquire().addCallback(self._incr) self.assertEquals(self.counter, i) sem.acquire().addCallback(self._incr) self.assertEquals(self.counter, N) sem.release() self.assertEquals(self.counter, N + 1) for i in range(1, 1 + N): sem.release() self.assertEquals(self.counter, N + 1)
def scanForCovers(self, data): self.start_time = time.clock() self.guilist = [] self.counting = 0 self.found = 0 self.notfound = 0 self.error = 0 ds = defer.DeferredSemaphore(tokens=2) downloads = [ds.run(self.download, url).addCallback(self.parseWebpage, which, type, id, filename, title, url, season, episode).addErrback(self.dataErrorInfo) for which, type, id, filename, title, url, season, episode in data] finished = defer.DeferredList(downloads).addErrback(self.dataErrorInfo)
def do_migration(sd_hashes): def print_final_result(result): num_successes = 0 num_fails = 0 num_blobs = 0 for (success,value) in result: if success: num_successes+=1 num_blobs += value['blobs'] print("Success:{}".format(value)) print('num success:{}, num fail:{}, total:{}'.format(num_successes,num_fails,len(sd_hashes))) else: num_fails+=1 print("Fail:{}".format(value)) time_taken = time.time() - start_time sec_per_blob = num_blobs / time_taken print("All Finished! Streams: {} Successes:{}, Fails:{}, Blobs moved:{}, Min to finish:{}, Sec per blob:{}".format( len(sd_hashes), num_successes, num_fails, num_blobs, time_taken/60, sec_per_blob)) reactor.stop() ds = [] sem = defer.DeferredSemaphore(4) for host, sd_hash in sd_hashes: d = sem.run(migrate_sd_hash, sd_hash, host) ds.append(d) d = defer.DeferredList(ds,consumeErrors=True) d.addCallback(print_final_result) reactor.run()
def runScript(self, commands): """ Run each command in sequence and return a Deferred that fires when all commands are completed. @param commands: A list of strings containing sftp commands. @return: A C{Deferred} that fires when all commands are completed. The payload is a list of response strings from the server, in the same order as the commands. """ sem = defer.DeferredSemaphore(1) dl = [sem.run(self.runCommand, command) for command in commands] return defer.gatherResults(dl)
def test_semaphoreInvalidTokens(self): """ If the token count passed to L{DeferredSemaphore} is less than one then L{ValueError} is raised. """ self.assertRaises(ValueError, defer.DeferredSemaphore, 0) self.assertRaises(ValueError, defer.DeferredSemaphore, -1)
def test_cancelSemaphoreAfterAcquired(self): """ When canceling a L{Deferred} from a L{DeferredSemaphore} that already has the semaphore, the cancel should have no effect. """ def _failOnErrback(_): self.fail("Unexpected errback call!") sem = defer.DeferredSemaphore(1) d = sem.acquire() d.addErrback(_failOnErrback) d.cancel()
def test_cancelSemaphoreBeforeAcquired(self): """ When canceling a L{Deferred} from a L{DeferredSemaphore} that does not yet have the semaphore (i.e., the L{Deferred} has not fired), the cancel should cause a L{defer.CancelledError} failure. """ sem = defer.DeferredSemaphore(1) sem.acquire() d = sem.acquire() d.cancel() self.assertImmediateFailure(d, defer.CancelledError)
def getOverallDeferredSemaphore(): global OVERALL_SEMAPHORE if OVERALL_SEMAPHORE is None: preferences = zope.component.queryUtility( ICollectorPreferences, 'zenpython') if preferences: OVERALL_SEMAPHORE = defer.DeferredSemaphore(preferences.options.twistedconcurrenthttp) else: # When we are running in a daemon other than zenpython, the preferences # value will not be available OVERALL_SEMAPHORE = defer.DeferredSemaphore(DEFAULT_TWISTEDCONCURRENTHTTP) return OVERALL_SEMAPHORE
def getKeyedDeferredSemaphore(key, limit): global KEYED_SEMAPHORES if key not in KEYED_SEMAPHORES: KEYED_SEMAPHORES[key] = defer.DeferredSemaphore(limit) semaphore = KEYED_SEMAPHORES[key] if semaphore.limit != limit: if limit >= semaphore.tokens: semaphore.limit = limit log.info("Unable to lower maximum parallel query limit for %s to %d ", key, limit) else: log.warning("Unable to lower maximum parallel query limit for %s to %d at this time (%d connections currently active)", key, limit, semaphore.tokens) return semaphore
def make_database_unpool(maxthreads=max_threads_for_database_pool): """Create a general non-thread-pool for database activity. Its consumer are the old-school web application, i.e. the plain HTTP and HTTP API services, and the WebSocket service, for the responsive web UI. Each thread is fully connected to the database. However, this is a :class:`ThreadUnpool`, which means that threads are not actually pooled: a new thread is created for each task. This is ideal for testing, to improve isolation between tests. """ return ThreadUnpool(DeferredSemaphore(maxthreads), ExclusivelyConnected)
def __init__(self, settings): self.settings = settings self.options = settings.get('PHANTOMJS_OPTIONS', {})\ max_run = settings.get('PHANTOMJS_MAXRUN', 5) self.sem = defer.DeferredSemaphore(max_run) # as a means of limiting parallelism self.queue = queue.LifoQueue(max_run) # last in first out, the content is driver not request SignalManager(dispatcher.Any).connect(self._close, signal=signals.spider_closed)
def testSemaphore(self): N = 13 sem = defer.DeferredSemaphore(N) controlDeferred = defer.Deferred() def helper(self, arg): self.arg = arg return controlDeferred results = [] uniqueObject = object() resultDeferred = sem.run(helper, self=self, arg=uniqueObject) resultDeferred.addCallback(results.append) resultDeferred.addCallback(self._incr) self.assertEqual(results, []) self.assertEqual(self.arg, uniqueObject) controlDeferred.callback(None) self.assertIsNone(results.pop()) self.assertEqual(self.counter, 1) self.counter = 0 for i in range(1, 1 + N): sem.acquire().addCallback(self._incr) self.assertEqual(self.counter, i) success = [] def fail(r): success.append(False) def succeed(r): success.append(True) d = sem.acquire().addCallbacks(fail, succeed) d.cancel() self.assertEqual(success, [True]) sem.acquire().addCallback(self._incr) self.assertEqual(self.counter, N) sem.release() self.assertEqual(self.counter, N + 1) for i in range(1, 1 + N): sem.release() self.assertEqual(self.counter, N + 1)
def request(self, method, uri, headers=None, bodyProducer=None): """ Issue a new request. @param method: The request method to send. @type method: C{str} @param uri: The request URI send. @type uri: C{str} @param scheme: A string like C{'http'} or C{'https'} (the only two supported values) to use to determine how to establish the connection. @param host: A C{str} giving the hostname which will be connected to in order to issue a request. @param port: An C{int} giving the port number the connection will be on. @param path: A C{str} giving the path portion of the request URL. @param headers: The request headers to send. If no I{Host} header is included, one will be added based on the request URI. @type headers: L{Headers} @param bodyProducer: An object which will produce the request body or, if the request body is to be empty, L{None}. @type bodyProducer: L{IBodyProducer} provider @return: A L{Deferred} which fires with the result of the request (a L{Response} instance), or fails if there is a problem setting up a connection over which to issue the request. It may also fail with L{SchemeNotSupported} if the scheme of the given URI is not supported. @rtype: L{Deferred} """ scheme, host, port, path = _parse(uri) if headers is None: headers = Headers() if not headers.hasHeader('host'): # This is a lot of copying. It might be nice if there were a bit # less. headers = Headers(dict(headers.getAllRawHeaders())) headers.addRawHeader( 'host', self._computeHostValue(scheme, host, port)) if self.persistent: sem = self._semaphores.get((scheme, host, port)) if sem is None: sem = DeferredSemaphore(self.maxConnectionsPerHostName) self._semaphores[scheme, host, port] = sem return sem.run(self._request, method, scheme, host, port, path, headers, bodyProducer) else: return self._request( method, scheme, host, port, path, headers, bodyProducer)