我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.indexbytes()。
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend): curve_name, rest = _read_next_string(decoded_data) data, rest = _read_next_string(rest) if expected_key_type != b"ecdsa-sha2-" + curve_name: raise ValueError( 'Key header and key body contain different key type values.' ) if rest: raise ValueError('Key body contains extra bytes.') curve = { b"nistp256": ec.SECP256R1, b"nistp384": ec.SECP384R1, b"nistp521": ec.SECP521R1, }[curve_name]() if six.indexbytes(data, 0) != 4: raise NotImplementedError( "Compressed elliptic curve points are not supported" ) numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data) return numbers.public_key(backend)
def _VarintDecoder(mask, result_type): """Return an encoder for a basic varint value (does not include tag). Decoded values will be bitwise-anded with the given mask before being returned, e.g. to limit them to 32 bits. The returned decoder does not take the usual "end" parameter -- the caller is expected to do bounds checking after the fact (often the caller can defer such checking until later). The decoder returns a (value, new_pos) pair. """ def DecodeVarint(buffer, pos): result = 0 shift = 0 while 1: b = six.indexbytes(buffer, pos) result |= ((b & 0x7f) << shift) pos += 1 if not (b & 0x80): result &= mask result = result_type(result) return (result, pos) shift += 7 if shift >= 64: raise _DecodeError('Too many bytes when decoding varint.') return DecodeVarint
def ReadTag(buffer, pos): """Read a tag from the buffer, and return a (tag_bytes, new_pos) tuple. We return the raw bytes of the tag rather than decoding them. The raw bytes can then be used to look up the proper decoder. This effectively allows us to trade some work that would be done in pure-python (decoding a varint) for work that is done in C (searching for a byte string in a hash table). In a low-level language it would be much cheaper to decode the varint and use that, but not in Python. """ start = pos while six.indexbytes(buffer, pos) & 0x80: pos += 1 pos += 1 return (buffer[start:pos], pos) # --------------------------------------------------------------------
def _load_ssh_ecdsa_public_key(expected_key_type, decoded_data, backend): curve_name, rest = _ssh_read_next_string(decoded_data) data, rest = _ssh_read_next_string(rest) if expected_key_type != b"ecdsa-sha2-" + curve_name: raise ValueError( 'Key header and key body contain different key type values.' ) if rest: raise ValueError('Key body contains extra bytes.') curve = { b"nistp256": ec.SECP256R1, b"nistp384": ec.SECP384R1, b"nistp521": ec.SECP521R1, }[curve_name]() if six.indexbytes(data, 0) != 4: raise NotImplementedError( "Compressed elliptic curve points are not supported" ) numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(curve, data) return numbers.public_key(backend)
def test_binary_output_length_larger_than_127(frame_factory, monkeypatch): # even shorter test, as we don't want to repeat ourselves. monkeypatch.setattr( 'lomond.frame.make_masking_key', lambda: b'\x00\x00\x00\x00') large_length = 1 << 16 frame = frame_factory(Opcode.BINARY, payload=b'\x01' * large_length) frame_binary = frame.to_bytes() expected_length = 1 + 1 + 8 + 4 + large_length # ^ ^ ^ ^ ^ # | | | | +-- payload length (128 bytes) # | | | +------- mask length (4 bytes ) # | | +----------- length as uint64_t (4 bytes ) # | +--------------- masking byte # +------------------- opcode byte assert len(frame_binary) == expected_length assert six.indexbytes(frame_binary, 0) == 0b10000010 _length = six.indexbytes(frame_binary, 1) & 0b01111111 assert _length == 127 # we can decode the real length as well: decoded_length = unpack('!Q', frame_binary[2:10]) assert len(decoded_length) == 1 assert decoded_length[0] == large_length assert frame_binary[10:14] == b'\x00\x00\x00\x00' assert frame_binary[14:] == frame.payload
def _int_factory(sign, data): def parse_int(): value = 0 length = len(data) while length >= 8: segment = _rslice(data, length, 8) value <<= 64 value |= unpack('>Q', segment)[0] length -= 8 if length >= 4: segment = _rslice(data, length, 4) value <<= 32 value |= unpack('>I', segment)[0] length -= 4 if length >= 2: segment = _rslice(data, length, 2) value <<= 16 value |= unpack('>H', segment)[0] length -= 2 if length == 1: value <<= 8 value |= six.indexbytes(data, -length) return sign * value return parse_int
def read_byte(self): if self.__size < 1: raise IndexError('Buffer queue is empty') segments = self.__segments segment = segments[0] segment_len = len(segment) offset = self.__offset if BufferQueue.is_eof(segment): octet = _EOF else: octet = self.__ord(six.indexbytes(segment, offset)) offset += 1 if offset == segment_len: offset = 0 segments.popleft() self.__offset = offset self.__size -= 1 self.position += 1 return octet
def finalize(self): if self._buffer is None: raise AlreadyFinalized("Context was already finalized.") if len(self._buffer) != self.block_size // 8: raise ValueError("Invalid padding bytes.") valid = _lib.Cryptography_check_pkcs7_padding( self._buffer, self.block_size // 8 ) if not valid: raise ValueError("Invalid padding bytes.") pad_size = six.indexbytes(self._buffer, -1) res = self._buffer[:-pad_size] self._buffer = None return res
def extract_message(cls, raw_bytes): if b':' not in raw_bytes: if len(raw_bytes) > 10: raise FramingError( 'Length information missing: %s' % raw_bytes) return None, raw_bytes msg_len, rest = raw_bytes.split(b':', 1) try: msg_len = int(msg_len) except ValueError: raise FramingError('Invalid length: %s' % raw_bytes) if msg_len < 0: raise FramingError('Negative length: %s' % raw_bytes) if len(rest) < msg_len + 1: return None, raw_bytes else: if six.indexbytes(rest, msg_len) != 44: raise FramingError( 'Missing correct end marker: %s' % raw_bytes) return rest[:msg_len], rest[(msg_len + 1):]
def finalize(self): if self._buffer is None: raise AlreadyFinalized("Context was already finalized.") if len(self._buffer) != self.block_size // 8: raise ValueError("Invalid padding bytes.") valid = lib.Cryptography_check_pkcs7_padding( self._buffer, self.block_size // 8 ) if not valid: raise ValueError("Invalid padding bytes.") pad_size = six.indexbytes(self._buffer, -1) res = self._buffer[:-pad_size] self._buffer = None return res
def serialize(self): # fixup byte_length = (self.length + 7) // 8 bin_addr = self._to_bin(self.addr) if (self.length % 8) == 0: bin_addr = bin_addr[:byte_length] else: # clear trailing bits in the last octet. # rfc doesn't require this. mask = 0xff00 >> (self.length % 8) last_byte = six.int2byte( six.indexbytes(bin_addr, byte_length - 1) & mask) bin_addr = bin_addr[:byte_length - 1] + last_byte self.addr = self._from_bin(bin_addr) buf = bytearray() msg_pack_into(self._PACK_STR, buf, 0, self.length) return buf + bytes(bin_addr)
def parser(cls, buf): (diag, flags, detect_mult, length, my_discr, your_discr, desired_min_tx_interval, required_min_rx_interval, required_min_echo_rx_interval) = \ struct.unpack_from(cls._PACK_STR, buf[:cls._PACK_STR_LEN]) ver = diag >> 5 diag = diag & 0x1f state = flags >> 6 flags = flags & 0x3f if flags & BFD_FLAG_AUTH_PRESENT: auth_type = six.indexbytes(buf, cls._PACK_STR_LEN) auth_cls = cls._auth_parsers[auth_type].\ parser(buf[cls._PACK_STR_LEN:])[0] else: auth_cls = None msg = cls(ver, diag, state, flags, detect_mult, my_discr, your_discr, desired_min_tx_interval, required_min_rx_interval, required_min_echo_rx_interval, auth_cls) return msg, None, None
def _byte_unpadding_check(buffer_, block_size, checkfn): if buffer_ is None: raise AlreadyFinalized("Context was already finalized.") if len(buffer_) != block_size // 8: raise ValueError("Invalid padding bytes.") valid = checkfn(buffer_, block_size // 8) if not valid: raise ValueError("Invalid padding bytes.") pad_size = six.indexbytes(buffer_, -1) return buffer_[:-pad_size]
def _SignedVarintDecoder(bits, result_type): """Like _VarintDecoder() but decodes signed values.""" signbit = 1 << (bits - 1) mask = (1 << bits) - 1 def DecodeVarint(buffer, pos): result = 0 shift = 0 while 1: b = six.indexbytes(buffer, pos) result |= ((b & 0x7f) << shift) pos += 1 if not (b & 0x80): result &= mask result = (result ^ signbit) - signbit result = result_type(result) return (result, pos) shift += 7 if shift >= 64: raise _DecodeError('Too many bytes when decoding varint.') return DecodeVarint # We force 32-bit values to int and 64-bit values to long to make # alternate implementations where the distinction is more significant # (e.g. the C++ implementation) simpler.
def _ssh_write_mpint(value): data = utils.int_to_bytes(value) if six.indexbytes(data, 0) & 0x80: data = b"\x00" + data return _ssh_write_string(data)
def _dynamic_truncate(self, counter): ctx = hmac.HMAC(self._key, self._algorithm, self._backend) ctx.update(struct.pack(">Q", counter)) hmac_value = ctx.finalize() offset = six.indexbytes(hmac_value, len(hmac_value) - 1) & 0b1111 p = hmac_value[offset:offset + 4] return struct.unpack(">I", p)[0] & 0x7fffffff
def test_binary_output_length_smaller_or_equal_127(frame_factory, monkeypatch): # this unit test is very similar to previous one, the difference being of # course the length of the packet in question monkeypatch.setattr( 'lomond.frame.make_masking_key', lambda: b'\x00\x00\x00\x00') frame = frame_factory(Opcode.BINARY, payload=b'\x01' * 127) frame_binary = frame.to_bytes() expected_length = 1 + 1 + 2 + 4 + 127 # ^ ^ ^ ^ ^ # | | | | +-- payload length (127 bytes) # | | | +------- mask length (4 bytes ) # | | +----------- length as uint16_t (2 bytes ) # | +--------------- masking byte # +------------------- opcode byte assert len(frame_binary) == expected_length # the first byte doesn't change at all, so please look inside the previous # function for in-depth explanation assert six.indexbytes(frame_binary, 0) == 0b10000010 # in the second byte of the header, length field should be set to 126 to # indicate a payload of length which should be encoded as uint16_t # just for the fun of it, we can decode the actual length value: _length = six.indexbytes(frame_binary, 1) & 0b01111111 assert _length == 126 assert frame_binary[4:8] == b'\x00\x00\x00\x00' assert frame_binary[8:] == frame.payload