Python struct 模块,iter_unpack() 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用struct.iter_unpack()

项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_construct(self):
        def _check_iterator(it):
            self.assertIsInstance(it, abc.Iterator)
            self.assertIsInstance(it, abc.Iterable)
        s = struct.Struct('>ibcp')
        it = s.iter_unpack(b"")
        _check_iterator(it)
        it = s.iter_unpack(b"1234567")
        _check_iterator(it)
        # Wrong bytes length
        with self.assertRaises(struct.error):
            s.iter_unpack(b"123456")
        with self.assertRaises(struct.error):
            s.iter_unpack(b"12345678")
        # Zero-length struct
        s = struct.Struct('>')
        with self.assertRaises(struct.error):
            s.iter_unpack(b"")
        with self.assertRaises(struct.error):
            s.iter_unpack(b"12")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_construct(self):
        def _check_iterator(it):
            self.assertIsInstance(it, abc.Iterator)
            self.assertIsInstance(it, abc.Iterable)
        s = struct.Struct('>ibcp')
        it = s.iter_unpack(b"")
        _check_iterator(it)
        it = s.iter_unpack(b"1234567")
        _check_iterator(it)
        # Wrong bytes length
        with self.assertRaises(struct.error):
            s.iter_unpack(b"123456")
        with self.assertRaises(struct.error):
            s.iter_unpack(b"12345678")
        # Zero-length struct
        s = struct.Struct('>')
        with self.assertRaises(struct.error):
            s.iter_unpack(b"")
        with self.assertRaises(struct.error):
            s.iter_unpack(b"12")
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def _unpack_bitplanes(self, data):
        """The values that make up a row of pixels are organized like:
        [bp1-row1] [bp2-row1] [bp3-row1] [bp4-row1]...
        [bp1-row8] [bp2-row8] [bp3-row8] [bp4-row8]

        Returns a list of lists containing bitplane values (bytes).
        """
        planes = [[], [], [], []]
        data_iter = Struct('c').iter_unpack(data)

        for bp in data_iter:
            planes[0].extend(*bp)
            planes[1].extend(*next(data_iter))
            planes[2].extend(*next(data_iter))
            planes[3].extend(*next(data_iter))

        return planes
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def toarray(self):
        """Uses 8x8 or 16x16 Tile data to create an array of the tile's data.

        Returns an array.
        """
        interleaved_tile = self.interleave_subtiles()

        tile_fmt = interleaved_tile.dimensions * 'c'
        tile_iter = iter_unpack(tile_fmt, interleaved_tile.unpack())
        tiles = [subtile for subtile in tile_iter]

        return np.array(tiles)
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def _interleave(self, subtile1, subtile2):
        """Interleaves two 8x8 tiles like
        [subtile1-row1] [subtile2-row1] ...
        [subtile1-row16] [subtile2-row16]

        Returns bytes()
        """
        interleaved = []
        interleave_fmt = Struct(4 * 'c')

        left_iter = interleave_fmt.iter_unpack(subtile1)
        right_iter = interleave_fmt.iter_unpack(subtile2)

        for i in left_iter:
            right_next = next(right_iter)
            interleaved.extend([*i, *right_next])

        return interleaved
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def _condense(img):
    """PNG image is returned as 3 bands representing RGB on each plane.
    Flattens into 1 plane array with each (R,G,B) value represented as RGB.

    """
    im = [band.tobytes() for band in img.split()]
    band_fmt = 'c'
    band_iter = [struct.iter_unpack(band_fmt, split_im) for split_im in im]
    comb = []

    for val in band_iter[0]:
        val_2 = next(band_iter[1])
        val_3 = next(band_iter[2])
        comb.append(b''.join([*val, *val_2, *val_3]))

    return comb
项目:dane-monitoring-plugins    作者:siccegge    | 项目源码 | 文件源码
def format_address(data, datatype):
    """Given a answer packet for an A or AAAA query, return the string
       representation of the address
    """
    if datatype == RR_TYPE_A:
        return '.'.join([str(a) for a in data])
    elif datatype == RR_TYPE_AAAA:
        data = list(struct.iter_unpack("!H", data))
        return ":".join(["%x" % a for a in data])
    else:
        return None
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_arbitrary_buffer(self):
        s = struct.Struct('>IB')
        b = bytes(range(1, 11))
        it = s.iter_unpack(memoryview(b))
        self.assertEqual(next(it), (0x01020304, 5))
        self.assertEqual(next(it), (0x06070809, 10))
        self.assertRaises(StopIteration, next, it)
        self.assertRaises(StopIteration, next, it)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_length_hint(self):
        lh = operator.length_hint
        s = struct.Struct('>IB')
        b = bytes(range(1, 16))
        it = s.iter_unpack(b)
        self.assertEqual(lh(it), 3)
        next(it)
        self.assertEqual(lh(it), 2)
        next(it)
        self.assertEqual(lh(it), 1)
        next(it)
        self.assertEqual(lh(it), 0)
        self.assertRaises(StopIteration, next, it)
        self.assertEqual(lh(it), 0)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_module_func(self):
        # Sanity check for the global struct.iter_unpack()
        it = struct.iter_unpack('>IB', bytes(range(1, 11)))
        self.assertEqual(next(it), (0x01020304, 5))
        self.assertEqual(next(it), (0x06070809, 10))
        self.assertRaises(StopIteration, next, it)
        self.assertRaises(StopIteration, next, it)
项目:cavalcade    作者:worron    | 项目源码 | 文件源码
def _read_output(self):
        fifo = open(self.path, "rb")
        while True:
            data = fifo.read(2 * self.cavaconfig["general"]["bars"])
            sample = [i[0] / 65535 for i in struct.iter_unpack("H", data)]
            if sample:
                GLib.idle_add(self.data_handler, sample)
            else:
                break
        fifo.close()
        GLib.idle_add(self._on_stop)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_iterate(self):
        s = struct.Struct('>IB')
        b = bytes(range(1, 16))
        it = s.iter_unpack(b)
        self.assertEqual(next(it), (0x01020304, 5))
        self.assertEqual(next(it), (0x06070809, 10))
        self.assertEqual(next(it), (0x0b0c0d0e, 15))
        self.assertRaises(StopIteration, next, it)
        self.assertRaises(StopIteration, next, it)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_arbitrary_buffer(self):
        s = struct.Struct('>IB')
        b = bytes(range(1, 11))
        it = s.iter_unpack(memoryview(b))
        self.assertEqual(next(it), (0x01020304, 5))
        self.assertEqual(next(it), (0x06070809, 10))
        self.assertRaises(StopIteration, next, it)
        self.assertRaises(StopIteration, next, it)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_length_hint(self):
        lh = operator.length_hint
        s = struct.Struct('>IB')
        b = bytes(range(1, 16))
        it = s.iter_unpack(b)
        self.assertEqual(lh(it), 3)
        next(it)
        self.assertEqual(lh(it), 2)
        next(it)
        self.assertEqual(lh(it), 1)
        next(it)
        self.assertEqual(lh(it), 0)
        self.assertRaises(StopIteration, next, it)
        self.assertEqual(lh(it), 0)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_module_func(self):
        # Sanity check for the global struct.iter_unpack()
        it = struct.iter_unpack('>IB', bytes(range(1, 11)))
        self.assertEqual(next(it), (0x01020304, 5))
        self.assertEqual(next(it), (0x06070809, 10))
        self.assertRaises(StopIteration, next, it)
        self.assertRaises(StopIteration, next, it)
项目:inputs    作者:zeth    | 项目源码 | 文件源码
def iter_unpack(raw):
    """Yield successive 24-sized chunks from message."""
    if OLD:
        return chunks(raw)
    else:
        return struct.iter_unpack(EVENT_FORMAT, raw)

# long, long, unsigned short, unsigned short, int
项目:inputs    作者:zeth    | 项目源码 | 文件源码
def _do_iter(self):
        if self.read_size:
            read_size = EVENT_SIZE * self.read_size
        else:
            read_size = EVENT_SIZE
        data = self._get_data(read_size)
        if not data:
            return
        evdev_objects = iter_unpack(data)
        events = [self._make_event(*event) for event in evdev_objects]
        return events

    # pylint: disable=too-many-arguments
项目:inputs    作者:zeth    | 项目源码 | 文件源码
def iter_unpack(raw):
    """Yield successive 24-sized chunks from message."""
    if OLD:
        return chunks(raw)
    else:
        return struct.iter_unpack(EVENT_FORMAT, raw)

# long, long, unsigned short, unsigned short, int
项目:inputs    作者:zeth    | 项目源码 | 文件源码
def _do_iter(self):
        if self.read_size:
            read_size = EVENT_SIZE * self.read_size
        else:
            read_size = EVENT_SIZE
        data = self._get_data(read_size)
        if not data:
            return
        evdev_objects = iter_unpack(data)
        events = [self._make_event(*event) for event in evdev_objects]
        return events

    # pylint: disable=too-many-arguments
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def unpack(self):
        """Unpacks a 4bpp planar tile.

        Returns a byte() of pixel values.
        """
        tile_fmt = Struct(32 * 'c')
        tile_iter = tile_fmt.iter_unpack(self._tile_data)

        tiles = [self._bitplanes_to_tile(b''.join(tile)) for tile in tile_iter]
        return b''.join(tiles)
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def pack(self, data):
        """Converts pixel values into 4bpp and packs it for Tile use.
        Returns bytes()
        """

        tile_fmt = Struct(32 * 'c')
        tile_iter = tile_fmt.iter_unpack(data)

        bitplane_values = [self._tile_to_bitplanes(b''.join(tile)) for tile in tile_iter]
        return b''.join(bitplane_values)
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def _tile_to_bitplanes(self, data):
        bitplanes = []
        row_fmt = Struct(8 * 'c')
        row_iter = row_fmt.iter_unpack(data)

        bitplanes = [self._pixel_row_to_4bpp(row) for row in row_iter]

        return b''.join(bitplanes)
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def deinterleave_subtiles(self):
        tile_fmt = Struct(64 * 'c')
        tile_iter = tile_fmt.iter_unpack(self._tile_data)

        subtiles = []
        for subtile in tile_iter:
            subtiles.extend(self._deinterleave(b''.join(subtile)))

        deinterleaved = [subtiles[0], subtiles[2], subtiles[1], subtiles[3]]
        return Tile(self._tile_addr, b''.join(deinterleaved), self._tile_dimensions)
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def _deinterleave(self, data):
        deinterleaved = [[], []]

        deinterleave_fmt = Struct(4 * 'c')
        deinterleave_iter = deinterleave_fmt.iter_unpack(data)

        for i in deinterleave_iter:
            deinterleaved[0].extend([*i])
            deinterleaved[1].extend([*next(deinterleave_iter)])

        return [b''.join(data) for data in deinterleaved]
项目:cps2-gfx-editor    作者:goosechooser    | 项目源码 | 文件源码
def toarray(self):
        """Unpacks the tiles and fills in pixel color.

        Returns an array.
        """
        pic_array = []

        tiles = self.tiles2d()
        for tile_row in tiles:
            row = []
            for tile in tile_row:
                interleaved_tile = tile.interleave_subtiles()
                tile_fmt = interleaved_tile.dimensions * 'c'
                tile_iter = struct.iter_unpack(tile_fmt, interleaved_tile.unpack())
                subtiles = [subtile for subtile in tile_iter]

                row.append(self._colortile(subtiles))

            pic_array.append(row)

        array_rows = []
        for row in np.array(pic_array):
            array_rows.append(np.concatenate(row, axis=1))
        assembled = np.concatenate(array_rows, axis=0)

        return assembled
项目:gltf-blender-importer    作者:ksons    | 项目源码 | 文件源码
def get_accessor(self, idx):
        accessor = self.root['accessors'][idx]
        buffer_view = self.get_buffer_view(accessor['bufferView'])
        component_type = accessor["componentType"]
        type_size = accessor["type"]

        if component_type == 5126:
            fmt = "<f"
        elif component_type == 5123:
            fmt = "<H"
        elif component_type == 5125:
            fmt = "<I"
        else:
            raise ValueError("Unknown component type: %s" % component_type)

        values = [i[0] for i in struct.iter_unpack(fmt, buffer_view)]

        result = values

        if type_size == "VEC3":
            result = [Vector(values[i:i+3]) for i in range(0, len(values), 3)]
        elif type_size == "VEC2":
            result = [tuple(values[i:i+2]) for i in range(0, len(values), 2)]

        return result
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_iterate(self):
        s = struct.Struct('>IB')
        b = bytes(range(1, 16))
        it = s.iter_unpack(b)
        self.assertEqual(next(it), (0x01020304, 5))
        self.assertEqual(next(it), (0x06070809, 10))
        self.assertEqual(next(it), (0x0b0c0d0e, 15))
        self.assertRaises(StopIteration, next, it)
        self.assertRaises(StopIteration, next, it)