我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用struct.iter_unpack()。
def test_construct(self): def _check_iterator(it): self.assertIsInstance(it, abc.Iterator) self.assertIsInstance(it, abc.Iterable) s = struct.Struct('>ibcp') it = s.iter_unpack(b"") _check_iterator(it) it = s.iter_unpack(b"1234567") _check_iterator(it) # Wrong bytes length with self.assertRaises(struct.error): s.iter_unpack(b"123456") with self.assertRaises(struct.error): s.iter_unpack(b"12345678") # Zero-length struct s = struct.Struct('>') with self.assertRaises(struct.error): s.iter_unpack(b"") with self.assertRaises(struct.error): s.iter_unpack(b"12")
def _unpack_bitplanes(self, data): """The values that make up a row of pixels are organized like: [bp1-row1] [bp2-row1] [bp3-row1] [bp4-row1]... [bp1-row8] [bp2-row8] [bp3-row8] [bp4-row8] Returns a list of lists containing bitplane values (bytes). """ planes = [[], [], [], []] data_iter = Struct('c').iter_unpack(data) for bp in data_iter: planes[0].extend(*bp) planes[1].extend(*next(data_iter)) planes[2].extend(*next(data_iter)) planes[3].extend(*next(data_iter)) return planes
def toarray(self): """Uses 8x8 or 16x16 Tile data to create an array of the tile's data. Returns an array. """ interleaved_tile = self.interleave_subtiles() tile_fmt = interleaved_tile.dimensions * 'c' tile_iter = iter_unpack(tile_fmt, interleaved_tile.unpack()) tiles = [subtile for subtile in tile_iter] return np.array(tiles)
def _interleave(self, subtile1, subtile2): """Interleaves two 8x8 tiles like [subtile1-row1] [subtile2-row1] ... [subtile1-row16] [subtile2-row16] Returns bytes() """ interleaved = [] interleave_fmt = Struct(4 * 'c') left_iter = interleave_fmt.iter_unpack(subtile1) right_iter = interleave_fmt.iter_unpack(subtile2) for i in left_iter: right_next = next(right_iter) interleaved.extend([*i, *right_next]) return interleaved
def _condense(img): """PNG image is returned as 3 bands representing RGB on each plane. Flattens into 1 plane array with each (R,G,B) value represented as RGB. """ im = [band.tobytes() for band in img.split()] band_fmt = 'c' band_iter = [struct.iter_unpack(band_fmt, split_im) for split_im in im] comb = [] for val in band_iter[0]: val_2 = next(band_iter[1]) val_3 = next(band_iter[2]) comb.append(b''.join([*val, *val_2, *val_3])) return comb
def format_address(data, datatype): """Given a answer packet for an A or AAAA query, return the string representation of the address """ if datatype == RR_TYPE_A: return '.'.join([str(a) for a in data]) elif datatype == RR_TYPE_AAAA: data = list(struct.iter_unpack("!H", data)) return ":".join(["%x" % a for a in data]) else: return None
def test_arbitrary_buffer(self): s = struct.Struct('>IB') b = bytes(range(1, 11)) it = s.iter_unpack(memoryview(b)) self.assertEqual(next(it), (0x01020304, 5)) self.assertEqual(next(it), (0x06070809, 10)) self.assertRaises(StopIteration, next, it) self.assertRaises(StopIteration, next, it)
def test_length_hint(self): lh = operator.length_hint s = struct.Struct('>IB') b = bytes(range(1, 16)) it = s.iter_unpack(b) self.assertEqual(lh(it), 3) next(it) self.assertEqual(lh(it), 2) next(it) self.assertEqual(lh(it), 1) next(it) self.assertEqual(lh(it), 0) self.assertRaises(StopIteration, next, it) self.assertEqual(lh(it), 0)
def test_module_func(self): # Sanity check for the global struct.iter_unpack() it = struct.iter_unpack('>IB', bytes(range(1, 11))) self.assertEqual(next(it), (0x01020304, 5)) self.assertEqual(next(it), (0x06070809, 10)) self.assertRaises(StopIteration, next, it) self.assertRaises(StopIteration, next, it)
def _read_output(self): fifo = open(self.path, "rb") while True: data = fifo.read(2 * self.cavaconfig["general"]["bars"]) sample = [i[0] / 65535 for i in struct.iter_unpack("H", data)] if sample: GLib.idle_add(self.data_handler, sample) else: break fifo.close() GLib.idle_add(self._on_stop)
def test_iterate(self): s = struct.Struct('>IB') b = bytes(range(1, 16)) it = s.iter_unpack(b) self.assertEqual(next(it), (0x01020304, 5)) self.assertEqual(next(it), (0x06070809, 10)) self.assertEqual(next(it), (0x0b0c0d0e, 15)) self.assertRaises(StopIteration, next, it) self.assertRaises(StopIteration, next, it)
def iter_unpack(raw): """Yield successive 24-sized chunks from message.""" if OLD: return chunks(raw) else: return struct.iter_unpack(EVENT_FORMAT, raw) # long, long, unsigned short, unsigned short, int
def _do_iter(self): if self.read_size: read_size = EVENT_SIZE * self.read_size else: read_size = EVENT_SIZE data = self._get_data(read_size) if not data: return evdev_objects = iter_unpack(data) events = [self._make_event(*event) for event in evdev_objects] return events # pylint: disable=too-many-arguments
def unpack(self): """Unpacks a 4bpp planar tile. Returns a byte() of pixel values. """ tile_fmt = Struct(32 * 'c') tile_iter = tile_fmt.iter_unpack(self._tile_data) tiles = [self._bitplanes_to_tile(b''.join(tile)) for tile in tile_iter] return b''.join(tiles)
def pack(self, data): """Converts pixel values into 4bpp and packs it for Tile use. Returns bytes() """ tile_fmt = Struct(32 * 'c') tile_iter = tile_fmt.iter_unpack(data) bitplane_values = [self._tile_to_bitplanes(b''.join(tile)) for tile in tile_iter] return b''.join(bitplane_values)
def _tile_to_bitplanes(self, data): bitplanes = [] row_fmt = Struct(8 * 'c') row_iter = row_fmt.iter_unpack(data) bitplanes = [self._pixel_row_to_4bpp(row) for row in row_iter] return b''.join(bitplanes)
def deinterleave_subtiles(self): tile_fmt = Struct(64 * 'c') tile_iter = tile_fmt.iter_unpack(self._tile_data) subtiles = [] for subtile in tile_iter: subtiles.extend(self._deinterleave(b''.join(subtile))) deinterleaved = [subtiles[0], subtiles[2], subtiles[1], subtiles[3]] return Tile(self._tile_addr, b''.join(deinterleaved), self._tile_dimensions)
def _deinterleave(self, data): deinterleaved = [[], []] deinterleave_fmt = Struct(4 * 'c') deinterleave_iter = deinterleave_fmt.iter_unpack(data) for i in deinterleave_iter: deinterleaved[0].extend([*i]) deinterleaved[1].extend([*next(deinterleave_iter)]) return [b''.join(data) for data in deinterleaved]
def toarray(self): """Unpacks the tiles and fills in pixel color. Returns an array. """ pic_array = [] tiles = self.tiles2d() for tile_row in tiles: row = [] for tile in tile_row: interleaved_tile = tile.interleave_subtiles() tile_fmt = interleaved_tile.dimensions * 'c' tile_iter = struct.iter_unpack(tile_fmt, interleaved_tile.unpack()) subtiles = [subtile for subtile in tile_iter] row.append(self._colortile(subtiles)) pic_array.append(row) array_rows = [] for row in np.array(pic_array): array_rows.append(np.concatenate(row, axis=1)) assembled = np.concatenate(array_rows, axis=0) return assembled
def get_accessor(self, idx): accessor = self.root['accessors'][idx] buffer_view = self.get_buffer_view(accessor['bufferView']) component_type = accessor["componentType"] type_size = accessor["type"] if component_type == 5126: fmt = "<f" elif component_type == 5123: fmt = "<H" elif component_type == 5125: fmt = "<I" else: raise ValueError("Unknown component type: %s" % component_type) values = [i[0] for i in struct.iter_unpack(fmt, buffer_view)] result = values if type_size == "VEC3": result = [Vector(values[i:i+3]) for i in range(0, len(values), 3)] elif type_size == "VEC2": result = [tuple(values[i:i+2]) for i in range(0, len(values), 2)] return result