我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Sequence()。
def iterchars(text): # type: (str) -> Sequence[str] idx = 0 chars = [] while idx < len(text): c = text[idx] if ord(c) >= 0x100: highchar = True if ((0xD800 <= ord(c) <= 0xDBFF) and (idx < len(text) - 1) and (0xDC00 <= ord(text[idx + 1]) <= 0xDFFF)): c = text[idx:idx + 2] # Skip the other half of the lead and trail surrogate idx += 1 else: highchar = False idx += 1 # Add every character except only one half of a surrogate pair. if not (highchar and len(c) == 1 and 0xD800 <= ord(c) <= 0xDFFF): chars.append(c) return chars
def mixtohash(self, args=(), # type: Sequence[AnyStr] exe=None, # type: Optional[str] depfiles=(), # type: Sequence[str] hashobj=None # type: Optional[Any] ): # type: (...) -> Any if hashobj is None: hashobj = HASHFUNC() for filename in depfiles: hashobj.update(sysfilename(filename)) hashobj.update(filesha(filename)) hashobj.update(b'\x00') for arg in args: hashobj.update(sysfilename(arg)) hashobj.update(b'\x00') if exe is not None: hashobj.update(self.digest_for_exe(exe)) return hashobj
def apply(func, # type: Callable[..., bytes] args=(), # type: Sequence[AnyStr] exe=None, # type: Optional[str] depfiles=(), # type: Sequence[str] cache=None # type: Optional[Cache] ): """Applies func(*args) when the result is not present in the cache. The result of func(*args) must be bytes and must not be None which is used as cache-miss indicator. After evaluation of func the result is stored in the cache. """ key, value = None, None if cache is not None: hashobj = cache.mixtohash(args, exe=exe, depfiles=depfiles) key = hashobj.hexdigest() value = cache.get(key) if value is None: value = func(*args) if key is not None: cache.set(key, value) return value
def update_evaluations(formatter, # type: CodeFormatter evaluations, # type: List[AttemptResult] finished_styles, # type: List[AttemptResult] bestdist # type: Sequence[int] ): # type: (...) -> Tuple[bool, bool, Sequence[int]] attemptresult = heapq.heappop(evaluations) nested_round = False if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)): bestdist = attemptresult.distance heapq.heappush(evaluations, attemptresult) else: # We found a style that could no longer be improved by adding a single option value. heapq.heappush(finished_styles, attemptresult) nested_styles = formatter.nested_derivations(attemptresult.formatstyle) if not nested_styles: # This formatstyle does not unlock more options. return True, nested_round, bestdist # Restart the optimization from scratch with the attemptresult augmented with # every nested option as seed styles. bestdist = None ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE) evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles] nested_round = True return False, nested_round, bestdist
def to_text(cls, records: Sequence['TransactionSpecification']) -> str: """Get a text string from a sequence of specification records.""" if len(records) > cls._MAX_RECORDS: raise ValueError( 'Max {} specification records allowed, got {}' .format(cls._MAX_RECORDS, len(records))) tuples = sorted([ (r.line_number, r.column_number, r) for r in records ]) text = '' for _, column, specification in tuples: text += specification.text if column == cls._MAX_COLUMNS: text += '\n' return text
def from_dataset(cls, dataset, min_count: int = 1, max_vocab_size: Union[int, Dict[str, int]] = None, non_padded_namespaces: Sequence[str] = DEFAULT_NON_PADDED_NAMESPACES, pretrained_files: Optional[Dict[str, str]] = None, only_include_pretrained_words: bool = False) -> 'Vocabulary': """ Constructs a vocabulary given a :class:`.Dataset` and some parameters. We count all of the vocabulary items in the dataset, then pass those counts, and the other parameters, to :func:`__init__`. See that method for a description of what the other parameters do. """ logger.info("Fitting token dictionary from dataset.") namespace_token_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int)) for instance in tqdm.tqdm(dataset.instances): instance.count_vocab_items(namespace_token_counts) return Vocabulary(counter=namespace_token_counts, min_count=min_count, max_vocab_size=max_vocab_size, non_padded_namespaces=non_padded_namespaces, pretrained_files=pretrained_files, only_include_pretrained_words=only_include_pretrained_words)
def parse(self, source: Sequence[Input]) -> Result[Output]: """Abstract method for completely parsing a source. While ``parse`` is a method on every parser for convenience, it is really a function of the context. It is the duty of the context to set the correct ``Reader`` to use and to handle whitespace not handled by the parsers themselves. This method is pulled from the context when the parser is initialized. Args: source: What will be parsed. Returns: If the parser succeeded in matching and consumed the entire output, the value from ``Continue`` is copied to make a ``Success``. If the parser failed in matching, the error message is copied to a ``Failure``. If the parser succeeded but the source was not completelt consumed, a ``Failure`` with a message indicating this is returned. """ raise NotImplementedError()
def lit(literal: Sequence[Input], *literals: Sequence[Sequence[Input]]) -> Parser: """Match a literal sequence. In the `TextParsers`` context, this matches the literal string provided. In the ``GeneralParsers`` context, this matches a sequence of input. If multiple literals are provided, they are treated as alternatives. e.g. ``lit('+', '-')`` is the same as ``lit('+') | lit('-')``. Args: literal: A literal to match *literals: Alternative literals to match Returns: A ``LiteralParser`` in the ``GeneralContext``, a ``LiteralStringParser`` in the ``TextParsers`` context, and an ``AlternativeParser`` if multiple arguments are provided. """ if len(literals) > 0: return AlternativeParser(options.handle_literal(literal), *map(options.handle_literal, literals)) else: return options.handle_literal(literal)
def rep1sep(parser: Union[Parser, Sequence[Input]], separator: Union[Parser, Sequence[Input]]) \ -> RepeatedOnceSeparatedParser: """Match a parser one or more times separated by another parser. This matches repeated sequences of ``parser`` separated by ``separator``. If there is at least one match, a list containing the values of the ``parser`` matches is returned. The values from ``separator`` are discarded. If it does not match ``parser`` at all, it fails. Args: parser: Parser or literal separator: Parser or literal """ if isinstance(parser, str): parser = lit(parser) if isinstance(separator, str): separator = lit(separator) return RepeatedOnceSeparatedParser(parser, separator)
def repsep(parser: Union[Parser, Sequence[Input]], separator: Union[Parser, Sequence[Input]]) \ -> RepeatedSeparatedParser: """Match a parser zero or more times separated by another parser. This matches repeated sequences of ``parser`` separated by ``separator``. A list is returned containing the value from each match of ``parser``. The values from ``separator`` are discarded. If there are no matches, an empty list is returned. Args: parser: Parser or literal separator: Parser or literal """ if isinstance(parser, str): parser = lit(parser) if isinstance(separator, str): separator = lit(separator) return RepeatedSeparatedParser(parser, separator)
def ensure_unique_string(preferred_string: str, current_strings: Union[Sequence[str], KeysView[str]]) -> str: """Return a string that is not present in current_strings. If preferred string exists will append _2, _3, .. """ test_string = preferred_string current_strings_set = set(current_strings) tries = 1 while test_string in current_strings_set: tries += 1 test_string = "{}_{}".format(preferred_string, tries) return test_string # Taken from: http://stackoverflow.com/a/11735897
def printCommand(arg1: "typing.Union[str, typing.Sequence[typing.Any]]", *remainingArgs, outputFile=None, colour=AnsiColour.yellow, cwd=None, env=None, sep=" ", printVerboseOnly=False, **kwargs): if not _cheriConfig or (_cheriConfig.quiet or (printVerboseOnly and not _cheriConfig.verbose)): return # also allow passing a single string if not type(arg1) is str: allArgs = arg1 arg1 = allArgs[0] remainingArgs = allArgs[1:] newArgs = ("cd", shlex.quote(str(cwd)), "&&") if cwd else tuple() if env: # only print the changed environment entries filteredEnv = __filterEnv(env) if filteredEnv: newArgs += ("env",) + tuple(map(shlex.quote, (k + "=" + str(v) for k, v in filteredEnv.items()))) # comma in tuple is required otherwise it creates a tuple of string chars newArgs += (shlex.quote(str(arg1)),) + tuple(map(shlex.quote, map(str, remainingArgs))) if outputFile: newArgs += (">", str(outputFile)) print(coloured(colour, newArgs, sep=sep), flush=True, **kwargs)
def getInterpreter(cmdline: "typing.Sequence[str]") -> "typing.Optional[typing.List[str]]": """ :param cmdline: The command to check :return: The interpreter command if the executable does not have execute permissions """ executable = Path(cmdline[0]) print(executable, os.access(str(executable), os.X_OK), cmdline) if not executable.exists(): executable = Path(shutil.which(str(executable))) statusUpdate(executable, "is not executable, looking for shebang:", end=" ") with executable.open("r", encoding="utf-8") as f: firstLine = f.readline() if firstLine.startswith("#!"): interpreter = shlex.split(firstLine[2:]) statusUpdate("Will run", executable, "using", interpreter) return interpreter else: statusUpdate("No shebang found.") return None
def multimask_images(images: Iterable[SpatialImage], masks: Sequence[np.ndarray], image_type: type = None ) -> Iterable[Sequence[np.ndarray]]: """Mask images with multiple masks. Parameters ---------- images: Images to mask. masks: Masks to apply. image_type: Type to cast images to. Yields ------ Sequence[np.ndarray] For each mask, a masked image. """ for image in images: yield [mask_image(image, mask, image_type) for mask in masks]
def __init__(self, pattern, keys=None): # type: (Union[Text, regex._pattern_type, re._pattern_type], Optional[Sequence[Text]]) -> None """ :param pattern: Regex used to split incoming string values. IMPORTANT: If you specify your own compiled regex, be sure to add the ``UNICODE`` flag for Unicode support! :param keys: If set, the resulting list will be converted into an OrderedDict, using the specified keys. IMPORTANT: If ``keys`` is set, the split value's length must be less than or equal to ``len(keys)``. """ super(Split, self).__init__() self.regex = ( pattern if isinstance(pattern, (regex._pattern_type, re._pattern_type)) else regex.compile(pattern, regex.UNICODE) ) self.keys = keys
def sorted_dict(value): # type: (Mapping) -> Any """ Sorts a dict's keys to avoid leaking information about the backend's handling of unordered dicts. """ if isinstance(value, Mapping): return OrderedDict( (key, sorted_dict(value[key])) for key in sorted(iterkeys(value)) ) elif isinstance(value, Sequence) and not isinstance(value, string_types): return list(map(sorted_dict, value)) else: return value
def __call__(self, parser: configargparse.ArgumentParser, namespace: configargparse.Namespace, values: Sequence[str], option_string: str=None) -> None: if len(values) > 2: raise ErrorMessage("%s takes 1 or 2 arguments, not more." % highlight("--init")) if values[0] not in _initializers: raise ErrorMessage("Unknown initializer \"%s\". Acceptable values are: %s" % (highlight(values[0]), highlight(", ".join(_initializers.keys())))) initializer = _initializers[values[0]] if len(values) > 1: initializer.configfolder = values[1] # override config folder if it's not the default if os.path.exists(initializer.configfolder): raise ErrorMessage("%s already exists. If you want to overwrite it, remove it first." % initializer.configfolder) initializer.build_config() parser.exit(0)
def generate_future_versions(self, artifact_names: Sequence[str], base_version: VersionContainer, action: str, args: configargparse.Namespace) -> Union[Dict[str, VersionContainer], None]: """ Takes a list of unique artifact identifiers (e.g. package names) which *will be created by a Packer during the build later* and returns a dict mapping of identifier to version for the Packer to be used during the build or ``None`` if the store can't generate future versions. The store should use ``action`` to generate the version strings for all artifacts. :param artifact_names: a list of artifact identifiers :param base_version: the base version from which to generate future versions :param action: the version action selected by the user to generate future versions :param args: command-line parameters :return: a mapping of artifact identifiers to version information """ raise NotImplementedError("Each subclass of BaseStore MUST implement generate_future_versions")
def __missing__(self, key: t.Union[t.Tuple[int, type], t.Tuple[int, type, t.Sequence[int]]]): if not isinstance(key, tuple): raise TypeError('key={} of bad type {} was given'.format(repr(key), type(key))) if len(key) < 2 or len(key) > 3: raise ValueError('{}'.format(key)) if not isinstance(key[0], int): raise TypeError() if key[0] <= 0: raise ValueError() if not isinstance(key[1], type): raise TypeError() if len(key) == 3: if not isinstance(key[2], tuple): raise TypeError() if len(key[2]) != key[0]: raise ValueError() if any(k is not Ellipsis and not isinstance(k, int) for k in key[2]): raise TypeError() if any(k is not Ellipsis and k <= 0 for k in key[2]): raise ValueError() value = create_typed_numpy_ndarray(*key) self[key] = value return value
def partition_version_classifiers( classifiers: t.Sequence[str], version_prefix: str = 'Programming Language :: Python :: ', only_suffix: str = ' :: Only') -> t.Tuple[t.List[str], t.List[str]]: """Find version number classifiers in given list and partition them into 2 groups.""" versions_min, versions_only = [], [] for classifier in classifiers: version = classifier.replace(version_prefix, '') versions = versions_min if version.endswith(only_suffix): version = version.replace(only_suffix, '') versions = versions_only try: versions.append(tuple([int(_) for _ in version.split('.')])) except ValueError: pass return versions_min, versions_only
def split_path(path: str) -> t.Tuple[t.Sequence[str], bool]: """Split a path into an array of parts of a path. This functions splits a forward slash separated path into an sequence of the directories of this path. If the given path ends with a '/' it returns that the given path ends with an directory, otherwise the last part is a file, this information is returned as the last part of the returned tuple. The given path may contain multiple consecutive forward slashes, these are interpreted as a single slash. A leading forward slash is also optional. :param path: The forward slash separated path to split. :returns: A tuple where the first item is the splitted path and the second item is a boolean indicating if the last item of the given path was a directory. """ is_dir = path[-1] == '/' patharr = [item for item in path.split('/') if item] return patharr, is_dir
def _filter_or_404(model: t.Type[Y], get_all: bool, criteria: t.Tuple) -> t.Union[Y, t.Sequence[Y]]: """Get the specified object by filtering or raise an exception. :param get_all: Get all objects if ``True`` else get a single one. :param model: The object to get. :param criteria: The criteria to filter with. :returns: The requested object. :raises APIException: If no object with the given id could be found. (OBJECT_ID_NOT_FOUND) """ crit_str = ' AND '.join(str(crit) for crit in criteria) query = model.query.filter(*criteria) # type: ignore obj = query.all() if get_all else query.one_or_none() if not obj: raise psef.errors.APIException( f'The requested {model.__name__.lower()} was not found', f'There is no "{model.__name__}" when filtering with {crit_str}', psef.errors.APICodes.OBJECT_ID_NOT_FOUND, 404 ) return obj
def filter_all_or_404(model: t.Type[Y], *criteria: t.Any) -> t.Sequence[Y]: """Get all objects of the specified model filtered by the specified criteria. .. note:: ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a SQLAlchemy model. :param model: The object to get. :param criteria: The criteria to filter with. :returns: The requested objects. :raises APIException: If no object with the given id could be found. (OBJECT_ID_NOT_FOUND) """ return t.cast(t.Sequence[Y], _filter_or_404(model, True, criteria))
def get_all_permissions(self) -> t.Mapping[str, bool]: """Get all course :class:`permissions` for this course role. :returns: A name boolean mapping where the name is the name of the permission and the value indicates if this user has this permission. """ perms: t.Sequence[Permission] = ( Permission.query. filter_by( # type: ignore course_permission=True ).all() ) result: t.MutableMapping[str, bool] = {} for perm in perms: if perm.name in self._permissions: result[perm.name] = not perm.default_value else: result[perm.name] = perm.default_value return result
def get_all_permissions(self) -> t.Mapping[str, bool]: """Get all course permissions (:class:`Permission`) for this role. :returns: A name boolean mapping where the name is the name of the permission and the value indicates if this user has this permission. """ perms: t.Sequence[Permission] = ( Permission.query. filter_by( # type: ignore course_permission=False ).all() ) result: t.MutableMapping[str, bool] = {} for perm in perms: if perm.name in self._permissions: result[perm.name] = not perm.default_value else: result[perm.name] = perm.default_value return result
def get_all_permissions(self, course_id: t.Union['Course', int] = None ) -> t.Mapping[str, bool]: """Get all global permissions (:class:`Permission`) of this user or all course permissions of the user in a specific :class:`Course`. :param course_id: The course or course id :returns: A name boolean mapping where the name is the name of the permission and the value indicates if this user has this permission. """ if isinstance(course_id, Course): course_id = course_id.id if course_id is None: return self.role.get_all_permissions() elif course_id in self.courses: return self.courses[course_id].get_all_permissions() else: perms: t.Sequence[Permission] perms = Permission.query.filter_by( # type: ignore course_permission=True).all() return {perm.name: False for perm in perms}
def get_grade_history(submission_id: int ) -> JSONResponse[t.Sequence[models.GradeHistory]]: """Get the grade history for the given submission. .. :quickref: Submission; Get the grade history for the given submission. :returns: A list of :class:`.models.GradeHistory` object serialized to json for the given assignment. :raises PermissionException: If the current user has no permission to see the grade history. (INCORRECT_PERMISSION) """ work = helpers.get_or_404(models.Work, submission_id) auth.ensure_permission('can_see_grade_history', work.assignment.course_id) hist: t.MutableSequence[models.GradeHistory] hist = db.session.query( models.GradeHistory ).filter_by(work_id=work.id).order_by( models.GradeHistory.changed_at.desc(), # type: ignore ).all() return jsonify(hist)
def combine_notebooks(notebook_files: Sequence[Path]) -> NotebookNode: combined_nb = new_notebook() count = 0 for filename in notebook_files: count += 1 log.debug('Adding notebook: %s', filename) nbname = filename.stem nb = nbformat.read(str(filename), as_version=4) try: combined_nb.cells.extend(add_sec_label(nb.cells[0], nbname)) except NoHeader: raise NoHeader("Failed to find header in " + filename) combined_nb.cells.extend(nb.cells[1:]) if not combined_nb.metadata: combined_nb.metadata = nb.metadata.copy() log.info('Combined %d files' % count) return combined_nb
def main(args: Sequence[str] = None) -> None: nfo('Starting') try: if args is None: args = sys.argv[1:] args = parse_args(args) setup_logging(args.verbose) dbg(f'Invoked with args: {args}') transformation = get_transformation(args.transformation) document = parse_file(args) copy_file(args.target, args.target + '.orig') dbg("Saved document backup with suffix '.orig'") dbg('Applying transformation.') document._setroot(transformation(document.getroot())) write_file(document, args) except Exception: print_exc() raise SystemExit(2)
def create_masters_in_db(self, lib_name, content_list, debug=False): # type: (str, Sequence[Any], bool) -> None """Create the masters in the design database. Parameters ---------- lib_name : str library to create the designs in. content_list : Sequence[Any] a list of the master contents. Must be created in this order. debug : bool True to print debug messages """ if self._prj is None: raise ValueError('BagProject is not defined.') self._prj.instantiate_schematic(lib_name, content_list, lib_path=self.lib_path)
def update_testbench(self, lib, cell, parameters, sim_envs, config_rules, env_parameters): # type: (str, str, Dict[str, str], Sequence[str], List[List[str]], List[List[Tuple[str, str]]]) -> None """Update the given testbench configuration. Parameters ---------- lib : str testbench library. cell : str testbench cell. parameters : Dict[str, str] testbench parameters. sim_envs : Sequence[str] list of enabled simulation environments. config_rules : List[List[str]] config view mapping rules, list of (lib, cell, view) rules. env_parameters : List[List[Tuple[str, str]]] list of param/value list for each simulation environment. """ pass
def instantiate_layout(self, lib_name, view_name, via_tech, layout_list): # type: (str, str, str, Sequence[Any]) -> None """Create a batch of layouts. Parameters ---------- lib_name : str layout library name. view_name : str layout view name. via_tech : str via technology library name. layout_list : Sequence[Any] a list of layouts to create """ pass
def create_dut_layouts(self, lay_params_list, cell_name_list, temp_db): # type: (Sequence[Dict[str, Any]], Sequence[str], TemplateDB) -> Sequence[Dict[str, Any]] """Create multiple layouts""" if self.prj is None: raise ValueError('BagProject instance is not given.') cls_package = self.specs['layout_package'] cls_name = self.specs['layout_class'] lay_module = importlib.import_module(cls_package) temp_cls = getattr(lay_module, cls_name) temp_list, sch_params_list = [], [] for lay_params in lay_params_list: template = temp_db.new_template(params=lay_params, temp_cls=temp_cls, debug=False) temp_list.append(template) sch_params_list.append(template.sch_params) temp_db.batch_layout(self.prj, temp_list, cell_name_list) return sch_params_list
def get_cells_in_library(self, lib_name): # type: (str) -> Sequence[str] """Get a list of cells in the given library. Returns an empty list if the given library does not exist. Parameters ---------- lib_name : str the library name. Returns ------- cell_list : Sequence[str] a list of cells in the library """ if self.impl_db is None: raise Exception('BAG Server is not set up.') return self.impl_db.get_cells_in_library(lib_name)
def instantiate_schematic(self, lib_name, content_list, lib_path=''): # type: (str, Sequence[Any], str) -> None """Create the given schematic contents in CAD database. NOTE: this is BAG's internal method. TO create schematics, call batch_schematic() instead. Parameters ---------- lib_name : str name of the new library to put the schematic instances. content_list : Sequence[Any] list of schematics to create. lib_path : str the path to create the library in. If empty, use default location. """ if self.impl_db is None: raise Exception('BAG Server is not set up.') self.impl_db.instantiate_schematic(lib_name, content_list, lib_path=lib_path)
def instantiate_layout(self, lib_name, view_name, via_tech, layout_list): # type: (str, str, str, Sequence[Any]) -> None """Create a batch of layouts. Parameters ---------- lib_name : str layout library name. view_name : str layout view name. via_tech : str via technology name. layout_list : Sequence[Any] a list of layouts to create """ if self.impl_db is None: raise Exception('BAG Server is not set up.') self.impl_db.instantiate_layout(lib_name, view_name, via_tech, layout_list)
def from_vocab(cls, sequences: Map[int, Seq[H]], vocab: Vocabulary, max_len: int, pack_sequences: bool=False, append_eos: bool=True, eos_token: Opt[H]=DEFAULT_EOS, null_token: H=DEFAULT_NULL, int_id_type: str='long', shuffle: bool=True): """ :param vocab: instance of Vocabulary to use for encoding/decoding tokens :param max_len: maximum length of sequences to sample :param pack_sequences: bool indicating whether to return regular Tensors or PackedSequence instances. :param int_id_type: string indicating the type of int ids to use. Must be a key of data.str_to_int_tensor_type. :param eos_token: string or hashable to append to mark end-of-sequence in encoding :param null_token: Optional hashable to use for padding sequences. Added to the vocab, unless none is passed and none is built, in which case this is considered to be an int id. Numpy aliases for integer types are valid, as well as 'long', 'short', 'byte', 'char'. The default 'long' is recommended, as only LongTensors can be used to index Embeddings in pytorch. """ encoder = SequenceTensorEncoder(vocab, append_eos=append_eos, eos_token=eos_token, null_token=null_token, int_id_type=int_id_type) return cls(sequences=sequences, encoder=encoder, max_len=max_len, pack_sequences=pack_sequences, null_token=null_token, shuffle=shuffle)
def from_id2token(cls, sequences: Map[int, Seq[H]], id2token: Dict[H, int], max_len: int, pack_sequences: bool=False, append_eos: bool=True, eos_token: Opt[H]=DEFAULT_EOS, null_token: H=DEFAULT_NULL, oov_token: H=DEFAULT_OOV, int_id_type: str='long', shuffle: bool=True): """ :param id2token: mapping of int ids to tokens :param max_len: maximum length of sequences to sample :param pack_sequences: bool indicating whether to return regular Tensors or PackedSequence instances. :param int_id_type: string indicating the type of int ids to use. Must be a key of data.str_to_int_tensor_type. :param oov_token: hashable to insert for out-of-vocab tokens when encoding :param eos_token: hashable to append to mark end-of-sequence in encoding :param null_token: hashable to use for padding sequences. Added to the vocab, unless none is passed and none is built, in which case this is considered to be an int id. Numpy aliases for integer types are valid, as well as 'long', 'short', 'byte', 'char'. The default 'long' is recommended, as only LongTensors can be used to index Embeddings in pytorch. """ vocab = Vocabulary.from_id2token(id2token, oov_token=oov_token) encoder = SequenceTensorEncoder(vocab, append_eos=append_eos, eos_token=eos_token, null_token=null_token, int_id_type=int_id_type) return cls(sequences=sequences, encoder=encoder, max_len=max_len, pack_sequences=pack_sequences, null_token=null_token, shuffle=shuffle)
def assign_data(centroids: Sequence[Centroid], data: Iterable[Point]) -> Dict[Centroid, Sequence[Point]]: 'Assign data the closest centroid' d : DefaultDict[Point, List[Point]] = defaultdict(list) for point in data: centroid: Point = min(centroids, key=partial(dist, point)) d[centroid].append(point) return dict(d)
def compute_centroids(groups: Iterable[Sequence[Point]]) -> List[Centroid]: 'Compute the centroid of each group' return [tuple(map(mean, transpose(group))) for group in groups]
def quality(labeled: Dict[Centroid, Sequence[Point]]) -> float: 'Mean value of squared distances from data to its assigned centroid' return mean(dist(c, p) ** 2 for c, pts in labeled.items() for p in pts)
def do_explode(self, kind): if kind in basic_types or type(kind) is typing.TypeVar: return False if not issubclass(kind, (typing.Sequence, typing.Mapping)): self.clear() self.extend(Args(kind)) return True return False
def ReturnMapping(cls): # Annotate the method with a return Type # so the value can be cast def decorator(f): @functools.wraps(f) async def wrapper(*args, **kwargs): nonlocal cls reply = await f(*args, **kwargs) if cls is None: return reply if 'error' in reply: cls = CLASSES['Error'] if issubclass(cls, typing.Sequence): result = [] item_cls = cls.__parameters__[0] for item in reply: result.append(item_cls.from_json(item)) """ if 'error' in item: cls = CLASSES['Error'] else: cls = item_cls result.append(cls.from_json(item)) """ else: result = cls.from_json(reply['response']) return result return wrapper return decorator
def buildArray(self, obj, d=0): # return a sequence from an array in the schema if "$ref" in obj: return Sequence[refType(obj)] else: kind = obj.get("type") if kind and kind == "array": items = obj['items'] return self.buildArray(items, d + 1) else: return Sequence[objType(obj)]
def iterchars(text): # type: (str) -> Sequence[str] return text
def make_execall(exe, cmdargs, stdindata=None, depfiles=()): # type: (str, Sequence[str], Optional[bytes], Sequence[str]) -> ExeCall """If the result of a subprocess call depends on the content of files, their filenames must be specified as depfiles to prevent stale cache results. """ return ExeCall(exe, cmdargs, stdindata, depfiles)
def iter_parallel_report(func, # type: Callable[..., Any] args_lists, # type: Sequence[CallArgs] ccmode=CC_PROCESSES): # type: (...) -> Iterator[Union[ExeResult, ExcInfo]] if ccmode == CC_OFF or len(args_lists) <= 1 or not multiprocessing: for args, kwargs in args_lists: yield func(*args, **kwargs) return processes = min(len(args_lists), multiprocessing.cpu_count()) if ccmode == CC_THREADS: pool = multiprocessing.pool.ThreadPool(processes=processes) else: pool = multiprocessing.Pool(processes=processes, initializer=per_process_init) try: async_results = [pool.apply_async(func, args=args, kwds=kwargs) for args, kwargs in args_lists] pool.close() while async_results: try: asyncres = async_results.pop(0) yield asyncres.get() except (KeyboardInterrupt, GeneratorExit): raise except Exception as e: t, v, tb = sys.exc_info() try: # Report the textual traceback of the subprocess rather # than this local exception which was triggered # by the other side. tb = e.traceback # type: ignore except AttributeError: pass yield ExcInfo((t, v, tb)) except GeneratorExit: pool.terminate() except KeyboardInterrupt: pool.terminate() raise finally: pool.join()
def iter_parallel(func, # type: Callable args_lists, # type: Sequence[CallArgs] ccmode=CC_PROCESSES): # type: (...) -> Iterator[Any] if not args_lists: return if ccmode != CC_OFF: args_lists = [((func, args, kwargs), {}) for args, kwargs in args_lists] wrappedfunc = tracebackwrapper else: wrappedfunc = func for result in iter_parallel_report(wrappedfunc, args_lists, ccmode=ccmode): if ccmode == CC_OFF: yield result else: tbtext = None try: if isinstance(result, ExcInfo): t, v, tb = result.exc_info if not isinstance(tb, types.TracebackType): tbtext = tb tb = None reraise(t, v, tb) else: yield result except Exception: if tbtext is not None: raise Exception(tbtext) else: traceback.print_exc() raise # ---------------------------------------------------------------------- # The data types option and style.
def identify_language(self, filenames=(), language=None): # type: (Sequence[str], Optional[str]) -> None """Identify the languages from the filenames extensions. """ if language is None: exts = set([os.path.splitext(f)[1] for f in filenames]) for lang, extsdescription in UncrustifyFormatter.language_exts: langexts = set(extsdescription.split()) if exts.issubset(langexts): self.languages.append(lang) else: self.languages.append(language)
def attempt_acceptible(self, roundnr, prevdist, newdist): # type: (int, Sequence[int], Sequence[int]) -> bool if roundnr >= 3 and tuple(newdist) > tuple(prevdist): # Makes things worse return False if roundnr >= 3 and tuple(newdist) >= tuple(prevdist): # Does not improve things return False return True