我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Generator()。
def __init__(self, indent_step=4, indent_char=' ', repr_strings=False, simple_cutoff=10, width=120, yield_from_generators=True): self._indent_step = indent_step self._c = indent_char self._repr_strings = repr_strings self._repr_generators = not yield_from_generators self._simple_cutoff = simple_cutoff self._width = width self._type_lookup = [ (dict, self._format_dict), (str, self._format_str), (bytes, self._format_bytes), (tuple, self._format_tuples), ((list, set, frozenset), self._format_list_like), (collections.Generator, self._format_generators), ]
def get_meta(self) -> t.Generator[t.Dict, None, None]: all_fields = self.model._meta.get_fields( include_parents=self.include_parents, include_hidden=self.include_hidden ) for f in all_fields: if f.name in self.exclude: continue if self.fields and f.name not in self.fields: continue if f.name not in self.fields: if f.concrete not in self.concrete_in: continue if f.auto_created not in self.auto_created_in: continue if f.editable not in self.editable_in: continue yield self.get_field_meta(f)
def enumerate(self, item: Any, reverse: bool = False) -> Generator[Tuple[int, Any], None, None]: items = self if reverse: max = len(items) - 1 items = reversed(items) for index, x in enumerate(items): if x == item: yield max - index if reverse else index, x continue try: if item in x: yield max - index if reverse else index, x except TypeError: # x doesn't define __contains__ pass
def enumerate(self, item: Any) -> Generator[Tuple[Any, Any], None, None]: for key, value in self.items(): if key == item: yield key, value continue try: if item in key: yield key, value continue except TypeError: # key doesn't define __contains__ pass if value == item: yield key, value continue try: if item in value: yield key, value continue except TypeError: # value doesn't define __contains__ pass
def get_columns(self, table_name: str = None) \ -> 'typing.Generator[md_column.Column, None, None]': """ Yields a :class:`.md_column.Column` for each column in the specified table, or for each column in the schema if no table is specified. These columns don't point to a :class:`.md_table.Table` since there might not be one, but accessing __name__ and __tablename__ of the column's table will still work as expected. :param table_name: The table to get indexes from, or all tables if omitted """ params = {"table_name": table_name} emitter = self.bind.emit_param sql = self.bind.dialect.get_column_sql(table_name, emitter=emitter) cur = await self.transaction.cursor(sql, params) records = await cur.flatten() await cur.close() return self.bind.dialect.transform_rows_to_columns(*records, table_name=table_name)
def get_indexes(self, table_name: str = None) \ -> 'typing.Generator[md_index.Index, None, None]': """ Yields a :class:`.md_index.Index` for each index in the specified table, or for each index in the schema if no table is specified. These indexes don't point to a :class:`.md_table.Table` since there might not be one, but they have a table_name attribute. :param table_name: The table to get indexes from, or all tables if omitted """ params = {"table_name": table_name} emitter = self.bind.emit_param sql = self.bind.dialect.get_index_sql(table_name, emitter=emitter) cur = await self.transaction.cursor(sql, params) records = await cur.flatten() await cur.close() return self.bind.dialect.transform_rows_to_indexes(*records, table_name=table_name)
def _get_module_commands(module): # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None] """Yield all Command objects represented by the python module. Module commands consist of a docopt-style module docstring and a callable Command class. Args: module: An ast.Module object used to retrieve docopt-style commands. Yields: Command objects that represent entry points to append to setup.py. """ cls = next((n for n in module.body if isinstance(n, ast.ClassDef) and n.name == 'Command'), None) if not cls: return methods = (n.name for n in cls.body if isinstance(n, ast.FunctionDef)) if '__call__' not in methods: return docstring = ast.get_docstring(module) for commands, _ in usage.parse_commands(docstring): yield _EntryPoint(commands[0], next(iter(commands[1:]), None), None)
def _get_class_commands(module): # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None] """Yield all Command objects represented by python classes in the module. Class commands are detected by inspecting all callable classes in the module for docopt-style docstrings. Args: module: An ast.Module object used to retrieve docopt-style commands. Yields: Command objects that represent entry points to append to setup.py. """ nodes = (n for n in module.body if isinstance(n, ast.ClassDef)) for cls in nodes: methods = (n.name for n in cls.body if isinstance(n, ast.FunctionDef)) if '__call__' in methods: docstring = ast.get_docstring(cls) for commands, _ in usage.parse_commands(docstring): yield _EntryPoint(commands[0], next(iter(commands[1:]), None), cls.name)
def _get_function_commands(module): # type: (ast.Module) -> typing.Generator[_EntryPoint, None, None] """Yield all Command objects represented by python functions in the module. Function commands consist of all top-level functions that contain docopt-style docstrings. Args: module: An ast.Module object used to retrieve docopt-style commands. Yields: Command objects that represent entry points to append to setup.py. """ nodes = (n for n in module.body if isinstance(n, ast.FunctionDef)) for func in nodes: docstring = ast.get_docstring(func) for commands, _ in usage.parse_commands(docstring): yield _EntryPoint(commands[0], next(iter(commands[1:]), None), func.name)
def _check_query_words( self, query: ast.Str, parser: Parser, ) -> Generator[Tuple[int, int, str, type], Any, None]: for token in parser: word = token.value if token.is_keyword or token.is_function_name: if not word.isupper() and word.upper() not in self.excepted_names: yield( query.lineno, query.col_offset, "Q440 keyword {} is not uppercase".format(word), type(self), ) if word.upper() in ABBREVIATED_KEYWORDS: yield( query.lineno, query.col_offset, "Q442 avoid abbreviated keywords, {}".format(word), type(self), ) elif token.is_name and (not word.islower() or word.endswith('_')): yield( query.lineno, query.col_offset, "Q441 name {} is not valid, must be snake_case, and cannot " "end with `_`".format(word), type(self), )
def lexicon_iterator(path: str, vocab_source: Dict[str, int], vocab_target: Dict[str, int]) -> Generator[Tuple[int, int, float], None, None]: """ Yields lines from a translation table of format: src, trg, logprob. :param path: Path to lexicon file. :param vocab_source: Source vocabulary. :param vocab_target: Target vocabulary. :return: Generator returning tuples (src_id, trg_id, prob). """ assert C.UNK_SYMBOL in vocab_source assert C.UNK_SYMBOL in vocab_target src_unk_id = vocab_source[C.UNK_SYMBOL] trg_unk_id = vocab_target[C.UNK_SYMBOL] with smart_open(path) as fin: for line in fin: src, trg, logprob = line.rstrip("\n").split("\t") prob = np.exp(float(logprob)) src_id = vocab_source.get(src, src_unk_id) trg_id = vocab_target.get(trg, trg_unk_id) yield src_id, trg_id, prob
def _uid_str(uid_list: str or [str] or Generator) -> str: """ Prepare list of uid for use in commands: delete/copy/move/seen uid_list can be: str, list, tuple, set, fetch generator """ if not uid_list: raise MailBox.MailBoxUidParamError('uid_list should not be empty') if type(uid_list) is str: uid_list = uid_list.split(',') if inspect.isgenerator(uid_list): uid_list = [msg.uid for msg in uid_list if msg.uid] if type(uid_list) not in (list, tuple, set): raise MailBox.MailBoxUidParamError('Wrong uid_list type: {}'.format(type(uid_list))) for uid in uid_list: if type(uid) is not str: raise MailBox.MailBoxUidParamError('uid {} is not string'.format(str(uid))) if not uid.strip().isdigit(): raise MailBox.MailBoxUidParamError('Wrong uid: {}'.format(uid)) return ','.join((i.strip() for i in uid_list))
def get_attachments(self) -> Generator: """ Attachments of the mail message (generator) :return: generator of tuple(filename: str, payload: bytes) """ for part in self.obj.walk(): # multipart/* are just containers if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: continue filename = part.get_filename() if not part.get_filename(): continue # this is what happens when Content-Disposition = inline filename = self._decode_value(*decode_header(filename)[0]) payload = part.get_payload(decode=True) if not payload: continue yield filename, payload
def create_iterator(self, start=0, step=1): # type: (int, int) -> Generator[Address] """ Creates an iterator that can be used to progressively generate new addresses. :param start: Starting index. Warning: This method may take awhile to reset if ``start`` is a large number! :param step: Number of indexes to advance after each address. Warning: The generator may take awhile to advance between iterations if ``step`` is a large number! """ key_iterator = ( KeyGenerator(self.seed) .create_iterator(start, step, self.security_level) ) while True: yield self._generate_address(key_iterator)
def iter_used_addresses(adapter, seed, start): # type: (BaseAdapter, Seed, int) -> Generator[Tuple[Address, List[TransactionHash]]] """ Scans the Tangle for used addresses. This is basically the opposite of invoking ``getNewAddresses`` with ``stop=None``. """ ft_command = FindTransactionsCommand(adapter) for addy in AddressGenerator(seed).create_iterator(start): ft_response = ft_command(addresses=[addy]) if ft_response['hashes']: yield addy, ft_response['hashes'] else: break # Reset the command so that we can call it again. ft_command.reset()
def test_find_deck_spawns(prov): if prov == "holy": provider = Holy(network="peercoin-testnet") if prov == "mintr": provider = Mintr(network="peercoin") if prov == "cryptoid": provider = Cryptoid(network="peercoin") try: if prov == "rpc": provider = RpcNode(testnet=True) except: print("No RpcNode avaliable.") assert isinstance(find_deck_spawns(provider), Generator)
def _generate_conditions(self, filters: Generator[Tuple[str, RawSQLFilter], None, None]) -> Generator[str, None, None]: """ Returns generator, yields raw-sql conditions strings E.g. 'field_name >= %s` :param filters: Generator with filter's name and `RawSQLFilter` instance """ for name, filter_ in filters: conds_and_values = self._request_filters.get(name) if conds_and_values: for condition, value in conds_and_values: try: sql = filter_.filter(name, condition, value) except ValidationError as e: raise ValidationError('Exception raised for {}: {}'.format(name, e)) yield sql elif filter_.default is not None: self.params = filter_.default yield "{} = %s".format(name)
def modpack_file(path: Path) -> Generator[ModPack, None, None]: """Context manager for manipulation of existing mod-pack. Keyword arguments: path: Path to the existing ModPack file, which should be provided. Yields: ModPack loaded from path. If no exception occurs, the provided modpack is written (with changes) back to the file on context exit. """ with path.open(encoding='utf-8', mode='r') as istream: mp = ModPack.load(istream) yield mp with path.open(encoding='utf-8', mode='w') as ostream: mp.dump(ostream)
def filter_obsoletes( self: 'ModPack', files: Iterable[File] ) -> Generator[File, None, None]: """Filter obsolete files. Obsolete files are defined as being already installed, or being an older version of already installed files. Keyword arguments: files: Iterable of mod :class:`File`s to filter. Yields: Original files without the obsoletes. """ for file in files: current = self.installed.get(file.mod.id, None) if current is None or current.date < file.date: yield file else: continue
def orphans(self: 'ModPack', mods: Mapping[int, Mod]=None) -> Generator[File, None, None]: """Finds all no longer needed dependencies. Keyword arguments: mods: Optional mapping of installed mods [default: self.mods]. The purpose of this parameter is to be able to override really installed mods without changing the property directly. Yields: Orphaned files. """ if mods is None: mods = self.mods needed = {} for file in mods.values(): needed.update(resolve(file, pool=self.installed)) # Filter unneeded dependencies yield from ( file for m_id, file in self.dependencies.items() if m_id not in needed )
def populate(comments: Sequence['Comment'], authors: Sequence['People'], count=100) -> Generator['Article', None, None]: import mimesis aid = mimesis.Numbers() article = mimesis.Text() answers = list(comments) def get_random_answers(max): counter = 0 while answers and counter < max: yield answers.pop(random.randint(0, len(answers) - 1)) counter += 1 return ( Article( id=aid.between(1, count), title=article.title(), author=random.choice(authors), comments=[c for c in get_random_answers(random.randint(1, 10))] ) for _ in range(count) )
def peek(nbytes=0) -> typing.Generator[_Action, Buffer, bytes]: """Read output without consuming it. Read but **does not** consume data from the protocol input. This is a *non-blocking* primitive, if less data than requested is available, less data is returned. It is meant to be used in combination with :func:`~ohneio.wait`, but edge cases and smart people can (and most likely will) prove me wrong. Args: nbytes (:obj:`int`, optional): amount of bytes to read *at most*. ``0`` meaning all bytes. Returns: bytes: data read from the buffer """ input_ = yield _get_input return input_.peek(nbytes)
def exec_iter( command: typing.List[str], logger: typing.Optional[iocage.lib.Logger.Logger]=None ) -> typing.Generator[str, None, None]: process = exec_raw( command, logger=logger, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True ) for stdout_line in iter(process.stdout.readline, ""): yield stdout_line process.stdout.close() return_code = process.wait() if return_code: raise subprocess.CalledProcessError(return_code, command)
def __iter__( self ) -> typing.Generator[Resource, None, None]: for child_dataset in self.dataset.children: name = self._get_asset_name_from_dataset(child_dataset) if self._filters is not None and \ self._filters.match_key("name", name) is not True: # Skip all jails that do not even match the name continue # ToDo: Do not load jail if filters do not require to resource = self._get_resource_from_dataset(child_dataset) if self._filters is not None: if self._filters.match_resource(resource): yield resource
def _print_list( resources: typing.Generator[ iocage.lib.Jails.JailsGenerator, None, None ], columns: list, show_header: bool, separator: str=";" ) -> None: if show_header is True: print(separator.join(columns).upper()) for resource in resources: print(separator.join(_lookup_resource_values(resource, columns)))
def _print_json( resources: typing.Generator[ iocage.lib.Jails.JailsGenerator, None, None ], columns: list, **json_dumps_args ): if "indent" not in json_dumps_args.keys(): json_dumps_args["indent"] = 2 if "sort_keys" not in json_dumps_args.keys(): json_dumps_args["sort_keys"] = True output = [] for resource in resources: output.append(dict(zip( columns, _lookup_resource_values(resource, columns) ))) print(json.dumps(output, **json_dumps_args))
def _read_in_chunks(file_object: IO[bytes], chunk_size: int = 2*MB) -> Generator[bytes, None, None]: """Read a file in fixed-size chunks (to minimize memory usage for large files). Args: file_object: An opened file-like object supporting read(). chunk_size: Max size (in bytes) of each file chunk. Yields: File chunks, each of size at most chunk_size. """ while True: chunk = file_object.read(chunk_size) if chunk: yield chunk else: return # End of file.
def get_output(self, limit: int = None) -> Generator: if limit is None: per_replica = None else: per_replica = round(limit / self.num_replicas) if per_replica == 0: logger.debug("{} forcibly setting replica " "message limit to {}" .format(self._node.name, per_replica)) per_replica = 1 for replica in self._replicas: num = 0 while replica.outBox: yield replica.outBox.popleft() num += 1 if per_replica and num >= per_replica: break
def untyped_do(f: Callable[..., Generator[G, B, None]]) -> Callable[..., G]: @functools.wraps(f) def do_loop(*a: Any, **kw: Any) -> F[B]: itr = f(*a, **kw) if not isinstance(itr, GeneratorType): raise Exception(f'function `{f.__qualname__}` decorated with `do` does not produce a generator') init = itr.send(None) m = Monad.fatal_for(init) @functools.wraps(f) def loop(val: B) -> F[B]: try: return m.flat_map(itr.send(val), loop) except StopIteration: return m.pure(val) return m.flat_map(init, loop) return do_loop
def _drain_find(self, abort: Callable[[A], bool]) -> Maybe[A]: culprit = Empty() def gen() -> Generator: nonlocal culprit while True: try: el = next(self.source) yield el if abort(el): culprit = Just(el) break except StopIteration: break drained = List.wrap(list(gen())) self.strict = self.strict + drained return culprit
def make_find_check_resolve_submit(finder: Finder, notSubmittedCheck: Checker, resolver: Resolver, submitter: Submitter) -> Processor: def inner(text: str) -> Generator: for found in finder(text): print(found) if notSubmittedCheck(found): resolved = resolver(found) yield submitter(found, resolved) return inner
def _process_args(self, func_ast, code_lines, args, kwargs) -> Generator[DebugArgument, None, None]: # noqa: C901 arg_offsets = list(self._get_offsets(func_ast)) for arg, ast_node, i in zip(args, func_ast.args, range(1000)): if isinstance(ast_node, ast.Name): yield self.output_class.arg_class(arg, name=ast_node.id) elif isinstance(ast_node, self.complex_nodes): # TODO replace this hack with astor when it get's round to a new release start_line, start_col = arg_offsets[i] if i + 1 < len(arg_offsets): end_line, end_col = arg_offsets[i + 1] else: end_line, end_col = len(code_lines) - 1, None name_lines = [] for l_ in range(start_line, end_line + 1): start_ = start_col if l_ == start_line else 0 end_ = end_col if l_ == end_line else None name_lines.append( code_lines[l_][start_:end_].strip(' ') ) yield self.output_class.arg_class(arg, name=' '.join(name_lines).strip(' ,')) else: yield self.output_class.arg_class(arg) kw_arg_names = {} for kw in func_ast.keywords: if isinstance(kw.value, ast.Name): kw_arg_names[kw.arg] = kw.value.id for name, value in kwargs.items(): yield self.output_class.arg_class(value, name=name, variable=kw_arg_names.get(name))
def _format(self, value: Any, indent_current: int, indent_first: bool): if indent_first: self._stream.write(indent_current * self._c) value_repr = repr(value) if len(value_repr) <= self._simple_cutoff and not isinstance(value, collections.Generator): self._stream.write(value_repr) else: indent_new = indent_current + self._indent_step for t, func in self._type_lookup: if isinstance(value, t): func(value, value_repr, indent_current, indent_new) return self._format_raw(value, value_repr, indent_current, indent_new)
def _format_generators(self, value: Generator, value_repr: str, indent_current: int, indent_new: int): if self._repr_generators: self._stream.write(value_repr) else: self._stream.write('(\n') for v in value: self._format(v, indent_new, True) self._stream.write(',\n') self._stream.write(indent_current * self._c + ')')
def get_meta(self) -> t.Generator[t.Dict, None, None]: """ :return: generator list(metadata_obj.get_meta()) -> [abstract_field_obj1, abstract_field_obj2, ...] """ fields_by_name = OrderedDict() for field_data in self.fields: fields_by_name[field_data['name']] = field_data for k, v_callable in self.__class__.__dict__.items(): # method `get_field_<NAME>` used for updates later if k.startswith('get_field_'): # get_field_ ????????? ???????????? ???? continue if not k.startswith('get_'): continue # check dynamic get_%s fields # method get_%s must return {'name': '<NAME>'}, where <name> is a real field name res = v_callable(self, self.request) fields_by_name[res['name']] = res fields_order = self.order or fields_by_name.keys() for field_name in fields_order: field_value = fields_by_name[field_name] # method should update field with returned dict method = getattr(self, 'get_field_%s' % field_name, None) if callable(method): field_value.update(method(field_name, self.request)) yield field_value # noinspection PyMethodMayBeStatic,PyUnusedLocal
def restapiplugin_target() -> Generator: """Simulate the endpoints triggered by RESTAPIPlugin.""" fauxmo_device = Process(target=httpbin.core.app.run, kwargs={"host": "127.0.0.1", "port": 8000}, daemon=True) fauxmo_device.start() time.sleep(1) yield fauxmo_device.terminate() fauxmo_device.join()
def srand(seed=0): # type: (KeyType) -> typing.Generator[int, None, None] if isinstance(seed, six.string_types) or isinstance(seed, bytes): if isinstance(seed, six.text_type): seed = seed.encode('utf-8') seed_int = int(hashlib.sha512(seed).hexdigest(), 16) seed = typing.cast(int, seed_int) rng = random.Random(seed) while True: yield rng.randint(0, sys.maxsize)
def _get_many_generator(self, result: Iterable[S], context: PipelineContext = None) -> Generator[T, None, None]: for item in result: LOGGER.info("Sending item \"{item}\" to sinks before converting".format(item=item)) for sink in self._before_transform: sink.put(item, context) LOGGER.info("Converting item \"{item}\" to request type".format(item=item)) item = self._transform(data=item, context=context) LOGGER.info("Sending item \"{item}\" to sinks after converting".format(item=item)) for sink in self._after_transform: sink.put(item, context) yield item
def get_many_int(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[int, None, None]: value = query.get(VALUE_KEY) count = query.get(COUNT_KEY) try: value = int(value) except ValueError: raise NotFoundError("Couldn't cast the query value to \"int\"") return (value for _ in range(count))
def get_many_float(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[float, None, None]: value = query.get(VALUE_KEY) count = query.get(COUNT_KEY) try: value = float(value) except ValueError: raise NotFoundError("Couldn't cast the query value to \"float\"") if value not in self.items: raise NotFoundError("Query value wasn't in store!") return (value for _ in range(count))
def get_many(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Generator[T, None, None]: value = query.get(VALUE_KEY) count = query.get(COUNT_KEY) try: # noinspection PyCallingNonCallable value = type(value) except ValueError: raise NotFoundError("Couldn't cast the query value to \"{type}\"".format(type=type)) return (value for _ in range(count))
def get_many_str(self, query: Mapping[str, Any], context: PipelineContext = None) -> Generator[str, None, None]: value = query.get(VALUE_KEY) count = query.get(COUNT_KEY) try: value = str(value) except ValueError: raise NotFoundError("Couldn't cast the query value to \"str\"") return (value for _ in range(count)) ######################## # Unsupported Function # ########################
def __call__(self, dataset: Dataset, num_epochs: int = None, shuffle: bool = True, cuda_device: int = -1, for_training: bool = True) -> Generator[Dict[str, Union[numpy.ndarray, Dict[str, numpy.ndarray]]], None, None]: """ Returns a generator that yields batches over the given dataset, forever. Parameters ---------- dataset : ``Dataset`` num_epochs : ``int``, optional (default=``None``) How times should we iterate over this dataset? If ``None``, we will iterate over it forever. shuffle : ``bool``, optional (default=``True``) If ``True``, we will shuffle the instances in ``dataset`` before constructing batches and iterating over the data. cuda_device : ``int`` If cuda_device >= 0, GPUs are available and Pytorch was compiled with CUDA support, the tensor will be copied to the cuda_device specified. for_training : ``bool``, optional (default=``True``) If ``False``, we will pass the ``volatile=True`` flag when constructing variables, which disables gradient computations in the graph. This makes inference more efficient (particularly in memory usage), but is incompatible with training models. """ if num_epochs is None: while True: yield from self._yield_one_epoch(dataset, shuffle, cuda_device, for_training) else: for _ in range(num_epochs): yield from self._yield_one_epoch(dataset, shuffle, cuda_device, for_training)
def get_clients_groups(self) -> typing.Generator['FlyingUnit', None, None]: for group in self.groups: assert isinstance(group, Group) if group.group_is_client_group: yield group
def farps(self) -> typing.Generator['Static', None, None]: for coa in [self.blue_coa, self.red_coa]: for farp in coa.farps: yield farp # noinspection PyProtectedMember
def countries(self) -> typing.Generator['Country', None, None]: for k in self._section_country: if k not in self._countries.keys(): country = Country(self.d, self.l10n, self.coa_color, k) self._countries[k] = country self._countries_by_id[country.country_id] = country self._countries_by_name[country.country_name] = country yield self._countries[k]
def groups(self) -> typing.Generator['Group', None, None]: for country in self.countries: assert isinstance(country, Country) for group in country.groups: assert isinstance(group, Group) yield group
def statics(self) -> typing.Generator['Static', None, None]: for country in self.countries: assert isinstance(country, Country) for static in country.statics: assert isinstance(static, Static) yield static
def get_groups_from_category(self, category) -> typing.Generator['Group', None, None]: Mission.validator_group_category.validate(category, 'get_groups_from_category') for group in self.groups: assert isinstance(group, Group) if group.group_category == category: yield group