Python typing 模块,Iterator() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Iterator()

项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def table_iter(pairs,            # type: List[BytesPair]
               uniqueidx,        # type: int
               enc='utf-8',      # type: str
               fromdesc='',      # type: str
               todesc='',        # type: str
               numlines=2,       # type: int
               wrapcolumn=0      # type: int
               ):
    # type: (...) -> Iterator[Tuple[str, str, str]]
    htmldiffer = HtmlMultiDiff(tabsize=8, wrapcolumn=wrapcolumn)
    htmldiffer.uniqueidx = uniqueidx
    table = htmldiffer.table_from_pairs(pairs,
                                        enc,
                                        fromdesc=fromdesc,
                                        todesc=todesc,
                                        context=True,
                                        numlines=numlines)
    for tablestart, tbody, tableend in iter_tbodies(table):
        yield tablestart, tbody, tableend
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def sentence_iterator(self, file_path: str) -> Iterator[OntonotesSentence]:
        """
        An iterator over the sentences in an individual CONLL formatted file.
        """
        with codecs.open(file_path, 'r', encoding='utf8') as open_file:
            conll_rows = []
            for line in open_file:
                line = line.strip()
                if line != '' and not line.startswith('#'):
                    conll_rows.append(line)
                else:
                    if not conll_rows:
                        continue
                    else:
                        yield self._conll_rows_to_sentence(conll_rows)
                        conll_rows = []
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def read_android(self) -> Iterator[DeviceConfig]:
        """Read Android-specific database file."""
        _LOGGER.info("Reading tokens from Android DB")
        c = self.conn.execute("SELECT * FROM devicerecord WHERE token IS NOT '';")
        for dev in c.fetchall():
            if self.dump_raw:
                BackupDatabaseReader.dump_raw(dev)
            ip = dev['localIP']
            mac = dev['mac']
            model = dev['model']
            name = dev['name']
            token = dev['token']

            config = DeviceConfig(name=name, ip=ip, mac=mac,
                                  model=model, token=token)
            yield config
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def read_tokens(self, db) -> Iterator[DeviceConfig]:
        """Read device information out from a given database file.

        :param str db: Database file"""
        self.db = db
        _LOGGER.info("Reading database from %s" % db)
        self.conn = sqlite3.connect(db)

        self.conn.row_factory = sqlite3.Row
        with self.conn:
            is_android = self.conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None
            is_apple = self.conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None
            if is_android:
                yield from self.read_android()
            elif is_apple:
                yield from self.read_apple()
            else:
                _LOGGER.error("Error, unknown database type!")
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def shuffle_and_batch(items: List[T], batch_size: int,
                      rng: Optional[random.Random] = None) \
        -> Iterator[List[T]]:
    """Optionally shuffles and batches items in a list.

    Args:
        - items: List of items to shuffle & batch.
        - batch_size: size of batches.
        - rng: random number generator if items should be shuffles, else None.

    Returns: Batch iterator
    """

    todo = list(range(len(items)))
    if rng is not None:
        rng.shuffle(todo)
    while todo:
        indices = todo[:batch_size]
        todo = todo[batch_size:]
        items_batch = [items[i] for i in indices]
        yield items_batch
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def iter_grid(
        cls,
        min_pos: 'Vec',
        max_pos: 'Vec',
        stride: int=1,
    ) -> Iterator['Vec']:
        """Loop over points in a bounding box. All coordinates should be integers.

        Both borders will be included.
        """
        min_x, min_y, min_z = map(int, min_pos)
        max_x, max_y, max_z = map(int, max_pos)
        for x in range(min_x, max_x + 1, stride):
            for y in range(min_y, max_y + 1, stride):
                for z in range(min_z, max_z + 1, stride):
                    yield cls(x, y, z)
项目:auditwheel    作者:pypa    | 项目源码 | 文件源码
def elf_file_filter(paths: Iterator[str]) -> Iterator[Tuple[str, ELFFile]]:
    """Filter through an iterator of filenames and load up only ELF
    files
    """

    for path in paths:
        if path.endswith('.py'):
            continue
        else:
            try:
                with open(path, 'rb') as f:
                    candidate = ELFFile(f)
                    yield path, candidate
            except ELFError:
                # not an elf file
                continue
项目:dirtools3    作者:kirpit    | 项目源码 | 文件源码
def items(self, humanise: bool=True, precision: int=2) -> Iterator[dict]:
        """Returns an iterator for scanned items list. It doesn't return the
        internal _items list because we don't want it to be modified outside.

        Blocks until the scanning operation has been completed on first access.

        :param humanise: Humanise flag to format results (defaults to True)
        :type humanise: bool
        :param precision: The floating precision of the human-readable size format (defaults to 2).
        :type precision: int
        :return: Iterator for the internal _items list.
        :rtype: iterator
        """
        self._await()

        # Don't humanise
        if humanise is False:
            return iter(self._items)

        # Humanise
        humanise_item = partial(self._humanise_item, precision=precision)
        return map(humanise_item, self._items)
项目:rupo    作者:IlyaGusev    | 项目源码 | 文件源码
def read_texts(path: str, source_type: FileType) -> Iterator[str]:
        """
        ?????????? ???????.

        :param path: ???? ? ?????/?????.
        :param source_type: ??? ??????.
        """
        paths = Reader.get_paths(path, source_type.value)
        for filename in paths:
            with open(filename, "r", encoding="utf-8") as file:
                if source_type == FileType.XML:
                    for elem in Reader.__xml_iter(file, 'item'):
                        yield elem.find(".//text").text
                elif source_type == FileType.JSON:
                    # TODO: ??????? ???????
                    j = json.load(file)
                    for item in j['items']:
                        yield item['text']
                elif source_type == FileType.RAW:
                    text = file.read()
                    for t in text.split(RAW_SEPARATOR):
                        yield t
项目:rupo    作者:IlyaGusev    | 项目源码 | 文件源码
def get_paths(path: str, ext: str) -> Iterator[str]:
        """
        ????????? ???? ?????? ????????? ???? ?? ????????? ????.

        :param path: ???? ? ?????/?????.
        :param ext: ????????? ??????????.
        """
        if os.path.isfile(path):
            if ext == os.path.splitext(path)[1]:
                yield path
        else:
            for root, folders, files in os.walk(path):
                for file in files:
                    if ext == os.path.splitext(file)[1]:
                        yield os.path.join(root, file)
                for folder in folders:
                    return Reader.get_paths(folder, ext)
项目:sockeye    作者:awslabs    | 项目源码 | 文件源码
def __init__(self,
                 path: str,
                 vocab: Optional[Dict[str, int]],
                 add_bos: bool = False,
                 limit: Optional[int] = None) -> None:
        self.path = path
        self.vocab = vocab
        self.bos_id = None
        if vocab is not None:
            assert C.UNK_SYMBOL in vocab
            assert vocab[C.PAD_SYMBOL] == C.PAD_ID
            assert C.BOS_SYMBOL in vocab
            assert C.EOS_SYMBOL in vocab
            self.bos_id = vocab[C.BOS_SYMBOL]
        else:
            check_condition(not add_bos, "Adding a BOS symbol requires a vocabulary")
        self.add_bos = add_bos
        self.limit = limit

        self._iter = None  # type: Optional[Iterator]
        self._iterated_once = False
        self.count = 0
        self._next = None
项目:sk-torch    作者:mattHawthorn    | 项目源码 | 文件源码
def get_torch_num_workers(num_workers: int):
    """turn an int into a useful number of workers for a pytorch DataLoader.
    -1 means "use all CPU's", -2, means "use all but 1 CPU", etc.
    Note: 0 is interpreted by pytorch as doing data loading in the main process, while any positive number spawns a
    new process. We do not allow more processes to spawn than there are CPU's."""
    num_cpu = cpu_count()
    if num_workers < 0:
        n_workers = num_cpu + 1 + num_workers
        if n_workers < 0:
            print("Warning: {} fewer workers than the number of CPU's were specified, but there are only {} CPU's; "
                  "running data loading in the main process (num_workers = 0).".format(num_workers + 1, num_cpu))
        num_workers = max(0, n_workers)
    if num_workers > num_cpu:
        print("Warning, `num_workers` is {} but only {} CPU's are available; "
              "using this number instead".format(num_workers, num_cpu))
    return min(num_workers, num_cpu)


#####################################################################
# Iterator utils                                                    #
#####################################################################
项目:py-aspio    作者:hexhex    | 项目源码 | 文件源码
def __iter__(self) -> Iterator[str]:
        '''Return an iterator over the lines written to stdout. May only be called once! Might raise a SolverSubprocessError.'''
        assert not self.iterating, 'You may only iterate once over a single DlvhexLineReader instance.'
        self.iterating = True
        # Requirement: dlvhex2 needs to flush stdout after every line
        with io.TextIOWrapper(self.process.stdout, encoding=self.stdout_encoding) as stdout_lines:
            for line in stdout_lines:
                yield line
                # Tell dlvhex2 to prepare the next answer set
                if not self.process.stdin.closed:
                    self.process.stdin.write(b'\n')
                    self.process.stdin.flush()
                else:
                    break
        # We've exhausted stdout, so either:
        #   1. we got all answer sets, or
        #   2. an error occurred,
        # and dlvhex closed stdout (and probably terminated).
        # Give it a chance to terminate gracefully.
        try:
            self.process.wait(timeout=0.005)  # type: ignore (mypy does not know about `timeout`)
        except subprocess.TimeoutExpired:  # type: ignore (mypy does not know about `TimeoutExpired`)
            pass
        self.close()
项目:instaloader    作者:Thammus    | 项目源码 | 文件源码
def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Post]:
        """Retrieve all posts from a profile."""
        profile_name = profile_metadata['user']['username']
        profile_id = int(profile_metadata['user']['id'])
        yield from (Post(self, node, profile=profile_name, profile_id=profile_id)
                    for node in profile_metadata['user']['media']['nodes'])
        has_next_page = profile_metadata['user']['media']['page_info']['has_next_page']
        end_cursor = profile_metadata['user']['media']['page_info']['end_cursor']
        while has_next_page:
            # We do not use self.graphql_node_list() here, because profile_metadata
            # lets us obtain the first 12 nodes 'for free'
            data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'],
                                                          'first': 200,
                                                          'after': end_cursor},
                                      'https://www.instagram.com/{0}/'.format(profile_name))
            media = data['data']['user']['edge_owner_to_timeline_media']
            yield from (Post(self, edge['node'], profile=profile_name, profile_id=profile_id)
                        for edge in media['edges'])
            has_next_page = media['page_info']['has_next_page']
            end_cursor = media['page_info']['end_cursor']
项目:dcos-e2e    作者:mesosphere    | 项目源码 | 文件源码
def dcos_cluster(
    oss_artifact: Path,
    cluster_backend: ClusterBackend,
) -> Iterator[Cluster]:
    """
    Return a `Cluster`.

    This is module scoped as we do not intend to modify the cluster in ways
    that make tests interfere with one another.
    """
    with Cluster(
        cluster_backend=cluster_backend,
        masters=1,
        agents=0,
        public_agents=0,
    ) as cluster:
        cluster.install_dcos_from_path(oss_artifact, log_output_live=True)
        yield cluster
项目:data-store    作者:HumanCellAtlas    | 项目源码 | 文件源码
def _latest_version_from_object_names(object_names: typing.Iterator[str]) -> str:
    dead_versions = set()  # type: typing.Set[str]
    all_versions = set()  # type: typing.Set[str]
    set_checks = [
        (DSS_BUNDLE_TOMBSTONE_REGEX, dead_versions),
        (DSS_BUNDLE_KEY_REGEX, all_versions),
    ]

    for object_name in object_names:
        for regex, version_set in set_checks:
            match = regex.match(object_name)
            if match:
                _, version = match.groups()
                version_set.add(version)
                break

    version = None

    for current_version in (all_versions - dead_versions):
        if version is None or current_version > version:
            version = current_version

    return version
项目:workspace    作者:nojima    | 项目源码 | 文件源码
def train(dataset: DataSet, n_iter: int = 3000, batch_size: int = 25) -> Iterator[AutoEncoder]:
    n = dataset.size

    input_dimension = dataset.input.shape[1]
    hidden_dimension = 2
    model = AutoEncoder(input_dimension, hidden_dimension)

    optimizer = optimizers.Adam()
    optimizer.setup(model)

    for j in range(n_iter):
        shuffled = np.random.permutation(n)

        for i in range(0, n, batch_size):
            indices = shuffled[i:i+batch_size]
            x = Variable(dataset.input[indices])
            model.cleargrads()
            loss = model(x)
            loss.backward()
            optimizer.update()

        yield model
项目:omb-eregs    作者:18F    | 项目源码 | 文件源码
def nest_annotations(annotations: Iterator[Annotation],
                     text_length: int) -> List[NestableAnnotation]:
    """Converts overlapping annotations into a nested version."""
    in_order = sorted(annotations, key=lambda a: (a.start, -a.end))
    # Easier to operate on a single root, even if we'll remove it later.
    root = last = NestableAnnotation(PlainText(start=0, end=text_length), None)
    for anote in in_order:
        # We're not allowing non-nested overlapping annotations, so we won't
        # compare ends when determining nesting
        while anote not in last:
            last = last.parent
        # Enforce all annotations to be nested rather than overlapping
        anote.end = min(anote.end, last.end)
        last = NestableAnnotation(anote, last)
    root.wrap_unwrapped()
    return root.children
项目:SHOGUN    作者:knights-lab    | 项目源码 | 文件源码
def build_lca_map(gen: typing.Iterator, tree: Taxonomy) -> dict:
    """
    Build a last common ancestor dictionary
    :param sam_inf: path to SAM infile
    :param extract_ncbi_tid: function to extract ncbi_tid
    :param tree: NCBITree
    :return: dict key (query name: string) value (ncbi_tid: int)
    """
    lca_map = {}
    for qname, rname in gen:
        tax = tree(rname)
        if qname in lca_map:
            current_tax = lca_map[qname]
            if current_tax:
                if current_tax != tax:
                    lca_map[qname] = least_common_ancestor((tax, current_tax))
        else:
            lca_map[qname] = tax
    return lca_map
项目:master-thesis    作者:AndreasMadsen    | 项目源码 | 文件源码
def predict_from_dataset(self, dataset: Dataset,
                             show_eos: bool=True,
                             use_queue: bool=True,
                             **kwargs) -> Iterator[Tuple[str, str, str]]:
        if use_queue:
            evaluator = self._predict_from_dataset_queue(dataset, **kwargs)
        else:
            evaluator = self._predict_from_dataset_feed(dataset, **kwargs)

        for source, target, translation in evaluator:

            # unpack and decode result
            yield from zip(
                self.dataset.decode_as_batch(source,
                                             show_eos=show_eos),
                self.dataset.decode_as_batch(target,
                                             show_eos=show_eos),
                self.dataset.decode_as_batch(translation,
                                             show_eos=show_eos)
            )
项目:master-thesis    作者:AndreasMadsen    | 项目源码 | 文件源码
def __iter__(self) -> Iterator[Tuple[str, str]]:

        with NLTKEnv() as nltk_env:
            nltk_env.download('perluniprops')
            nltk_env.download('comtrans')

            from nltk.corpus import comtrans
            from nltk.tokenize.moses import MosesDetokenizer

            als = comtrans.aligned_sents(self._comtrans_string())

            source_detokenizer = MosesDetokenizer(lang=self._source_lang)
            target_detokenizer = MosesDetokenizer(lang=self._target_lang)

        for source, target in self._comtrans_maybe_swap(als):
            source = source_detokenizer.detokenize(source, return_str=True)
            target = target_detokenizer.detokenize(target, return_str=True)

            if self._length_checker(source, target):
                yield (source, target)
项目:master-thesis    作者:AndreasMadsen    | 项目源码 | 文件源码
def __iter__(self) -> Iterator[Tuple[str, str]]:
        with EuroparlCache() as europarl_cache:
            europarl_cache.download(name='europarl-v7.tgz', url=_v7_url)

            # peak in tarball
            filepath = europarl_cache.filepath('europarl-v7.tgz')
            source_filepath, target_filepath = self._files()
            with tar_extract_file(filepath, source_filepath) as source_file, \
                    tar_extract_file(filepath, target_filepath) as target_file:

                observations = 0
                for source, target in zip(source_file, target_file):
                    source, target = (source.rstrip(), target.rstrip())
                    if self._length_checker(source, target):
                        yield (source, target)
                        observations += 1

                        if self._max_observations is not None and \
                           observations >= self._max_observations:
                            break
项目:master-thesis    作者:AndreasMadsen    | 项目源码 | 文件源码
def _build_dataset(self) -> Iterator[Tuple[str, str]]:
        length_type = size_to_unsigned_type(self._max_length)

        for _ in range(self._examples):
            length = self._random.randint(
                self._min_length, self._max_length + 1,
                dtype=length_type
            )

            target = self._random.randint(
                0, self._digits,
                size=length, dtype=np.int8
            )
            source = text_map[target]

            target_str = ''.join(target.astype(np.str))
            source_str = ' '.join(source)

            yield (source_str, target_str)
项目:CA-NEAT    作者:mathiasose    | 项目源码 | 文件源码
def ca_develop(network: FeedForwardNetwork) -> Iterator[ToroidalCellGrid2D]:
        def transition_f(inputs_discrete_values: Sequence[CELL_STATE_T]) -> CELL_STATE_T:
            neighbour_values, xy_values = inputs_discrete_values[:-2], inputs_discrete_values[-2:]

            if all((x == initial_grid.dead_cell) for x in neighbour_values):
                return initial_grid.dead_cell

            inputs_float_values = tuple(state_normalization_rules[n] for n in neighbour_values) + \
                                  tuple(coord_normalization_rules[n] for n in xy_values)

            outputs = network.serial_activate(inputs_float_values)

            return max(zip(alphabet, outputs), key=itemgetter(1))[0]

        yield initial_grid

        for grid in iterate_ca_n_times_or_until_cycle_found(
                initial_grid=initial_grid,
                transition_f=transition_f,
                n=iterations,
                iterate_f=iterate_ca_once_with_coord_inputs
        ):
            yield grid
项目:CA-NEAT    作者:mathiasose    | 项目源码 | 文件源码
def create_initial_population(neat_config: CPPNNEATConfig) -> Iterator[Genome]:
    for _ in range(neat_config.pop_size):
        g_id = uuid4().int
        g = neat_config.genotype.create_unconnected(g_id, neat_config)

        hidden_nodes = neat_config.initial_hidden_nodes
        if hidden_nodes:
            g.add_hidden_nodes(hidden_nodes)

        if neat_config.initial_connection == 'fs_neat':
            g.connect_fs_neat()
        elif neat_config.initial_connection == 'fully_connected':
            g.connect_full()
        elif neat_config.initial_connection == 'partial':
            if callable(neat_config.connection_fraction):
                fraction = neat_config.connection_fraction()
            else:
                fraction = neat_config.connection_fraction

            g.connect_partial(fraction)

        yield g
项目:CA-NEAT    作者:mathiasose    | 项目源码 | 文件源码
def sigma_scaled(population: List[Genome], **kwargs) -> Iterator[PAIR_T]:
    try:
        assert len(population) > 1
    except AssertionError:
        raise TooFewIndividuals

    fitnesses = tuple(x.fitness for x in population)

    try:
        assert any(f > 0.0 for f in fitnesses)
    except AssertionError:
        return random_choice(population)

    sigma = stdev(fitnesses)

    average_fitness = mean(fitnesses)
    expected_value_func = lambda x: 1 if sigma == 0 else 1 + ((x - average_fitness) / (2 * sigma))
    sigma_sum = sum(expected_value_func(x) for x in fitnesses)
    scaling_func = lambda x: expected_value_func(x) / sigma_sum

    return roulette(population=population, scaling_func=scaling_func, **kwargs)
项目:CA-NEAT    作者:mathiasose    | 项目源码 | 文件源码
def tournament(population: List[Genome], group_size: int, epsilon: float, **kwargs) -> Iterator[PAIR_T]:
    def get_one(group: Sequence[Genome]) -> Genome:
        r = random()

        if r < epsilon:
            return choice(group)

        return max(group, key=attrgetter('fitness'))

    try:
        assert len(population) > 1
    except AssertionError:
        raise TooFewIndividuals

    while True:
        pool = list(population)  # make a shallow copy

        group_a = sample(pool, group_size)
        a = get_one(group_a)
        pool.remove(a)

        group_b = sample(pool, group_size)
        b = get_one(group_b)

        yield (a, b)
项目:CA-NEAT    作者:mathiasose    | 项目源码 | 文件源码
def find_pattern_partial_matches(grid: CellGrid2D, pattern: PATTERN_T) -> Iterator[float]:
    (x_min, y_min), (x_max, y_max) = grid.get_extreme_coords(pad=1)

    pattern_h, pattern_w = len(pattern), len(pattern[0])
    pattern_area = pattern_h * pattern_w

    for y in range(y_min, y_max):
        for x in range(x_min, x_max):
            rectangle = grid.get_rectangle(
                x_range=(x, x + pattern_w),
                y_range=(y, y + pattern_h),
            )

            if all(all(x == grid.dead_cell for x in row) for row in rectangle):
                yield 0.0

            correct_count = count_correct_cells(test_pattern=rectangle, target_pattern=pattern)

            yield (correct_count / pattern_area)
项目:matchpy    作者:HPAC    | 项目源码 | 文件源码
def _match_sequence_variable(self, wildcard: Wildcard, transition: _Transition) -> Iterator[_State]:
        min_count = wildcard.min_count
        if len(self.subjects) < min_count:
            return
        matched_subject = []
        for _ in range(min_count):
            matched_subject.append(self.subjects.popleft())
        while True:
            if self.associative[-1] and wildcard.fixed_size:
                assert min_count == 1, "Fixed wildcards with length != 1 are not supported."
                if len(matched_subject) > 1:
                    wrapped = self.associative[-1](*matched_subject)
                else:
                    wrapped = matched_subject[0]
            else:
                if len(matched_subject) == 0 and wildcard.optional is not None:
                    wrapped = wildcard.optional
                else:
                    wrapped = tuple(matched_subject)
            yield from self._check_transition(transition, wrapped, False)
            if not self.subjects:
                break
            matched_subject.append(self.subjects.popleft())
        self.subjects.extendleft(reversed(matched_subject))
项目:matchpy    作者:HPAC    | 项目源码 | 文件源码
def _match_with_bipartite(
            self,
            subject_ids: MultisetOfInt,
            pattern_set: MultisetOfInt,
            substitution: Substitution,
    ) -> Iterator[Tuple[Substitution, MultisetOfInt]]:
        bipartite = self._build_bipartite(subject_ids, pattern_set)
        for matching in enum_maximum_matchings_iter(bipartite):
            if len(matching) < len(pattern_set):
                break
            if not self._is_canonical_matching(matching):
                continue
            for substs in itertools.product(*(bipartite[edge] for edge in matching.items())):
                try:
                    bipartite_substitution = substitution.union(*substs)
                except ValueError:
                    continue
                matched_subjects = Multiset(subexpression for subexpression, _ in matching)
                yield bipartite_substitution, matched_subjects
项目:matchpy    作者:HPAC    | 项目源码 | 文件源码
def _match_sequence_variables(
            self,
            subjects: MultisetOfExpression,
            pattern_vars: Sequence[VariableWithCount],
            substitution: Substitution,
    ) -> Iterator[Substitution]:
        only_counts = [info for info, _ in pattern_vars]
        wrapped_vars = [name for (name, _, _, _), wrap in pattern_vars if wrap and name]
        for variable_substitution in commutative_sequence_variable_partition_iter(subjects, only_counts):
            for var in wrapped_vars:
                operands = variable_substitution[var]
                if isinstance(operands, (tuple, list, Multiset)):
                    if len(operands) > 1:
                        variable_substitution[var] = self.associative(*operands)
                    else:
                        variable_substitution[var] = next(iter(operands))
            try:
                result_substitution = substitution.union(variable_substitution)
            except ValueError:
                continue
            yield result_substitution
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _extract_docs(inbuffer: InBuffer) -> Iterator[Response]:
        """
        This generator will read the inbuffer yielding the JSON
        docs when it finds the ending mark
        """
        line: str
        for line in inbuffer.readlines():
            yield json.loads(line)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def iter_parallel_report(func,  # type: Callable[..., Any]
                         args_lists,  # type: Sequence[CallArgs]
                         ccmode=CC_PROCESSES):
    # type: (...) -> Iterator[Union[ExeResult, ExcInfo]]
    if ccmode == CC_OFF or len(args_lists) <= 1 or not multiprocessing:
        for args, kwargs in args_lists:
            yield func(*args, **kwargs)
        return

    processes = min(len(args_lists), multiprocessing.cpu_count())
    if ccmode == CC_THREADS:
        pool = multiprocessing.pool.ThreadPool(processes=processes)
    else:
        pool = multiprocessing.Pool(processes=processes, initializer=per_process_init)
    try:
        async_results = [pool.apply_async(func, args=args, kwds=kwargs)
                         for args, kwargs in args_lists]
        pool.close()
        while async_results:
            try:
                asyncres = async_results.pop(0)
                yield asyncres.get()
            except (KeyboardInterrupt, GeneratorExit):
                raise
            except Exception as e:
                t, v, tb = sys.exc_info()
                try:
                    # Report the textual traceback of the subprocess rather
                    # than this local exception which was triggered
                    # by the other side.
                    tb = e.traceback  # type: ignore
                except AttributeError:
                    pass
                yield ExcInfo((t, v, tb))
    except GeneratorExit:
        pool.terminate()
    except KeyboardInterrupt:
        pool.terminate()
        raise
    finally:
        pool.join()
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def iter_parallel(func,        # type: Callable
                  args_lists,  # type: Sequence[CallArgs]
                  ccmode=CC_PROCESSES):
    # type: (...) -> Iterator[Any]
    if not args_lists:
        return
    if ccmode != CC_OFF:
        args_lists = [((func, args, kwargs), {}) for args, kwargs in args_lists]
        wrappedfunc = tracebackwrapper
    else:
        wrappedfunc = func

    for result in iter_parallel_report(wrappedfunc, args_lists, ccmode=ccmode):
        if ccmode == CC_OFF:
            yield result
        else:
            tbtext = None
            try:
                if isinstance(result, ExcInfo):
                    t, v, tb = result.exc_info
                    if not isinstance(tb, types.TracebackType):
                        tbtext = tb
                        tb = None
                    reraise(t, v, tb)
                else:
                    yield result
            except Exception:
                if tbtext is not None:
                    raise Exception(tbtext)
                else:
                    traceback.print_exc()
                    raise

# ----------------------------------------------------------------------
# The data types option and style.
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def iter_options(self, style):
        # type: (Style) -> Iterator[TextPair]
        dump = self.style_dump(style)
        for optname, optvalue in parse_keyvalue_pairs(dump):
            optname = optname.lower()
            optvalue = optvalue.lower()
            yield optname, optvalue
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def iter_stylecombos(formatter, ignoreopts=()):
    # type: (CodeFormatter, Sequence[str]) -> Iterator[FormatOption]
    for option in styledef_options(formatter.styledefinition):
        if option_name(option) in ignoreopts:
            continue
        stylecombo = formatter.variants_for(option)
        if stylecombo:
            yield FormatOption(stylecombo)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def iter_tbodies(table):
    # type: (str) -> Iterator[Tuple[str, str, str]]
    fragments = re.split(r'</?tbody>', table, flags=re.MULTILINE)
    if len(fragments) <= 1:
        return
    tbodies = fragments[1:-1:2]
    tablestart, tableend = fragments[0], fragments[-1]
    for tbody in tbodies:
        yield tablestart, '<tbody>%s</tbody>\n        ' % tbody, tableend
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def format_with_styles(formatter,           # type: CodeFormatter
                       styles,              # type: List[Style]
                       filenames,           # type: List[str]
                       reporterrors=True,   # type: bool
                       cache=None,          # type: Optional[Cache]
                       ccmode=CC_PROCESSES  # type: str
                       ):
    # type: (...) -> Iterator[Tuple[ExeCall, ExeResult]]
    """Reformat all files with all styles and yield pairs
    (job, jobresult) of all reformat operations.
    """
    jobs = []
    sourcecodes = []
    for style, filename in itertools.product(styles, filenames):
        cmdargs = formatter.cmdargs_for_style(style, filename)
        sourcedata = get_cached_file(filename)
        jobs.append(make_execall(formatter.exe, cmdargs, sourcedata, depfiles=[filename]))
        sourcecodes.append(sourcedata)
    jobresults = run_executables(jobs, cache, ccmode=ccmode)
    for srcdata, job, jobres in izip(sourcecodes, jobs, jobresults):
        if reporterrors:
            formatter.reporterrors(job, jobres)
        # A formatter reporting a valid result for non-empty input while returning empty
        # output indicates that the effective result is the unchanged input.
        if not jobres.stdout and srcdata and formatter.valid_job_result(job, jobres):
            jobres = jobres._replace(stdout=srcdata)
        yield job, jobres
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def distances_from_diffs_avglen(difftool,            # type: Tuple[str, str, List[str]]
                                diffargs,            # type: List[Tuple[str, bytes]]
                                cache=None,          # type: Optional[Cache]
                                ccmode=CC_PROCESSES  # type: str
                                ):
    # type: (...) -> Iterator[Tuple[Sequence[int], Iterable[int]]]
    """Returns pairs of (m, l) where m is the diff metric and l is the average line length
    difference. This improves the metric just a tiny bit.
    """
    metrics = distances_from_diffs(difftool, diffargs, cache=cache, ccmode=ccmode)
    lldiffs = avg_linelength_diffs(diffargs)
    return izip(metrics, lldiffs)
项目:Lebanese-Channels    作者:ChadiEM    | 项目源码 | 文件源码
def __filter_locations(channel_list: List[Channel], location: str) -> Iterator[Channel]:
    return filter(lambda current_channel: current_channel.available_in(location), channel_list)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def iter_monitors(self) -> Iterator['ActiveMonitor']:
        """List all monitors that use this monitor def."""
        for monitor in self.manager.monitors.values():
            if monitor.monitor_def.id == self.id:
                yield monitor
项目:Main    作者:N-BodyPhysicsSimulator    | 项目源码 | 文件源码
def get_generator(self) -> Iterator[BodyState]:
        """ Method to receive bodies. Returns a generator. """
        pass # codecov ignore
项目:Main    作者:N-BodyPhysicsSimulator    | 项目源码 | 文件源码
def get_generator(self) -> Iterator[BodyState]:
        path = self.args.get('csv_input_path')
        separator = self.args.get('separator')

        bodies = self.__get_bodies_from_path(path, separator)

        yield BodyState.from_dict({
            'bodies': bodies,
            'ticks': 0,
            'time': 0,
            'delta_time': self.args.get('delta_time')
        })
项目:Main    作者:N-BodyPhysicsSimulator    | 项目源码 | 文件源码
def get_generator(self) -> Iterator[BodyState]:
        path = self.args.get('json_input_path')

        with open(path) as f:
            for line in f.readlines():
                yield BodyState.from_dict(
                    json.loads(line)
                )
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def dataset_iterator(self, file_path) -> Iterator[OntonotesSentence]:
        """
        An iterator over the entire dataset, yielding all sentences processed.
        """
        for conll_file in self.dataset_path_iterator(file_path):
            yield from self.sentence_iterator(conll_file)
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def dataset_path_iterator(file_path: str) -> Iterator[str]:
        """
        An iterator returning file_paths in a directory
        containing CONLL-formatted files.
        """
        logger.info("Reading CONLL sentences from dataset files at: %s", file_path)
        for root, _, files in tqdm.tqdm(list(os.walk(file_path))):
            for data_file in files:
                # These are a relic of the dataset pre-processing. Every
                # file will be duplicated - one file called filename.gold_skel
                # and one generated from the preprocessing called filename.gold_conll.
                if not data_file.endswith("gold_conll"):
                    continue

                yield os.path.join(root, data_file)
项目:greenstalk    作者:mayhewj    | 项目源码 | 文件源码
def assert_seconds(n: int) -> Iterator[None]:
    start = datetime.now()
    yield
    duration = datetime.now() - start
    assert duration >= timedelta(seconds=n)
    assert duration <= timedelta(seconds=n, milliseconds=50)
项目:python-miio    作者:rytilahti    | 项目源码 | 文件源码
def read_apple(self) -> Iterator[DeviceConfig]:
        """Read Apple-specific database file."""
        _LOGGER.info("Reading tokens from Apple DB")
        c = self.conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';")
        for dev in c.fetchall():
            if self.dump_raw:
                BackupDatabaseReader.dump_raw(dev)
            ip = dev['ZLOCALIP']
            mac = dev['ZMAC']
            model = dev['ZMODEL']
            name = dev['ZNAME']
            token = BackupDatabaseReader.decrypt_ztoken(dev['ZTOKEN'])

            config = DeviceConfig(name=name, mac=mac, ip=ip, model=model, token=token)
            yield config
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def filter_records(
        minimum_level=logging.NOTSET,
        msg_filter: str or None = None,
        module_filter: str or None = None,
        thread_filter: str or None = None,
    ) -> typing.Iterator[logging.LogRecord]:
        records = Records(persistent_logging_handler.records)
        records \
            .filter_by_level(minimum_level) \
            .filter_by_message(msg_filter) \
            .filter_by_module(module_filter) \
            .filter_by_thread(thread_filter)
        for rec in records:
            yield rec
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def successful_only(self) -> typing.Iterator[AVBuild]:
        for x in self:
            if x.status == 'success':
                yield x