我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用struct.html()。
def write_value(fd, fmt, *value, **kwargs): """ Write a single binary value to a file-like object. Parameters ---------- fd : file-like object Must be opened for writing, in binary mode. fmt : str A `struct` module `format character <https://docs.python.org/2/library/struct.html#format-characters>`__ string. value : any The value to encode and write to the file. endian : str The endianness. Must be ``>`` or ``<``. Default: ``>``. """ endian = kwargs.get('endian', '>') fmt = endian + fmt fd.write(struct.pack(fmt, *value))
def __init__(self, file_path, mode, data_fmt, convert_type=None): """ Args: file_path mode: 'w', 'r', or 'a'. Automatically append 'b' to the mode. data_fmt: see https://docs.python.org/2/library/struct.html i - int; I - unsigned int; q - long long; Q - unsigned long long f - float; d - double; s - string; c - char; ? - bool b - signed char; B - unsigned char; h - short; H - unsigned short. 3i - tuple of 3 ints; 'ifb' - tuple of int, float, bool convert_type: if you write int, file.read() will return a tuple (3,) use convert_return to return convert_return(*read) instead. """ self.data_fmt = data_fmt self._size = struct.calcsize(data_fmt) self.convert_type = convert_type mode = self._get_mode(mode) AbstractFile.__init__(self, file_path, mode)
def split_field(byte_len, data_bytes): ''' Return a tuple containing the first byte_len bytes in data_bytes, and the remainder of data_bytes. Asserts if data_bytes is not at least byte_len bytes long. ''' assert len(data_bytes) >= byte_len return (bytearray(data_bytes[0:byte_len]), bytearray(data_bytes[byte_len:])) # struct formats. See # https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
def endianness(self): """ '<' little-endian '!' Network-byte-order (big-endian) https://docs.python.org/3.5/library/struct.html#format-strings From the RFC: The NETWORK_BYTE_ORDER bit applies to all multi-byte integer values in the entire AgentX packet, including the remaining header fields. If set, then network byte order (most significant byte first; "big endian") is used. If not set, then least significant byte first ("little endian") is used. """ return '!' if self.flag__network_byte_order else '<'
def read_value(fd, fmt, endian='>'): # type: (BinaryIO, unicode, unicode) -> Any """ Read a values from a file-like object. Parameters ---------- fd : file-like object Must be opened for reading, in binary mode. fmt : str A `struct` module `format character <https://docs.python.org/2/library/struct.html#format-characters>`__ string. endian : str The endianness. Must be ``>`` or ``<``. Default: ``>``. Returns ------- value : any The value(s) read from the file. If a single value, it is returned alone. If multiple values, a tuple is returned. """ fmt = endian + fmt size = struct.calcsize(fmt) # type: ignore result = struct.unpack(fmt, fd.read(size)) # type: ignore if len(result) == 1: return result[0] else: return result
def unpack(self, value): """Unpack a value using Python's struct.unpack()""" assert(self.pack_format is not None) # Note: "The result is a tuple even if it contains exactly one item." # (https://docs.python.org/2/library/struct.html#struct.unpack) # For single-valued data types, use AdsSingleValuedDatatype to get the # first (and only) entry of the tuple after unpacking. return struct.unpack(self.pack_format, value)
def _start(self): # memoryview act as an recv buffer # refer https://docs.python.org/3/library/stdtypes.html#memoryview buff = memoryview(bytearray(RECV_BUFFER_SIZE)) while True: if not self.conn_rd: # sleep if there is no connections time.sleep(0.06) continue # blocks until there is socket(s) ready for .recv # notice: sockets which were closed by remote, # are also regarded as read-ready by select() r, w, e = select.select(self.conn_rd, [], [], 0.5) for s in r: # iter every read-ready or closed sockets try: # here, we use .recv_into() instead of .recv() # recv data directly into the pre-allocated buffer # to avoid many unnecessary malloc() # see https://docs.python.org/3/library/socket.html#socket.socket.recv_into rec_len = s.recv_into(buff, RECV_BUFFER_SIZE) except: # unable to read, in most cases, it's due to socket close self._rd_shutdown(s) continue if not rec_len: # read zero size, closed or shutdowned socket self._rd_shutdown(s) continue try: # send data, we use `buff[:rec_len]` slice because # only the front of buff is filled self.map[s].send(buff[:rec_len]) except: # unable to send, close connection self._rd_shutdown(s) continue
def decode_string(data): """ Decode string and strip NULL-bytes from end.""" return data.decode('utf-8').rstrip('\0') # IRIS Data Types and corresponding python struct format characters # 4.2 Scalar Definitions, Page 23 # https://docs.python.org/3/library/struct.html#format-characters
def read_uleb128(data): # the first bit of each byte is 1, unless that's the last byte total = 0 found = False # so technically it doesn't have to be 5... if len(data) != 5: log(3, "read_uleb128, where len(data) == %i" % len(data)) #assert len(data) == 5 for i in xrange(5): value = ord(data[i]) high_bit = (ord(data[i]) >> 7) # clear the high bit total += (value & 0x7f) << (i * 7) | total # this is the last byte, so break if high_bit == 0: found = True break if not found: # redundant to also check for "i == 4"? log(3, "invalid ULEB128") assert False # return (value, num_of_bytes) # where num_of_bytes indicates how much space this LEB128 took up return total, i+1 # http://llvm.org/docs/doxygen/html/LEB128_8h_source.html # hex => decimal ###############3 # 00 => 0 # 01 => 1 # 7f => -1 # 80 7f => -128
def class_defs(self): # seems ok class_defs_size_offset = 96 # VERIFIED class_defs_off_offset = 100 # VERIFIED self.class_defs_size = self.read_uint(class_defs_size_offset) # ok self.class_defs_off = self.read_uint(class_defs_off_offset) # ok print "\n===============================\n" print "class_defs_size: ", self.class_defs_size, "\n" print "class_defs_off: ", hex(self.class_defs_off), "\n" # class_def_items will store the class_def_items, see "class_def_item" @ https://source.android.com/devices/tech/dalvik/dex-format.html # Name | Format # ======================================== # class_idx uint | uint # access_flags | uint # superclass_idx | uint # interfaces_off | uint # source_file_idx | uint # annotations_off | uint # class_data_off | uint # static_values_off | uint class_def_item_size = 0x20 # 0x20 is 32 decimal, the class_def_item size in bytes class_def_items = [] offset = 0 for i in range(self.class_defs_size): offset = four_byte_align(offset) item = self.read_class_def_item(offset) offset += class_def_item_size class_def_items.append(item) # list of class_def_item objects return class_def_items # collision? # handles data_size, data_off
def _create(self, message_h, **fields): ''' Render the supplied message values to the bytearray. ''' # # ensure that the fields match the schema if message_h not in self.gruel_protocol: raise Exception("No message exists matching [%s]"%message_h) message_stencil = self.gruel_protocol.get_message_stencil( message_h=message_h) set_sch = set(message_stencil.field_names()) set_got = set(fields.keys()) if set_sch != set_got: raise Exception("Inconsistent fields/want:%s/got:%s"%( str(set_sch), str(set_got))) # # render offset = 0 for (field_h, field_dt) in message_stencil.items(): field_value = fields[field_h] # struct docs: https://docs.python.org/3.1/library/struct.html #log('** o%s press %s %s'%(offset, field_h, field_value)) if field_dt.name == 'u1': bsize = 1 struct.pack_into( '!B', # fmt. B is unsigned char self.arr, # buffer offset, # offset field_value) offset += bsize elif field_dt.name == 'u2': bsize = 2 struct.pack_into( '!H', # fmt. H is unsigned short self.arr, # buffer offset, # offset field_value) offset += bsize elif field_dt.name == 'vs': s_len = len(field_value) # first we do a two-byte length, network-endian bsize = 2 struct.pack_into( '!H', # fmt. H is unsigned short self.arr, # buffer offset, # offset s_len) offset += bsize # Now we put that string into the array. Emphasis: these # strings are not zero-terminated. struct.pack_into( '%ss'%s_len, # fmt. self.arr, # buffer offset, # offset bytes(field_value, 'utf8')) offset += s_len else: raise Exception("Datatype not recognised/handled: %s"%( field_dt.name)) return self.arr
def parse_idx(fd): """Parse an IDX file, and return it as a numpy array. Parameters ---------- fd : file File descriptor of the IDX file to parse endian : str Byte order of the IDX file. See [1] for available options Returns ------- data : numpy.ndarray Numpy array with the dimensions and the data in the IDX file 1. https://docs.python.org/3/library/struct.html#byte-order-size-and-alignment """ DATA_TYPES = {0x08: 'B', # unsigned byte 0x09: 'b', # signed byte 0x0b: 'h', # short (2 bytes) 0x0c: 'i', # int (4 bytes) 0x0d: 'f', # float (4 bytes) 0x0e: 'd'} # double (8 bytes) header = fd.read(4) if len(header) != 4: raise IdxDecodeError('Invalid IDX file, file empty or does not contain a full header.') zeros, data_type, num_dimensions = struct.unpack('>HBB', header) if zeros != 0: raise IdxDecodeError('Invalid IDX file, file must start with two zero bytes. ' 'Found 0x%02x' % zeros) try: data_type = DATA_TYPES[data_type] except KeyError: raise IdxDecodeError('Unknown data type 0x%02x in IDX file' % data_type) dimension_sizes = struct.unpack('>' + 'I' * num_dimensions, fd.read(4 * num_dimensions)) data = array.array(data_type, fd.read()) data.byteswap() # looks like array.array reads data as little endian expected_items = functools.reduce(operator.mul, dimension_sizes) if len(data) != expected_items: raise IdxDecodeError('IDX file has wrong number of items. ' 'Expected: %d. Found: %d' % (expected_items, len(data))) return np.array(data).reshape(dimension_sizes)
def _start(self): # memoryview act as an recv buffer # refer https://docs.python.org/3/library/stdtypes.html#memoryview buff = memoryview(bytearray(RECV_BUFFER_SIZE)) while True: if not self.conn_rd: # sleep if there is no connections time.sleep(0.06) continue # blocks until there is socket(s) ready for .recv # notice: sockets which were closed by remote, # are also regarded as read-ready by select() r, w, e = select.select(self.conn_rd, [], [], 0.5) for s in r: # iter every read-ready or closed sockets try: # here, we use .recv_into() instead of .recv() # recv data directly into the pre-allocated buffer # to avoid many unnecessary malloc() # see https://docs.python.org/3/library/socket.html#socket.socket.recv_into rec_len = s.recv_into(buff, RECV_BUFFER_SIZE) # agre = "http" # url = agre + '://' + heads['Host'] # heads = httphead(buff.tobytes().decode('utf-8')) # logging.info("recv head:{}".format(heads)) except Exception as e: # unable to read, in most cases, it's due to socket close self._rd_shutdown(s) continue if not rec_len: # read zero size, closed or shutdowned socket self._rd_shutdown(s) continue try: # send data, we use `buff[:rec_len]` slice because # only the front of buff is filled self.map[s].send(buff[:rec_len]) except Exception as e: # unable to send, close connection self._rd_shutdown(s) continue