我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.Struct()。
def read(self): self.fileptr.seek(self.offset, 0) rec_fmt = '=LBBHLLHHlBBH' rec_len = struct.calcsize(rec_fmt) rec_unpack = struct.Struct(rec_fmt).unpack_from s = rec_unpack(self.fileptr.read(rec_len)) self.STX = s[1] self.typeOfDatagram = chr(s[2]) self.EMModel = s[3] self.RecordDate = s[4] self.Time = float(s[5]/1000.0) self.Counter = s[6] self.SerialNumber = s[7] self.Height = float (s[8] / float (100)) self.HeightType = s[9] # now read the footer self.ETX, self.checksum = readFooter(self.numberOfBytes, self.fileptr) ###############################################################################
def readFooter(numberOfBytes, fileptr): rec_fmt = '=BH' rec_len = struct.calcsize(rec_fmt) rec_unpack = struct.Struct(rec_fmt).unpack_from s = rec_unpack(fileptr.read(rec_len)) ETX = s[0] checksum = s[1] # self.DatagramAsReceived = s[0].decode('utf-8').rstrip('\x00') # if numberOfBytes % 2 == 0: # # skip the spare byte # ETX = s[2] # checksum = s[3] # else: # ETX = s[1] # checksum = s[2] # #read any trailing bytes. We have seen the need for this with some .all files. # if bytesRead < self.numberOfBytes: # self.fileptr.read(int(self.numberOfBytes - bytesRead)) return ETX, checksum ###############################################################################
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False): # Helper function for a85encode and b85encode if not isinstance(b, bytes_types): b = memoryview(b).tobytes() padding = (-len(b)) % 4 if padding: b = b + b'\0' * padding words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b) chunks = [b'z' if foldnuls and not word else b'y' if foldspaces and word == 0x20202020 else (chars2[word // 614125] + chars2[word // 85 % 7225] + chars[word % 85]) for word in words] if padding and not pad: if chunks[-1] == b'z': chunks[-1] = chars[0] * 5 chunks[-1] = chunks[-1][:-padding] return b''.join(chunks)
def get_packer(self, obj): has_bitmap, none_bitmap, present_bitmap = self._get_bitmaps(obj) rv = self.packer_cache.get(present_bitmap) if rv is None: packer = struct.Struct("".join([ self.bitmap_packer.format, ] + [ self.slot_struct_types[slot] for i,slot in enumerate(self.slot_keys) if present_bitmap & (cython.cast(cython.ulonglong, 1) << i) ])) alignment = self.alignment size = packer.size padding = (size + alignment - 1) / alignment * alignment - size self.packer_cache[present_bitmap] = rv = (packer, padding) return rv
def get_unpacker(self, has_bitmap, none_bitmap): present_bitmap = has_bitmap & ~none_bitmap if self._last_unpacker is not None and present_bitmap == self._last_unpacker_bitmap: return self._last_unpacker rv = self.unpacker_cache.get(present_bitmap) if rv is None: pformat = "".join([ self.slot_struct_types[slot] for i,slot in enumerate(self.slot_keys) if present_bitmap & (cython.cast(cython.ulonglong, 1) << i) ]) unpacker = struct.Struct(pformat) alignment = self.alignment size = unpacker.size padding = (size + self.bitmap_size + alignment - 1) / alignment * alignment - size gfactory = GenericProxyClass(self.slot_keys, self.slot_types, present_bitmap, self.bitmap_size) rv = (unpacker, padding, pformat, gfactory) self.unpacker_cache[present_bitmap] = rv self._last_unpacker_bitmap = present_bitmap self._last_unpacker = rv return rv
def parse_station_record(self, line): fieldnames = ['StationCode', 'StationName', 'DateStart', 'DateEnd', 'AntennaHeight', 'HeightCode', 'AntennaNorth', 'AntennaEast', 'ReceiverCode', 'ReceiverVers', 'ReceiverFirmware', 'ReceiverSerial', 'AntennaCode', 'RadomeCode', 'AntennaSerial'] fieldwidths = ( 1, 6, 18, 19, 19, 9, 7, 9, 9, 22, 22, 7, 22, 17, 7, 20) # negative widths represent ignored padding fields fmtstring = ' '.join('{}{}'.format(abs(fw), 'x' if fw < 0 else 's') for fw in fieldwidths) fieldstruct = struct.Struct(fmtstring) parse = fieldstruct.unpack_from if line[0] == ' ' and len(line) >= 77: record = dict(zip(fieldnames, map(str.strip, parse(line.ljust(fieldstruct.size))[1:]))) else: return None # convert to datetime object DateStart, DateEnd = self.stninfodate2datetime(record['DateStart'], record['DateEnd']) record['DateStart'] = DateStart record['DateEnd'] = DateEnd record['StationCode'] = record['StationCode'].lower() return record
def __init__(self, cloexec=True, nonblock=True): self._init1, self._add_watch, self._rm_watch, self._read = load_inotify() flags = 0 if cloexec: flags |= self.CLOEXEC if nonblock: flags |= self.NONBLOCK self._inotify_fd = self._init1(flags) if self._inotify_fd == -1: raise INotifyError(os.strerror(ctypes.get_errno())) self._buf = ctypes.create_string_buffer(5000) self.fenc = get_preferred_file_name_encoding() self.hdr = struct.Struct(b'iIII') # We keep a reference to os to prevent it from being deleted # during interpreter shutdown, which would lead to errors in the # __del__ method self.os = os
def _binary_write(filepath, faces): with open(filepath, 'wb') as data: fw = data.write # header # we write padding at header beginning to avoid to # call len(list(faces)) which may be expensive fw(struct.calcsize('<80sI') * b'\0') # 3 vertex == 9f pack = struct.Struct('<9f').pack # number of vertices written nb = 0 for face in faces: # calculate face normal # write normal + vertexes + pad as attributes fw(struct.pack('<3f', *normal(*face)) + pack(*itertools.chain.from_iterable(face))) # attribute byte count (unused) fw(b'\0\0') nb += 1 # header, with correct value now data.seek(0) fw(struct.pack('<80sI', _header_version().encode('ascii'), nb))
def read(self): self.fileptr.seek(self.offset, 0) rec_fmt = '=LBBHLLHHLLBBH' rec_len = struct.calcsize(rec_fmt) rec_unpack = struct.Struct(rec_fmt).unpack # bytesRead = rec_len s = rec_unpack(self.fileptr.read(rec_len)) # self.numberOfBytes = s[0] self.STX = s[1] self.typeOfDatagram = chr(s[2]) self.EMModel = s[3] self.RecordDate = s[4] self.Time = float(s[5]/1000.0) self.ClockCounter = s[6] self.SerialNumber = s[7] self.ExternalDate = s[8] self.ExternalTime = s[9] self.PPS = s[10] self.ETX = s[11] self.checksum = s[12] ###############################################################################
def test_unpack_from(self): test_string = b'abcd01234' fmt = '4s' s = struct.Struct(fmt) for cls in (bytes, bytearray): data = cls(test_string) self.assertEqual(s.unpack_from(data), (b'abcd',)) self.assertEqual(s.unpack_from(data, 2), (b'cd01',)) self.assertEqual(s.unpack_from(data, 4), (b'0123',)) for i in range(6): self.assertEqual(s.unpack_from(data, i), (data[i:i+4],)) for i in range(6, len(test_string) + 1): self.assertRaises(struct.error, s.unpack_from, data, i) for cls in (bytes, bytearray): data = cls(test_string) self.assertEqual(struct.unpack_from(fmt, data), (b'abcd',)) self.assertEqual(struct.unpack_from(fmt, data, 2), (b'cd01',)) self.assertEqual(struct.unpack_from(fmt, data, 4), (b'0123',)) for i in range(6): self.assertEqual(struct.unpack_from(fmt, data, i), (data[i:i+4],)) for i in range(6, len(test_string) + 1): self.assertRaises(struct.error, struct.unpack_from, fmt, data, i)
def parse(cls, buff, offset): """ Given a buffer and offset, returns the parsed value and new offset. Parses the ``size_primitive`` first to determine how many more bytes to consume to extract the value. """ size, offset = cls.size_primitive.parse(buff, offset) if size == -1: return None, offset var_struct = struct.Struct("!%ds" % size) value = var_struct.unpack_from(buff, offset)[0] value = cls.parse_value(value) offset += var_struct.size return value, offset
def send_message(self, message): """ Send the message to client :param message: the message to send """ output = self.request.wfile output.write(struct.Struct(">B").pack(129)) length = len(message) if length <= 125: output.write(struct.Struct(">B").pack(length)) elif length >= 126 and length <= 65535: output.write(struct.Struct(">B").pack(126)) output.write(struct.pack(">H", length)) else: output.write(struct.Struct(">B").pack(127)) output.write(struct.pack(">Q", length)) output.write(message) logging.debug(message)
def test_ceph_key(self, mock_urandom): result = utils.JinjaUtils.ceph_key() # First, decode the base64 raw_result = base64.b64decode(result.encode('ascii')) # Decompose into a header and a key hdr_struct = struct.Struct('<hiih') header = raw_result[:hdr_struct.size] key = raw_result[hdr_struct.size:] # Interpret the header _type, _secs, _nanosecs, key_len = hdr_struct.unpack(header) assert key_len == len(key) # Verify that the key is what it should be assert key == b'0123456789012345'
def __new__(mcs, clsname, clsbases, clsdict): headers = clsdict.get('__header__', []) if headers: header_attrs, header_fmt = zip(*headers) header_format_order = clsdict.get('__byte_order__', '>') header_format = [header_format_order] + list(header_fmt) header_struct = struct.Struct(''.join(header_format)) clsdict['__slots__'] = ('_fields', '_view', '_payload') clsdict['_header_fields'] = tuple(header_attrs) clsdict['_header_bytes_order'] = header_format_order clsdict['_header_struct'] = header_struct clsdict['_header_size'] = header_struct.size return type.__new__(mcs, clsname, clsbases, clsdict)
def b85decode(b): _b85dec = [None] * 256 for i, c in enumerate(iterbytes(_b85alphabet)): _b85dec[c] = i padding = (-len(b)) % 5 b = b + b'~' * padding out = [] packI = struct.Struct('!I').pack for i in range(0, len(b), 5): chunk = b[i:i + 5] acc = 0 try: for c in iterbytes(chunk): acc = acc * 85 + _b85dec[c] except TypeError: for j, c in enumerate(iterbytes(chunk)): if _b85dec[c] is None: raise ValueError( 'bad base85 character at position %d' % (i + j) ) raise try: out.append(packI(acc)) except struct.error: raise ValueError('base85 overflow in hunk starting at byte %d' % i) result = b''.join(out) if padding: result = result[:-padding] return result
def __init__(self,format_string): self._struct = _Struct(format_string)
def _unpack_bytes_from_pybuffer(buf, offs, idmap): if idmap is not None and offs in idmap: return idmap[offs] if cython.compiled: try: buf = _likebuffer(buf) PyObject_GetBuffer(buf, cython.address(pybuf), PyBUF_SIMPLE) # lint:ok rv = _unpack_bytes_from_cbuffer(cython.cast(cython.p_char, pybuf.buf), offs, pybuf.len, None) # lint:ok finally: PyBuffer_Release(cython.address(pybuf)) # lint:ok else: hpacker = struct.Struct('=H') objlen = hpacker.unpack_from(buf, offs)[0] offs = int(offs) dataoffs = offs + hpacker.size compressed = (objlen & 0x8000) != 0 if (objlen & 0x7FFF) == 0x7FFF: qpacker = struct.Struct('=HQ') objlen = qpacker.unpack_from(buf, offs)[1] dataoffs = offs + qpacker.size else: objlen = objlen & 0x7FFF rv = buffer(buf, dataoffs, objlen) if compressed: rv = lz4_decompress(rv) else: rv = bytes(rv) if idmap is not None: idmap[offs] = rv return rv
def set_nomenclature(self): """ As in get_nomenclature, but set the title of the file header in the file, encoded as a pascal string containing 15 characters and stored as 16 bytes of binary data. """ self.file.seek(0) title = 'DAC2 objects' st = struct.Struct( '<B15sH' ) header_rec = [len(title), title, 18] # constant header header_chr = st.pack( *header_rec ) self.header_size = len( header_chr ) self.file.write( header_chr )
def _make_packer(format_string): packer = struct.Struct(format_string) pack = packer.pack unpack = lambda s: packer.unpack(s)[0] return pack, unpack
def __init__(self, rich_format): self.rich_fmt = rich_format self.fmt = format_from_rich(rich_format) self.compiled = struct.Struct(self.fmt) self.size = self.compiled.size
def __call__(self, sample, as_string=False): values = sample.channel_data # save sockets that are closed to remove them later on outdated_list = [] for sock in self.CONNECTION_LIST: # If one error should happen, we remove socket from the list try: if as_string: sock.send(str(values) + "\n") else: nb_channels = len(values) # format for binary data, network endian (big) and float (float32) packer = struct.Struct('!%sf' % nb_channels) # convert values to bytes packed_data = packer.pack(*values) sock.send(packed_data) # TODO: should check if the correct number of bytes passed through except: # sometimes (always?) it's only during the second write to a close socket that an error is raised? print "Something bad happened, will close socket" outdated_list.append(sock) # now we are outside of the main list, it's time to remove outdated sockets, if any for bad_sock in outdated_list: print "Removing socket..." self.CONNECTION_LIST.remove(bad_sock) # not very costly to be polite bad_sock.close()
def blit(self, dest, fill_fore=True, fill_back=True): # use libtcod's "fill" functions to write the buffer to a console. if (console_get_width(dest) != self.width or console_get_height(dest) != self.height): raise ValueError('ConsoleBuffer.blit: Destination console has an incorrect size.') s = struct.Struct('%di' % len(self.back_r)) if fill_back: _lib.TCOD_console_fill_background(dest, (c_int * len(self.back_r))(*self.back_r), (c_int * len(self.back_g))(*self.back_g), (c_int * len(self.back_b))(*self.back_b)) if fill_fore: _lib.TCOD_console_fill_foreground(dest, (c_int * len(self.fore_r))(*self.fore_r), (c_int * len(self.fore_g))(*self.fore_g), (c_int * len(self.fore_b))(*self.fore_b)) _lib.TCOD_console_fill_char(dest, (c_int * len(self.char))(*self.char))
def __setstate__(self, d): self.__dict__.update(d) self._struct = struct.Struct(">" + str(self.sortable_typecode)) if "min_value" not in d: d["min_value"], d["max_value"] = self._min_max()
def __init__(self, dbfile, basepos, length, doccount, fixedlen): self._dbfile = dbfile self._basepos = basepos self._doccount = doccount self._fixedlen = fixedlen self._typecode = chr(dbfile.get_byte(basepos + length - 1)) st = struct.Struct("!" + self._typecode) self._unpack = st.unpack self._itemsize = st.size dbfile.seek(basepos + doccount * self._itemsize) self._uniques = self._read_uniques()
def __init__(self, dbfile, typecode, default): self._dbfile = dbfile self._pack = struct.Struct("!" + typecode).pack self._default = default self._defaultbytes = self._pack(default) self._fixedlen = struct.calcsize(typecode) self._count = 0
def __init__(self, dbfile, basepos, length, doccount, typecode, default): self._dbfile = dbfile self._basepos = basepos self._doccount = doccount self._default = default self._reverse = False self._typecode = typecode self._unpack = struct.Struct("!" + typecode).unpack self._defaultbytes = struct.pack("!" + typecode, default) self._fixedlen = struct.calcsize(typecode) self._count = length // self._fixedlen
def __init__(self, dbfile, spec, default): self._dbfile = dbfile self._struct = struct.Struct(spec) self._fixedlen = self._struct.size self._default = default self._defaultbytes = self._struct.pack(*default) self._count = 0
def __init__(self, dbfile, basepos, length, doccount, spec, default): self._dbfile = dbfile self._basepos = basepos self._doccount = doccount self._struct = struct.Struct(spec) self._fixedlen = self._struct.size self._default = default self._defaultbytes = self._struct.pack(*default) self._count = length // self._fixedlen
def __repr__(self): return "<Struct.Reader>"
def read_fields(self, line, data, format_tuple): # create the parser object formatstr = re.sub(r'\..', '',' '.join(format_tuple).replace('%', '').replace('f', 's').replace('i', 's').replace('-', '')) fs = struct.Struct(formatstr) parse = fs.unpack_from if len(data) < fs.size: # line too short, add padding zeros f = '%-' + str(fs.size) + 's' data = f % line elif len(data) > fs.size: # line too long! cut data = line[0:fs.size] fields = list(parse(data)) # convert each element in the list to float if necessary for i, field in enumerate(fields): if 'f' in format_tuple[i]: try: fields[i] = float(fields[i]) except ValueError: # invalid number in the field!, replace with something harmless fields[i] = float(2.11) elif 'i' in format_tuple[i]: try: fields[i] = int(fields[i]) except ValueError: # invalid number in the field!, replace with something harmless fields[i] = int(1) elif 's' in format_tuple[i]: fields[i] = fields[i].strip() return fields
def get_firstobs(self): fs = struct.Struct('1s2s1s2s1s2s1s2s1s2s11s2s1s3s') parse = fs.unpack_from date = None with open(self.rinex_path,'r') as fileio: found = False for line in fileio: if 'END OF HEADER' in line: found = True break if found: skip = 0 for line in fileio: if skip == 0: fields = list(parse(line)) if int(fields[12]) <= 1: # OK FLAG # read first observation year = int(fields[1]) month = int(fields[3]) day = int(fields[5]) hour = int(fields[7]) minute = int(fields[9]) second = float(fields[10]) date = pyDate.Date(year=year, month=month, day=day, hour=hour, minute=minute, second=second) break elif int(fields[12]) > 1: # event, skip lines indicated in next field skip = int(fields[13]) else: skip -= 1 return date
def __init__(self, n, r, p): # store config self.n = n self.r = r self.p = p self.smix_bytes = r << 7 # num bytes in smix input - 2*r*16*4 self.iv_bytes = self.smix_bytes * p self.bmix_len = bmix_len = r << 5 # length of bmix block list - 32*r integers self.bmix_half_len = r << 4 assert struct.calcsize("I") == 4 self.bmix_struct = struct.Struct("<" + str(bmix_len) + "I") # use optimized bmix for certain cases if r == 1: self.bmix = self._bmix_1 # pick best integerify function - integerify(bmix_block) should # take last 64 bytes of block and return a little-endian integer. # since it's immediately converted % n, we only have to extract # the first 32 bytes if n < 2**32 - which due to the current # internal representation, is already unpacked as a 32-bit int. if n <= 0xFFFFffff: integerify = operator.itemgetter(-16) else: assert n <= 0xFFFFffffFFFFffff ig1 = operator.itemgetter(-16) ig2 = operator.itemgetter(-17) def integerify(X): return ig1(X) | (ig2(X)<<32) self.integerify = integerify #================================================================= # frontend #=================================================================
def recv_int(self): unpacked_value = self._recv_msg(byte_length=4) s = struct.Struct(self.BINARY_INT) integer = s.unpack(unpacked_value) return integer[0]
def recv_table(self): unpacked_value = self._recv_msg(byte_length=480) s = struct.Struct(self.BINARY_TABLE) ls = s.unpack(unpacked_value) table = [ls[15 * i: 15 * (i + 1)][:] for i in range(8)] # 8x15??????? return table
def send_table(self, table): ls = [item for inner in table for item in inner] # 2??????1????? s = struct.Struct(self.BINARY_TABLE) packed_value = s.pack(*ls) self._send_msg(packed_value)
def read_struct(self, fmt): s = struct.Struct(fmt) result = s.unpack_from(self._data, self._position) self._position += s.size return result