我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用uuid.UUID。
def print_header(o): formated_out = """Version:\t%(version)s Creation Date:\t%(timestamp)s Incremental:\t%(incremental)s Volume Size:\t%(volume_size)s Segment Size:\t%(segment_size)s Backup Set:\t%(backup_set)s""" dict_out = dict() dict_out['backup_set'] = UUID(bytes=o.metadata['bases'][-1]) dict_out['version'] = o.metadata['version'] dict_out['incremental'] = o.metadata['incremental'] dict_out['timestamp'] = get_timestamp(o) dict_out['volume_size'] = convert_size(o.metadata['sectors'] * 512) dict_out['segment_size'] = convert_size(o.metadata['segment_size']) print(formated_out % dict_out)
def put_data(self, data_segment): segment = data_segment[1] data = self.wrap_data(data_segment[0], segment) file_path = os.path.join( self.storage_location, str(uuid.UUID(bytes=segment.backupset_id)), str(segment.incremental) ) mkpath(file_path) file_output = os.path.join( file_path, str(segment.segment) ) with open(file_output, 'wb') as f: f.write(data) return segment
def api_is_task(url): """Determine if a URL looks like a valid task URL""" # Note that this generates an extra array element because of the # leading slash. url_parts = urlparse.urlparse(url).path.split('/') if len(url_parts) != 4 \ or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]): return False try: uuid.UUID(url_parts[3]) except ValueError: return False return True
def api_is_run(url): """Determine if a URL looks like a valid run URL""" # Note that this generates an extra array element because of the # leading slash. url_parts = urlparse.urlparse(url).path.split('/') if len(url_parts) != 6 \ or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]) \ or (url_parts[4] != 'runs'): return False try: uuid.UUID(url_parts[3]) uuid.UUID(url_parts[5]) except ValueError: return False return True
def _encode_uuid(name, value, dummy, opts): """Encode uuid.UUID.""" uuid_representation = opts.uuid_representation # Python Legacy Common Case if uuid_representation == OLD_UUID_SUBTYPE: return b"\x05" + name + b'\x10\x00\x00\x00\x03' + value.bytes # Java Legacy elif uuid_representation == JAVA_LEGACY: from_uuid = value.bytes data = from_uuid[0:8][::-1] + from_uuid[8:16][::-1] return b"\x05" + name + b'\x10\x00\x00\x00\x03' + data # C# legacy elif uuid_representation == CSHARP_LEGACY: # Microsoft GUID representation. return b"\x05" + name + b'\x10\x00\x00\x00\x03' + value.bytes_le # New else: return b"\x05" + name + b'\x10\x00\x00\x00\x04' + value.bytes
def default(self, o): """Implement this method in a subclass such that it returns a serializable object for ``o``, or calls the base implementation (to raise a ``TypeError``). For example, to support arbitrary iterators, you could implement default like this:: def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) return JSONEncoder.default(self, o) """ if isinstance(o, datetime): return http_date(o) if isinstance(o, uuid.UUID): return str(o) if hasattr(o, '__html__'): return text_type(o.__html__()) return _json.JSONEncoder.default(self, o)
def loads(self, value): def object_hook(obj): if len(obj) != 1: return obj the_key, the_value = next(iteritems(obj)) if the_key == ' t': return tuple(the_value) elif the_key == ' u': return uuid.UUID(the_value) elif the_key == ' b': return b64decode(the_value) elif the_key == ' m': return Markup(the_value) elif the_key == ' d': return parse_date(the_value) return obj return json.loads(value, object_hook=object_hook)
def detach_volume(self, blockdevice_id): """ :param: volume id = blockdevice_id :raises: unknownvolume exception if not found """ try: dataset_id = UUID(blockdevice_id[6:]) except ValueError: raise UnknownVolume(blockdevice_id) volumesdetails = self.coprhdcli.get_volume_details("flocker-{}".format(dataset_id)) if not volumesdetails: raise UnknownVolume(blockdevice_id) if volumesdetails[volumesdetails.keys()[0]]['attached_to'] is not None: Message.new(Info="coprhd detach_volume" + str(blockdevice_id)).write(_logger) dataset_id = UUID(blockdevice_id[6:]) self.coprhdcli.unexport_volume("flocker-{}".format(dataset_id)) else: Message.new(Info="Volume" + blockdevice_id + "not attached").write(_logger) raise UnattachedVolume(blockdevice_id)
def default(self, o): """Implement this method in a subclass such that it returns a serializable object for ``o``, or calls the base implementation (to raise a :exc:`TypeError`). For example, to support arbitrary iterators, you could implement default like this:: def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) return JSONEncoder.default(self, o) """ if isinstance(o, date): return http_date(o.timetuple()) if isinstance(o, uuid.UUID): return str(o) if hasattr(o, '__html__'): return text_type(o.__html__()) return _json.JSONEncoder.default(self, o)
def _tag(value): if isinstance(value, tuple): return {' t': [_tag(x) for x in value]} elif isinstance(value, uuid.UUID): return {' u': value.hex} elif isinstance(value, bytes): return {' b': b64encode(value).decode('ascii')} elif callable(getattr(value, '__html__', None)): return {' m': text_type(value.__html__())} elif isinstance(value, list): return [_tag(x) for x in value] elif isinstance(value, datetime): return {' d': http_date(value)} elif isinstance(value, dict): return dict((k, _tag(v)) for k, v in iteritems(value)) elif isinstance(value, str): try: return text_type(value) except UnicodeError: from flask.debughelpers import UnexpectedUnicodeError raise UnexpectedUnicodeError(u'A byte string with ' u'non-ASCII data was passed to the session system ' u'which can only store unicode strings. Consider ' u'base64 encoding your string (String was %r)' % value) return value
def validate(self, value): if value is None: return True if isinstance(value, UUID): return True if isinstance(value, six.string_types): try: UUID(value) return True except TypeError: pass except ValueError: pass return False
def default(self, o, dates=(datetime.datetime, datetime.date), times=(datetime.time,), textual=(decimal.Decimal, uuid.UUID, DjangoPromise), isinstance=isinstance, datetime=datetime.datetime, text_type=text_type): if isinstance(o, dates): if not isinstance(o, datetime): o = datetime(o.year, o.month, o.day, 0, 0, 0, 0) r = o.isoformat() if r.endswith("+00:00"): r = r[:-6] + "Z" return r elif isinstance(o, times): return o.isoformat() elif isinstance(o, textual): return text_type(o) else: return super(JsonEncoder, self).default(o)
def __new__(cls, uuid, aik_path, vtpm, added=False): """ Args ---- uuid: str The UUID for this VTPM Group aik_path: str The path to the .pem for this group's AIK """ if uuid in VTPMGroup._groups: return VTPMGroup._groups[uuid] obj = object.__new__(cls) obj.uuid = uuid obj.aik_path = aik_path obj.vtpm = vtpm if added: VTPMGroup._groups[uuid] = obj return obj
def show_group(group_num): """ Returns info about group `group_num` using VTPM_ORD_GROUP_SHOW""" out = {'num': group_num, 'vtpms': []} body = vtpm_raw(0x1C2, struct.pack('>II', 0x02000107, group_num)) (uuid, pk, cfg) = struct.unpack('16s 256s 16s', body) uuid = stringify_uuid(uuid) logger.info('Group [%d] UUID: %s', group_num, uuid) pk_hash = hashlib.sha1(pk).hexdigest() logger.info(' PK Hash: %s', pk_hash) logger.info(' Cfg list: %s', cfg.encode('hex')) body = vtpm_cmd(VTPM_ORD_VTPM_LIST, struct.pack('>II', group_num, 0)) ((num_vtpms,), body) = unpack('>I', body) if num_vtpms > 0: logger.info(' vTPMs: ') vtpms = struct.unpack('16s' * num_vtpms, body) vtpms = [stringify_uuid(vtpm) for vtpm in vtpms] for i, vtpm in enumerate(vtpms): logger.info(' [%d]: %s', i, vtpm) out['vtpms'].append(vtpm) out['uuid'] = uuid return out
def get_group_info(num): """Returns UUID and path to the group AIK file for vtpm group `num`.""" # Get info for group `num` ginfo = show_group(num) uuid = ginfo['uuid'] aikname = '{0}_aik'.format(uuid) pubaik_path = '{0}.pub'.format(aikname) # Check that we have the group's AIK if not os.path.exists(pubaik_path): logger.error('Group %d AIK Path %r doesn\'t exist', num, pubaik_path) raise OSError() aikbase = '{0}'.format(aikname) aikpem = aikbase + '.pem' # Convert group AIK to PEM check_call('tpmconv -ik {0} -ok {1}'.format(pubaik_path, aikbase), shell=True) return {'aikpem': aikpem, 'uuid': uuid}
def add_vtpm_group(rsa_mod=None): """ Add new vtpm group""" if common.STUB_TPM: return (common.TEST_GROUP_UUID,common.TEST_HAIK,1,None) logger.debug('Adding group') if rsa_mod is None: rsa_mod = '\x00' * 256 assert len(rsa_mod) == 256 ca_digest = '\x00' * 20 rsp = vtpm_cmd(VTPM_ORD_GROUP_NEW, ca_digest + rsa_mod) (uuid, aik_pub, aik_priv_ca) = struct.unpack('16s256s256s', rsp) uuid = struct.unpack(uuid_fmt, uuid) uuid = '-'.join([part.encode('hex') for part in uuid]) logger.info('Created group with UUID: %s', uuid) aikpem = tpmconv(aik_pub) # return the group group_num = get_group_num(uuid) return (uuid,aikpem,group_num,aik_priv_ca)
def encodeSmallAttribute(self, attr): """ @since: 0.5 """ obj = getattr(self, attr) if not obj: return obj if attr in ['timestamp', 'timeToLive']: return pyamf.util.get_timestamp(obj) * 1000.0 elif attr in ['clientId', 'messageId']: if isinstance(obj, uuid.UUID): return None return obj
def testUUIDARRAY(self): import uuid psycopg2.extras.register_uuid() u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')] s = self.execute("SELECT %s AS foo", (u,)) self.assertTrue(u == s) # array with a NULL element u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None] s = self.execute("SELECT %s AS foo", (u,)) self.assertTrue(u == s) # must survive NULL cast to a uuid[] s = self.execute("SELECT NULL::uuid[] AS foo") self.assertTrue(s is None) # what about empty arrays? s = self.execute("SELECT '{}'::uuid[] AS foo") self.assertTrue(type(s) == list and len(s) == 0)
def test_send(header_timestamp_mock, pubsub_client_mock): messaging = queue_messaging.Messaging.create_from_dict({ 'TOPIC': 'test-topic', }) model = FancyEvent( uuid_field=uuid.UUID('cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e'), string_field='Just testing!' ) header_timestamp_mock.return_value = datetime.datetime( 2016, 12, 10, 11, 15, 45, 123456, tzinfo=datetime.timezone.utc) messaging.send(model) topic_mock = pubsub_client_mock.return_value.topic publish_mock = topic_mock.return_value.publish topic_mock.assert_called_with('test-topic') publish_mock.assert_called_with( test_utils.EncodedJson({ "uuid_field": "cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e", "string_field": "Just testing!" }), timestamp='2016-12-10T11:15:45.123456Z', type='FancyEvent' )
def test_if_encode_raises_exception_with_invalid_data_and_strict_schema(): class StrictSchema(marshmallow.Schema): uuid_field = fields.UUID(required=True) class Meta: strict = True class Event(structures.Model): class Meta: schema = StrictSchema type_name = 'Event' data = Event(uuid_field='not an uuid') with pytest.raises(exceptions.EncodingError) as excinfo: encoding.encode(data) assert str(excinfo.value) == ( "({'uuid_field': ['Not a valid UUID.']}, '')")
def generate_uuid(cls, return_hex=False, seed=None): """ Generate uuid :param return_hex: Return in hex format :param seed: Seed value to generate a consistent uuid :return: """ if seed: m = hashlib.md5() m.update(seed.encode('utf-8')) new_uuid = uuid.UUID(m.hexdigest()) else: new_uuid = uuid.uuid1() if return_hex: return new_uuid.hex return str(new_uuid)
def json_serial(obj): """ JSON serializer for objects not serializable by default json code. :param obj: object to serialize :type obj: date, datetime or UUID :return: formatted and serialized object :rtype: str """ if isinstance(obj, datetime.datetime) or isinstance(obj, datetime.date): # Datetime serializer return obj.isoformat() elif isinstance(obj, uuid.UUID): return str(obj) raise TypeError("Type %s not serializable" % type(obj))
def test_uid(self): class UidModel(properties.HasProperties): uid = properties.Uuid('my uuid') model = UidModel() assert isinstance(model.uid, uuid.UUID) with self.assertRaises(AttributeError): model.uid = uuid.uuid4() assert model.validate() model._backend['uid'] = 'hi' with self.assertRaises(ValueError): model.validate() json_uuid = uuid.uuid4() json_uuid_str = str(json_uuid) assert properties.Uuid.to_json(json_uuid) == json_uuid_str assert str(properties.Uuid.from_json(json_uuid_str)) == json_uuid_str assert properties.Uuid('').equal(uuid.UUID(int=0), uuid.UUID(int=0))
def bind_port(self, uuid, model, changes): """Called to bind port to VM. :param uuid: UUID of Port :param model: Model object :returns: dict of vif parameters (vif_type, vif_details) """ LOG.info("bind_port: %s" % uuid) LOG.info(changes) port = model.ports.get(uuid, None) if not port: LOG.error("Cannot find port") return dict() retval = {'vif_type': 'ovs', 'vif_details': {'port_filter': False, 'bridge_name': 'br-int'}} return retval
def modify_service(self, uuid, model, changes): """Called when attributes change on a bound port's service :param uuid: UUID of Service :param model: Model Object :param changes: dictionary of changed attributes :returns: None """ LOG.info("modify_service: %s" % uuid) LOG.info(changes) LOG.info("Creating or updating VPN instance") vpn_instance = model.vpn_instances.get(uuid) if vpn_instance: self._create_or_update_service(vpn_instance) else: LOG.error("VPN instance %s not found" % uuid)
def create_message_header(self) -> Dict: def _add_correlation_id(brightstide_message_header: Dict, correlation_id: UUID) -> None: if correlation_id is not None: brightstide_message_header[message_correlation_id_header] = str(correlation_id) def _add_message_id(brightside_message_header: Dict, identity: UUID) -> None: if identity is None: raise MessagingException("Missing id on message, this is a required field") brightside_message_header[message_id_header] = str(identity) def _add_message_type(brightside_message_header: Dict, brightside_message_type: str) -> None: if brightside_message_type is None: raise MessagingException("Missing type on message, this is a required field") brightside_message_header[message_type_header] = brightside_message_type header = {} _add_message_id(header, self._message.header.id) _add_message_type(header, self._message.header.message_type.name) _add_correlation_id(header, self._message.header.correlation_id) return header
def test_remove_vms(self): """ """ privileges = self.get_privileges() default_datastore = self.get_default_datastore() default_datastore_url = self.get_datastore_url(default_datastore) vms = [(self.vm1_uuid, self.vm1_name), (self.vm2_uuid, self.vm2_name)] error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name, description='Some tenant', vms=vms, privileges=privileges) self.assertEqual(error_info, None) self.assertTrue(uuid.UUID(tenant1.id)) error_info = tenant1.remove_vms(self.auth_mgr.conn, vms) self.assertEqual(error_info, None) error_info, vms_row = auth.get_row_from_vms_table(self.auth_mgr.conn, tenant1.id) self.assertEqual(error_info, None) self.assertEqual(vms_row, [])
def test_set_name(self): vms = [(self.vm1_uuid, self.vm1_name)] privileges = self.get_privileges() default_datastore = self.get_default_datastore() default_datastore_url = self.get_datastore_url(default_datastore) error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name, description='Some tenant', vms=vms, privileges=privileges) self.assertEqual(error_info, None) self.assertTrue(uuid.UUID(tenant1.id)) error_info = tenant1.set_name(self.auth_mgr.conn, self.tenant_name, self.tenant_2_name) self.assertEqual(error_info, None) error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id) self.assertEqual(error_info, None) expected_output = self.tenant_2_name actual_output = tenants_row[auth_data_const.COL_NAME] self.assertEqual(actual_output, expected_output)
def test_set_description(self): vms = [(self.vm1_uuid, self.vm1_name)] privileges = self.get_privileges() error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name, description='Some tenant', vms=vms, privileges=privileges) self.assertEqual(error_info, None) self.assertTrue(uuid.UUID(tenant1.id)) error_info = tenant1.set_description(self.auth_mgr.conn, 'new description') self.assertEqual(error_info, None) error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id) self.assertEqual(error_info, None) expected_output = 'new description' actual_output = tenants_row[auth_data_const.COL_DESCRIPTION] self.assertEqual(actual_output, expected_output)
def test_set_default_datastore(self): vms = [(self.vm1_uuid, self.vm1_name)] privileges = self.get_privileges() default_datastore = self.get_default_datastore() default_datastore_url = self.get_datastore_url(default_datastore) error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name, description='Some tenant', vms=vms, privileges=privileges) self.assertEqual(error_info, None) self.assertTrue(uuid.UUID(tenant1.id)) default_datastore = 'new_default_ds' default_datastore_url = self.get_datastore_url(default_datastore) error_info = tenant1.set_default_datastore(self.auth_mgr.conn, default_datastore_url) self.assertEqual(error_info, None) # Check tenants table error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn, tenant1.id) self.assertEqual(error_info, None) expected_output = 'new_default_ds_url' actual_output = tenants_row[auth_data_const.COL_DEFAULT_DATASTORE_URL] self.assertEqual(actual_output, expected_output)
def get_mac_address(): import uuid node = uuid.getnode() mac = uuid.UUID(int=node).hex[-12:] return mac
def handle(self, player, action, values, **kwargs): if not action.startswith(self.id): return if not self._is_global_shown and player.login not in self._is_player_shown.keys(): # Ignore if id is unique (uuid4) try: uuid.UUID(self.id, version=4) except: raise ManialinkMemoryLeakException( 'Old view instance (ml-id: {}) is not yet destroyed, but is receiving player callbacks!, ' 'Make sure you are not removing old view instances with .destroy() and del variable! ' 'Potential Memory Leak!! Should be fixed asap!'.format(self.id) ) action_name = action[len(self.id)+2:] if action_name not in self.receivers: return await self.handle_catch_all(player, action_name, values) # Call receivers. for rec in self.receivers[action_name]: try: if iscoroutinefunction(rec): await rec(player, action, values) else: rec(player, action, values) except Exception as e: if self.throw_exceptions: raise else: logging.exception('Exception has been silenced in ManiaLink Action receiver:', exc_info=e)
def _set_uuid(self, uuid): if uuid is None: # UUID generation is expensive. Using FastUUID instead of the built # in UUID methods increases Messages that can be instantiated per # second from ~25,000 to ~185,000. Not generating UUIDs at all # increases the throughput further still to about 730,000 per # second. uuid = self._fast_uuid.uuid4() elif len(uuid) != 16: raise TypeError( "UUIDs should be exactly 16 bytes. Conforming UUID's can be " "generated with `import uuid; uuid.uuid4().bytes`." ) self._uuid = uuid
def uuid_hex(self): # TODO: DATAPIPE-848 return UUID(bytes=self.uuid).hex
def run(self): logger.info( "Starting to consume from {}".format(self.topic_to_offsets_map) ) with Consumer( # The tailer name should be unique - if it's not, partitions will # be split between multiple tailer instances 'data_pipeline_tailer-{}'.format( str(UUID(bytes=FastUUID().uuid4()).hex) ), 'bam', ExpectedFrequency.constantly, self.topic_to_offsets_map, auto_offset_reset=self.options.offset_reset_location, cluster_name=self.options.cluster_name ) as consumer: message_count = 0 while self.keep_running(message_count): message = consumer.get_message(blocking=True, timeout=0.1) if message is not None: if self.options.end_timestamp is None or message.timestamp < self.options.end_timestamp: print self._format_message(message) message_count += 1 else: self._running = False logger.info( "Latest message surpasses --end-timestamp. Stopping tailer..." )
def convert_uuidfield_value(self, value, expression, connection, context): # New in Django 1.8 if value is not None: value = uuid.UUID(value) return value
def generateGuid(): """Gets a random GUID. Note: python's UUID generation library is used here. Basically UUID is the same as GUID when represented as a string. :Returns: str, the generated random GUID. a=GenerateGuid() import uuid print a print uuid.UUID(a).hex """ return str(uuid4())
def arg_uuid(name): """Fetch and validate an argument as a UUID""" argval = request.args.get(name) if argval is None: return None # This will throw a ValueError if something's wrong. return str(uuid.UUID(argval))
def uuid_is_valid(test_uuid): """ Determine if a UUID is valid """ try: uuid_object = uuid.UUID(test_uuid) except ValueError: return False return True
def __new__(cls, obj): if not isinstance(obj, UUID): raise TypeError("obj must be an instance of uuid.UUID") self = Binary.__new__(cls, obj.bytes, OLD_UUID_SUBTYPE) self.__uuid = obj return self
def uuid(self): """UUID instance wrapped by this UUIDLegacy instance. """ return self.__uuid
def _get_binary(data, position, dummy0, opts, dummy1): """Decode a BSON binary to bson.binary.Binary or python UUID.""" length, subtype = _UNPACK_LENGTH_SUBTYPE(data[position:position + 5]) position += 5 if subtype == 2: length2 = _UNPACK_INT(data[position:position + 4])[0] position += 4 if length2 != length - 4: raise InvalidBSON("invalid binary (st 2) - lengths don't match!") length = length2 end = position + length if subtype in (3, 4): # Java Legacy uuid_representation = opts.uuid_representation if uuid_representation == JAVA_LEGACY: java = data[position:end] value = uuid.UUID(bytes=java[0:8][::-1] + java[8:16][::-1]) # C# legacy elif uuid_representation == CSHARP_LEGACY: value = uuid.UUID(bytes_le=data[position:end]) # Python else: value = uuid.UUID(bytes=data[position:end]) return value, end # Python3 special case. Decode subtype 0 to 'bytes'. if PY3 and subtype == 0: value = data[position:end] else: value = Binary(data[position:end], subtype) return value, end
def test_can_insert_udts_with_all_datatypes(self): """ Test for inserting all column types into a UserType test_can_insert_udts_with_all_datatypes tests that each cqlengine column type can be inserted into a UserType. It first creates a UserType that has each cqlengine column type, and a corresponding table/Model. It then creates a UserType instance where all the fields have corresponding data, and inserts the UserType as an instance of the Model. Finally, it verifies that each column read from the UserType from Cassandra is the same as the input parameters. @since 2.5.0 @jira_ticket PYTHON-251 @expected_result The UserType is inserted with each column type, and the resulting read yields proper data for each column. @test_category data_types:udt """ sync_table(AllDatatypesModel) self.addCleanup(drop_table, AllDatatypesModel) input = AllDatatypes(a='ascii', b=2 ** 63 - 1, c=bytearray(b'hello world'), d=True, e=datetime.utcfromtimestamp(872835240), f=Decimal('12.3E+7'), g=2.39, h=3.4028234663852886e+38, i='123.123.123.123', j=2147483647, k='text', l=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'), m=UUID('067e6162-3b6f-4ae2-a171-2470b63dff00'), n=int(str(2147483647) + '000')) AllDatatypesModel.create(id=0, data=input) self.assertEqual(1, AllDatatypesModel.objects.count()) output = AllDatatypesModel.objects.first().data for i in range(ord('a'), ord('a') + 14): self.assertEqual(input[chr(i)], output[chr(i)])
def test_nested_udts_inserts(self): """ Test for inserting collections of user types using cql engine. test_nested_udts_inserts Constructs a model that contains a list of usertypes. It will then attempt to insert them. The expectation is that no exception is thrown during insert. For sanity sake we also validate that our input and output values match. This combination of model, and UT produces a syntax error in 2.5.1 due to improper quoting around the names collection. @since 2.6.0 @jira_ticket PYTHON-311 @expected_result No syntax exception thrown @test_category data_types:udt """ class Name(UserType): type_name__ = "header" name = columns.Text() value = columns.Text() class Container(Model): id = columns.UUID(primary_key=True, default=uuid4) names = columns.List(columns.UserDefinedType(Name)) # Construct the objects and insert them names = [] for i in range(0, 10): names.append(Name(name="name{0}".format(i), value="value{0}".format(i))) # Create table, insert data sync_table(Container) self.addCleanup(drop_table, Container) Container.create(id=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'), names=names) # Validate input and output matches self.assertEqual(1, Container.objects.count()) names_output = Container.objects.first().names self.assertEqual(names_output, names)