我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用google.protobuf.message.ByteSize()。
def MessageSetItemByteSize(field_number, msg): # First compute the sizes of the tags. # There are 2 tags for the beginning and ending of the repeated group, that # is field number 1, one with field number 2 (type_id) and one with field # number 3 (message). total_size = (2 * TagByteSize(1) + TagByteSize(2) + TagByteSize(3)) # Add the number of bytes for type_id. total_size += _VarUInt64ByteSizeNoTag(field_number) message_size = msg.ByteSize() # The number of bytes for encoding the length of the message. total_size += _VarUInt64ByteSizeNoTag(message_size) # The size of the message. total_size += message_size return total_size
def testClear(self): proto = unittest_pb2.TestAllTypes() # C++ implementation does not support lazy fields right now so leave it # out for now. if api_implementation.Type() == 'python': test_util.SetAllFields(proto) else: test_util.SetAllNonLazyFields(proto) # Clear the message. proto.Clear() self.assertEqual(proto.ByteSize(), 0) empty_proto = unittest_pb2.TestAllTypes() self.assertEqual(proto, empty_proto) # Test if extensions which were set are cleared. proto = unittest_pb2.TestAllExtensions() test_util.SetAllExtensions(proto) # Clear the message. proto.Clear() self.assertEqual(proto.ByteSize(), 0) empty_proto = unittest_pb2.TestAllExtensions() self.assertEqual(proto, empty_proto)
def testCacheInvalidationForNonrepeatedScalar(self): # Test non-extension. self.proto.optional_int32 = 1 self.assertEqual(2, self.proto.ByteSize()) self.proto.optional_int32 = 128 self.assertEqual(3, self.proto.ByteSize()) self.proto.ClearField('optional_int32') self.assertEqual(0, self.proto.ByteSize()) # Test within extension. extension = more_extensions_pb2.optional_int_extension self.extended_proto.Extensions[extension] = 1 self.assertEqual(2, self.extended_proto.ByteSize()) self.extended_proto.Extensions[extension] = 128 self.assertEqual(3, self.extended_proto.ByteSize()) self.extended_proto.ClearExtension(extension) self.assertEqual(0, self.extended_proto.ByteSize())
def testCacheInvalidationForRepeatedScalar(self): # Test non-extension. self.proto.repeated_int32.append(1) self.assertEqual(3, self.proto.ByteSize()) self.proto.repeated_int32.append(1) self.assertEqual(6, self.proto.ByteSize()) self.proto.repeated_int32[1] = 128 self.assertEqual(7, self.proto.ByteSize()) self.proto.ClearField('repeated_int32') self.assertEqual(0, self.proto.ByteSize()) # Test within extension. extension = more_extensions_pb2.repeated_int_extension repeated = self.extended_proto.Extensions[extension] repeated.append(1) self.assertEqual(2, self.extended_proto.ByteSize()) repeated.append(1) self.assertEqual(4, self.extended_proto.ByteSize()) repeated[1] = 128 self.assertEqual(5, self.extended_proto.ByteSize()) self.extended_proto.ClearExtension(extension) self.assertEqual(0, self.extended_proto.ByteSize())
def testCacheInvalidationForRepeatedMessage(self): # Test non-extension. child0 = self.proto.repeated_foreign_message.add() self.assertEqual(3, self.proto.ByteSize()) self.proto.repeated_foreign_message.add() self.assertEqual(6, self.proto.ByteSize()) child0.c = 1 self.assertEqual(8, self.proto.ByteSize()) self.proto.ClearField('repeated_foreign_message') self.assertEqual(0, self.proto.ByteSize()) # Test within extension. extension = more_extensions_pb2.repeated_message_extension child_list = self.extended_proto.Extensions[extension] child0 = child_list.add() self.assertEqual(2, self.extended_proto.ByteSize()) child_list.add() self.assertEqual(4, self.extended_proto.ByteSize()) child0.foreign_message_int = 1 self.assertEqual(6, self.extended_proto.ByteSize()) child0.ClearField('foreign_message_int') self.assertEqual(4, self.extended_proto.ByteSize()) self.extended_proto.ClearExtension(extension) self.assertEqual(0, self.extended_proto.ByteSize())
def testPackedRepeatedScalars(self): self.assertEqual(0, self.packed_proto.ByteSize()) self.packed_proto.packed_int32.append(10) # 1 byte. self.packed_proto.packed_int32.append(128) # 2 bytes. # The tag is 2 bytes (the field number is 90), and the varint # storing the length is 1 byte. int_size = 1 + 2 + 3 self.assertEqual(int_size, self.packed_proto.ByteSize()) self.packed_proto.packed_double.append(4.2) # 8 bytes self.packed_proto.packed_double.append(3.25) # 8 bytes # 2 more tag bytes, 1 more length byte. double_size = 8 + 8 + 3 self.assertEqual(int_size+double_size, self.packed_proto.ByteSize()) self.packed_proto.ClearField('packed_int32') self.assertEqual(double_size, self.packed_proto.ByteSize())
def testCanonicalSerializationOrder(self): proto = more_messages_pb2.OutOfOrderFields() # These are also their tag numbers. Even though we're setting these in # reverse-tag order AND they're listed in reverse tag-order in the .proto # file, they should nonetheless be serialized in tag order. proto.optional_sint32 = 5 proto.Extensions[more_messages_pb2.optional_uint64] = 4 proto.optional_uint32 = 3 proto.Extensions[more_messages_pb2.optional_int64] = 2 proto.optional_int32 = 1 serialized = proto.SerializeToString() self.assertEqual(proto.ByteSize(), len(serialized)) d = _MiniDecoder(serialized) ReadTag = d.ReadFieldNumberAndWireType self.assertEqual((1, wire_format.WIRETYPE_VARINT), ReadTag()) self.assertEqual(1, d.ReadInt32()) self.assertEqual((2, wire_format.WIRETYPE_VARINT), ReadTag()) self.assertEqual(2, d.ReadInt64()) self.assertEqual((3, wire_format.WIRETYPE_VARINT), ReadTag()) self.assertEqual(3, d.ReadUInt32()) self.assertEqual((4, wire_format.WIRETYPE_VARINT), ReadTag()) self.assertEqual(4, d.ReadUInt64()) self.assertEqual((5, wire_format.WIRETYPE_VARINT), ReadTag()) self.assertEqual(5, d.ReadSInt32())
def testPackedFieldsWireFormat(self): proto = unittest_pb2.TestPackedTypes() proto.packed_int32.extend([1, 2, 150, 3]) # 1 + 1 + 2 + 1 bytes proto.packed_double.extend([1.0, 1000.0]) # 8 + 8 bytes proto.packed_float.append(2.0) # 4 bytes, will be before double serialized = proto.SerializeToString() self.assertEqual(proto.ByteSize(), len(serialized)) d = _MiniDecoder(serialized) ReadTag = d.ReadFieldNumberAndWireType self.assertEqual((90, wire_format.WIRETYPE_LENGTH_DELIMITED), ReadTag()) self.assertEqual(1+1+1+2, d.ReadInt32()) self.assertEqual(1, d.ReadInt32()) self.assertEqual(2, d.ReadInt32()) self.assertEqual(150, d.ReadInt32()) self.assertEqual(3, d.ReadInt32()) self.assertEqual((100, wire_format.WIRETYPE_LENGTH_DELIMITED), ReadTag()) self.assertEqual(4, d.ReadInt32()) self.assertEqual(2.0, d.ReadFloat()) self.assertEqual((101, wire_format.WIRETYPE_LENGTH_DELIMITED), ReadTag()) self.assertEqual(8+8, d.ReadInt32()) self.assertEqual(1.0, d.ReadDouble()) self.assertEqual(1000.0, d.ReadDouble()) self.assertTrue(d.EndOfStream())
def testClear(self): proto = unittest_pb2.TestAllTypes() # C++ implementation does not support lazy fields right now so leave it # out for now. if api_implementation.Type() == 'python': test_util.SetAllFields(proto) else: test_util.SetAllNonLazyFields(proto) # Clear the message. proto.Clear() self.assertEquals(proto.ByteSize(), 0) empty_proto = unittest_pb2.TestAllTypes() self.assertEquals(proto, empty_proto) # Test if extensions which were set are cleared. proto = unittest_pb2.TestAllExtensions() test_util.SetAllExtensions(proto) # Clear the message. proto.Clear() self.assertEquals(proto.ByteSize(), 0) empty_proto = unittest_pb2.TestAllExtensions() self.assertEquals(proto, empty_proto)