我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用twisted.internet.reactor.suggestThreadPoolSize()。
def run(self): """Loads plugins, and initiates polling schedules.""" reactor.callWhenRunning(self.install_sighandlers) if self.options.netbox: self.setup_single_job() elif self.options.multiprocess: self.setup_multiprocess(self.options.multiprocess, self.options.max_jobs) elif self.options.worker: self.setup_worker() else: self.setup_scheduling() reactor.suggestThreadPoolSize(self.options.threadpoolsize) reactor.addSystemEventTrigger("after", "shutdown", self.shutdown) reactor.run()
def testSuggestThreadPoolSize(self): # XXX Uh, how about some asserts? reactor.suggestThreadPoolSize(34) reactor.suggestThreadPoolSize(4)
def setUp(self): reactor.suggestThreadPoolSize(8)
def tearDown(self): reactor.suggestThreadPoolSize(0)
def do_cleanThreads(cls): from twisted.internet import reactor if interfaces.IReactorThreads.providedBy(reactor): reactor.suggestThreadPoolSize(0) if hasattr(reactor, 'threadpool') and reactor.threadpool: reactor.threadpool.stop() reactor.threadpool = None # *Put it back* and *start it up again*. The # reactor's threadpool is *private*: we cannot just # rape it and walk away. reactor.threadpool = threadpool.ThreadPool(0, 10) reactor.threadpool.start()
def test_suggestThreadPoolSize(self): """ Try to change maximum number of threads. """ reactor.suggestThreadPoolSize(34) self.assertEqual(reactor.threadpool.max, 34) reactor.suggestThreadPoolSize(4) self.assertEqual(reactor.threadpool.max, 4)
def startService(self): from channels import DEFAULT_CHANNEL_LAYER, channel_layers from channels.staticfiles import StaticFilesConsumer from channels.worker import Worker reactor.suggestThreadPoolSize(self.worker_count + 3) channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER] channel_layer.router.check_default(http_consumer=StaticFilesConsumer()) for _ in range(self.worker_count): w = Worker(channel_layer, signal_handlers=False) self._workers.append((w, threads.deferToThread(w.run)))
def main(): preferences = Preferences() task_factory = SimpleTaskFactory(PythonCollectionTask) task_splitter = PerDataSourceInstanceTaskSplitter(task_factory) daemon = CollectorDaemon(preferences, task_splitter) pool_size = preferences.options.threadPoolSize # The Twisted version shipped with Zenoss 4.1 doesn't have this. if hasattr(reactor, 'suggestThreadPoolSize'): reactor.suggestThreadPoolSize(pool_size) daemon.run()
def __init__(self, config, component_builder=None, testing=False): logging.basicConfig(filename=config['log_file'], level=config['log_level']) logging.captureWarnings(True) logger.debug("Configuration: " + config.view()) logger.debug("Creating a new data router") self.config = config self.data_router = self._create_data_router(config, component_builder) self._testing = testing reactor.suggestThreadPoolSize(config['num_threads'] * 5)
def start_search_tasks(): """ Before everything, kill if there is any running search tasks. Then start the search tasks concurrently. """ global SEARCH_TASKS logging.info("(Re)populated config collections from config file. " "Cancelling previous loops and restarting them again with the new config.") for looping_task in SEARCH_TASKS: logging.info("Cancelling this loop: %r", looping_task) looping_task.stop() SEARCH_TASKS = [] searches = CONFIG['Searches'].values() search_count = len(searches) logging.info("Search count: %d", search_count) reactor.suggestThreadPoolSize(search_count) try: for search in searches: search_obj = Search(SERVICE_CLASS_MAPPER.get(search['destination']['service']), search, CONFIG) do_search_concurrently(search_obj) except Exception as exception: logging.exception("Exception occurred while processing search. %s", exception.message)
def main(): parser = argparse.ArgumentParser() parser.add_argument("-m", "--mainnet", action="store_true", default=False, help="Use MainNet instead of the default TestNet") parser.add_argument("-p", "--privnet", action="store_true", default=False, help="Use PrivNet instead of the default TestNet") parser.add_argument("-c", "--config", action="store", help="Use a specific config file") parser.add_argument("-t", "--set-default-theme", dest="theme", choices=["dark", "light"], help="Set the default theme to be loaded from the config file. Default: 'dark'") parser.add_argument('--version', action='version', version='neo-python v{version}'.format(version=__version__)) args = parser.parse_args() if args.config and (args.mainnet or args.privnet): print("Cannot use both --config and --mainnet/--privnet arguments, please use only one.") exit(1) if args.mainnet and args.privnet: print("Cannot use both --mainnet and --privnet arguments") exit(1) # Setup depending on command line arguments. By default, the testnet settings are already loaded. if args.config: settings.setup(args.config) elif args.mainnet: settings.setup_mainnet() elif args.privnet: settings.setup_privnet() if args.theme: preferences.set_theme(args.theme) # Instantiate the blockchain and subscribe to notifications blockchain = LevelDBBlockchain(settings.LEVELDB_PATH) Blockchain.RegisterBlockchain(blockchain) # Start the prompt interface cli = PromptInterface() # Run reactor.suggestThreadPoolSize(15) reactor.callInThread(cli.run) NodeLeader.Instance().Start() reactor.run()