Python struct 模块,pack_into() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.pack_into()

项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def _transmitMsg(self, msg):
        """Send an OSC message over a streaming socket. Raises exception if it
        should fail. If everything is transmitted properly, True is returned. If
        socket has been closed, False.
        """
        if not isinstance(msg, OSCMessage):
            raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")

        try:
            binary = msg.getBinary()
            length = len(binary)
            # prepend length of packet before the actual message (big endian)
            len_big_endian = array.array('c', '\0' * 4)
            struct.pack_into(">L", len_big_endian, 0, length)
            len_big_endian = len_big_endian.tostring()
            if self._transmit(len_big_endian) and self._transmit(binary):
                return True
            return False            
        except socket.error as e:
            if e[0] == errno.EPIPE: # broken pipe
                return False
            raise e
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def _transmitMsg(self, msg):
        """Send an OSC message over a streaming socket. Raises exception if it
        should fail. If everything is transmitted properly, True is returned. If
        socket has been closed, False.
        """
        if not isinstance(msg, OSCMessage):
            raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")

        try:
            binary = msg.getBinary()
            length = len(binary)
            # prepend length of packet before the actual message (big endian)
            len_big_endian = array.array('c', '\0' * 4)
            struct.pack_into(">L", len_big_endian, 0, length)
            len_big_endian = len_big_endian.tostring()
            if self._transmit(len_big_endian) and self._transmit(binary):
                return True
            return False            
        except socket.error, e:
            if e[0] == errno.EPIPE: # broken pipe
                return False
            raise e
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def check(self, fmt, value):
        from random import randrange
        # build a buffer which is surely big enough to contain what we need
        # and check:
        #   1) that we correctly write the bytes we expect
        #   2) that we do NOT write outside the bounds
        #
        pattern = [six.int2byte(randrange(256)) for _ in range(256)]
        pattern = b''.join(pattern)
        buf = bytearray(pattern)
        buf2 = bytearray(pattern)
        offset = 16
        pack_into(ord(fmt), buf, offset, value)
        struct.pack_into(fmt, buf2, offset, value)
        assert buf == buf2
        #
        # check that it raises if it's out of bound
        out_of_bound = 256-struct.calcsize(fmt)+1
        pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
项目:pyledbat    作者:justas-    | 项目源码 | 文件源码
def _build_and_send_init(self):
        """Build and send the INIT message"""

        # Build the message
        msg_bytes = bytearray(12)
        struct.pack_into('>III', msg_bytes, 0,
                         1,                     # Type - ACK
                         0,                     # Remote Channel
                         self.local_channel     # Local channel
                        )

        # Send it to the remote
        self._owner.send_data(msg_bytes, (self._remote_ip, self._remote_port))
        self._num_init_sent += 1

        # Print log
        logging.info('%s Sent INIT message (%s)', self, self._num_init_sent)
项目:pyledbat    作者:justas-    | 项目源码 | 文件源码
def _build_and_send_init_ack(self):
        """Build and send INI-ACK message"""

        # Build message bytes
        msg_bytes = bytearray(12)
        struct.pack_into('>III', msg_bytes, 0,
                         1,                     # Type
                         self.remote_channel,   # Remote channel
                         self.local_channel     # Local channel
                        )

        # Send it
        self._owner.send_data(msg_bytes, (self._remote_ip, self._remote_port))
        self._num_init_ack_sent += 1

        # Print log
        logging.info('%s Sent INIT-ACK message (%s)', self, self._num_init_ack_sent)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _write_header(self, buff, api_version=0, correlation_id=0):
        """Write the header for an outgoing message.

        :param buff: The buffer into which to write the header
        :type buff: buffer
        :param api_version: The "kafka api version id", used for feature flagging
        :type api_version: int
        :param correlation_id: This is a user-supplied integer. It will be
            passed back in the response by the server, unmodified. It is useful
            for matching request and response between the client and server.
        :type correlation_id: int
        """
        fmt = '!ihhih%ds' % len(self.CLIENT_ID)
        struct.pack_into(fmt, buff, 0,
                         len(buff) - 4,  # msglen excludes this int
                         self.API_KEY,
                         api_version,
                         correlation_id,
                         len(self.CLIENT_ID),
                         self.CLIENT_ID)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def pack_into(self, buff, offset):
        """Serialize and write to ``buff`` starting at offset ``offset``.

        Intentionally follows the pattern of ``struct.pack_into``

        :param buff: The buffer to write into
        :param offset: The offset to start the write at
        """
        # NB a length of 0 means an empty string, whereas -1 means null
        len_key = -1 if self.partition_key is None else len(self.partition_key)
        len_value = -1 if self.value is None else len(self.value)
        fmt = '!BBi%dsi%ds' % (max(len_key, 0), max(len_value, 0))
        args = (self.MAGIC,
                self.compression_type,
                len_key,
                self.partition_key or b"",
                len_value,
                self.value or b"")
        struct.pack_into(fmt, buff, offset + 4, *args)
        fmt_size = struct.calcsize(fmt)
        data = buffer(buff[(offset + 4):(offset + 4 + fmt_size)])
        crc = crc32(data) & 0xffffffff
        struct.pack_into('!I', buff, offset, crc)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _get_compressed(self):
        """Get a compressed representation of all current messages.

        Returns a Message object with correct headers set and compressed
        data in the value field.
        """
        assert self.compression_type != CompressionType.NONE
        tmp_mset = MessageSet(messages=self._messages)
        uncompressed = bytearray(len(tmp_mset))
        tmp_mset.pack_into(uncompressed, 0)
        if self.compression_type == CompressionType.GZIP:
            compressed = compression.encode_gzip(buffer(uncompressed))
        elif self.compression_type == CompressionType.SNAPPY:
            compressed = compression.encode_snappy(buffer(uncompressed))
        else:
            raise TypeError("Unknown compression: %s" % self.compression_type)
        return Message(compressed, compression_type=self.compression_type)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_bytes(self):
        """Serialize the message

        :returns: Serialized message
        :rtype: :class:`bytearray`
        """
        output = bytearray(len(self))
        self._write_header(output)
        offset = self.HEADER_LEN
        struct.pack_into('!iiii', output, offset,
                         -1, self.timeout, self.min_bytes, len(self._reqs))
        offset += 16
        for topic_name, partitions in iteritems(self._reqs):
            fmt = '!h%dsi' % len(topic_name)
            struct.pack_into(
                fmt, output, offset, len(topic_name), topic_name,
                len(partitions)
            )
            offset += struct.calcsize(fmt)
            for partition_id, (fetch_offset, max_bytes) in iteritems(partitions):
                struct.pack_into('!iqi', output, offset,
                                 partition_id, fetch_offset, max_bytes)
                offset += 16
        return output
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_bytes(self):
        """Serialize the message

        :returns: Serialized message
        :rtype: :class:`bytearray`
        """
        output = bytearray(len(self))
        self._write_header(output)
        offset = self.HEADER_LEN
        struct.pack_into('!ii', output, offset, -1, len(self._reqs))
        offset += 8
        for topic_name, partitions in iteritems(self._reqs):
            fmt = '!h%dsi' % len(topic_name)
            struct.pack_into(fmt, output, offset, len(topic_name),
                             topic_name, len(partitions))
            offset += struct.calcsize(fmt)
            for pnum, (offsets_before, max_offsets) in iteritems(partitions):
                struct.pack_into('!iqi', output, offset,
                                 pnum, offsets_before, max_offsets)
                offset += 16
        return output
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_bytes(self):
        """Serialize the message

        :returns: Serialized message
        :rtype: :class:`bytearray`
        """
        output = bytearray(len(self))
        self._write_header(output, api_version=1)
        offset = self.HEADER_LEN
        fmt = '!h%dsi' % len(self.consumer_group)
        struct.pack_into(fmt, output, offset,
                         len(self.consumer_group), self.consumer_group,
                         len(self._reqs))
        offset += struct.calcsize(fmt)
        for topic_name, partitions in iteritems(self._reqs):
            fmt = '!h%dsi' % len(topic_name)
            struct.pack_into(fmt, output, offset, len(topic_name),
                             topic_name, len(partitions))
            offset += struct.calcsize(fmt)
            for pnum in partitions:
                fmt = '!i'
                struct.pack_into(fmt, output, offset, pnum)
                offset += struct.calcsize(fmt)
        return output
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_bytes(self):
        """Serialize the message

        :returns: Serialized message
        :rtype: :class:`bytearray`
        """
        output = bytearray(len(self))
        self._write_header(output)
        offset = self.HEADER_LEN
        fmt = '!h%dsih%dsh%dsi' % (len(self.group_id), len(self.member_id),
                                   len(self.protocol_type))
        struct.pack_into(fmt, output, offset, len(self.group_id), self.group_id,
                         self.session_timeout, len(self.member_id), self.member_id,
                         len(self.protocol_type), self.protocol_type,
                         len(self.group_protocols))
        offset += struct.calcsize(fmt)
        for protocol_name, protocol_metadata in self.group_protocols:
            fmt = '!h%dsi%ds' % (len(protocol_name), len(protocol_metadata))
            struct.pack_into(fmt, output, offset, len(protocol_name), protocol_name,
                             len(protocol_metadata), protocol_metadata)
            offset += struct.calcsize(fmt)
        return output
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_bytes(self):
        output = bytearray(len(self))
        offset = 0
        fmt = '!hi'
        struct.pack_into(fmt, output, offset, self.version,
                         len(self.partition_assignment))
        offset += struct.calcsize(fmt)
        for topic_name, partitions in self.partition_assignment:
            fmt = '!h%dsi' % len(topic_name)
            struct.pack_into(fmt, output, offset, len(topic_name), topic_name,
                             len(partitions))
            offset += struct.calcsize(fmt)
            for partition_id in partitions:
                fmt = '!i'
                struct.pack_into(fmt, output, offset, partition_id)
                offset += struct.calcsize(fmt)
        return output
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_bytes(self):
        """Serialize the message

        :returns: Serialized message
        :rtype: :class:`bytearray`
        """
        output = bytearray(len(self))
        self._write_header(output)
        offset = self.HEADER_LEN
        fmt = '!h%dsih%dsi' % (len(self.group_id), len(self.member_id))
        struct.pack_into(fmt, output, offset, len(self.group_id), self.group_id,
                         self.generation_id, len(self.member_id), self.member_id,
                         len(self.group_assignment))
        offset += struct.calcsize(fmt)
        for member_assignment in self.group_assignment:
            assignment_bytes = bytes(member_assignment.get_bytes())
            fmt = '!h%dsi%ds' % (len(member_assignment.member_id), len(assignment_bytes))
            struct.pack_into(fmt, output, offset, len(member_assignment.member_id),
                             member_assignment.member_id, len(assignment_bytes),
                             assignment_bytes)
            offset += struct.calcsize(fmt)
        return output
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def reset_length(self, buf, length):
        """
        Writes an IPA header with the length param. Used externally when
        protocols over provision memory initially and then reset the length.

        Args:
            buf (memoryview): the IPA message
            length (int): length of the protocl message encapsulated by IPA
        Returns:
            None
        """
        if self._osmo_extn is not None:  # Ctrl extn is 0x00
            struct.pack_into('!HBB', buf, 0, length + 1,
                             self._stream_id, self._osmo_extn)
        else:
            struct.pack_into('!HB', buf, 0, length, self._stream_id)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def encode_auth_tuple(val, buf, offset, min_len, max_len):
        """
        Encode the Auth tuple IE.

        Args:
            val: (rand, sres, kc) tuple
        Returns:
            The size of encoded auth tuple (always 34)
        """
        (rand, sres, key) = val
        if len(rand) != 16 or len(sres) != 4 or len(key) != 8:
            raise GSUPCodecError(
                "Bad auth tuple to encode: rand: %s, sres: %s, key: %s"
                % (rand, sres, key))
        struct.pack_into(
            '2B16s2B4s2B8s', buf, offset,
            IEType.RAND, 16, rand, IEType.SRES, 4, sres,
            IEType.KC_KEY, 8, key)
        return 34
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_pack_into_fn(self):
        test_string = b'Reykjavik rocks, eow!'
        writable_buf = array.array('b', b' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('b', b' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_trailing_counter(self):
        store = array.array('b', b' '*100)

        # format lists containing only count spec should result in an error
        self.assertRaises(struct.error, struct.pack, '12345')
        self.assertRaises(struct.error, struct.unpack, '12345', b'')
        self.assertRaises(struct.error, struct.pack_into, '12345', store, 0)
        self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0)

        # Format lists with trailing count spec should result in an error
        self.assertRaises(struct.error, struct.pack, 'c12345', b'x')
        self.assertRaises(struct.error, struct.unpack, 'c12345', b'x')
        self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0,
                           b'x')
        self.assertRaises(struct.error, struct.unpack_from, 'c12345', store,
                           0)

        # Mixed format tests
        self.assertRaises(struct.error, struct.pack, '14s42', b'spam and eggs')
        self.assertRaises(struct.error, struct.unpack, '14s42',
                          b'spam and eggs')
        self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0,
                          b'spam and eggs')
        self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_pack_into_fn(self):
        test_string = 'Reykjavik rocks, eow!'
        writable_buf = array.array('c', ' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('c', ' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_pack_into_fn(self):
        test_string = 'Reykjavik rocks, eow!'
        writable_buf = array.array('c', ' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('c', ' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
项目:curious    作者:SunDwarf    | 项目源码 | 文件源码
def get_packet_header(self) -> bytes:
        """
        Gets the voice packet header.

        :return: The bytes of the header.
        """
        header = bytearray(12)

        # constant values, provided by the docs
        header[0:2] = b"\x80\x78"

        # dynamic values
        # offset 2 -> sequence
        struct.pack_into(">H", header, 2, self.sequence)
        # offset 4 -> timestamp
        struct.pack_into(">I", header, 4, self.timestamp)
        # offset 8 -> ssrc
        struct.pack_into(">I", header, 8, self.vs_ws.ssrc)

        return header
项目:disco    作者:b1naryth1ef    | 项目源码 | 文件源码
def send_frame(self, frame, sequence=None, timestamp=None):
        # Convert the frame to a bytearray
        frame = bytearray(frame)

        # Pack the rtc header into our buffer
        struct.pack_into('>H', self._buffer, 2, sequence or self.vc.sequence)
        struct.pack_into('>I', self._buffer, 4, timestamp or self.vc.timestamp)
        struct.pack_into('>i', self._buffer, 8, self.vc.ssrc)

        # Now encrypt the payload with the nonce as a header
        raw = self.vc.secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext

        # Send the header (sans nonce padding) plus the payload
        self.send(self._buffer[:12] + raw)

        # Increment our sequence counter
        self.vc.sequence += 1
        if self.vc.sequence >= 65535:
            self.vc.sequence = 0
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_pack_into_fn(self):
        test_string = b'Reykjavik rocks, eow!'
        writable_buf = array.array('b', b' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('b', b' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_trailing_counter(self):
        store = array.array('b', b' '*100)

        # format lists containing only count spec should result in an error
        self.assertRaises(struct.error, struct.pack, '12345')
        self.assertRaises(struct.error, struct.unpack, '12345', '')
        self.assertRaises(struct.error, struct.pack_into, '12345', store, 0)
        self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0)

        # Format lists with trailing count spec should result in an error
        self.assertRaises(struct.error, struct.pack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.unpack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0,
                           'x')
        self.assertRaises(struct.error, struct.unpack_from, 'c12345', store,
                           0)

        # Mixed format tests
        self.assertRaises(struct.error, struct.pack, '14s42', 'spam and eggs')
        self.assertRaises(struct.error, struct.unpack, '14s42',
                          'spam and eggs')
        self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0,
                          'spam and eggs')
        self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def serialize(self, payload, prev):
        length = len(self)
        hdr = bytearray(length)
        version = self.version << 4 | self.header_length
        flags = self.flags << 13 | self.offset
        if self.total_length == 0:
            self.total_length = self.header_length * 4 + len(payload)
        struct.pack_into(ipv4._PACK_STR, hdr, 0, version, self.tos,
                         self.total_length, self.identification, flags,
                         self.ttl, self.proto, 0,
                         addrconv.ipv4.text_to_bin(self.src),
                         addrconv.ipv4.text_to_bin(self.dst))

        if self.option:
            assert (length - ipv4._MIN_LEN) >= len(self.option)
            hdr[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)] = self.option

        self.csum = packet_utils.checksum(hdr)
        struct.pack_into('!H', hdr, 10, self.csum)
        return hdr
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def serialize(self):
        buf = bytearray(struct.pack(self._PACK_STR, self.type_,
                        self.aux_len, self.num,
                        addrconv.ipv6.text_to_bin(self.address)))
        for src in self.srcs:
            buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src)))
        if 0 == self.num:
            self.num = len(self.srcs)
            struct.pack_into('!H', buf, 2, self.num)
        if self.aux is not None:
            mod = len(self.aux) % 4
            if mod:
                self.aux += bytearray(4 - mod)
                self.aux = six.binary_type(self.aux)
            buf.extend(self.aux)
            if 0 == self.aux_len:
                self.aux_len = len(self.aux) // 4
                struct.pack_into('!B', buf, 1, self.aux_len)
        return six.binary_type(buf)
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def serialize(self, payload=None, prev=None):
        tail = self.serialize_tail()
        self.length = self._HDR_LEN + len(tail)
        head = bytearray(struct.pack(self._HDR_PACK_STR, self.version,
                         self.type_, self.length,
                         addrconv.ipv4.text_to_bin(self.router_id),
                         addrconv.ipv4.text_to_bin(self.area_id), 0,
                         self.au_type, self.authentication))
        buf = head + tail
        csum = packet_utils.checksum(buf[:12] + buf[14:16] +
                                     buf[self._HDR_LEN:])
        self.checksum = csum
        struct.pack_into("!H", buf, 12, csum)
        return buf

# alias
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def serialize(self, payload, prev):
        hdr = bytearray(40)
        v_tc_flow = (self.version << 28 | self.traffic_class << 20 |
                     self.flow_label)
        struct.pack_into(ipv6._PACK_STR, hdr, 0, v_tc_flow,
                         self.payload_length, self.nxt, self.hop_limit,
                         addrconv.ipv6.text_to_bin(self.src),
                         addrconv.ipv6.text_to_bin(self.dst))
        if self.ext_hdrs:
            for ext_hdr in self.ext_hdrs:
                hdr.extend(ext_hdr.serialize())
        if 0 == self.payload_length:
            payload_length = len(payload)
            for ext_hdr in self.ext_hdrs:
                payload_length += len(ext_hdr)
            self.payload_length = payload_length
            struct.pack_into('!H', hdr, 4, self.payload_length)
        return hdr
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def serialize(self, payload, prev):
        hdr = bytearray(struct.pack(icmp._PACK_STR, self.type,
                                    self.code, self.csum))

        if self.data is not None:
            if self.type in icmp._ICMP_TYPES:
                hdr += self.data.serialize()
            else:
                hdr += self.data
        else:
            self.data = echo()
            hdr += self.data.serialize()

        if self.csum == 0:
            self.csum = packet_utils.checksum(hdr)
            struct.pack_into('!H', hdr, 2, self.csum)

        return hdr
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def setUp_with_echo(self):
        self.echo_id = 13379
        self.echo_seq = 1
        self.echo_data = b'\x30\x0e\x09\x00\x00\x00\x00\x00' \
            + b'\x10\x11\x12\x13\x14\x15\x16\x17' \
            + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \
            + b'\x20\x21\x22\x23\x24\x25\x26\x27' \
            + b'\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \
            + b'\x30\x31\x32\x33\x34\x35\x36\x37'
        self.data = icmp.echo(
            id_=self.echo_id, seq=self.echo_seq, data=self.echo_data)

        self.type_ = icmp.ICMP_ECHO_REQUEST
        self.code = 0
        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)

        self.buf = bytearray(struct.pack(
            icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
        self.buf += self.data.serialize()
        self.csum_calc = packet_utils.checksum(self.buf)
        struct.pack_into('!H', self.buf, 2, self.csum_calc)
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def setUp_with_dest_unreach(self):
        self.unreach_mtu = 10
        self.unreach_data = b'abc'
        self.unreach_data_len = len(self.unreach_data)
        self.data = icmp.dest_unreach(
            data_len=self.unreach_data_len, mtu=self.unreach_mtu,
            data=self.unreach_data)

        self.type_ = icmp.ICMP_DEST_UNREACH
        self.code = icmp.ICMP_HOST_UNREACH_CODE
        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)

        self.buf = bytearray(struct.pack(
            icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
        self.buf += self.data.serialize()
        self.csum_calc = packet_utils.checksum(self.buf)
        struct.pack_into('!H', self.buf, 2, self.csum_calc)
项目:yw_save    作者:togenyan    | 项目源码 | 文件源码
def yw_proc(data, isEncrypt, key=None, head=None, validator=None):
    out = bytearray()
    length = len(data)
    new_crc32 = struct.unpack("<I", data[-8:-4])[0]
    seed = struct.unpack("<I", data[-4:])[0]
    if not isEncrypt:
        if binascii.crc32(data[:-8]) != new_crc32:
            logging.error("Checksum does not match")
            return None
    c = YWCipher(seed, 0x1000)
    out += c.encrypt(data[:-8])
    out += data[-8:]
    if isEncrypt:
        new_crc32 = binascii.crc32(out[:-8])
        struct.pack_into("<I", out, length - 8, new_crc32)
    return bytes(out)
项目:yw_save    作者:togenyan    | 项目源码 | 文件源码
def allThornyan(file, code=72463062):
    print("=== Yo-kai ===")
    data = open(file, "rb").read()
    data = bytearray(data)
    pos = 0x1D08
    for i in range(240):
        params = struct.unpack("<4x i 56x 5b 5B 5b 5x B 3x 2x H", data[pos:pos+0x5C])
        if params[0] == 0:
            continue
        print("{} -> {}".format(
                youkai.get(params[0], "(unknown)"),
                youkai.get(code, "(unknown)"),
            )
        )
        struct.pack_into("<i", data, pos + 4, code)
        pos += 0x5C
    with open(file + ".edited.yw", "xb") as f:
        f.write(data)
项目:Nier-Automata-editor    作者:CensoredUsername    | 项目源码 | 文件源码
def save(self):
        new = bytearray(self.original)

        new[0x4:0x10] = self.gamedata_header

        struct.pack_into("<L", new, 0x00024, self.time_played)
        struct.pack_into("<L", new, 0x0002C, self.last_saved_chapter)

        namebuf = self.name.encode("utf-16-le")
        # limit length
        if len(namebuf) > 68:
            namebuf = namebuf[:68]
        # pad with zeroes
        namebuf = namebuf + b"\0" * (70 - len(namebuf))
        new[0x34:0x7A] = namebuf

        struct.pack_into("<L", new, 0x3056C, self.money)
        struct.pack_into("<L", new, 0x3871C, self.experience)

        # back up the file
        shutil.move(self.path, self.path + ".bak")

        # write the new one
        with open(self.path, "wb") as f:
            f.write(new)
项目:pysteamworks    作者:thedropbears    | 项目源码 | 文件源码
def transactionI2C(self, port, device_address, data_to_send, send_size, data_received, receive_size):
        '''
            To give data back use ``data_received``::

                data_received[:] = [1,2,3...]

            :returns: number of bytes returned
        '''

        if data_to_send[0] == BNO055.BNO055_EULER_H_LSB_ADDR:
            struct.pack_into('<h', data_received, 0, int(self.heading * 900.0))
        if data_to_send[0] == BNO055.BNO055_EULER_P_LSB_ADDR:
            struct.pack_into('<h', data_received, 0, int(self.pitch * 900.0))
        if data_to_send[0] == BNO055.BNO055_EULER_R_LSB_ADDR:
            struct.pack_into('<h', data_received, 0, int(self.roll * 900.0))

        return receive_size
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_pack_into_fn(self):
        test_string = 'Reykjavik rocks, eow!'
        writable_buf = array.array('c', ' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('c', ' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pack_into_fn(self):
        test_string = b'Reykjavik rocks, eow!'
        writable_buf = array.array('b', b' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('b', b' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_trailing_counter(self):
        store = array.array('b', b' '*100)

        # format lists containing only count spec should result in an error
        self.assertRaises(struct.error, struct.pack, '12345')
        self.assertRaises(struct.error, struct.unpack, '12345', '')
        self.assertRaises(struct.error, struct.pack_into, '12345', store, 0)
        self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0)

        # Format lists with trailing count spec should result in an error
        self.assertRaises(struct.error, struct.pack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.unpack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0,
                           'x')
        self.assertRaises(struct.error, struct.unpack_from, 'c12345', store,
                           0)

        # Mixed format tests
        self.assertRaises(struct.error, struct.pack, '14s42', 'spam and eggs')
        self.assertRaises(struct.error, struct.unpack, '14s42',
                          'spam and eggs')
        self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0,
                          'spam and eggs')
        self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_pack_into_fn(self):
        test_string = 'Reykjavik rocks, eow!'
        writable_buf = array.array('c', ' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('c', ' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
项目:OverviewOne    作者:SpaceVR-O1    | 项目源码 | 文件源码
def _pack_array(values, definition, fmt_string, num_bytes):
    """
    Pack an array item defined by a dictionary entry into a bytearray

    Args:
        values (tuple): 1d or 2d tuple of values to pack
        definition (dict): definition of the data item
        fmt_string (str): struct.unpack format string for an element
        num_bytes (int): number of bytes of each element

    """
    packet_part = bytearray()
    if definition['DIM_2_SIZE']:
        for dim1 in values:
            for value in dim1:
                buff = bytearray(num_bytes)
                struct.pack_into(fmt_string, buff, 0, value)
                packet_part += buff
    else:
        for value in values:
            buff = bytearray(num_bytes)
            struct.pack_into(fmt_string, buff, 0, value)
            packet_part += buff
    return packet_part
项目:OverviewOne    作者:SpaceVR-O1    | 项目源码 | 文件源码
def mai_set_time(self, gpstime):
        """
        Sets the ADACS clock.

        Arguments:
            gpstime - GPS time is a linear count of seconds elapsed since 0h Jan 6, 1980.
        """

        DATA_LEN = 40 #always
        data = bytearray(DATA_LEN)

        syncbyte    = 0xEB90 # TODO: the Pumpkin comments contain some confusing statements about endianness
        commandID   = 0x44   # set GPS time

        # Create data
        struct.pack_into('<HBL', data, 0, syncbyte, commandID, 
                         gpstime)

        # Add data checksum.  (This is different than the packet checksum.)
        checksum = 0xFFFF & sum(data[0:DATA_LEN]) 
        struct.pack_into('<H', data, 38, checksum) 

        Send.send_bus_cmd(BusCommands.MAI_CMD, data)
项目:LiveScript    作者:rekliner    | 项目源码 | 文件源码
def _transmitMsg(self, msg):
        """Send an OSC message over a streaming socket. Raises exception if it
        should fail. If everything is transmitted properly, True is returned. If
        socket has been closed, False.
        """
        if not isinstance(msg, OSCMessage):
            raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")

        try:
            binary = msg.getBinary()
            length = len(binary)
            # prepend length of packet before the actual message (big endian)
            len_big_endian = array.array('c', '\0' * 4)
            struct.pack_into(">L", len_big_endian, 0, length)
            len_big_endian = len_big_endian.tostring()
            if self._transmit(len_big_endian) and self._transmit(binary):
                return True
            return False            
        except socket.error, e:
            if e[0] == errno.EPIPE: # broken pipe
                return False
            raise e
项目:inshack-2017    作者:HugoDelval    | 项目源码 | 文件源码
def assemble_int_data(self, int_data, global_offset):
        offset = None
        data_section = bytes()
        int_buffer = ctypes.create_string_buffer(4)
        for var in int_data:
            length = 0
            t = var['val']
            if t is not None:
                offset = global_offset
                for v in t:
                    struct.pack_into('>i', int_buffer, 0, v)
                    data_section += int_buffer
                    global_offset += 32
                    length += 32
                struct.pack_into('>I', int_buffer, 0, 0xffffffff)
                data_section += int_buffer
                global_offset += 32
                length += 32
            else:
                offset = None
                length = 32
            self.add_var(SGSConfig.VT_INT, '@'+var['name'], offset, length)
        return (data_section, global_offset)
项目:inshack-2017    作者:HugoDelval    | 项目源码 | 文件源码
def assemble_int_data(self, int_data, global_offset):
        offset = None
        data_section = bytes()
        int_buffer = ctypes.create_string_buffer(4)
        for var in int_data:
            length = 0
            t = var['val']
            if t is not None:
                offset = global_offset
                for v in t:
                    struct.pack_into('>i', int_buffer, 0, v)
                    data_section += int_buffer
                    global_offset += 32
                    length += 32
                struct.pack_into('>I', int_buffer, 0, 0xffffffff)
                data_section += int_buffer
                global_offset += 32
                length += 32
            else:
                offset = None
                length = 32
            self.add_var(SGSConfig.VT_INT, '@'+var['name'], offset, length)
        return (data_section, global_offset)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_pack_into_fn(self):
        test_string = b'Reykjavik rocks, eow!'
        writable_buf = array.array('b', b' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('b', b' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_trailing_counter(self):
        store = array.array('b', b' '*100)

        # format lists containing only count spec should result in an error
        self.assertRaises(struct.error, struct.pack, '12345')
        self.assertRaises(struct.error, struct.unpack, '12345', '')
        self.assertRaises(struct.error, struct.pack_into, '12345', store, 0)
        self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0)

        # Format lists with trailing count spec should result in an error
        self.assertRaises(struct.error, struct.pack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.unpack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0,
                           'x')
        self.assertRaises(struct.error, struct.unpack_from, 'c12345', store,
                           0)

        # Mixed format tests
        self.assertRaises(struct.error, struct.pack, '14s42', 'spam and eggs')
        self.assertRaises(struct.error, struct.unpack, '14s42',
                          'spam and eggs')
        self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0,
                          'spam and eggs')
        self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
项目:game_engine_evertims    作者:EVERTims    | 项目源码 | 文件源码
def _transmitMsg(self, msg):
        """Send an OSC message over a streaming socket. Raises exception if it
        should fail. If everything is transmitted properly, True is returned. If
        socket has been closed, False.
        """
        if not isinstance(msg, OSCMessage):
            raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")

        try:
            binary = msg.getBinary()
            length = len(binary)
            # prepend length of packet before the actual message (big endian)
            len_big_endian = array.array('c', '\0' * 4)
            struct.pack_into(">L", len_big_endian, 0, length)
            len_big_endian = len_big_endian.tostring()
            if self._transmit(len_big_endian) and self._transmit(binary):
                return True
            return False            
        except socket.error as e:
            if e[0] == errno.EPIPE: # broken pipe
                return False
            raise e
项目:deb-ryu    作者:openstack    | 项目源码 | 文件源码
def serialize(self):
        buf = bytearray(struct.pack(self._PACK_STR, self.type_,
                        self.aux_len, self.num,
                        addrconv.ipv4.text_to_bin(self.address)))
        for src in self.srcs:
            buf.extend(struct.pack('4s', addrconv.ipv4.text_to_bin(src)))
        if 0 == self.num:
            self.num = len(self.srcs)
            struct.pack_into('!H', buf, 2, self.num)
        if self.aux is not None:
            mod = len(self.aux) % 4
            if mod:
                self.aux += bytearray(4 - mod)
                self.aux = six.binary_type(self.aux)
            buf.extend(self.aux)
            if 0 == self.aux_len:
                self.aux_len = len(self.aux) // 4
                struct.pack_into('!B', buf, 1, self.aux_len)
        return six.binary_type(buf)
项目:deb-ryu    作者:openstack    | 项目源码 | 文件源码
def serialize(self, payload, prev):
        length = len(self)
        hdr = bytearray(length)
        version = self.version << 4 | self.header_length
        flags = self.flags << 13 | self.offset
        if self.total_length == 0:
            self.total_length = self.header_length * 4 + len(payload)
        struct.pack_into(ipv4._PACK_STR, hdr, 0, version, self.tos,
                         self.total_length, self.identification, flags,
                         self.ttl, self.proto, 0,
                         addrconv.ipv4.text_to_bin(self.src),
                         addrconv.ipv4.text_to_bin(self.dst))

        if self.option:
            assert (length - ipv4._MIN_LEN) >= len(self.option)
            hdr[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)] = self.option

        self.csum = packet_utils.checksum(hdr)
        struct.pack_into('!H', hdr, 10, self.csum)
        return hdr
项目:deb-ryu    作者:openstack    | 项目源码 | 文件源码
def serialize(self):
        buf = bytearray(struct.pack(self._PACK_STR, self.type_,
                        self.aux_len, self.num,
                        addrconv.ipv6.text_to_bin(self.address)))
        for src in self.srcs:
            buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src)))
        if 0 == self.num:
            self.num = len(self.srcs)
            struct.pack_into('!H', buf, 2, self.num)
        if self.aux is not None:
            mod = len(self.aux) % 4
            if mod:
                self.aux += bytearray(4 - mod)
                self.aux = six.binary_type(self.aux)
            buf.extend(self.aux)
            if 0 == self.aux_len:
                self.aux_len = len(self.aux) // 4
                struct.pack_into('!B', buf, 1, self.aux_len)
        return six.binary_type(buf)