我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用collections.abc.Generator()。
def patch(patch_inspect=True): """ Main entry point for patching the ``collections.abc`` and ``inspect`` standard library modules. """ PATCHED['collections.abc.Generator'] = _collections_abc.Generator = Generator PATCHED['collections.abc.Coroutine'] = _collections_abc.Coroutine = Coroutine PATCHED['collections.abc.Awaitable'] = _collections_abc.Awaitable = Awaitable if patch_inspect: import inspect PATCHED['inspect.isawaitable'] = inspect.isawaitable = isawaitable
def __new__(cls, *args, **kwds): if _geqv(cls, Dict): raise TypeError("Type Dict cannot be instantiated; " "use dict() instead") return dict.__new__(cls, *args, **kwds) # Determine what base class to use for Generator.
def __new__(cls, *args, **kwds): if _geqv(cls, Generator): raise TypeError("Type Generator cannot be instantiated; " "create a subclass instead") return super().__new__(cls, *args, **kwds)
def coroutine(func): """Convert regular generator function to a coroutine.""" if not callable(func): raise TypeError('types.coroutine() expects a callable') if (func.__class__ is FunctionType and getattr(func, '__code__', None).__class__ is CodeType): co_flags = func.__code__.co_flags # Check if 'func' is a coroutine function. # (0x180 == CO_COROUTINE | CO_ITERABLE_COROUTINE) if co_flags & 0x180: return func # Check if 'func' is a generator function. # (0x20 == CO_GENERATOR) if co_flags & 0x20: # TODO: Implement this in C. co = func.__code__ func.__code__ = CodeType( co.co_argcount, co.co_kwonlyargcount, co.co_nlocals, co.co_stacksize, co.co_flags | 0x100, # 0x100 == CO_ITERABLE_COROUTINE co.co_code, co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, co.co_lnotab, co.co_freevars, co.co_cellvars) return func # The following code is primarily to support functions that # return generator-like objects (for instance generators # compiled with Cython). @_functools.wraps(func) def wrapped(*args, **kwargs): coro = func(*args, **kwargs) if (coro.__class__ is CoroutineType or coro.__class__ is GeneratorType and coro.gi_code.co_flags & 0x100): # 'coro' is a native coroutine object or an iterable coroutine return coro if (isinstance(coro, _collections_abc.Generator) and not isinstance(coro, _collections_abc.Coroutine)): # 'coro' is either a pure Python generator iterator, or it # implements collections.abc.Generator (and does not implement # collections.abc.Coroutine). return _GeneratorWrapper(coro) # 'coro' is either an instance of collections.abc.Coroutine or # some other object -- pass it through. return coro return wrapped
def run_in_tx(self, batch, chunk_count=None, dry_run=False): if not chunk_count: chunk_count = sys.maxsize if isinstance(batch, Generator): it = batch elif isinstance(batch, Iterable): def gen(): for j in batch: yield j it = gen() else: err = "batch_job must be iterable or callable but {0} passed" err = err.format(type(batch)) logger.error(err) raise ValueError(err) if dry_run: return list(it) session = self.neo4j_driver.session() try: result_set = [] consumed_result = None more_chunks = True while more_chunks: logger.debug('neo4j transaction beginning') with session.begin_transaction() as tx: chunk_i = 0 try: while chunk_i < chunk_count: # noinspection PyNoneFunctionAssignment query, params = it.send(consumed_result) logger.debug('chunk %s will run query %s' 'in transaction', chunk_i, query) result = tx.run(query, params) consumed_result = list(result) result_set.append(consumed_result) chunk_i += 1 except StopIteration: more_chunks = False tx.success = True logger.debug('neo4j transaction committed') return result_set finally: session.close()
def mk_gen(): from abc import abstractmethod required_methods = ( '__iter__', '__next__' if hasattr(iter(()), '__next__') else 'next', 'send', 'throw', 'close') class Generator(_collections_abc.Iterator): __slots__ = () if '__next__' in required_methods: def __next__(self): return self.send(None) else: def next(self): return self.send(None) @abstractmethod def send(self, value): raise StopIteration @abstractmethod def throw(self, typ, val=None, tb=None): if val is None: if tb is None: raise typ val = typ() if tb is not None: val = val.with_traceback(tb) raise val def close(self): try: self.throw(GeneratorExit) except (GeneratorExit, StopIteration): pass else: raise RuntimeError('generator ignored GeneratorExit') @classmethod def __subclasshook__(cls, C): if cls is Generator: mro = C.__mro__ for method in required_methods: for base in mro: if method in base.__dict__: break else: return NotImplemented return True return NotImplemented generator = type((lambda: (yield))()) Generator.register(generator) return Generator