我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.pack_into()。
def _transmitMsg(self, msg): """Send an OSC message over a streaming socket. Raises exception if it should fail. If everything is transmitted properly, True is returned. If socket has been closed, False. """ if not isinstance(msg, OSCMessage): raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object") try: binary = msg.getBinary() length = len(binary) # prepend length of packet before the actual message (big endian) len_big_endian = array.array('c', '\0' * 4) struct.pack_into(">L", len_big_endian, 0, length) len_big_endian = len_big_endian.tostring() if self._transmit(len_big_endian) and self._transmit(binary): return True return False except socket.error as e: if e[0] == errno.EPIPE: # broken pipe return False raise e
def _transmitMsg(self, msg): """Send an OSC message over a streaming socket. Raises exception if it should fail. If everything is transmitted properly, True is returned. If socket has been closed, False. """ if not isinstance(msg, OSCMessage): raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object") try: binary = msg.getBinary() length = len(binary) # prepend length of packet before the actual message (big endian) len_big_endian = array.array('c', '\0' * 4) struct.pack_into(">L", len_big_endian, 0, length) len_big_endian = len_big_endian.tostring() if self._transmit(len_big_endian) and self._transmit(binary): return True return False except socket.error, e: if e[0] == errno.EPIPE: # broken pipe return False raise e
def check(self, fmt, value): from random import randrange # build a buffer which is surely big enough to contain what we need # and check: # 1) that we correctly write the bytes we expect # 2) that we do NOT write outside the bounds # pattern = [six.int2byte(randrange(256)) for _ in range(256)] pattern = b''.join(pattern) buf = bytearray(pattern) buf2 = bytearray(pattern) offset = 16 pack_into(ord(fmt), buf, offset, value) struct.pack_into(fmt, buf2, offset, value) assert buf == buf2 # # check that it raises if it's out of bound out_of_bound = 256-struct.calcsize(fmt)+1 pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
def _build_and_send_init(self): """Build and send the INIT message""" # Build the message msg_bytes = bytearray(12) struct.pack_into('>III', msg_bytes, 0, 1, # Type - ACK 0, # Remote Channel self.local_channel # Local channel ) # Send it to the remote self._owner.send_data(msg_bytes, (self._remote_ip, self._remote_port)) self._num_init_sent += 1 # Print log logging.info('%s Sent INIT message (%s)', self, self._num_init_sent)
def _build_and_send_init_ack(self): """Build and send INI-ACK message""" # Build message bytes msg_bytes = bytearray(12) struct.pack_into('>III', msg_bytes, 0, 1, # Type self.remote_channel, # Remote channel self.local_channel # Local channel ) # Send it self._owner.send_data(msg_bytes, (self._remote_ip, self._remote_port)) self._num_init_ack_sent += 1 # Print log logging.info('%s Sent INIT-ACK message (%s)', self, self._num_init_ack_sent)
def _write_header(self, buff, api_version=0, correlation_id=0): """Write the header for an outgoing message. :param buff: The buffer into which to write the header :type buff: buffer :param api_version: The "kafka api version id", used for feature flagging :type api_version: int :param correlation_id: This is a user-supplied integer. It will be passed back in the response by the server, unmodified. It is useful for matching request and response between the client and server. :type correlation_id: int """ fmt = '!ihhih%ds' % len(self.CLIENT_ID) struct.pack_into(fmt, buff, 0, len(buff) - 4, # msglen excludes this int self.API_KEY, api_version, correlation_id, len(self.CLIENT_ID), self.CLIENT_ID)
def pack_into(self, buff, offset): """Serialize and write to ``buff`` starting at offset ``offset``. Intentionally follows the pattern of ``struct.pack_into`` :param buff: The buffer to write into :param offset: The offset to start the write at """ # NB a length of 0 means an empty string, whereas -1 means null len_key = -1 if self.partition_key is None else len(self.partition_key) len_value = -1 if self.value is None else len(self.value) fmt = '!BBi%dsi%ds' % (max(len_key, 0), max(len_value, 0)) args = (self.MAGIC, self.compression_type, len_key, self.partition_key or b"", len_value, self.value or b"") struct.pack_into(fmt, buff, offset + 4, *args) fmt_size = struct.calcsize(fmt) data = buffer(buff[(offset + 4):(offset + 4 + fmt_size)]) crc = crc32(data) & 0xffffffff struct.pack_into('!I', buff, offset, crc)
def _get_compressed(self): """Get a compressed representation of all current messages. Returns a Message object with correct headers set and compressed data in the value field. """ assert self.compression_type != CompressionType.NONE tmp_mset = MessageSet(messages=self._messages) uncompressed = bytearray(len(tmp_mset)) tmp_mset.pack_into(uncompressed, 0) if self.compression_type == CompressionType.GZIP: compressed = compression.encode_gzip(buffer(uncompressed)) elif self.compression_type == CompressionType.SNAPPY: compressed = compression.encode_snappy(buffer(uncompressed)) else: raise TypeError("Unknown compression: %s" % self.compression_type) return Message(compressed, compression_type=self.compression_type)
def get_bytes(self): """Serialize the message :returns: Serialized message :rtype: :class:`bytearray` """ output = bytearray(len(self)) self._write_header(output) offset = self.HEADER_LEN struct.pack_into('!iiii', output, offset, -1, self.timeout, self.min_bytes, len(self._reqs)) offset += 16 for topic_name, partitions in iteritems(self._reqs): fmt = '!h%dsi' % len(topic_name) struct.pack_into( fmt, output, offset, len(topic_name), topic_name, len(partitions) ) offset += struct.calcsize(fmt) for partition_id, (fetch_offset, max_bytes) in iteritems(partitions): struct.pack_into('!iqi', output, offset, partition_id, fetch_offset, max_bytes) offset += 16 return output
def get_bytes(self): """Serialize the message :returns: Serialized message :rtype: :class:`bytearray` """ output = bytearray(len(self)) self._write_header(output) offset = self.HEADER_LEN struct.pack_into('!ii', output, offset, -1, len(self._reqs)) offset += 8 for topic_name, partitions in iteritems(self._reqs): fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) offset += struct.calcsize(fmt) for pnum, (offsets_before, max_offsets) in iteritems(partitions): struct.pack_into('!iqi', output, offset, pnum, offsets_before, max_offsets) offset += 16 return output
def get_bytes(self): """Serialize the message :returns: Serialized message :rtype: :class:`bytearray` """ output = bytearray(len(self)) self._write_header(output, api_version=1) offset = self.HEADER_LEN fmt = '!h%dsi' % len(self.consumer_group) struct.pack_into(fmt, output, offset, len(self.consumer_group), self.consumer_group, len(self._reqs)) offset += struct.calcsize(fmt) for topic_name, partitions in iteritems(self._reqs): fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) offset += struct.calcsize(fmt) for pnum in partitions: fmt = '!i' struct.pack_into(fmt, output, offset, pnum) offset += struct.calcsize(fmt) return output
def get_bytes(self): """Serialize the message :returns: Serialized message :rtype: :class:`bytearray` """ output = bytearray(len(self)) self._write_header(output) offset = self.HEADER_LEN fmt = '!h%dsih%dsh%dsi' % (len(self.group_id), len(self.member_id), len(self.protocol_type)) struct.pack_into(fmt, output, offset, len(self.group_id), self.group_id, self.session_timeout, len(self.member_id), self.member_id, len(self.protocol_type), self.protocol_type, len(self.group_protocols)) offset += struct.calcsize(fmt) for protocol_name, protocol_metadata in self.group_protocols: fmt = '!h%dsi%ds' % (len(protocol_name), len(protocol_metadata)) struct.pack_into(fmt, output, offset, len(protocol_name), protocol_name, len(protocol_metadata), protocol_metadata) offset += struct.calcsize(fmt) return output
def get_bytes(self): output = bytearray(len(self)) offset = 0 fmt = '!hi' struct.pack_into(fmt, output, offset, self.version, len(self.partition_assignment)) offset += struct.calcsize(fmt) for topic_name, partitions in self.partition_assignment: fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) offset += struct.calcsize(fmt) for partition_id in partitions: fmt = '!i' struct.pack_into(fmt, output, offset, partition_id) offset += struct.calcsize(fmt) return output
def get_bytes(self): """Serialize the message :returns: Serialized message :rtype: :class:`bytearray` """ output = bytearray(len(self)) self._write_header(output) offset = self.HEADER_LEN fmt = '!h%dsih%dsi' % (len(self.group_id), len(self.member_id)) struct.pack_into(fmt, output, offset, len(self.group_id), self.group_id, self.generation_id, len(self.member_id), self.member_id, len(self.group_assignment)) offset += struct.calcsize(fmt) for member_assignment in self.group_assignment: assignment_bytes = bytes(member_assignment.get_bytes()) fmt = '!h%dsi%ds' % (len(member_assignment.member_id), len(assignment_bytes)) struct.pack_into(fmt, output, offset, len(member_assignment.member_id), member_assignment.member_id, len(assignment_bytes), assignment_bytes) offset += struct.calcsize(fmt) return output
def reset_length(self, buf, length): """ Writes an IPA header with the length param. Used externally when protocols over provision memory initially and then reset the length. Args: buf (memoryview): the IPA message length (int): length of the protocl message encapsulated by IPA Returns: None """ if self._osmo_extn is not None: # Ctrl extn is 0x00 struct.pack_into('!HBB', buf, 0, length + 1, self._stream_id, self._osmo_extn) else: struct.pack_into('!HB', buf, 0, length, self._stream_id)
def encode_auth_tuple(val, buf, offset, min_len, max_len): """ Encode the Auth tuple IE. Args: val: (rand, sres, kc) tuple Returns: The size of encoded auth tuple (always 34) """ (rand, sres, key) = val if len(rand) != 16 or len(sres) != 4 or len(key) != 8: raise GSUPCodecError( "Bad auth tuple to encode: rand: %s, sres: %s, key: %s" % (rand, sres, key)) struct.pack_into( '2B16s2B4s2B8s', buf, offset, IEType.RAND, 16, rand, IEType.SRES, 4, sres, IEType.KC_KEY, 8, key) return 34
def test_pack_into_fn(self): test_string = b'Reykjavik rocks, eow!' writable_buf = array.array('b', b' '*100) fmt = '21s' pack_into = lambda *args: struct.pack_into(fmt, *args) # Test without offset. pack_into(writable_buf, 0, test_string) from_buf = writable_buf.tobytes()[:len(test_string)] self.assertEqual(from_buf, test_string) # Test with offset. pack_into(writable_buf, 10, test_string) from_buf = writable_buf.tobytes()[:len(test_string)+10] self.assertEqual(from_buf, test_string[:10] + test_string) # Go beyond boundaries. small_buf = array.array('b', b' '*10) self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0, test_string) self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2, test_string)
def test_trailing_counter(self): store = array.array('b', b' '*100) # format lists containing only count spec should result in an error self.assertRaises(struct.error, struct.pack, '12345') self.assertRaises(struct.error, struct.unpack, '12345', b'') self.assertRaises(struct.error, struct.pack_into, '12345', store, 0) self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0) # Format lists with trailing count spec should result in an error self.assertRaises(struct.error, struct.pack, 'c12345', b'x') self.assertRaises(struct.error, struct.unpack, 'c12345', b'x') self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0, b'x') self.assertRaises(struct.error, struct.unpack_from, 'c12345', store, 0) # Mixed format tests self.assertRaises(struct.error, struct.pack, '14s42', b'spam and eggs') self.assertRaises(struct.error, struct.unpack, '14s42', b'spam and eggs') self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0, b'spam and eggs') self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
def test_pack_into_fn(self): test_string = 'Reykjavik rocks, eow!' writable_buf = array.array('c', ' '*100) fmt = '21s' pack_into = lambda *args: struct.pack_into(fmt, *args) # Test without offset. pack_into(writable_buf, 0, test_string) from_buf = writable_buf.tostring()[:len(test_string)] self.assertEqual(from_buf, test_string) # Test with offset. pack_into(writable_buf, 10, test_string) from_buf = writable_buf.tostring()[:len(test_string)+10] self.assertEqual(from_buf, test_string[:10] + test_string) # Go beyond boundaries. small_buf = array.array('c', ' '*10) self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0, test_string) self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2, test_string)
def get_packet_header(self) -> bytes: """ Gets the voice packet header. :return: The bytes of the header. """ header = bytearray(12) # constant values, provided by the docs header[0:2] = b"\x80\x78" # dynamic values # offset 2 -> sequence struct.pack_into(">H", header, 2, self.sequence) # offset 4 -> timestamp struct.pack_into(">I", header, 4, self.timestamp) # offset 8 -> ssrc struct.pack_into(">I", header, 8, self.vs_ws.ssrc) return header
def send_frame(self, frame, sequence=None, timestamp=None): # Convert the frame to a bytearray frame = bytearray(frame) # Pack the rtc header into our buffer struct.pack_into('>H', self._buffer, 2, sequence or self.vc.sequence) struct.pack_into('>I', self._buffer, 4, timestamp or self.vc.timestamp) struct.pack_into('>i', self._buffer, 8, self.vc.ssrc) # Now encrypt the payload with the nonce as a header raw = self.vc.secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext # Send the header (sans nonce padding) plus the payload self.send(self._buffer[:12] + raw) # Increment our sequence counter self.vc.sequence += 1 if self.vc.sequence >= 65535: self.vc.sequence = 0
def test_trailing_counter(self): store = array.array('b', b' '*100) # format lists containing only count spec should result in an error self.assertRaises(struct.error, struct.pack, '12345') self.assertRaises(struct.error, struct.unpack, '12345', '') self.assertRaises(struct.error, struct.pack_into, '12345', store, 0) self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0) # Format lists with trailing count spec should result in an error self.assertRaises(struct.error, struct.pack, 'c12345', 'x') self.assertRaises(struct.error, struct.unpack, 'c12345', 'x') self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0, 'x') self.assertRaises(struct.error, struct.unpack_from, 'c12345', store, 0) # Mixed format tests self.assertRaises(struct.error, struct.pack, '14s42', 'spam and eggs') self.assertRaises(struct.error, struct.unpack, '14s42', 'spam and eggs') self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0, 'spam and eggs') self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
def serialize(self, payload, prev): length = len(self) hdr = bytearray(length) version = self.version << 4 | self.header_length flags = self.flags << 13 | self.offset if self.total_length == 0: self.total_length = self.header_length * 4 + len(payload) struct.pack_into(ipv4._PACK_STR, hdr, 0, version, self.tos, self.total_length, self.identification, flags, self.ttl, self.proto, 0, addrconv.ipv4.text_to_bin(self.src), addrconv.ipv4.text_to_bin(self.dst)) if self.option: assert (length - ipv4._MIN_LEN) >= len(self.option) hdr[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)] = self.option self.csum = packet_utils.checksum(hdr) struct.pack_into('!H', hdr, 10, self.csum) return hdr
def serialize(self): buf = bytearray(struct.pack(self._PACK_STR, self.type_, self.aux_len, self.num, addrconv.ipv6.text_to_bin(self.address))) for src in self.srcs: buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src))) if 0 == self.num: self.num = len(self.srcs) struct.pack_into('!H', buf, 2, self.num) if self.aux is not None: mod = len(self.aux) % 4 if mod: self.aux += bytearray(4 - mod) self.aux = six.binary_type(self.aux) buf.extend(self.aux) if 0 == self.aux_len: self.aux_len = len(self.aux) // 4 struct.pack_into('!B', buf, 1, self.aux_len) return six.binary_type(buf)
def serialize(self, payload=None, prev=None): tail = self.serialize_tail() self.length = self._HDR_LEN + len(tail) head = bytearray(struct.pack(self._HDR_PACK_STR, self.version, self.type_, self.length, addrconv.ipv4.text_to_bin(self.router_id), addrconv.ipv4.text_to_bin(self.area_id), 0, self.au_type, self.authentication)) buf = head + tail csum = packet_utils.checksum(buf[:12] + buf[14:16] + buf[self._HDR_LEN:]) self.checksum = csum struct.pack_into("!H", buf, 12, csum) return buf # alias
def serialize(self, payload, prev): hdr = bytearray(40) v_tc_flow = (self.version << 28 | self.traffic_class << 20 | self.flow_label) struct.pack_into(ipv6._PACK_STR, hdr, 0, v_tc_flow, self.payload_length, self.nxt, self.hop_limit, addrconv.ipv6.text_to_bin(self.src), addrconv.ipv6.text_to_bin(self.dst)) if self.ext_hdrs: for ext_hdr in self.ext_hdrs: hdr.extend(ext_hdr.serialize()) if 0 == self.payload_length: payload_length = len(payload) for ext_hdr in self.ext_hdrs: payload_length += len(ext_hdr) self.payload_length = payload_length struct.pack_into('!H', hdr, 4, self.payload_length) return hdr
def serialize(self, payload, prev): hdr = bytearray(struct.pack(icmp._PACK_STR, self.type, self.code, self.csum)) if self.data is not None: if self.type in icmp._ICMP_TYPES: hdr += self.data.serialize() else: hdr += self.data else: self.data = echo() hdr += self.data.serialize() if self.csum == 0: self.csum = packet_utils.checksum(hdr) struct.pack_into('!H', hdr, 2, self.csum) return hdr
def setUp_with_echo(self): self.echo_id = 13379 self.echo_seq = 1 self.echo_data = b'\x30\x0e\x09\x00\x00\x00\x00\x00' \ + b'\x10\x11\x12\x13\x14\x15\x16\x17' \ + b'\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \ + b'\x20\x21\x22\x23\x24\x25\x26\x27' \ + b'\x28\x29\x2a\x2b\x2c\x2d\x2e\x2f' \ + b'\x30\x31\x32\x33\x34\x35\x36\x37' self.data = icmp.echo( id_=self.echo_id, seq=self.echo_seq, data=self.echo_data) self.type_ = icmp.ICMP_ECHO_REQUEST self.code = 0 self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data) self.buf = bytearray(struct.pack( icmp.icmp._PACK_STR, self.type_, self.code, self.csum)) self.buf += self.data.serialize() self.csum_calc = packet_utils.checksum(self.buf) struct.pack_into('!H', self.buf, 2, self.csum_calc)
def setUp_with_dest_unreach(self): self.unreach_mtu = 10 self.unreach_data = b'abc' self.unreach_data_len = len(self.unreach_data) self.data = icmp.dest_unreach( data_len=self.unreach_data_len, mtu=self.unreach_mtu, data=self.unreach_data) self.type_ = icmp.ICMP_DEST_UNREACH self.code = icmp.ICMP_HOST_UNREACH_CODE self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data) self.buf = bytearray(struct.pack( icmp.icmp._PACK_STR, self.type_, self.code, self.csum)) self.buf += self.data.serialize() self.csum_calc = packet_utils.checksum(self.buf) struct.pack_into('!H', self.buf, 2, self.csum_calc)
def yw_proc(data, isEncrypt, key=None, head=None, validator=None): out = bytearray() length = len(data) new_crc32 = struct.unpack("<I", data[-8:-4])[0] seed = struct.unpack("<I", data[-4:])[0] if not isEncrypt: if binascii.crc32(data[:-8]) != new_crc32: logging.error("Checksum does not match") return None c = YWCipher(seed, 0x1000) out += c.encrypt(data[:-8]) out += data[-8:] if isEncrypt: new_crc32 = binascii.crc32(out[:-8]) struct.pack_into("<I", out, length - 8, new_crc32) return bytes(out)
def allThornyan(file, code=72463062): print("=== Yo-kai ===") data = open(file, "rb").read() data = bytearray(data) pos = 0x1D08 for i in range(240): params = struct.unpack("<4x i 56x 5b 5B 5b 5x B 3x 2x H", data[pos:pos+0x5C]) if params[0] == 0: continue print("{} -> {}".format( youkai.get(params[0], "(unknown)"), youkai.get(code, "(unknown)"), ) ) struct.pack_into("<i", data, pos + 4, code) pos += 0x5C with open(file + ".edited.yw", "xb") as f: f.write(data)
def save(self): new = bytearray(self.original) new[0x4:0x10] = self.gamedata_header struct.pack_into("<L", new, 0x00024, self.time_played) struct.pack_into("<L", new, 0x0002C, self.last_saved_chapter) namebuf = self.name.encode("utf-16-le") # limit length if len(namebuf) > 68: namebuf = namebuf[:68] # pad with zeroes namebuf = namebuf + b"\0" * (70 - len(namebuf)) new[0x34:0x7A] = namebuf struct.pack_into("<L", new, 0x3056C, self.money) struct.pack_into("<L", new, 0x3871C, self.experience) # back up the file shutil.move(self.path, self.path + ".bak") # write the new one with open(self.path, "wb") as f: f.write(new)
def transactionI2C(self, port, device_address, data_to_send, send_size, data_received, receive_size): ''' To give data back use ``data_received``:: data_received[:] = [1,2,3...] :returns: number of bytes returned ''' if data_to_send[0] == BNO055.BNO055_EULER_H_LSB_ADDR: struct.pack_into('<h', data_received, 0, int(self.heading * 900.0)) if data_to_send[0] == BNO055.BNO055_EULER_P_LSB_ADDR: struct.pack_into('<h', data_received, 0, int(self.pitch * 900.0)) if data_to_send[0] == BNO055.BNO055_EULER_R_LSB_ADDR: struct.pack_into('<h', data_received, 0, int(self.roll * 900.0)) return receive_size
def _pack_array(values, definition, fmt_string, num_bytes): """ Pack an array item defined by a dictionary entry into a bytearray Args: values (tuple): 1d or 2d tuple of values to pack definition (dict): definition of the data item fmt_string (str): struct.unpack format string for an element num_bytes (int): number of bytes of each element """ packet_part = bytearray() if definition['DIM_2_SIZE']: for dim1 in values: for value in dim1: buff = bytearray(num_bytes) struct.pack_into(fmt_string, buff, 0, value) packet_part += buff else: for value in values: buff = bytearray(num_bytes) struct.pack_into(fmt_string, buff, 0, value) packet_part += buff return packet_part
def mai_set_time(self, gpstime): """ Sets the ADACS clock. Arguments: gpstime - GPS time is a linear count of seconds elapsed since 0h Jan 6, 1980. """ DATA_LEN = 40 #always data = bytearray(DATA_LEN) syncbyte = 0xEB90 # TODO: the Pumpkin comments contain some confusing statements about endianness commandID = 0x44 # set GPS time # Create data struct.pack_into('<HBL', data, 0, syncbyte, commandID, gpstime) # Add data checksum. (This is different than the packet checksum.) checksum = 0xFFFF & sum(data[0:DATA_LEN]) struct.pack_into('<H', data, 38, checksum) Send.send_bus_cmd(BusCommands.MAI_CMD, data)
def assemble_int_data(self, int_data, global_offset): offset = None data_section = bytes() int_buffer = ctypes.create_string_buffer(4) for var in int_data: length = 0 t = var['val'] if t is not None: offset = global_offset for v in t: struct.pack_into('>i', int_buffer, 0, v) data_section += int_buffer global_offset += 32 length += 32 struct.pack_into('>I', int_buffer, 0, 0xffffffff) data_section += int_buffer global_offset += 32 length += 32 else: offset = None length = 32 self.add_var(SGSConfig.VT_INT, '@'+var['name'], offset, length) return (data_section, global_offset)
def serialize(self): buf = bytearray(struct.pack(self._PACK_STR, self.type_, self.aux_len, self.num, addrconv.ipv4.text_to_bin(self.address))) for src in self.srcs: buf.extend(struct.pack('4s', addrconv.ipv4.text_to_bin(src))) if 0 == self.num: self.num = len(self.srcs) struct.pack_into('!H', buf, 2, self.num) if self.aux is not None: mod = len(self.aux) % 4 if mod: self.aux += bytearray(4 - mod) self.aux = six.binary_type(self.aux) buf.extend(self.aux) if 0 == self.aux_len: self.aux_len = len(self.aux) // 4 struct.pack_into('!B', buf, 1, self.aux_len) return six.binary_type(buf)