我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用typing.MutableSequence()。
def dehead_filetree(tree: ExtractFileTree) -> ExtractFileTree: """Remove the head of the given filetree while preserving the old head name. So a tree ``{1: [2: [3: [4: [f1, f2]]]}`` will be converted to ``{1: [f1, f2]}``. :param dict tree: The file tree as generated by :py:func:`extract`. :returns: The same tree but deheaded as described. :rtype: dict """ assert len(tree) == 1 head_node = list(tree.keys())[0] head = tree[head_node] while ( isinstance(head, t.MutableSequence) and len(head) == 1 and isinstance(head[0], t.MutableMapping) and len(head[0]) == 1 ): head = list(head[0].values())[0] tree[head_node] = head return tree
def get_grade_history(submission_id: int ) -> JSONResponse[t.Sequence[models.GradeHistory]]: """Get the grade history for the given submission. .. :quickref: Submission; Get the grade history for the given submission. :returns: A list of :class:`.models.GradeHistory` object serialized to json for the given assignment. :raises PermissionException: If the current user has no permission to see the grade history. (INCORRECT_PERMISSION) """ work = helpers.get_or_404(models.Work, submission_id) auth.ensure_permission('can_see_grade_history', work.assignment.course_id) hist: t.MutableSequence[models.GradeHistory] hist = db.session.query( models.GradeHistory ).filter_by(work_id=work.id).order_by( models.GradeHistory.changed_at.desc(), # type: ignore ).all() return jsonify(hist)
def address_from_digest(digest): # type: (Digest) -> Address """ Generates an address from a private key digest. """ address_trits = [0] * (Address.LEN * TRITS_PER_TRYTE) # type: MutableSequence[int] sponge = Kerl() sponge.absorb(digest.as_trits()) sponge.squeeze(address_trits) return Address.from_trits( trits = address_trits, key_index = digest.key_index, security_level = digest.security_level, )
def __produce_commands(self, install): # type: (After4xCollectionInstaller) -> Sequence[str] args = [] # type: MutableSequence[str] mem = install.puppetserver_jvm_memory() jvmargs = install.puppetserver_jvm_args() if mem.is_set(): if mem.heap_minimum() is not None: args.append('-Xms{xms}'.format(xms=mem.heap_minimum())) if mem.heap_maximum() is not None: args.append('-Xmx{xmx}'.format(xmx=mem.heap_maximum())) java_version = Facter.get(JavaVersion) metaspace_arg = self.__get_metaspace_arg(java_version) if java_version.has_permgen_space() and mem.metaspace_maximum() is None: args.append(metaspace_arg.format(mspace='256m')) elif mem.metaspace_maximum() is not None: args.append(metaspace_arg.format(mspace=mem.metaspace_maximum())) if jvmargs.are_set(): args.extend(jvmargs) args_as_str = ' '.join(args) collector = self._collector() mapping = dict(jvmargs=args_as_str) collector.collect_from_template('Configuring PuppetServer JVM Args', 'puppetserver-jvmargs.pp', mapping, format=ScriptFormat.PUPPET) return collector.lines()
def segments(self): # type: () -> MutableSequence[int|str] return list(self.__get_segments())
def _to_jsonschema(type_): if isinstance(type_, marshmallow.Schema): return _jsonschema.dump_schema(type_) elif type_ in six.integer_types: return {'type': 'number', 'format': 'integer'} elif type_ == float: return {'type': 'number', 'format': 'float'} elif type_ == decimal.Decimal: return {'type': 'string', 'format': 'decimal'} elif type_ == uuid.UUID: return {'type': 'string', 'format': 'uuid'} elif type_ == datetime.datetime: return {'type': 'string', 'format': 'date-time'} elif type_ == datetime.date: return {'type': 'string', 'format': 'date'} elif type_ == datetime.time: return {'type': 'string', 'format': 'time'} elif type_ == dict: return {'type': 'object'} elif type_ == six.text_type or type_ == six.binary_type: return {'type': 'string'} elif type_ is None: return {'type': 'null'} elif type_ == list: return {'type': 'array'} elif type_ == bool: return {'type': 'boolean'} elif issubclass(type_, typing.MutableSequence[typing.T]): items_type = type_.__parameters__[0] if issubclass(items_type, marshmallow.Schema): items_type = items_type() return { 'type': 'array', 'items': _to_jsonschema(items_type), } else: raise ValueError('unsupported return type: %s' % type_)
def __init__(self, random_jobs: MutableSequence[JobInterface]): self._jobs = {job.id: job for job in random_jobs}
def squeeze(self, trits): # type: (MutableSequence[int]) -> None """ Squeeze trits from the sponge. :param trits: Sequence that the squeezed trits will be copied to. Note: this object will be modified! """ # # Squeeze is kind of like the opposite of absorb; it copies trits # from internal state to the ``trits`` parameter, one hash at a # time, and transforming internal state in between hashes. # # However, only the first hash of the state is "public", so we # can simplify the implementation somewhat. # # Ensure that ``trits`` can hold at least one hash worth of trits. trits.extend([0] * max(0, HASH_LENGTH - len(trits))) # Copy exactly one hash. trits[0:HASH_LENGTH] = self._state[0:HASH_LENGTH] # One hash worth of trits copied; now transform. self._transform()
def __next__(self): # type: () -> TryteString """ Returns the next signature fragment. """ key_trytes = next(self._key_chunks) # type: TryteString self._iteration += 1 # If the key is long enough, loop back around to the start. normalized_chunk =\ self._normalized_hash[self._iteration % len(self._normalized_hash)] signature_fragment = key_trytes.as_trits() # Build the signature, one hash at a time. for i in range(key_trytes.count_chunks(Hash.LEN)): hash_start = i * HASH_LENGTH hash_end = hash_start + HASH_LENGTH buffer = signature_fragment[hash_start:hash_end] # type: MutableSequence[int] for _ in range(13 - normalized_chunk[i]): self._sponge.reset() self._sponge.absorb(buffer) self._sponge.squeeze(buffer) signature_fragment[hash_start:hash_end] = buffer return TryteString.from_trits(signature_fragment)
def __init__( self, template_map: typing.MutableSequence[typing.Tuple[str, int]]=None, ) -> None: self.template_map = list() if template_map is None else template_map
def we_can_has_sequence(p, q, r, s, t, u): """ :type p: typing.Sequence[int] :type q: typing.Sequence[B] :type r: typing.Sequence[int] :type s: typing.Sequence["int"] :type t: typing.MutableSequence[dict] :type u: typing.List[float] """ #? ["count"] p.c #? int() p[1] #? ["count"] q.c #? B() q[1] #? ["count"] r.c #? int() r[1] #? ["count"] s.c #? int() s[1] #? [] s.a #? ["append"] t.a #? dict() t[1] #? ["append"] u.a #? float() u[1]
def __init__(self, feedstate_dict: Mapping[str, Any]=None) -> None: if feedstate_dict is not None: LOG.debug("Successfully loaded feed state dict.") self.feed = feedstate_dict.get("feed", {}) self.entries = feedstate_dict.get("entries", []) self.entries_state_dict = feedstate_dict.get("entries_state_dict", {}) self.queue = collections.deque(feedstate_dict.get("queue", [])) # Store the most recent SUMMARY_LIMIT items we've downloaded. temp_list = feedstate_dict.get("summary_queue", []) self.summary_queue: MutableSequence[Dict[str, Any]] = collections.deque( [], SUMMARY_LIMIT, ) # When we load from the cache file, mark all of the items in the summary queue as not # being from the current session. for elem in temp_list: elem["is_this_session"] = False self.summary_queue.append(elem) last_modified = feedstate_dict.get("last_modified", None) self.store_last_modified(last_modified) self.etag = feedstate_dict.get("etag", None) self.latest_entry_number = feedstate_dict.get("latest_entry_number", None) else: LOG.debug("Did not successfully load feed state dict.") LOG.debug("Creating blank dict.") self.feed = {} self.entries = [] self.entries_state_dict = {} self.queue = collections.deque([]) self.summary_queue = collections.deque([], SUMMARY_LIMIT) self.last_modified: Any = None self.etag: str = None self.latest_entry_number = None
def absorb(self, trits, offset=0, length=None): # type: (MutableSequence[int], int, Optional[int]) -> None """ Absorb trits into the sponge from a buffer. :param trits: Buffer that contains the trits to absorb. :param offset: Starting offset in ``trits``. :param length: Number of trits to absorb. Defaults to ``len(trits)``. """ # Pad input if necessary, so that it can be divided evenly into # hashes. # Note that this operation creates a COPY of ``trits``; the # incoming buffer is not modified! pad = ((len(trits) % TRIT_HASH_LENGTH) or TRIT_HASH_LENGTH) trits += [0] * (TRIT_HASH_LENGTH - pad) if length is None: length = len(trits) if length < 1: raise with_context( exc = ValueError('Invalid length passed to ``absorb``.'), context = { 'trits': trits, 'offset': offset, 'length': length, }, ) while offset < length: stop = min(offset + TRIT_HASH_LENGTH, length) # If we're copying over a full chunk, zero last trit if stop - offset == TRIT_HASH_LENGTH: trits[stop - 1] = 0 signed_nums = conv.convertToBytes(trits[offset:stop]) # Convert signed bytes into their equivalent unsigned representation # In order to use Python's built-in bytes type unsigned_bytes = bytearray(conv.convert_sign(b) for b in signed_nums) self.k.update(unsigned_bytes) offset += TRIT_HASH_LENGTH
def squeeze(self, trits, offset=0, length=None): # type: (MutableSequence[int], int, Optional[int]) -> None """ Squeeze trits from the sponge into a buffer. :param trits: Buffer that will hold the squeezed trits. IMPORTANT: If ``trits`` is too small, it will be extended! :param offset: Starting offset in ``trits``. :param length: Number of trits to squeeze from the sponge. If not specified, defaults to :py:data:`TRIT_HASH_LENGTH` (i.e., by default, we will try to squeeze exactly 1 hash). """ # Pad input if necessary, so that it can be divided evenly into # hashes. pad = ((len(trits) % TRIT_HASH_LENGTH) or TRIT_HASH_LENGTH) trits += [0] * (TRIT_HASH_LENGTH - pad) if length is None: # By default, we will try to squeeze one hash. # Note that this is different than ``absorb``. length = len(trits) or TRIT_HASH_LENGTH if length < 1: raise with_context( exc = ValueError('Invalid length passed to ``squeeze``.'), context = { 'trits': trits, 'offset': offset, 'length': length, }, ) while offset < length: unsigned_hash = self.k.digest() if PY2: unsigned_hash = map(ord, unsigned_hash) # type: ignore signed_hash = [conv.convert_sign(b) for b in unsigned_hash] trits_from_hash = conv.convertToTrits(signed_hash) trits_from_hash[TRIT_HASH_LENGTH - 1] = 0 stop = min(TRIT_HASH_LENGTH, length-offset) trits[offset:offset+stop] = trits_from_hash[0:stop] flipped_bytes = bytearray(conv.convert_sign(~b) for b in unsigned_hash) # Reset internal state before feeding back in self.reset() self.k.update(flipped_bytes) offset += TRIT_HASH_LENGTH
def get_digest(self): # type: () -> Digest """ Generates the digest used to do the actual signing. Signing keys can have variable length and tend to be quite long, which makes them not-well-suited for use in crypto algorithms. The digest is essentially the result of running the signing key through a PBKDF, yielding a constant-length hash that can be used for crypto. """ hashes_per_fragment = FRAGMENT_LENGTH // Hash.LEN key_fragments = self.iter_chunks(FRAGMENT_LENGTH) # The digest will contain one hash per key fragment. digest = [0] * HASH_LENGTH * len(key_fragments) # Iterate over each fragment in the key. for (i, fragment) in enumerate(key_fragments): # type: Tuple[int, TryteString] fragment_trits = fragment.as_trits() key_fragment = [0] * FRAGMENT_LENGTH hash_trits = [] # Within each fragment, iterate over one hash at a time. for j in range(hashes_per_fragment): hash_start = j * HASH_LENGTH hash_end = hash_start + HASH_LENGTH hash_trits = fragment_trits[hash_start:hash_end] # type: MutableSequence[int] for k in range(26): sponge = Kerl() sponge.absorb(hash_trits) sponge.squeeze(hash_trits) key_fragment[hash_start:hash_end] = hash_trits # # After processing all of the hashes in the fragment, generate a # final hash and append it to the digest. # # Note that we will do this once per fragment in the key, so the # longer the key is, the longer the digest will be. # sponge = Kerl() sponge.absorb(key_fragment) sponge.squeeze(hash_trits) fragment_start = i * FRAGMENT_LENGTH fragment_end = fragment_start + FRAGMENT_LENGTH digest[fragment_start:fragment_end] = hash_trits return Digest(TryteString.from_trits(digest), self.key_index)