Python struct 模块,html() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用struct.html()

项目:pytoshop    作者:mdboom    | 项目源码 | 文件源码
def write_value(fd, fmt, *value, **kwargs):
    """
    Write a single binary value to a file-like object.

    Parameters
    ----------
    fd : file-like object
        Must be opened for writing, in binary mode.

    fmt : str
        A `struct` module `format character
        <https://docs.python.org/2/library/struct.html#format-characters>`__
        string.

    value : any
        The value to encode and write to the file.

    endian : str
        The endianness. Must be ``>`` or ``<``.  Default: ``>``.
    """
    endian = kwargs.get('endian', '>')
    fmt = endian + fmt
    fd.write(struct.pack(fmt, *value))
项目:ml-utils    作者:LinxiFan    | 项目源码 | 文件源码
def __init__(self, file_path, mode, data_fmt, convert_type=None):
        """
        Args:
          file_path
          mode: 'w', 'r', or 'a'. Automatically append 'b' to the mode.
          data_fmt: see https://docs.python.org/2/library/struct.html
            i - int; I - unsigned int; q - long long; Q - unsigned long long
            f - float; d - double; s - string; c - char; ? - bool
            b - signed char; B - unsigned char; h - short; H - unsigned short.
            3i - tuple of 3 ints; 'ifb' - tuple of int, float, bool
          convert_type: if you write int, file.read() will return a tuple (3,)
              use convert_return to return convert_return(*read) instead.
        """
        self.data_fmt = data_fmt
        self._size = struct.calcsize(data_fmt)
        self.convert_type = convert_type
        mode = self._get_mode(mode)

        AbstractFile.__init__(self, file_path, mode)
项目:endosome    作者:teor2345    | 项目源码 | 文件源码
def split_field(byte_len, data_bytes):
    '''
    Return a tuple containing the first byte_len bytes in data_bytes, and the
    remainder of data_bytes.
    Asserts if data_bytes is not at least byte_len bytes long.
    '''
    assert len(data_bytes) >= byte_len
    return (bytearray(data_bytes[0:byte_len]),
            bytearray(data_bytes[byte_len:]))

# struct formats. See
# https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
项目:sonic-snmpagent    作者:Azure    | 项目源码 | 文件源码
def endianness(self):
        """
        '<' little-endian
        '!' Network-byte-order (big-endian)
        https://docs.python.org/3.5/library/struct.html#format-strings

         From the RFC:
         The NETWORK_BYTE_ORDER bit applies to all multi-byte integer
         values in the entire AgentX packet, including the remaining
         header fields.  If set, then network byte order (most
         significant byte first; "big endian") is used.  If not set,
         then least significant byte first ("little endian") is used.
        """
        return '!' if self.flag__network_byte_order else '<'
项目:pytoshop    作者:mdboom    | 项目源码 | 文件源码
def read_value(fd, fmt, endian='>'):
    # type: (BinaryIO, unicode, unicode) -> Any
    """
    Read a values from a file-like object.

    Parameters
    ----------
    fd : file-like object
        Must be opened for reading, in binary mode.

    fmt : str
        A `struct` module `format character
        <https://docs.python.org/2/library/struct.html#format-characters>`__
        string.

    endian : str
        The endianness. Must be ``>`` or ``<``.  Default: ``>``.

    Returns
    -------
    value : any
        The value(s) read from the file.

        If a single value, it is returned alone.  If multiple values,
        a tuple is returned.
    """
    fmt = endian + fmt
    size = struct.calcsize(fmt)  # type: ignore
    result = struct.unpack(fmt, fd.read(size))  # type: ignore
    if len(result) == 1:
        return result[0]
    else:
        return result
项目:counsyl-pyads    作者:counsyl    | 项目源码 | 文件源码
def unpack(self, value):
        """Unpack a value using Python's struct.unpack()"""
        assert(self.pack_format is not None)
        # Note: "The result is a tuple even if it contains exactly one item."
        # (https://docs.python.org/2/library/struct.html#struct.unpack)
        # For single-valued data types, use AdsSingleValuedDatatype to get the
        # first (and only) entry of the tuple after unpacking.
        return struct.unpack(self.pack_format, value)
项目:passbytcp    作者:mxdg    | 项目源码 | 文件源码
def _start(self):
        # memoryview act as an recv buffer
        # refer https://docs.python.org/3/library/stdtypes.html#memoryview
        buff = memoryview(bytearray(RECV_BUFFER_SIZE))
        while True:
            if not self.conn_rd:
                # sleep if there is no connections
                time.sleep(0.06)
                continue

            # blocks until there is socket(s) ready for .recv
            # notice: sockets which were closed by remote,
            #   are also regarded as read-ready by select()
            r, w, e = select.select(self.conn_rd, [], [], 0.5)

            for s in r:  # iter every read-ready or closed sockets
                try:
                    # here, we use .recv_into() instead of .recv()
                    #   recv data directly into the pre-allocated buffer
                    #   to avoid many unnecessary malloc()
                    # see https://docs.python.org/3/library/socket.html#socket.socket.recv_into
                    rec_len = s.recv_into(buff, RECV_BUFFER_SIZE)
                except:
                    # unable to read, in most cases, it's due to socket close
                    self._rd_shutdown(s)
                    continue

                if not rec_len:
                    # read zero size, closed or shutdowned socket
                    self._rd_shutdown(s)
                    continue

                try:
                    # send data, we use `buff[:rec_len]` slice because
                    #   only the front of buff is filled
                    self.map[s].send(buff[:rec_len])
                except:
                    # unable to send, close connection
                    self._rd_shutdown(s)
                    continue
项目:wradlib    作者:wradlib    | 项目源码 | 文件源码
def decode_string(data):
    """ Decode string and strip NULL-bytes from end."""
    return data.decode('utf-8').rstrip('\0')


# IRIS Data Types and corresponding python struct format characters
# 4.2 Scalar Definitions, Page 23
# https://docs.python.org/3/library/struct.html#format-characters
项目:binja_smali    作者:lucasduffey    | 项目源码 | 文件源码
def read_uleb128(data):
    # the first bit of each byte is 1, unless that's the last byte
    total = 0
    found = False

    # so technically it doesn't have to be 5...
    if len(data) != 5:
        log(3, "read_uleb128, where len(data) == %i" % len(data))
        #assert len(data) == 5


    for i in xrange(5):
        value = ord(data[i])
        high_bit = (ord(data[i]) >> 7)

        # clear the high bit
        total += (value & 0x7f) << (i * 7) | total

        # this is the last byte, so break
        if high_bit == 0:
            found = True
            break

    if not found: # redundant to also check for "i == 4"?
        log(3, "invalid ULEB128")
        assert False

    # return (value, num_of_bytes) # where num_of_bytes indicates how much space this LEB128 took up
    return total, i+1

# http://llvm.org/docs/doxygen/html/LEB128_8h_source.html
# hex => decimal
###############3
# 00 => 0
# 01 => 1
# 7f => -1
# 80 7f => -128
项目:binja_smali    作者:lucasduffey    | 项目源码 | 文件源码
def class_defs(self): # seems ok
        class_defs_size_offset = 96 # VERIFIED
        class_defs_off_offset = 100 # VERIFIED

        self.class_defs_size = self.read_uint(class_defs_size_offset) # ok
        self.class_defs_off = self.read_uint(class_defs_off_offset) # ok

        print "\n===============================\n"
        print "class_defs_size: ", self.class_defs_size, "\n"
        print "class_defs_off: ", hex(self.class_defs_off), "\n"

        # class_def_items will store the class_def_items, see "class_def_item" @ https://source.android.com/devices/tech/dalvik/dex-format.html

        # Name              |   Format
        # ========================================
        # class_idx uint    |   uint
        # access_flags      |   uint
        # superclass_idx    |   uint
        # interfaces_off    |   uint
        # source_file_idx   |   uint
        # annotations_off   |   uint
        # class_data_off    |   uint
        # static_values_off |   uint

        class_def_item_size = 0x20 # 0x20 is 32 decimal, the class_def_item size in bytes

        class_def_items = []
        offset = 0
        for i in range(self.class_defs_size):
            offset = four_byte_align(offset)
            item = self.read_class_def_item(offset)
            offset += class_def_item_size

            class_def_items.append(item)

        # list of class_def_item objects
        return class_def_items

    # collision?
    # handles data_size, data_off
项目:shootback    作者:aploium    | 项目源码 | 文件源码
def _start(self):
        # memoryview act as an recv buffer
        # refer https://docs.python.org/3/library/stdtypes.html#memoryview
        buff = memoryview(bytearray(RECV_BUFFER_SIZE))
        while True:
            if not self.conn_rd:
                # sleep if there is no connections
                time.sleep(0.06)
                continue

            # blocks until there is socket(s) ready for .recv
            # notice: sockets which were closed by remote,
            #   are also regarded as read-ready by select()
            r, w, e = select.select(self.conn_rd, [], [], 0.5)

            for s in r:  # iter every read-ready or closed sockets
                try:
                    # here, we use .recv_into() instead of .recv()
                    #   recv data directly into the pre-allocated buffer
                    #   to avoid many unnecessary malloc()
                    # see https://docs.python.org/3/library/socket.html#socket.socket.recv_into
                    rec_len = s.recv_into(buff, RECV_BUFFER_SIZE)
                except:
                    # unable to read, in most cases, it's due to socket close
                    self._rd_shutdown(s)
                    continue

                if not rec_len:
                    # read zero size, closed or shutdowned socket
                    self._rd_shutdown(s)
                    continue

                try:
                    # send data, we use `buff[:rec_len]` slice because
                    #   only the front of buff is filled
                    self.map[s].send(buff[:rec_len])
                except:
                    # unable to send, close connection
                    self._rd_shutdown(s)
                    continue
项目:solent    作者:solent-eng    | 项目源码 | 文件源码
def _create(self, message_h, **fields):
        '''
        Render the supplied message values to the bytearray.
        '''
        #
        # ensure that the fields match the schema
        if message_h not in self.gruel_protocol:
            raise Exception("No message exists matching [%s]"%message_h)
        message_stencil = self.gruel_protocol.get_message_stencil(
            message_h=message_h)
        set_sch = set(message_stencil.field_names())
        set_got = set(fields.keys())
        if set_sch != set_got:
            raise Exception("Inconsistent fields/want:%s/got:%s"%(
                str(set_sch), str(set_got)))
        #
        # render
        offset = 0
        for (field_h, field_dt) in message_stencil.items():
            field_value = fields[field_h]
            # struct docs: https://docs.python.org/3.1/library/struct.html
            #log('** o%s press %s %s'%(offset, field_h, field_value))
            if field_dt.name == 'u1':
                bsize = 1
                struct.pack_into(
                    '!B',        # fmt. B is unsigned char
                    self.arr,    # buffer
                    offset,      # offset
                    field_value)
                offset += bsize
            elif field_dt.name == 'u2':
                bsize = 2
                struct.pack_into(
                    '!H',        # fmt. H is unsigned short
                    self.arr,    # buffer
                    offset,      # offset
                    field_value)
                offset += bsize
            elif field_dt.name == 'vs':
                s_len = len(field_value)
                # first we do a two-byte length, network-endian
                bsize = 2
                struct.pack_into(
                    '!H',        # fmt. H is unsigned short
                    self.arr,    # buffer
                    offset,      # offset
                    s_len)
                offset += bsize
                # Now we put that string into the array. Emphasis: these
                # strings are not zero-terminated.
                struct.pack_into(
                    '%ss'%s_len, # fmt.
                    self.arr,    # buffer
                    offset,      # offset
                    bytes(field_value, 'utf8'))
                offset += s_len
            else:
                raise Exception("Datatype not recognised/handled: %s"%(
                    field_dt.name))
        return self.arr
项目:mnist    作者:datapythonista    | 项目源码 | 文件源码
def parse_idx(fd):
    """Parse an IDX file, and return it as a numpy array.

    Parameters
    ----------
    fd : file
        File descriptor of the IDX file to parse

    endian : str
        Byte order of the IDX file. See [1] for available options

    Returns
    -------
    data : numpy.ndarray
        Numpy array with the dimensions and the data in the IDX file

    1. https://docs.python.org/3/library/struct.html#byte-order-size-and-alignment
    """
    DATA_TYPES = {0x08: 'B',  # unsigned byte
                  0x09: 'b',  # signed byte
                  0x0b: 'h',  # short (2 bytes)
                  0x0c: 'i',  # int (4 bytes)
                  0x0d: 'f',  # float (4 bytes)
                  0x0e: 'd'}  # double (8 bytes)

    header = fd.read(4)
    if len(header) != 4:
        raise IdxDecodeError('Invalid IDX file, file empty or does not contain a full header.')

    zeros, data_type, num_dimensions = struct.unpack('>HBB', header)

    if zeros != 0:
        raise IdxDecodeError('Invalid IDX file, file must start with two zero bytes. '
                             'Found 0x%02x' % zeros)

    try:
        data_type = DATA_TYPES[data_type]
    except KeyError:
        raise IdxDecodeError('Unknown data type 0x%02x in IDX file' % data_type)

    dimension_sizes = struct.unpack('>' + 'I' * num_dimensions,
                                    fd.read(4 * num_dimensions))

    data = array.array(data_type, fd.read())
    data.byteswap()  # looks like array.array reads data as little endian

    expected_items = functools.reduce(operator.mul, dimension_sizes)
    if len(data) != expected_items:
        raise IdxDecodeError('IDX file has wrong number of items. '
                             'Expected: %d. Found: %d' % (expected_items, len(data)))

    return np.array(data).reshape(dimension_sizes)
项目:passbytcp    作者:mxdg    | 项目源码 | 文件源码
def _start(self):
        # memoryview act as an recv buffer
        # refer https://docs.python.org/3/library/stdtypes.html#memoryview
        buff = memoryview(bytearray(RECV_BUFFER_SIZE))
        while True:
            if not self.conn_rd:
                # sleep if there is no connections
                time.sleep(0.06)
                continue

            # blocks until there is socket(s) ready for .recv
            # notice: sockets which were closed by remote,
            #   are also regarded as read-ready by select()
            r, w, e = select.select(self.conn_rd, [], [], 0.5)

            for s in r:  # iter every read-ready or closed sockets
                try:
                    # here, we use .recv_into() instead of .recv()
                    #   recv data directly into the pre-allocated buffer
                    #   to avoid many unnecessary malloc()
                    # see https://docs.python.org/3/library/socket.html#socket.socket.recv_into
                    rec_len = s.recv_into(buff, RECV_BUFFER_SIZE)

                    # agre = "http"
                    # url = agre + '://' + heads['Host']
                    # heads = httphead(buff.tobytes().decode('utf-8'))
                    # logging.info("recv head:{}".format(heads))
                except Exception as e:
                    # unable to read, in most cases, it's due to socket close
                    self._rd_shutdown(s)
                    continue

                if not rec_len:
                    # read zero size, closed or shutdowned socket
                    self._rd_shutdown(s)
                    continue

                try:
                    # send data, we use `buff[:rec_len]` slice because
                    #   only the front of buff is filled
                    self.map[s].send(buff[:rec_len])
                except Exception as e:
                    # unable to send, close connection
                    self._rd_shutdown(s)
                    continue