我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用uuid.uuid5()。
def insert(address, port, location='default', protocol='default'): uid = str(uuid.uuid5(uuid.NAMESPACE_DNS,address)) conn = sqlite3.connect(db) try: conn.execute("INSERT INTO PROXY VALUES (?, ?, ?, ?, ?)",[uid,address,port,location,protocol]) print('insert: '+str(address)) pass except sqlite3.IntegrityError as e: conn.execute("UPDATE PROXY SET ADDRESS = ?, PORT = ?, LOCATION = ?, PROTOCOL = ? WHERE UID=?",[address,port,location,protocol,uid]) print('update: '+str(address)) pass except sqlite3.ProgrammingError as e: #TODO pass finally: conn.commit() conn.close()
def uniq_id(self, delay=1): """ Returns a unique id based on the devices MAC address :param delay: Retry delay in sec :type delay: int :returns: string -- Unique secret """ mac_addr = self.__get_mac_address(delay=delay) if ':' in mac_addr and delay == 2: return uuid.uuid5(uuid.NAMESPACE_DNS, str(mac_addr)).bytes else: error_msg = '[%s] error: failed to get device id (%s)' self.utils.log(error_msg % (self.addon_id, str(mac_addr))) self.dialogs.show_storing_credentials_failed() return 'UnsafeStaticSecret'
def __init__(self): super(AlchemyBase, self).__init__() def fk_fixed_width(constraint, table): str_tokens = [table.name] +\ [element.parent.name for element in constraint.elements] +\ [element.target_fullname for element in constraint.elements] guid = uuid.uuid5(uuid.NAMESPACE_OID, "_".join(str_tokens).encode('ascii')) return str(guid) convention = { "fk_fixed_width": fk_fixed_width, "ix": 'ix_%(column_0_label)s', "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(column_0_name)s", "fk": "fk_%(fk_fixed_width)s", "pk": "pk_%(table_name)s" } metadata = MetaData(naming_convention=convention) self.Model = declarative_base(metadata=metadata, cls=Model, name='Model', metaclass=_BoundDeclarativeMeta) self.Model.query = _QueryProperty(self)
def test_uuid5(self): equal = self.assertEqual # Test some known version-5 UUIDs. for u, v in [(uuid.uuid5(uuid.NAMESPACE_DNS, 'python.org'), '886313e1-3b8a-5372-9b90-0c9aee199e5d'), (uuid.uuid5(uuid.NAMESPACE_URL, 'http://python.org/'), '4c565f0d-3f5a-5890-b41b-20cf47701c5e'), (uuid.uuid5(uuid.NAMESPACE_OID, '1.3.6.1'), '1447fa61-5277-5fef-a9b3-fbc6e44f4af3'), (uuid.uuid5(uuid.NAMESPACE_X500, 'c=ca'), 'cc957dd1-a972-5349-98cd-874190002798'), ]: equal(u.variant, uuid.RFC_4122) equal(u.version, 5) equal(u, uuid.UUID(v)) equal(str(u), v)
def __init__(self, player, name, port=8000, post_callback=None): #: The unique identifier string for this device. self.identifier = str(uuid.uuid5(uuid.NAMESPACE_DNS, name)) #: self.name = name self.command_id = 0 #: self.player = player #: self.port = port self.last_volume = 0 self.server = None self.server_thread = None self.poll_thread = None #: True if the remote is subscribed to the timeline. self.subscribed = False self._callback = post_callback self._test_connection()
def __init__(self, user=None, password=None, token=None): self.product = 'plexdevices-session' self.identifier = str(uuid.uuid5(uuid.NAMESPACE_DNS, self.product)) self.version = __version__ self.token = token self.user = user self.servers = [] """List of :obj:`Server <plexdevices.device.Server>` s accessible by the current user.""" self.players = [] """List of :obj:`Player <plexdevices.device.Player>` s accessible by the current user.""" self.users = [] """:obj:`list` of :obj:`User <plexdevices.users.User>` s that can be switched to.""" if user is not None and password is not None: self.login(password)
def ResourceUUID(value, creator): if isinstance(value, uuid.UUID): return value if '/' in value: raise ValueError("'/' is not supported in resource id") try: return uuid.UUID(value) except ValueError: if len(value) <= 255: if creator is None: creator = "\x00" # value/creator must be str (unicode) in Python 3 and str (bytes) # in Python 2. It's not logical, I know. if six.PY2: value = value.encode('utf-8') creator = creator.encode('utf-8') return uuid.uuid5(RESOURCE_ID_NAMESPACE, value + "\x00" + creator) raise ValueError( 'transformable resource id >255 max allowed characters')
def parse_sr_info(connection_data, description=''): params = {} if 'sr_uuid' not in connection_data: params = _parse_volume_info(connection_data) sr_identity = "%s/%s/%s" % (params['target'], params['port'], params['targetIQN']) # PY2 can only support taking an ascii string to uuid5 if six.PY2 and isinstance(sr_identity, unicode): sr_identity = sr_identity.encode('utf-8') sr_uuid = str(uuid.uuid5(SR_NAMESPACE, sr_identity)) else: sr_uuid = connection_data['sr_uuid'] for k in connection_data.get('introduce_sr_keys', {}): params[k] = connection_data[k] label = connection_data.pop('name_label', 'tempSR-%s' % sr_uuid) params['name_description'] = connection_data.get('name_description', description) return (sr_uuid, label, params)
def save(self, *args, **kwargs): """save""" try: int(self.gender) except ValueError: self.gender = 0 super(Contact, self).save(*args, **kwargs) if not self.uuid: ascii_name = unicodedata.normalize('NFKD', unicode(self.fullname)).encode("ascii", 'ignore') name = u'{0}-contact-{1}-{2}-{3}'.format(project_settings.SECRET_KEY, self.id, ascii_name, self.email) name = unicodedata.normalize('NFKD', unicode(name)).encode("ascii", 'ignore') self.uuid = unicode(uuid.uuid5(uuid.NAMESPACE_URL, name)) return super(Contact, self).save() if self.entity.is_single_contact: #force the entity name for ordering self.entity.save()
def create_additional_data(sender, instance=None, created=False, **kwargs): """ Creates '.models.TheUser' instance and auth_token for this instance after creating User instance. """ if created: the_user = TheUser.objects.create(id_user=instance, auth_token=uuid.uuid5(uuid.NAMESPACE_DNS, instance.username)) the_user.save() # ----------------------------------------------------------------------------------------------------------------------
def generate_service_config_filename(version): return str(uuid.uuid5(uuid.NAMESPACE_DNS, str(version))) # parse xff_trusted_proxy_list
def get_client_id(request): parts = [request.META.get(key, '') for key in FINGERPRINT_PARTS] return uuid.uuid5(UUID_NAMESPACE, '_'.join(parts))
def randomUUIDField(self): """ Return the unique uuid from uuid1, uuid3, uuid4, or uuid5. """ uuid1 = uuid.uuid1().hex uuid3 = uuid.uuid3( uuid.NAMESPACE_URL, self.randomize(['python', 'django', 'awesome']) ).hex uuid4 = uuid.uuid4().hex uuid5 = uuid.uuid5( uuid.NAMESPACE_DNS, self.randomize(['python', 'django', 'awesome']) ).hex return self.randomize([uuid1, uuid3, uuid4, uuid5])
def create_uid(self, handle=None): if handle: uid = uuid.uuid5(UUID, handle) else: uid = uuid.uuid4() return uid.hex.upper()
def parse_crontab(line): args = line.split(' ') job_id = uuid.uuid5(uuid.NAMESPACE_DNS, line).hex cron, job = args[:5], Job(args[5:]) db.status.push(job_id, ' '.join(cron)) schedule = {} parse_schedule(cron[0], 'minute', [0, 60], schedule) parse_schedule(cron[1], 'hour', [0, 24], schedule) parse_schedule(cron[2], 'day', [1, 32], schedule) parse_schedule(cron[3], 'month', [1, 13], schedule) parse_schedule(cron[4], 'weekday', [0, 7], schedule) db.cron.push(job_id, job.path, job.args, job.method, schedule)
def _project_uuid(self, target): path = str(self._project_file_path(target)) id = uuid.uuid5(uuid.UUID('7a64fe6e-cba3-5019-90fa-640c295a343e'), path) return id
def getUUIDfromString(string): return str(uuid.uuid5(uuid.NAMESPACE_URL, string))
def to_uuid(string): return str(uuid.uuid5(UUID_NAMESPACE_ANSIBLE, str(string)))
def test_parse_uuid_invalid(self): # Invalid uuid4 taken from https://gist.github.com/ShawnMilo/7777304 uuid_str = '89eb3586-8a82-47a4-c911-758a62601cf7' self.assertFalse(_ZipArchive._is_uuid4(uuid_str)) # Not a UUID. uuid_str = 'abc123' self.assertFalse(_ZipArchive._is_uuid4(uuid_str)) # Other UUID versions. for uuid_ in (uuid.uuid1(), uuid.uuid3(uuid.NAMESPACE_DNS, 'foo'), uuid.uuid5(uuid.NAMESPACE_DNS, 'bar')): uuid_str = str(uuid_) self.assertFalse(_ZipArchive._is_uuid4(uuid_str))
def uuid(self, playlist): acc = [] for a in playlist.asset_set.order_by('asset_order'): acc.append((a.url, a.duration, a.active, a.kind)) return uuid5(NAMESPACE_URL, str(acc))
def generate_id(*args) -> str: return str(uuid5(NAMESPACE_OID, ''.join(args))) # dead code
def from_configparser(cfg): if not cfg.has_section('janus:global'): err = 'Config File does not have a janus:global section' raise Exception(err) if not cfg.has_option('janus:global', 'enabled_authorities'): err = 'enabled_authorities not defined in janus:global section' raise Exception(err) authorities = {} authorities_str = cfg.get('janus:global', 'enabled_authorities') authority_names = authorities_str.split(',') for authority_name in authority_names: authority_section = "janus:ca_{}".format(authority_name) if not cfg.has_section(authority_section): err = "Authority section for {} not found".format(authority_section) print(err) continue if cfg.has_option(authority_section, 'uuid'): authority_id = str(uuid.UUID(cfg.get(authority_section, 'uuid'))) else: authority_id = str(uuid.uuid5(AUTHORITY_NAMESPACE, authority_name)) authority = SSHCertAuthority.from_configparser(cfg, authority_section) authorities[authority_id] = authority return SSHCertAuthorityManager(authorities=authorities) # vim: ts=4 expandtab
def get_image_unique_id(self, image_href): """Get unique ID of the resource. If possible, the ID should change if resource contents are changed. :param image_href: Image reference. :returns: Unique ID of the resource. """ # NOTE(vdrok): Doing conversion of href in case it's unicode # string, UUID cannot be generated for unicode strings on python 2. return str(uuid.uuid5(uuid.NAMESPACE_URL, image_href.encode('utf-8')))
def process_item(self, item, spider): if 'id' not in item: if 'link' in item: item['id'] = uuid.uuid5(uuid.NAMESPACE_DNS, item['link']).urn else: raise DropItem('A link is required to autogenerate the feed ' 'id for: {}'.format(item)) if 'title' not in item: # Having a title is mandatory, so we use an empty string if none # is set. item['title'] = '' return item
def to_uuid(string): """ Mirroring the jinja filter implemented in ansible Input a string. Returns the uuid ansible would generate as a string. """ return str(uuid.uuid5(uuid.UUID('361E6D51-FAEC-444A-9079-341386DA8E2E'), string.encode('utf-8')))
def uuid(self): """Generate UUID for the user basee on the the primary email.""" return uuid.uuid5(uuid.NAMESPACE_URL, "mailto:" + (self.email or self.eppn))
def setUp(self): """Setup test.""" from uuid import uuid5, NAMESPACE_DNS class UUIDModel(db.Document): uuid = db.UUIDField() self.model_cls = UUIDModel uuid = uuid5(NAMESPACE_DNS, "This is a test") self.expected_data = { "uuid": uuid } self.data = json.dumps({"uuid": str(uuid)}) self.hook = generate_object_hook(UUIDModel)
def setUp(self): """Setup class.""" from uuid import uuid5, NAMESPACE_DNS self.encoder = GoodJSONEncoder() self.data = uuid5(NAMESPACE_DNS, "This is a test") self.expected = str(self.data)
def __init__(self, name, uuid=None): if uuid is None: self.uuid = uuid5(NAMESPACE_DNS, name).hex else: self.uuid = uuid self.name = name
def get_deterministic_id(external_host, sid): """ Get deterministic FTS job id. :param external_host: FTS server as a string. :param sid: FTS seed id. :returns: FTS transfer identifier. """ baseid, voname = get_transfer_baseid_voname(external_host) if baseid is None or voname is None: return None root = uuid.UUID(baseid) atlas = uuid.uuid5(root, voname) jobid = uuid.uuid5(atlas, sid) return str(jobid)
def create_uuid(self): if not self.version or self.version == 4: return uuid.uuid4() elif self.version == 1: return uuid.uuid1(self.node, self.clock_seq) elif self.version == 2: raise UUIDVersionError("UUID version 2 is not supported.") elif self.version == 3: return uuid.uuid3(self.namespace, self.name) elif self.version == 5: return uuid.uuid5(self.namespace, self.name) else: raise UUIDVersionError("UUID version %s is not valid." % self.version)
def test_create_timeuuid_with_uuid4_string_should_fail(self): ''' creating a TimeUUID with a hex uuid4 should fail''' for i in range(1,100): u = uuid.uuid4() with self.assertRaises(ValueError) as cm: t = timeuuid.TimeUUID(s=u.hex) self.assertEqual(str(cm.exception), 'Invalid UUID type') for fn in [uuid.uuid3, uuid.uuid5]: for i in range(1,100): u = fn(uuid.NAMESPACE_DNS,str(os.urandom(10))) with self.assertRaises(ValueError) as cm: t = timeuuid.TimeUUID(s=u.hex) self.assertEqual(str(cm.exception), 'Invalid UUID type')
def generate_container_id(container_name): # container id is a uuid in the namespace of the machine if not config["container_mode"]: return str(uuid.uuid5(uuid.UUID(generate_machine_id()), container_name.encode('utf8'))) else: return container_name.encode('utf8')
def random_id(stream_arn, kinesis_shard_id): namespace = uuid.UUID(bytes=hashlib.sha1(stream_arn.encode('utf-8')).digest()[:16]) return uuid.uuid5(namespace, kinesis_shard_id.encode('utf-8')).hex
def build_id(*names): x = NAMESPACE for name in names: x = uuid.uuid5(x, str(name)) return str(x)
def set_ready(cls, environment, version, resource_uuid, resource_id, status): """ Mark a resource as deployed in the configuration model status """ entry_uuid = uuid.uuid5(resource_uuid, resource_id) resource_key = "status.%s" % entry_uuid yield cls._coll.update_one({"environment": environment, "version": version}, {"$set": {resource_key: {"status": cls._value_to_dict(status), "id": resource_id}}})
def update_resource(cls, dryrun_id, resource_id, dryrun_data): """ Register a resource update with a specific query that sets the dryrun_data and decrements the todo counter, only if the resource has not been saved yet. """ entry_uuid = uuid.uuid5(dryrun_id, resource_id) resource_key = "resources.%s" % entry_uuid query = {"_id": dryrun_id, resource_key: {"$exists": False}} update = {"$inc": {"todo": int(-1)}, "$set": {resource_key: cls._value_to_dict(dryrun_data)}} yield cls._coll.update_one(query, update)
def make_token(user): date = localized_date() # Automatically expires different after an hour return str(uuid5(NAMESPACE_X500, "-".join([str(user.id), user.username, date.strftime('%Y%m%d%h')])))
def _id(self): _id = str(uuid.uuid5(NAMESPACE_UUID, self.__hash__())) return _id
def save(self, *args, **kwargs): """save""" if self.status and self.status.is_final: self.done = True elif self.type and self.type.allowed_status.filter(is_final=True).exists(): self.done = False if not self.done_date and self.done: self.done_date = now_rounded() elif self.done_date and not self.done: self.done_date = None #generate number automatically based on action type if self.number == 0 and self.type and self.type.number_auto_generated: self.number = self.type.last_number = self.type.last_number + 1 self.type.save() ret = super(Action, self).save(*args, **kwargs) if self.type: if self.type.generate_uuid and not self.uuid: name = u'{0}-action-{1}-{2}'.format( project_settings.SECRET_KEY, self.id, self.type.id if self.type else 0 ) name = unicodedata.normalize('NFKD', unicode(name)).encode("ascii", 'ignore') self.uuid = unicode(uuid.uuid5(uuid.NAMESPACE_URL, name)) super(Action, self).save() if not self.type.generate_uuid and self.uuid: self.uuid = '' super(Action, self).save() return ret
def save(self, *args, **kwargs): """save""" super(MagicLink, self).save(*args, **kwargs) if not self.uuid: name = u'{0}-magic-link-{1}-{2}'.format(settings.SECRET_KEY, self.id, self.url) name = unicodedata.normalize('NFKD', unicode(name)).encode("ascii", 'ignore') self.uuid = uuid.uuid5(uuid.NAMESPACE_URL, name) return super(MagicLink, self).save()
def generate_analysis_target_id(analysis_target, name): # this function generates 'machine-id's for analysis target's that # might not be hosts. # # 'machine_id' is what Insights uses to uniquely identify # the thing-to-be-analysed. Primarily it determines when two uploads # are for the 'same thing', and so the latest upload should update the # later one Up till now that has only been hosts (machines), and so a # random uuid (uuid4) machine-id was generated for the host as its machine-id, # and written to a file on the host, and reused for all insights # uploads for that host. # # For docker images and containers, it will be difficult to impossible # to save their machine id's anywhere. Also, while containers change # over time just like hosts, images don't change over time, though they # can be rebuilt. So for images we want the 'machine-id' for an 'image' # to follow the rebuilt image, not change every time the image is rebuilt. # Typically when an image is rebuilt, the rebuilt image will have the same # name as its predicessor, but a different version (tag). # # So for images and containers, instead of random uuids, we use namespace uuids # (uuid5's). This generates a new uuid based on a combination of another # uuid, and a name (a character string). This will always generate the # same uuid for the same given base uuid and name. This saves us from # having to save the image's uuid anywhere, and lets us control the created uuid # by controlling the name used to generate it. Keep the name and base uuid) the # same, we get the same uuid. # # For the base uuid we use the uuid of the host we are running on. # For containers this is the obvious choice, for images it is less obviously # what base uuid is correct. For now we will just go with the host's uuid also. # # For the name, we leave that outside this function, but in general it should # be the name of the container or the name of the image, and if you want to # replace the results on the insights server, you have to use the same name if analysis_target == "host": return generate_machine_id() elif (analysis_target == "docker_image" or analysis_target == "docker_container" or analysis_target == "compressed_file" or analysis_target == "mountpoint"): return generate_container_id(name) else: raise ValueError("Unknown analysis target: %s" % analysis_target)