Python uuid 模块,UUID 实例源码


项目:ekko    作者:openstack    | 项目源码 | 文件源码
def print_header(o):
    formated_out = """Version:\t%(version)s
Creation Date:\t%(timestamp)s
Volume Size:\t%(volume_size)s
Segment Size:\t%(segment_size)s
Backup Set:\t%(backup_set)s"""

    dict_out = dict()
    dict_out['backup_set'] = UUID(bytes=o.metadata['bases'][-1])
    dict_out['version'] = o.metadata['version']
    dict_out['incremental'] = o.metadata['incremental']
    dict_out['timestamp'] = get_timestamp(o)
    dict_out['volume_size'] = convert_size(o.metadata['sectors'] * 512)
    dict_out['segment_size'] = convert_size(o.metadata['segment_size'])

    print(formated_out % dict_out)
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def put_data(self, data_segment):
        segment = data_segment[1]
        data = self.wrap_data(data_segment[0], segment)

        file_path = os.path.join(
        file_output = os.path.join(

        with open(file_output, 'wb') as f:

        return segment
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def api_is_task(url):
    """Determine if a URL looks like a valid task URL"""
    # Note that this generates an extra array element because of the
    # leading slash.
    url_parts = urlparse.urlparse(url).path.split('/')

    if len(url_parts) != 4 \
            or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]):
        return False

    except ValueError:
        return False

    return True
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def api_is_run(url):
    """Determine if a URL looks like a valid run URL"""
    # Note that this generates an extra array element because of the
    # leading slash.
    url_parts = urlparse.urlparse(url).path.split('/')
    if len(url_parts) != 6 \
            or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]) \
            or (url_parts[4] != 'runs'):
        return False

    except ValueError:
        return False

    return True
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def _encode_uuid(name, value, dummy, opts):
    """Encode uuid.UUID."""
    uuid_representation = opts.uuid_representation
    # Python Legacy Common Case
    if uuid_representation == OLD_UUID_SUBTYPE:
        return b"\x05" + name + b'\x10\x00\x00\x00\x03' + value.bytes
    # Java Legacy
    elif uuid_representation == JAVA_LEGACY:
        from_uuid = value.bytes
        data = from_uuid[0:8][::-1] + from_uuid[8:16][::-1]
        return b"\x05" + name + b'\x10\x00\x00\x00\x03' + data
    # C# legacy
    elif uuid_representation == CSHARP_LEGACY:
        # Microsoft GUID representation.
        return b"\x05" + name + b'\x10\x00\x00\x00\x03' + value.bytes_le
    # New
        return b"\x05" + name + b'\x10\x00\x00\x00\x04' + value.bytes
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def default(self, o):
        """Implement this method in a subclass such that it returns a
        serializable object for ``o``, or calls the base implementation (to
        raise a ``TypeError``).

        For example, to support arbitrary iterators, you could implement
        default like this::

            def default(self, o):
                    iterable = iter(o)
                except TypeError:
                    return list(iterable)
                return JSONEncoder.default(self, o)
        if isinstance(o, datetime):
            return http_date(o)
        if isinstance(o, uuid.UUID):
            return str(o)
        if hasattr(o, '__html__'):
            return text_type(o.__html__())
        return _json.JSONEncoder.default(self, o)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def loads(self, value):
        def object_hook(obj):
            if len(obj) != 1:
                return obj
            the_key, the_value = next(iteritems(obj))
            if the_key == ' t':
                return tuple(the_value)
            elif the_key == ' u':
                return uuid.UUID(the_value)
            elif the_key == ' b':
                return b64decode(the_value)
            elif the_key == ' m':
                return Markup(the_value)
            elif the_key == ' d':
                return parse_date(the_value)
            return obj
        return json.loads(value, object_hook=object_hook)
项目:flocker-driver    作者:CoprHD    | 项目源码 | 文件源码
def detach_volume(self, blockdevice_id):
        :param: volume id = blockdevice_id
        :raises: unknownvolume exception if not found
         dataset_id = UUID(blockdevice_id[6:])
        except ValueError:
         raise UnknownVolume(blockdevice_id)
        volumesdetails = self.coprhdcli.get_volume_details("flocker-{}".format(dataset_id))
        if not volumesdetails:
         raise UnknownVolume(blockdevice_id)
        if volumesdetails[volumesdetails.keys()[0]]['attached_to'] is not None:"coprhd detach_volume" + str(blockdevice_id)).write(_logger)
         dataset_id = UUID(blockdevice_id[6:])
  "Volume" + blockdevice_id + "not attached").write(_logger)
            raise UnattachedVolume(blockdevice_id)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def default(self, o):
        """Implement this method in a subclass such that it returns a
        serializable object for ``o``, or calls the base implementation (to
        raise a :exc:`TypeError`).

        For example, to support arbitrary iterators, you could implement
        default like this::

            def default(self, o):
                    iterable = iter(o)
                except TypeError:
                    return list(iterable)
                return JSONEncoder.default(self, o)
        if isinstance(o, date):
            return http_date(o.timetuple())
        if isinstance(o, uuid.UUID):
            return str(o)
        if hasattr(o, '__html__'):
            return text_type(o.__html__())
        return _json.JSONEncoder.default(self, o)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _tag(value):
    if isinstance(value, tuple):
        return {' t': [_tag(x) for x in value]}
    elif isinstance(value, uuid.UUID):
        return {' u': value.hex}
    elif isinstance(value, bytes):
        return {' b': b64encode(value).decode('ascii')}
    elif callable(getattr(value, '__html__', None)):
        return {' m': text_type(value.__html__())}
    elif isinstance(value, list):
        return [_tag(x) for x in value]
    elif isinstance(value, datetime):
        return {' d': http_date(value)}
    elif isinstance(value, dict):
        return dict((k, _tag(v)) for k, v in iteritems(value))
    elif isinstance(value, str):
            return text_type(value)
        except UnicodeError:
            from flask.debughelpers import UnexpectedUnicodeError
            raise UnexpectedUnicodeError(u'A byte string with '
                u'non-ASCII data was passed to the session system '
                u'which can only store unicode strings.  Consider '
                u'base64 encoding your string (String was %r)' % value)
    return value
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def validate(self, value):
        if value is None:
            return True
        if isinstance(value, UUID):
            return True

        if isinstance(value, six.string_types):
                return True
            except TypeError:
            except ValueError:

        return False
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def default(self, o,
                textual=(decimal.Decimal, uuid.UUID, DjangoPromise),
        if isinstance(o, dates):
            if not isinstance(o, datetime):
                o = datetime(o.year, o.month,, 0, 0, 0, 0)
            r = o.isoformat()
            if r.endswith("+00:00"):
                r = r[:-6] + "Z"
            return r
        elif isinstance(o, times):
            return o.isoformat()
        elif isinstance(o, textual):
            return text_type(o)
            return super(JsonEncoder, self).default(o)
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def __new__(cls, uuid, aik_path, vtpm, added=False):
        uuid: str
            The UUID for this VTPM Group
        aik_path: str
            The path to the .pem for this group's AIK
        if uuid in VTPMGroup._groups:
            return VTPMGroup._groups[uuid]

        obj = object.__new__(cls)
        obj.uuid = uuid
        obj.aik_path = aik_path
        obj.vtpm = vtpm

        if added:
            VTPMGroup._groups[uuid] = obj
        return obj
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def show_group(group_num):
    """ Returns info about group `group_num` using VTPM_ORD_GROUP_SHOW"""
    out = {'num': group_num, 'vtpms': []}
    body = vtpm_raw(0x1C2, struct.pack('>II', 0x02000107, group_num))
    (uuid, pk, cfg) = struct.unpack('16s 256s 16s', body)
    uuid = stringify_uuid(uuid)'Group [%d] UUID: %s', group_num, uuid)
    pk_hash = hashlib.sha1(pk).hexdigest()'  PK Hash:  %s', pk_hash)'  Cfg list: %s', cfg.encode('hex'))
    body = vtpm_cmd(VTPM_ORD_VTPM_LIST, struct.pack('>II', group_num, 0))
    ((num_vtpms,), body) = unpack('>I', body)
    if num_vtpms > 0:'  vTPMs:  ')
        vtpms = struct.unpack('16s' * num_vtpms, body)
        vtpms = [stringify_uuid(vtpm) for vtpm in vtpms]
        for i, vtpm in enumerate(vtpms):
  '    [%d]: %s', i, vtpm)
    out['uuid'] = uuid
    return out
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def get_group_info(num):
    """Returns UUID and path to the group AIK file for vtpm group `num`."""
    # Get info for group `num`
    ginfo = show_group(num)
    uuid = ginfo['uuid']
    aikname = '{0}_aik'.format(uuid)
    pubaik_path = '{0}.pub'.format(aikname)

    # Check that we have the group's AIK
    if not os.path.exists(pubaik_path):
        logger.error('Group %d AIK Path %r doesn\'t exist', num, pubaik_path)
        raise OSError()

    aikbase = '{0}'.format(aikname)
    aikpem = aikbase + '.pem'

    # Convert group AIK to PEM
    check_call('tpmconv -ik {0} -ok {1}'.format(pubaik_path, aikbase),
    return {'aikpem': aikpem, 'uuid': uuid}
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def add_vtpm_group(rsa_mod=None):
    """ Add new vtpm group"""
    if common.STUB_TPM:
        return (common.TEST_GROUP_UUID,common.TEST_HAIK,1,None)

    logger.debug('Adding group')

    if rsa_mod is None:
        rsa_mod = '\x00' * 256
    assert len(rsa_mod) == 256
    ca_digest = '\x00' * 20
    rsp = vtpm_cmd(VTPM_ORD_GROUP_NEW,  ca_digest + rsa_mod)

    (uuid, aik_pub, aik_priv_ca) = struct.unpack('16s256s256s', rsp)
    uuid = struct.unpack(uuid_fmt, uuid)
    uuid = '-'.join([part.encode('hex') for part in uuid])'Created group with UUID: %s', uuid)

    aikpem = tpmconv(aik_pub)

    # return the group
    group_num = get_group_num(uuid)

    return (uuid,aikpem,group_num,aik_priv_ca)
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def encodeSmallAttribute(self, attr):
        @since: 0.5
        obj = getattr(self, attr)

        if not obj:
            return obj

        if attr in ['timestamp', 'timeToLive']:
            return pyamf.util.get_timestamp(obj) * 1000.0
        elif attr in ['clientId', 'messageId']:
            if isinstance(obj, uuid.UUID):
                return None

        return obj
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def testUUIDARRAY(self):
        import uuid
        u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'),
        s = self.execute("SELECT %s AS foo", (u,))
        self.assertTrue(u == s)
        # array with a NULL element
        u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None]
        s = self.execute("SELECT %s AS foo", (u,))
        self.assertTrue(u == s)
        # must survive NULL cast to a uuid[]
        s = self.execute("SELECT NULL::uuid[] AS foo")
        self.assertTrue(s is None)
        # what about empty arrays?
        s = self.execute("SELECT '{}'::uuid[] AS foo")
        self.assertTrue(type(s) == list and len(s) == 0)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def default(self, o):
        """Implement this method in a subclass such that it returns a
        serializable object for ``o``, or calls the base implementation (to
        raise a ``TypeError``).

        For example, to support arbitrary iterators, you could implement
        default like this::

            def default(self, o):
                    iterable = iter(o)
                except TypeError:
                    return list(iterable)
                return JSONEncoder.default(self, o)
        if isinstance(o, datetime):
            return http_date(o)
        if isinstance(o, uuid.UUID):
            return str(o)
        if hasattr(o, '__html__'):
            return text_type(o.__html__())
        return _json.JSONEncoder.default(self, o)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def loads(self, value):
        def object_hook(obj):
            if len(obj) != 1:
                return obj
            the_key, the_value = next(iteritems(obj))
            if the_key == ' t':
                return tuple(the_value)
            elif the_key == ' u':
                return uuid.UUID(the_value)
            elif the_key == ' b':
                return b64decode(the_value)
            elif the_key == ' m':
                return Markup(the_value)
            elif the_key == ' d':
                return parse_date(the_value)
            return obj
        return json.loads(value, object_hook=object_hook)
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_send(header_timestamp_mock, pubsub_client_mock):
    messaging = queue_messaging.Messaging.create_from_dict({
        'TOPIC': 'test-topic',
    model = FancyEvent(
        string_field='Just testing!'
    header_timestamp_mock.return_value = datetime.datetime(
        2016, 12, 10, 11, 15, 45, 123456, tzinfo=datetime.timezone.utc)


    topic_mock = pubsub_client_mock.return_value.topic
    publish_mock = topic_mock.return_value.publish
            "uuid_field": "cd1d3a03-7b04-4a35-97f8-ee5f3eb04c8e",
            "string_field": "Just testing!"
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def test_if_encode_raises_exception_with_invalid_data_and_strict_schema():
    class StrictSchema(marshmallow.Schema):
        uuid_field = fields.UUID(required=True)

        class Meta:
            strict = True

    class Event(structures.Model):
        class Meta:
            schema = StrictSchema
            type_name = 'Event'

    data = Event(uuid_field='not an uuid')
    with pytest.raises(exceptions.EncodingError) as excinfo:
    assert str(excinfo.value) == (
        "({'uuid_field': ['Not a valid UUID.']}, '')")
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def generate_uuid(cls, return_hex=False, seed=None):
        Generate uuid

        :param return_hex: Return in hex format
        :param seed: Seed value to generate a consistent uuid
        if seed:
            m = hashlib.md5()
            new_uuid = uuid.UUID(m.hexdigest())
            new_uuid = uuid.uuid1()
        if return_hex:
            return new_uuid.hex
        return str(new_uuid)
项目:tcrudge    作者:CodeTeam    | 项目源码 | 文件源码
def json_serial(obj):
    JSON serializer for objects not serializable by default json code.

    :param obj: object to serialize
    :type obj: date, datetime or UUID

    :return: formatted and serialized object
    :rtype: str
    if isinstance(obj, datetime.datetime) or isinstance(obj,
        # Datetime serializer
        return obj.isoformat()
    elif isinstance(obj, uuid.UUID):
        return str(obj)
    raise TypeError("Type %s not serializable" % type(obj))
项目:properties    作者:aranzgeo    | 项目源码 | 文件源码
def test_uid(self):

        class UidModel(properties.HasProperties):
            uid = properties.Uuid('my uuid')

        model = UidModel()
        assert isinstance(model.uid, uuid.UUID)
        with self.assertRaises(AttributeError):
            model.uid = uuid.uuid4()
        assert model.validate()
        model._backend['uid'] = 'hi'
        with self.assertRaises(ValueError):

        json_uuid = uuid.uuid4()
        json_uuid_str = str(json_uuid)

        assert properties.Uuid.to_json(json_uuid) == json_uuid_str
        assert str(properties.Uuid.from_json(json_uuid_str)) == json_uuid_str

        assert properties.Uuid('').equal(uuid.UUID(int=0), uuid.UUID(int=0))
项目:gluon    作者:openstack    | 项目源码 | 文件源码
def bind_port(self, uuid, model, changes):
        """Called to bind port to VM.

        :param uuid: UUID of Port
        :param model: Model object
        :returns: dict of vif parameters (vif_type, vif_details)
        """"bind_port: %s" % uuid)

        port = model.ports.get(uuid, None)
        if not port:
            LOG.error("Cannot find port")
            return dict()

        retval = {'vif_type': 'ovs',
                  'vif_details': {'port_filter': False,
                                  'bridge_name': 'br-int'}}
        return retval
项目:gluon    作者:openstack    | 项目源码 | 文件源码
def modify_service(self, uuid, model, changes):
        """Called when attributes change on a bound port's service

        :param uuid: UUID of Service
        :param model: Model Object
        :param changes: dictionary of changed attributes
        :returns: None
        """"modify_service: %s" % uuid)"Creating or updating VPN instance")
        vpn_instance = model.vpn_instances.get(uuid)
        if vpn_instance:
            LOG.error("VPN instance %s not found" % uuid)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def default(self, o):
        """Implement this method in a subclass such that it returns a
        serializable object for ``o``, or calls the base implementation (to
        raise a :exc:`TypeError`).

        For example, to support arbitrary iterators, you could implement
        default like this::

            def default(self, o):
                    iterable = iter(o)
                except TypeError:
                    return list(iterable)
                return JSONEncoder.default(self, o)
        if isinstance(o, date):
            return http_date(o.timetuple())
        if isinstance(o, uuid.UUID):
            return str(o)
        if hasattr(o, '__html__'):
            return text_type(o.__html__())
        return _json.JSONEncoder.default(self, o)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _tag(value):
    if isinstance(value, tuple):
        return {' t': [_tag(x) for x in value]}
    elif isinstance(value, uuid.UUID):
        return {' u': value.hex}
    elif isinstance(value, bytes):
        return {' b': b64encode(value).decode('ascii')}
    elif callable(getattr(value, '__html__', None)):
        return {' m': text_type(value.__html__())}
    elif isinstance(value, list):
        return [_tag(x) for x in value]
    elif isinstance(value, datetime):
        return {' d': http_date(value)}
    elif isinstance(value, dict):
        return dict((k, _tag(v)) for k, v in iteritems(value))
    elif isinstance(value, str):
            return text_type(value)
        except UnicodeError:
            from flask.debughelpers import UnexpectedUnicodeError
            raise UnexpectedUnicodeError(u'A byte string with '
                u'non-ASCII data was passed to the session system '
                u'which can only store unicode strings.  Consider '
                u'base64 encoding your string (String was %r)' % value)
    return value
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def default(self, o):
        """Implement this method in a subclass such that it returns a
        serializable object for ``o``, or calls the base implementation (to
        raise a :exc:`TypeError`).

        For example, to support arbitrary iterators, you could implement
        default like this::

            def default(self, o):
                    iterable = iter(o)
                except TypeError:
                    return list(iterable)
                return JSONEncoder.default(self, o)
        if isinstance(o, date):
            return http_date(o.timetuple())
        if isinstance(o, uuid.UUID):
            return str(o)
        if hasattr(o, '__html__'):
            return text_type(o.__html__())
        return _json.JSONEncoder.default(self, o)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _tag(value):
    if isinstance(value, tuple):
        return {' t': [_tag(x) for x in value]}
    elif isinstance(value, uuid.UUID):
        return {' u': value.hex}
    elif isinstance(value, bytes):
        return {' b': b64encode(value).decode('ascii')}
    elif callable(getattr(value, '__html__', None)):
        return {' m': text_type(value.__html__())}
    elif isinstance(value, list):
        return [_tag(x) for x in value]
    elif isinstance(value, datetime):
        return {' d': http_date(value)}
    elif isinstance(value, dict):
        return dict((k, _tag(v)) for k, v in iteritems(value))
    elif isinstance(value, str):
            return text_type(value)
        except UnicodeError:
            from flask.debughelpers import UnexpectedUnicodeError
            raise UnexpectedUnicodeError(u'A byte string with '
                u'non-ASCII data was passed to the session system '
                u'which can only store unicode strings.  Consider '
                u'base64 encoding your string (String was %r)' % value)
    return value
项目:Brightside    作者:BrighterCommand    | 项目源码 | 文件源码
def create_message_header(self) -> Dict:

        def _add_correlation_id(brightstide_message_header: Dict, correlation_id: UUID) -> None:
            if correlation_id is not None:
                brightstide_message_header[message_correlation_id_header] = str(correlation_id)

        def _add_message_id(brightside_message_header: Dict, identity: UUID) -> None:
            if identity is None:
                raise MessagingException("Missing id on message, this is a required field")
            brightside_message_header[message_id_header] = str(identity)

        def _add_message_type(brightside_message_header: Dict, brightside_message_type: str) -> None:
            if brightside_message_type is None:
                raise MessagingException("Missing type on message, this is a required field")
            brightside_message_header[message_type_header] = brightside_message_type

        header = {}
        _add_correlation_id(header, self._message.header.correlation_id)

        return header
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def test_remove_vms(self):

        privileges = self.get_privileges()
        default_datastore = self.get_default_datastore()
        default_datastore_url = self.get_datastore_url(default_datastore)

        vms = [(self.vm1_uuid, self.vm1_name), (self.vm2_uuid, self.vm2_name)]
        error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
                                                          description='Some tenant',

        self.assertEqual(error_info, None)
        error_info = tenant1.remove_vms(self.auth_mgr.conn, vms)
        self.assertEqual(error_info, None)
        error_info, vms_row = auth.get_row_from_vms_table(self.auth_mgr.conn,
        self.assertEqual(error_info, None)
        self.assertEqual(vms_row, [])
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def test_set_name(self):
        vms = [(self.vm1_uuid, self.vm1_name)]

        privileges = self.get_privileges()
        default_datastore = self.get_default_datastore()
        default_datastore_url = self.get_datastore_url(default_datastore)
        error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
                                                          description='Some tenant',

        self.assertEqual(error_info, None)

        error_info = tenant1.set_name(self.auth_mgr.conn, self.tenant_name, self.tenant_2_name)
        self.assertEqual(error_info, None)
        error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn,
        self.assertEqual(error_info, None)
        expected_output = self.tenant_2_name
        actual_output = tenants_row[auth_data_const.COL_NAME]
        self.assertEqual(actual_output, expected_output)
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def test_set_description(self):
        vms = [(self.vm1_uuid, self.vm1_name)]
        privileges = self.get_privileges()
        error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
                                                          description='Some tenant',

        self.assertEqual(error_info, None)
        error_info = tenant1.set_description(self.auth_mgr.conn, 'new description')
        self.assertEqual(error_info, None)
        error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn,
        self.assertEqual(error_info, None)
        expected_output = 'new description'
        actual_output = tenants_row[auth_data_const.COL_DESCRIPTION]
        self.assertEqual(actual_output, expected_output)
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def test_set_default_datastore(self):
        vms = [(self.vm1_uuid, self.vm1_name)]
        privileges = self.get_privileges()
        default_datastore = self.get_default_datastore()
        default_datastore_url = self.get_datastore_url(default_datastore)
        error_info, tenant1 = self.auth_mgr.create_tenant(name=self.tenant_name,
                                                          description='Some tenant',

        self.assertEqual(error_info, None)

        default_datastore = 'new_default_ds'
        default_datastore_url = self.get_datastore_url(default_datastore)
        error_info = tenant1.set_default_datastore(self.auth_mgr.conn, default_datastore_url)
        self.assertEqual(error_info, None)
        # Check tenants table
        error_info, tenants_row = auth.get_row_from_tenants_table(self.auth_mgr.conn,
        self.assertEqual(error_info, None)
        expected_output = 'new_default_ds_url'
        actual_output = tenants_row[auth_data_const.COL_DEFAULT_DATASTORE_URL]
        self.assertEqual(actual_output, expected_output)
项目:facerecognition    作者:guoxiaolu    | 项目源码 | 文件源码
def get_mac_address():
    import uuid
    node = uuid.getnode()
    mac = uuid.UUID(int=node).hex[-12:]
    return mac
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def handle(self, player, action, values, **kwargs):
        if not action.startswith(

        if not self._is_global_shown and player.login not in self._is_player_shown.keys():
            # Ignore if id is unique (uuid4)
                uuid.UUID(, version=4)
                raise ManialinkMemoryLeakException(
                    'Old view instance (ml-id: {}) is not yet destroyed, but is receiving player callbacks!, '
                    'Make sure you are not removing old view instances with .destroy() and del variable! '
                    'Potential Memory Leak!! Should be fixed asap!'.format(

        action_name = action[len(]
        if action_name not in self.receivers:
            return await self.handle_catch_all(player, action_name, values)

        # Call receivers.
        for rec in self.receivers[action_name]:
                if iscoroutinefunction(rec):
                    await rec(player, action, values)
                    rec(player, action, values)
            except Exception as e:
                if self.throw_exceptions:
                    logging.exception('Exception has been silenced in ManiaLink Action receiver:', exc_info=e)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def _set_uuid(self, uuid):
        if uuid is None:
            # UUID generation is expensive.  Using FastUUID instead of the built
            # in UUID methods increases Messages that can be instantiated per
            # second from ~25,000 to ~185,000.  Not generating UUIDs at all
            # increases the throughput further still to about 730,000 per
            # second.
            uuid = self._fast_uuid.uuid4()
        elif len(uuid) != 16:
            raise TypeError(
                "UUIDs should be exactly 16 bytes.  Conforming UUID's can be "
                "generated with `import uuid; uuid.uuid4().bytes`."
        self._uuid = uuid
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def uuid_hex(self):
        # TODO: DATAPIPE-848
        return UUID(bytes=self.uuid).hex
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def run(self):
            "Starting to consume from {}".format(self.topic_to_offsets_map)

        with Consumer(
            # The tailer name should be unique - if it's not, partitions will
            # be split between multiple tailer instances
        ) as consumer:
            message_count = 0
            while self.keep_running(message_count):
                message = consumer.get_message(blocking=True, timeout=0.1)
                if message is not None:
                    if self.options.end_timestamp is None or message.timestamp < self.options.end_timestamp:
                        print self._format_message(message)
                        message_count += 1
                        self._running = False
                            "Latest message surpasses --end-timestamp. Stopping tailer..."
项目:Plamber    作者:OlegKlimenko    | 项目源码 | 文件源码
def convert_uuidfield_value(self, value, expression, connection, context):
        # New in Django 1.8
        if value is not None:
            value = uuid.UUID(value)
        return value
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def generateGuid():
    """Gets a random GUID.
    Note: python's UUID generation library is used here. 
    Basically UUID is the same as GUID when represented as a string.
        str, the generated random GUID.

    import uuid
    print a
    print uuid.UUID(a).hex

    return str(uuid4())
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def arg_uuid(name):
    """Fetch and validate an argument as a UUID"""
    argval = request.args.get(name)
    if argval is None:
        return None
    # This will throw a ValueError if something's wrong.
    return str(uuid.UUID(argval))
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def uuid_is_valid(test_uuid):
    Determine if a UUID is valid
        uuid_object = uuid.UUID(test_uuid)
    except ValueError:
        return False
    return True
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def __new__(cls, obj):
        if not isinstance(obj, UUID):
            raise TypeError("obj must be an instance of uuid.UUID")
        self = Binary.__new__(cls, obj.bytes, OLD_UUID_SUBTYPE)
        self.__uuid = obj
        return self
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def uuid(self):
        """UUID instance wrapped by this UUIDLegacy instance.
        return self.__uuid
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def _get_binary(data, position, dummy0, opts, dummy1):
    """Decode a BSON binary to bson.binary.Binary or python UUID."""
    length, subtype = _UNPACK_LENGTH_SUBTYPE(data[position:position + 5])
    position += 5
    if subtype == 2:
        length2 = _UNPACK_INT(data[position:position + 4])[0]
        position += 4
        if length2 != length - 4:
            raise InvalidBSON("invalid binary (st 2) - lengths don't match!")
        length = length2
    end = position + length
    if subtype in (3, 4):
        # Java Legacy
        uuid_representation = opts.uuid_representation
        if uuid_representation == JAVA_LEGACY:
            java = data[position:end]
            value = uuid.UUID(bytes=java[0:8][::-1] + java[8:16][::-1])
        # C# legacy
        elif uuid_representation == CSHARP_LEGACY:
            value = uuid.UUID(bytes_le=data[position:end])
        # Python
            value = uuid.UUID(bytes=data[position:end])
        return value, end
    # Python3 special case. Decode subtype 0 to 'bytes'.
    if PY3 and subtype == 0:
        value = data[position:end]
        value = Binary(data[position:end], subtype)
    return value, end
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_can_insert_udts_with_all_datatypes(self):
        Test for inserting all column types into a UserType

        test_can_insert_udts_with_all_datatypes tests that each cqlengine column type can be inserted into a UserType.
        It first creates a UserType that has each cqlengine column type, and a corresponding table/Model. It then creates
        a UserType instance where all the fields have corresponding data, and inserts the UserType as an instance of the Model.
        Finally, it verifies that each column read from the UserType from Cassandra is the same as the input parameters.

        @since 2.5.0
        @jira_ticket PYTHON-251
        @expected_result The UserType is inserted with each column type, and the resulting read yields proper data for each column.

        @test_category data_types:udt
        self.addCleanup(drop_table, AllDatatypesModel)

        input = AllDatatypes(a='ascii', b=2 ** 63 - 1, c=bytearray(b'hello world'), d=True,
                             e=datetime.utcfromtimestamp(872835240), f=Decimal('12.3E+7'), g=2.39,
                             h=3.4028234663852886e+38, i='', j=2147483647, k='text',
                             m=UUID('067e6162-3b6f-4ae2-a171-2470b63dff00'), n=int(str(2147483647) + '000'))
        AllDatatypesModel.create(id=0, data=input)

        self.assertEqual(1, AllDatatypesModel.objects.count())
        output = AllDatatypesModel.objects.first().data

        for i in range(ord('a'), ord('a') + 14):
            self.assertEqual(input[chr(i)], output[chr(i)])
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_nested_udts_inserts(self):
        Test for inserting collections of user types using cql engine.

        test_nested_udts_inserts Constructs a model that contains a list of usertypes. It will then attempt to insert
        them. The expectation is that no exception is thrown during insert. For sanity sake we also validate that our
        input and output values match. This combination of model, and UT produces a syntax error in 2.5.1 due to
        improper quoting around the names collection.

        @since 2.6.0
        @jira_ticket PYTHON-311
        @expected_result No syntax exception thrown

        @test_category data_types:udt

        class Name(UserType):
            type_name__ = "header"

            name = columns.Text()
            value = columns.Text()

        class Container(Model):
            id = columns.UUID(primary_key=True, default=uuid4)
            names = columns.List(columns.UserDefinedType(Name))

        # Construct the objects and insert them
        names = []
        for i in range(0, 10):
            names.append(Name(name="name{0}".format(i), value="value{0}".format(i)))

        # Create table, insert data
        self.addCleanup(drop_table, Container)

        Container.create(id=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'), names=names)

        # Validate input and output matches
        self.assertEqual(1, Container.objects.count())
        names_output = Container.objects.first().names
        self.assertEqual(names_output, names)