我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.internet.defer.inlineCallbacks()。
def run(): configure_logging() # importing project settings for further usage # mainly because of the middlewares settings = get_project_settings() runner = CrawlerRunner(settings) # running spiders sequentially (non-distributed) @defer.inlineCallbacks def crawl(): yield runner.crawl(IPTesterSpider) yield runner.crawl(UATesterSpider) reactor.stop() crawl() reactor.run() # block until the last call
def moduleWampRegister(session, meth_list): """This function register for each module methods the relative RPC. :param session: :param meth_list: """ if len(meth_list) == 2: LOG.info(" - No procedures to register!") else: for meth in meth_list: # We don't considere the __init__ and finalize methods if (meth[0] != "__init__") & (meth[0] != "finalize"): rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0] session.register(inlineCallbacks(meth[1]), rpc_addr) LOG.info(" --> " + str(meth[0])) # LOG.info(" --> " + str(rpc_addr))
def test_returnWithValue(self): """ If the C{return} statement has a value it is propagated back to the L{Deferred} that the C{inlineCallbacks} function returned. """ environ = {"inlineCallbacks": inlineCallbacks} exec(""" @inlineCallbacks def f(d): yield d return 14 """, environ) d1 = Deferred() d2 = environ["f"](d1) d1.callback(None) self.assertEqual(self.successResultOf(d2), 14)
def assertMistakenMethodWarning(self, resultList): """ Flush the current warnings and assert that we have been told that C{mistakenMethod} was invoked, and that the result from the Deferred that was fired (appended to the given list) is C{mistakenMethod}'s result. The warning should indicate that an inlineCallbacks function called 'inline' was made to exit. """ self.assertEqual(resultList, [1]) warnings = self.flushWarnings(offendingFunctions=[self.mistakenMethod]) self.assertEqual(len(warnings), 1) self.assertEqual(warnings[0]['category'], DeprecationWarning) self.assertEqual( warnings[0]['message'], "returnValue() in 'mistakenMethod' causing 'inline' to exit: " "returnValue should only be invoked by functions decorated with " "inlineCallbacks")
def test_returnValueNonLocalDeferred(self): """ L{returnValue} will emit a non-local warning in the case where the L{inlineCallbacks}-decorated function has already yielded a Deferred and therefore moved its generator function along. """ cause = Deferred() @inlineCallbacks def inline(): yield cause self.mistakenMethod() returnValue(2) effect = inline() results = [] effect.addCallback(results.append) self.assertEqual(results, []) cause.callback(1) self.assertMistakenMethodWarning(results)
def test_deferredGeneratorDeprecated(self): """ L{deferredGenerator} is deprecated. """ @deferredGenerator def decoratedFunction(): yield None warnings = self.flushWarnings([self.test_deferredGeneratorDeprecated]) self.assertEqual(len(warnings), 1) self.assertEqual(warnings[0]['category'], DeprecationWarning) self.assertEqual( warnings[0]['message'], "twisted.internet.defer.deferredGenerator was deprecated in " "Twisted 15.0.0; please use " "twisted.internet.defer.inlineCallbacks instead")
def test_inlineCallbacksTracebacks(self): """ L{defer.inlineCallbacks} that re-raise tracebacks into their deferred should not lose their tracebacks. """ f = getDivisionFailure() d = defer.Deferred() try: f.raiseException() except: d.errback() def ic(d): yield d ic = defer.inlineCallbacks(ic) newFailure = self.failureResultOf(d) tb = traceback.extract_tb(newFailure.getTracebackObject()) self.assertEqual(len(tb), 2) self.assertIn('test_defer', tb[0][0]) self.assertEqual('test_inlineCallbacksTracebacks', tb[0][2]) self.assertEqual('f.raiseException()', tb[0][3]) self.assertIn('test_defer', tb[1][0]) self.assertEqual('getDivisionFailure', tb[1][2]) self.assertEqual('1/0', tb[1][3])
def runTestWithDirector(director, global_options, url=None, start_tor=True, create_input_store=True): deck = createDeck(global_options, url=url) d = director.start(create_input_store=create_input_store, start_tor=start_tor) @defer.inlineCallbacks def post_director_start(_): try: yield deck.setup() yield deck.run(director, from_schedule=False) except errors.UnableToLoadDeckInput as error: raise defer.failure.Failure(error) except errors.NoReachableTestHelpers as error: raise defer.failure.Failure(error) except errors.NoReachableCollectors as error: raise defer.failure.Failure(error) except SystemExit as error: raise error d.addCallback(post_director_start) d.addErrback(director_startup_handled_failures) d.addErrback(director_startup_other_failures) return d
def tx_main(self, args, config): global _exit_status do_summary = len(config.conn_infos) > 1 if do_summary: initial_wmiprvse_stats, good_conn_infos = \ yield get_initial_wmiprvse_stats(config) else: initial_wmiprvse_stats = None good_conn_infos = [config.conn_infos[0]] if not good_conn_infos: _exit_status = 1 stop_reactor() return @defer.inlineCallbacks def callback(results): if do_summary: yield self._print_summary( results, config, initial_wmiprvse_stats, good_conn_infos) d = self._strategy.act(good_conn_infos, args, config) d.addCallback(callback) d.addBoth(stop_reactor)
def test_success(self): @inlineCallbacks def fn(): if False: # inlineCallbacks doesn't work with regular functions; # must have a yield even if it's unreachable. yield returnValue(42) f = gen.convert_yielded(fn()) self.assertEqual(f.result(), 42)
def test_failure(self): @inlineCallbacks def fn(): if False: yield 1 / 0 f = gen.convert_yielded(fn()) with self.assertRaises(ZeroDivisionError): f.result()
def test_get_block(self): factory = p2p.ClientFactory(networks.nets['bitcoin']) c = reactor.connectTCP('127.0.0.1', 8333, factory) try: h = 0x000000000000046acff93b0e76cd10490551bf871ce9ac9fad62e67a07ff1d1e block = yield deferral.retry()(defer.inlineCallbacks(lambda: defer.returnValue((yield (yield factory.getProtocol()).get_block(h)))))() assert data.merkle_hash(map(data.hash256, map(data.tx_type.pack, block['txs']))) == block['header']['merkle_root'] assert data.hash256(data.block_header_type.pack(block['header'])) == h finally: factory.stopTrying() c.disconnect()
def retry(message='Error:', delay=3, max_retries=None, traceback=True): ''' @retry('Error getting block:', 1) @defer.inlineCallbacks def get_block(hash): ... ''' def retry2(func): @defer.inlineCallbacks def f(*args, **kwargs): for i in itertools.count(): try: result = yield func(*args, **kwargs) except Exception, e: if i == max_retries: raise if not isinstance(e, RetrySilentlyException): if traceback: log.err(None, message) else: print >>sys.stderr, message, e yield sleep(delay) else: defer.returnValue(result) return f return retry2
def deviceWampRegister(dev_meth_list, board): LOG.info(" - " + str(board.type).capitalize() + " device registering RPCs:") for meth in dev_meth_list: if (meth[0] != "__init__") & (meth[0] != "finalize"): # LOG.info(" - " + str(meth[0])) rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0] # LOG.debug(" --> " + str(rpc_addr)) SESSION.register(inlineCallbacks(meth[1]), rpc_addr) LOG.info(" --> " + str(meth[0]) + " registered!")
def inject_service_account_server(self, *responses): client = MockLDAPClient(*responses) @defer.inlineCallbacks def _factory_connect_service_account(): client.connectionMade() # TODO: Necessary here? yield client.bind(self.factory.service_account_dn, self.factory.service_account_password) defer.returnValue(client) self.factory.connect_service_account = _factory_connect_service_account return client
def run(cls): runner = CrawlerRunner(get_project_settings()) @defer.inlineCallbacks def deferred_crawl(): for spider, args, kwargs in cls.queue: try: yield runner.crawl(spider, *args, **kwargs) except KeyError as err: # Log a warning if the scraper name is invalid instead of # causing the job to fail. # NOTE: If there is any other type of error, the job will # fail, and all the jobs that depend on it will fail as # well. logger.warning(err.args[0]) # XXX: If all the names fail, then trying to run # `reactor.stop()` will give an "Unhandled error in # Deferred" complaint and hang. It will also hang in # general if no spiders have been run. I assume there's # some twisted-way to handle this, but for now, just log an # error. if reactor.running: reactor.stop() else: logger.critical("LocalQueue: No valid scraper names found.") deferred_crawl() reactor.run()
def decorator(fn): if async: if inspect.isgeneratorfunction(fn): wrapper = run_in_broker(defer.inlineCallbacks(fn)) else: wrapper = run_in_broker(fn) else: if inspect.isgeneratorfunction(fn): raise StandardError("Do not use the 'yield' keyword in a parlay command without 'parlay_command(async=True)' ") wrapper = run_in_thread(fn) wrapper._parlay_command = True wrapper._parlay_fn = fn # in case it gets wrapped again, this is the actual function so we can pull kwarg names wrapper._parlay_arg_conversions = {} # if type casting desired, this dict from param_types to converting funcs wrapper._parlay_arg_discovery = {} if auto_type_cast and fn.__doc__ is not None: for line in fn.__doc__.split("\n"): m = re.search(r"[@:]type\s+(\w+)\s*[ :]\s*(\w+\[?\w*\]?)", line) if m is not None: arg_name, arg_type = m.groups() if arg_type in INPUT_TYPE_CONVERTER_LOOKUP: # if we know how to convert it wrapper._parlay_arg_conversions[arg_name] = INPUT_TYPE_CONVERTER_LOOKUP[arg_type] # add to convert list wrapper._parlay_arg_discovery[arg_name] = INPUT_TYPE_DISCOVERY_LOOKUP.get(arg_type, INPUT_TYPES.STRING) return wrapper
def test_get_block(self): factory = p2p.ClientFactory(networks.nets['bitcoin']) c = reactor.connectTCP('127.0.0.1', 8333, factory) try: h = 0x000000000000046acff93b0e76cd10490551bf871ce9ac9fad62e67a07ff1d1e block = yield deferral.retry()(defer.inlineCallbacks(lambda: defer.returnValue((yield (yield factory.getProtocol()).get_block(h)))))() assert data.merkle_hash(map(data.get_txid, block['txs'])) == block['header']['merkle_root'] assert data.hash256(data.block_header_type.pack(block['header'])) == h finally: factory.stopTrying() c.disconnect()
def connect_factory(host, port, factory, blob_storage, hash_to_process): from twisted.internet import reactor @defer.inlineCallbacks def on_finish(result): log.info("Finished sending %s", hash_to_process) yield update_sent_blobs(factory.p.blob_hashes_sent, host, blob_storage) connection.disconnect() reactor.fireSystemEvent("shutdown") @defer.inlineCallbacks def on_error(error): log.error("Error when sending %s: %s. Hashes sent %s", hash_to_process, error, factory.p.blob_hashes_sent) yield update_sent_blobs(factory.p.blob_hashes_sent, host, blob_storage) connection.disconnect() reactor.fireSystemEvent("shutdown") def on_connection_fail(result): log.error("Failed to connect to %s:%s", host, port) reactor.fireSystemEvent("shutdown") def _error(failure): log.error("Failed on_connection_lost_d callback: %s", failure) reactor.fireSystemEvent("shutdown") factory.on_connection_lost_d.addCallbacks(on_finish, on_error) factory.on_connection_lost_d.addErrback(_error) factory.on_connection_fail_d.addCallback(on_connection_fail) try: log.debug("Connecting factory to %s:%s", host, port) connection = reactor.connectTCP(host, port, factory, timeout=TCP_CONNECT_TIMEOUT) except JobTimeoutException: log.error("Failed to forward %s --> %s", hash_to_process[:8], host) return sys.exit(0) except Exception as err: log.exception("Job (pid %s) encountered unexpected error") return sys.exit(1)
def twisted_wrapper(arg): """ Wrap a twisted test. Optionally supply a test timeout. Note that arg might either be a func or the timeout. """ if isinstance(arg, (int, long)): return lambda x: deferred(arg)(inlineCallbacks(x)) return deferred(timeout=1)(inlineCallbacks(arg))
def subscribe(self, uri=None): """ Decorator attaching a function as an event handler. The first argument of the decorator should be the URI of the topic to subscribe to. If no URI is given, the URI is constructed from the application URI prefix and the Python function name. If the function yield, it will be assumed that it's an asynchronous process and inlineCallbacks will be applied to it. :Example: .. code-block:: python @app.subscribe('com.myapp.topic1') def onevent1(x, y): print("got event on topic1", x, y) :param uri: The URI of the topic to subscribe to. :type uri: unicode """ def decorator(func): if uri: _uri = uri else: assert(self._prefix is not None) _uri = "{0}.{1}".format(self._prefix, func.__name__) if inspect.isgeneratorfunction(func): func = inlineCallbacks(func) self._handlers.append((_uri, func)) return func return decorator
def signal(self, name): """ Decorator attaching a function as handler for application signals. Signals are local events triggered internally and exposed to the developer to be able to react to the application lifecycle. If the function yield, it will be assumed that it's an asynchronous coroutine and inlineCallbacks will be applied to it. Current signals : - `onjoined`: Triggered after the application session has joined the realm on the router and registered/subscribed all procedures and event handlers that were setup via decorators. - `onleave`: Triggered when the application session leaves the realm. .. code-block:: python @app.signal('onjoined') def _(): # do after the app has join a realm :param name: The name of the signal to watch. :type name: unicode """ def decorator(func): if inspect.isgeneratorfunction(func): func = inlineCallbacks(func) self._signals.setdefault(name, []).append(func) return func return decorator
def __init__(self): self.qrt = qtm.QRT("127.0.0.1", 22223) self.qrt.connect(on_connect=self.on_connect, on_disconnect=self.on_disconnect, on_event=self.on_event) self.init = False self.connection = None # Inline callbacks is a feature of the twisted framework that makes it possible to write # asynchronous code that looks synchronous # http://twistedmatrix.com/documents/current/api/twisted.internet.defer.inlineCallbacks.html
def mistakenMethod(self): """ This method mistakenly invokes L{returnValue}, despite the fact that it is not decorated with L{inlineCallbacks}. """ returnValue(1)
def test_returnValueNonLocalWarning(self): """ L{returnValue} will emit a non-local exit warning in the simplest case, where the offending function is invoked immediately. """ @inlineCallbacks def inline(): self.mistakenMethod() returnValue(2) yield 0 d = inline() results = [] d.addCallback(results.append) self.assertMistakenMethodWarning(results)
def testReturnNoValue(self): """Ensure a standard python return results in a None result.""" def _noReturn(): yield 5 return _noReturn = inlineCallbacks(_noReturn) return _noReturn().addCallback(self.assertEqual, None)
def testReturnValue(self): """Ensure that returnValue works.""" def _return(): yield 5 returnValue(6) _return = inlineCallbacks(_return) return _return().addCallback(self.assertEqual, 6)
def test_nonGeneratorReturn(self): """ Ensure that C{TypeError} with a message about L{inlineCallbacks} is raised when a non-generator returns something other than a generator. """ def _noYield(): return 5 _noYield = inlineCallbacks(_noYield) self.assertIn("inlineCallbacks", str(self.assertRaises(TypeError, _noYield)))
def test_nonGeneratorReturnValue(self): """ Ensure that C{TypeError} with a message about L{inlineCallbacks} is raised when a non-generator calls L{returnValue}. """ def _noYield(): returnValue(5) _noYield = inlineCallbacks(_noYield) self.assertIn("inlineCallbacks", str(self.assertRaises(TypeError, _noYield)))
def test_passInlineCallbacks(self): """ The body of a L{defer.inlineCallbacks} decorated test gets run. """ result = self.runTest('test_passInlineCallbacks') self.assertTrue(result.wasSuccessful()) self.assertEqual(result.testsRun, 1) self.assertTrue(detests.DeferredTests.touched)