Python uuid 模块,uuid3() 实例源码


项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_swift_hash():
    if os.path.isfile(SWIFT_HASH_FILE):
        with open(SWIFT_HASH_FILE, 'r') as hashfile:
            swift_hash =
    elif config('swift-hash'):
        swift_hash = config('swift-hash')
        with open(SWIFT_HASH_FILE, 'w') as hashfile:
        model_uuid = os.environ.get("JUJU_ENV_UUID",
        swift_hash = str(uuid.uuid3(uuid.UUID(model_uuid),
        with open(SWIFT_HASH_FILE, 'w') as hashfile:

    return swift_hash
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def cluster_config(self):
        Provide the default configuration for a cluster
        if self.cluster:
            cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster)
            if not os.path.isdir(cluster_dir):
                _create_dirs(cluster_dir, self.root_dir)
            filename = "{}/cluster.yml".format(cluster_dir)
            contents = {}
            contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, os.urandom(32)))

            public_networks_str = ", ".join([str(n) for n in self.public_networks])
            cluster_networks_str = ", ".join([str(n) for n in self.cluster_networks])

            contents['public_network'] = public_networks_str
            contents['cluster_network'] = cluster_networks_str
            contents['available_roles'] = self.available_roles

            self.writer.write(filename, contents)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_uuid3(self):
        equal = self.assertEqual

        # Test some known version-3 UUIDs.
        for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, ''),
                     (uuid.uuid3(uuid.NAMESPACE_URL, ''),
                     (uuid.uuid3(uuid.NAMESPACE_OID, ''),
                     (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'),
            equal(u.variant, uuid.RFC_4122)
            equal(u.version, 3)
            equal(u, uuid.UUID(v))
            equal(str(u), v)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_uuid3(self):
        equal = self.assertEqual

        # Test some known version-3 UUIDs.
        for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, ''),
                     (uuid.uuid3(uuid.NAMESPACE_URL, ''),
                     (uuid.uuid3(uuid.NAMESPACE_OID, ''),
                     (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'),
            equal(u.variant, uuid.RFC_4122)
            equal(u.version, 3)
            equal(u, uuid.UUID(v))
            equal(str(u), v)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_uuid3(self):
        equal = self.assertEqual

        # Test some known version-3 UUIDs.
        for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, ''),
                     (uuid.uuid3(uuid.NAMESPACE_URL, ''),
                     (uuid.uuid3(uuid.NAMESPACE_OID, ''),
                     (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'),
            equal(u.variant, uuid.RFC_4122)
            equal(u.version, 3)
            equal(u, uuid.UUID(v))
            equal(str(u), v)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_uuid3(self):
        equal = self.assertEqual

        # Test some known version-3 UUIDs.
        for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, ''),
                     (uuid.uuid3(uuid.NAMESPACE_URL, ''),
                     (uuid.uuid3(uuid.NAMESPACE_OID, ''),
                     (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'),
            equal(u.variant, uuid.RFC_4122)
            equal(u.version, 3)
            equal(u, uuid.UUID(v))
            equal(str(u), v)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_uuid3(self):
        equal = self.assertEqual

        # Test some known version-3 UUIDs.
        for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, ''),
                     (uuid.uuid3(uuid.NAMESPACE_URL, ''),
                     (uuid.uuid3(uuid.NAMESPACE_OID, ''),
                     (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'),
            equal(u.variant, uuid.RFC_4122)
            equal(u.version, 3)
            equal(u, uuid.UUID(v))
            equal(str(u), v)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_uuid3(self):
        equal = self.assertEqual

        # Test some known version-3 UUIDs.
        for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, ''),
                     (uuid.uuid3(uuid.NAMESPACE_URL, ''),
                     (uuid.uuid3(uuid.NAMESPACE_OID, ''),
                     (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'),
            equal(u.variant, uuid.RFC_4122)
            equal(u.version, 3)
            equal(u, uuid.UUID(v))
            equal(str(u), v)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_uuid3(self):
        equal = self.assertEqual

        # Test some known version-3 UUIDs.
        for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, ''),
                     (uuid.uuid3(uuid.NAMESPACE_URL, ''),
                     (uuid.uuid3(uuid.NAMESPACE_OID, ''),
                     (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'),
            equal(u.variant, uuid.RFC_4122)
            equal(u.version, 3)
            equal(u, uuid.UUID(v))
            equal(str(u), v)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_uuid3(self):
        equal = self.assertEqual

        # Test some known version-3 UUIDs.
        for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, ''),
                     (uuid.uuid3(uuid.NAMESPACE_URL, ''),
                     (uuid.uuid3(uuid.NAMESPACE_OID, ''),
                     (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'),
            equal(u.variant, uuid.RFC_4122)
            equal(u.version, 3)
            equal(u, uuid.UUID(v))
            equal(str(u), v)
项目:xiboside    作者:ajiwo    | 项目源码 | 文件源码
def __set_identity(self):
        node = None
        if sys.platform == 'win32':
            for getter in [uuid._netbios_getnode, uuid._ipconfig_getnode]:
                node = getter()
                if node:
            # Linux only, find mac address using ifconfig command. taken from uuid._ifconfig_getnode
            for args in ('eth0', 'wlan0', 'en0'):  # TODO: other possible network interface name
                node = uuid._find_mac('ifconfig', args, ['hwaddr', 'ether'], lambda i: i + 1)
                if node:

        if node is None:
            raise RuntimeError("No network interface found.")
        self.__mac_address = ':'.join([str('%012x' % node)[x:x + 2] for x in range(0, 12, 2)])
        url = 'xiboside://%s/%s/%s' % (sys.platform,, self.__mac_address)
        self.__keys['hardware'] = uuid.uuid3(uuid.NAMESPACE_URL, url)
项目:forum    作者:ElfenSterben    | 项目源码 | 文件源码
def upload_avatar():
    u = g.user
    if 'photo' in request.files:
        form = request.form
        filename = str(uuid3(NAMESPACE_DNS, str( + u.username + str(time.time()))).replace('-','')
            x = int(form['x'])
            y = int(form['y'])
            w = int(form['nw'])
            h = int(form['nh'])
            img =['photo'])
            format = img.format
            croped_img = crop_img(img, x, y, w, h)
            filename = save_avatar(croped_img, filename, format)
            url_path = current_app.config['UPLOADED_PHOTOS_URL']
            old_name = u.avatar.split(url_path)[1]
            u.avatar = url_path + filename
        except Exception as e:
            flash('????????2Mb?????', 'error')
    return redirect(url_for('user.setting_view'))
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def generateUuid(self, email_id, machine_name):
        """ return a uuid which uniquely identifies machinename and email id """
        uuidstr = None

        if machine_name not in self.d:
            myNamespace = uuid.uuid3(uuid.NAMESPACE_URL, machine_name)
            uuidstr = str(uuid.uuid3(myNamespace, email_id)) 

            self.d[machine_name] = (machine_name, uuidstr, email_id)
            self.d[uuidstr] = (machine_name, uuidstr ,email_id)
            (machine_name, uuidstr, email_id) = self.d[machine_name]

        return uuidstr
项目:python-mysql-pool    作者:LuciferJack    | 项目源码 | 文件源码
def _fabric_server_uuid(host, port):
    """Create a UUID using host and port"""
    return uuid.uuid3(uuid.NAMESPACE_URL, _fabric_xmlrpc_uri(host, port))
项目:djipsum    作者:agusmakmun    | 项目源码 | 文件源码
def randomUUIDField(self):
        Return the unique uuid from uuid1, uuid3, uuid4, or uuid5.
        uuid1 = uuid.uuid1().hex
        uuid3 = uuid.uuid3(
            self.randomize(['python', 'django', 'awesome'])
        uuid4 = uuid.uuid4().hex
        uuid5 = uuid.uuid5(
            self.randomize(['python', 'django', 'awesome'])
        return self.randomize([uuid1, uuid3, uuid4, uuid5])
项目:blacklib    作者:halfss    | 项目源码 | 文件源码
def generate_uuid(uuid_str):
    return str(uuid.uuid3(uuid.NAMESPACE_OID, (uuid_str.encode('utf8'))))
项目:johnson-county-ddj-public    作者:dssg    | 项目源码 | 文件源码
def generate_uuid(self, metadata):
        """ Generate a unique identifier given a dictionary of matrix metadata.

        :param metadata: metadata for the matrix
        :type metadata: dict
        :return: unique name for the file
        :rtype: str
        identifier = ''
        for key in sorted(metadata.keys()):
            identifier = '{0}_{1}'.format(identifier, str(metadata[key]))
        name_uuid = str(uuid.uuid3(uuid.NAMESPACE_DNS, identifier))
        return  name_uuid
项目:importacsv    作者:rasertux    | 项目源码 | 文件源码
def _fabric_server_uuid(host, port):
    """Create a UUID using host and port"""
    return uuid.uuid3(uuid.NAMESPACE_URL, _fabric_xmlrpc_uri(host, port))
项目:qiime2    作者:qiime2    | 项目源码 | 文件源码
def test_parse_uuid_invalid(self):
        # Invalid uuid4 taken from
        uuid_str = '89eb3586-8a82-47a4-c911-758a62601cf7'

        # Not a UUID.
        uuid_str = 'abc123'

        # Other UUID versions.
        for uuid_ in (uuid.uuid1(), uuid.uuid3(uuid.NAMESPACE_DNS, 'foo'),
                      uuid.uuid5(uuid.NAMESPACE_DNS, 'bar')):
            uuid_str = str(uuid_)
项目:valence    作者:openstack    | 项目源码 | 文件源码
def generate_uuid_by_element_id(element_id):
    return str(uuid.uuid3(uuid.NAMESPACE_DNS, element_id))
项目:ahenk    作者:Pardus-LiderAhenk    | 项目源码 | 文件源码
def generate_uuid(self, depend_mac=True):
        if depend_mac is False:
            self.logger.debug('uuid creating randomly')
            return uuid.uuid4()  # make a random UUID
            self.logger.debug('uuid creating according to mac address')
            return uuid.uuid3(uuid.NAMESPACE_DNS,
                              str(get_mac()))  # make a UUID using an MD5 hash of a namespace UUID and a mac address
项目:smhr    作者:andycasey    | 项目源码 | 文件源码
def hashed_id():
        salt = getstatusoutput("git config --get")[1]
        import uuid
        salt = uuid.uuid3(uuid.NAMESPACE_DNS, "")
    return sha(salt).hexdigest()
项目:python-agent    作者:sandabuliu    | 项目源码 | 文件源码
def save(self):
        buffers = self._buffers
        self._buffers = []
            filename = str(uuid.uuid3(uuid.NAMESPACE_DNS, "%s_%s" % (time.time(), randint(0, 100000))))
            filename = os.path.join(self._cache, filename)
            fp = open(filename, 'w')
            for event in buffers:
            return filename
        except Exception, e:
            return 'Save Failed, cause: %s' % e
项目:python-agent    作者:sandabuliu    | 项目源码 | 文件源码
def __init__(self, level):
            self.level = level
            self.trace_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, "%s_%s" % (time.time(), randint(0, 100000))))
项目:tripl    作者:metasoarous    | 项目源码 | 文件源码
def _translate_target(self, c, a, v):
        # Set some things up
        target = self.targets[a]
        def relative_path(p):
            p = str(p)
            if p[0:1] != '/':
                return os.path.relpath(p, c['_output_wrt'])
                return p
        # recurse for list like things
        if isinstance(v, list) or isinstance(v, SCons.Node.NodeList):
            return [self._translate_target(c, a, v_) for v_ in v]
        elif isinstance(v, dict):
            v = copy.deepcopy(v)
        elif isinstance(v, SCons.Node.FS.Entry) or isinstance(v, SCons.Node.FS.File):
            # Can add more metadata here as needed
            ident = uuid.uuid3(c[self.nest_levels[self.current_nest]['ident_attr']], str(v))
            self.file_idents[str(v)] = ident
            v = {'db:ident': ident,
                 'tripl.file:path': relative_path(v),
                 # This will be super cool...
                 'tripl.file:sources': [{'tripl.file:path': relative_path(p)} for p in v.sources]}
        # This is where the metadata function gets called if it is callable
        metadata = target['metadata'](c, v) if callable(target['metadata']) else (target['metadata'] or {})
        if isinstance(v, dict):
            # Here we merge in the metadata
        elif metadata:
            metadata[''] = v
            v = metadata
        # TODO namespace all keywords
        return v
项目:YATE    作者:GarethNelson    | 项目源码 | 文件源码
def from_offline_player(cls, display_name):
        class FakeNamespace():
            bytes = b'OfflinePlayer:'
        base_uuid = uuid.uuid3(FakeNamespace(), display_name)
        return UUID(bytes=base_uuid.bytes)
项目:pyblade    作者:younggege    | 项目源码 | 文件源码
def gennerate_uuid(rootname, lineno):
    name = rootname + str(lineno)
    return uuid.uuid3(uuid.NAMESPACE_DNS, name).hex
项目:pyDashboard    作者:jtsmith2    | 项目源码 | 文件源码
def __init__(self, email, password): = email
        self.password = password
        self.session = requests.Session()
        self.sessionToken = None = str(uuid.uuid3(uuid.NAMESPACE_DNS, 'rienafairefr.pynYNAB'))
        self.logger = logging.getLogger('pynYnab')
项目:sun-dance    作者:RInvestments    | 项目源码 | 文件源码
def get_uuid( cur_dict ):
    od = collections.OrderedDict(sorted(cur_dict.items()))

    json_data_obj =  json.dumps(od)
    digest = uuid.uuid3(uuid.NAMESPACE_DNS, json_data_obj)
    return str(digest)
项目:sun-dance    作者:RInvestments    | 项目源码 | 文件源码
def get_uuid( cur_dict ):
    od = collections.OrderedDict(sorted(cur_dict.items()))

    json_data_obj =  json.dumps(od)
    digest = uuid.uuid3(uuid.NAMESPACE_DNS, json_data_obj)
    return str(digest)
项目:blender_addons_collection    作者:manorius    | 项目源码 | 文件源码
def id_from_name(name):
    """Generate a UUID using a name as the namespace

    :type name: str
    :rtype: str

    return str(uuid.uuid3(uuid.NAMESPACE_DNS, name)).upper()
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def create_uuid(self):
        if not self.version or self.version == 4:
            return uuid.uuid4()
        elif self.version == 1:
            return uuid.uuid1(self.node, self.clock_seq)
        elif self.version == 2:
            raise UUIDVersionError("UUID version 2 is not supported.")
        elif self.version == 3:
            return uuid.uuid3(self.namespace,
        elif self.version == 5:
            return uuid.uuid5(self.namespace,
            raise UUIDVersionError("UUID version %s is not valid." % self.version)
项目:komlogd    作者:komlog-io    | 项目源码 | 文件源码
def test_create_timeuuid_with_uuid4_string_should_fail(self):
        ''' creating a TimeUUID with a hex uuid4 should fail'''
        for i in range(1,100):
            u = uuid.uuid4()
            with self.assertRaises(ValueError) as cm:
                t = timeuuid.TimeUUID(s=u.hex)
            self.assertEqual(str(cm.exception), 'Invalid UUID type')
        for fn in [uuid.uuid3, uuid.uuid5]:
            for i in range(1,100):
                u = fn(uuid.NAMESPACE_DNS,str(os.urandom(10)))
                with self.assertRaises(ValueError) as cm:
                    t = timeuuid.TimeUUID(s=u.hex)
                self.assertEqual(str(cm.exception), 'Invalid UUID type')
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def generate(authorization_id):
        Generate an access token based on an (unexpired) authorization id.
        auth = authorizationcode.Authorizationcode.get('id=%s', authorization_id)
        consumer_key = uuid.uuid3(uuid.NAMESPACE_DNS, base36encode( + '-' + base36encode(auth.app_id))
        consumer_secret = sha224("%s%s" % (str(uuid.uuid1()), time.time())).hexdigest()

        if auth.expires_at > datetime.utcnow():
            access_token = Accesstoken(user_id=auth.user_id, app_id=auth.app_id, consumer_key=str(consumer_key), consumer_secret=str(consumer_secret))
            return access_token
            return None
项目:udocker    作者:indigo-dc    | 项目源码 | 文件源码
def uuid(self, name):
        """Get an ID"""
        if not name:
            name = self.def_name
            return str(uuid.uuid3(uuid.uuid4(), str(name) + str(time.time())))
        except (NameError, AttributeError):
            return(("%s-%s-%s-%s-%s") %
                   (self._rnd(8), self._rnd(4), self._rnd(4),
                    self._rnd(4), self._rnd(12)))
项目:udocker    作者:indigo-dc    | 项目源码 | 文件源码
def filename(self, filename):
        """Get a filename"""
        prefix = self.def_name + "-" + str(os.getpid()) + "-"
            return(prefix +
                   str(uuid.uuid3(uuid.uuid4(), str(time.time()))) +
                   "-" + str(filename))
        except (NameError, AttributeError):
            return prefix + self.uuid(filename) + "-" + str(filename)
项目:monique    作者:monique-dashboards    | 项目源码 | 文件源码
def uuid_for_string(s):
    return uuid.uuid3(uuid.NAMESPACE_OID, s)
项目:jf    作者:rolycg    | 项目源码 | 文件源码
def get_mount_point(block):
    global collection
    global messages
    bus = dbus.SystemBus()
    obj = bus.get_object('org.freedesktop.UDisks2', block)
    iface = dbus.Interface(obj, 'org.freedesktop.DBus.Properties')  # Here we use this 'magic' interface
    dbus_mount_point = iface.Get('org.freedesktop.UDisks2.Filesystem', 'MountPoints')
    mount_point = ''
    while not len(dbus_mount_point):
        dbus_mount_point = iface.Get('org.freedesktop.UDisks2.Filesystem', 'MountPoints')
    dbus_id = iface.Get('org.freedesktop.UDisks2.Block', 'Id')
    dbus_name = iface.Get('org.freedesktop.UDisks2.Block', 'IdLabel')
    dbus_space = iface.Get('org.freedesktop.UDisks2.Block', 'Size')
    for letter in dbus_mount_point[0]:
        mount_point += chr(letter)
    if not dbus_name:
        dbus_name = mount_point[:-1].split(os.sep)
        dbus_name = dbus_name[len(dbus_name) - 1]
    if not dbus_id:
        dbus_id = uuid.uuid3(uuid.uuid4(), dbus_name)
    collection[block] = [str(mount_point[:-1]), str(dbus_id), str(dbus_name), None, None, dbus_space]
        'You have a new device connected (' + dbus_name + ', ' + extra_functions.convert_to_human_readable(
            dbus_space) + '). To have JF track it, execute:' + '\n' + '      jf ' + '-i ' + str(mount_point[:-1]))
    return dbus_id, block, dbus_name
项目:CerebralCortex-2.0-legacy    作者:MD2Korg    | 项目源码 | 文件源码
def attachment_marker(raw_stream_id: uuid, stream_name: str, owner_id: uuid, dd_stream_name, CC: CerebralCortex,
                      config: dict):
    Label sensor data as sensor-on-body, sensor-off-body, or improper-attachment.
    All the labeled data (st, et, label) with its metadata are then stored in a datastore

    # TODO: quality streams could be multiple so find the one computed with CC
    # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
    attachment_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))

    stream_days = get_stream_days(raw_stream_id, attachment_marker_stream_id, CC)

    for day in stream_days:
        # load stream data to be diagnosed
        raw_stream = CC.get_datastream(raw_stream_id, day, data_type=DataSet.COMPLETE)

        if len( > 0:
            windowed_data = window(, config['general']['window_size'], True)
            results = process_windows(windowed_data, config)
            merged_windows = merge_consective_windows(results)

            input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
            output_stream = {"id": attachment_marker_stream_id, "name": dd_stream_name,
                             "algo_type": config["algo_type"]["attachment_marker"]}
            metadata = get_metadata(dd_stream_name, input_streams, config)
            store(merged_windows, input_streams, output_stream, metadata, CC, config)
项目:CerebralCortex-2.0-legacy    作者:MD2Korg    | 项目源码 | 文件源码
def phone_screen_touch_marker(raw_stream_id: uuid, raw_stream_name: str, owner_id, dd_stream_name, CC: CerebralCortex,
                              config: dict, start_time=None, end_time=None):
    This is not part of core data diagnostic suite.
    It only calculates how many screen touches are there.
    :param raw_stream_id:
    :param CC:
    :param config:

        # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
        screen_touch_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(
            raw_stream_id + dd_stream_name + owner_id + "mobile phone screen touch marker"))

        stream_days = get_stream_days(raw_stream_id, screen_touch_stream_id, CC)

        for day in stream_days:
            stream = CC.get_datastream(raw_stream_id, data_type=DataSet.COMPLETE, day=day, start_time=start_time,
            if len( > 0:
                windowed_data = window(, config['general']['window_size'], True)
                results = process_windows(windowed_data)

                merged_windows = merge_consective_windows(results)
                if len(merged_windows) > 0:
                    input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": raw_stream_name}]
                    output_stream = {"id": screen_touch_stream_id, "name": dd_stream_name,
                                     "algo_type": config["algo_type"]["app_availability_marker"]}
                    metadata = get_metadata(dd_stream_name, input_streams, config)
                    store(merged_windows, input_streams, output_stream, metadata, CC, config)

    except Exception as e:
项目:CerebralCortex-2.0-legacy    作者:MD2Korg    | 项目源码 | 文件源码
def battery_marker(raw_stream_id: uuid, stream_name: str, owner_id, dd_stream_name, CC: CerebralCortex, config: dict,
                   start_time=None, end_time=None):
    This algorithm uses battery percentages to decide whether device was powered-off or battery was low.
    All the labeled data (st, et, label) with its metadata are then stored in a datastore.
    :param raw_stream_id:
    :param CC:
    :param config:

        # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
        battery_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))

        stream_days = get_stream_days(raw_stream_id, battery_marker_stream_id, CC)

        for day in stream_days:
            stream = CC.get_datastream(raw_stream_id, data_type=DataSet.COMPLETE, day=day)

            if len( > 0:
                windowed_data = window(, config['general']['window_size'], True)
                results = process_windows(windowed_data, stream_name, config)

                merged_windows = merge_consective_windows(results)
                if len(merged_windows) > 0:
                    input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
                    output_stream = {"id": battery_marker_stream_id, "name": dd_stream_name,
                                     "algo_type": config["algo_type"]["battery_marker"]}
                    labelled_windows = mark_windows(battery_marker_stream_id, merged_windows, CC, config)
                    metadata = get_metadata(dd_stream_name, input_streams, config)
                    store(labelled_windows, input_streams, output_stream, metadata, CC, config)
    except Exception as e:
项目:CerebralCortex-2.0-legacy    作者:MD2Korg    | 项目源码 | 文件源码
def sensor_availability(raw_stream_id: uuid, stream_name: str, owner_id: uuid, dd_stream_name,
                        phone_physical_activity, CC: CerebralCortex, config: dict):
    Mark missing data as wireless disconnection if a participate walks away from phone or sensor
    :param raw_stream_id:
    :param stream_name:
    :param owner_id:
    :param dd_stream_name:
    :param phone_physical_activity:
    :param CC:
    :param config:

    # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker
    wireless_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id))

    stream_days = get_stream_days(raw_stream_id, wireless_marker_stream_id, CC)

    for day in stream_days:
        # load stream data to be diagnosed
        raw_stream = CC.get_datastream(raw_stream_id, day, data_type=DataSet.COMPLETE)
        if len( > 0:

            windowed_data = window(, config['general']['window_size'], True)
            results = process_windows(windowed_data, day, CC, phone_physical_activity, config)
            merged_windows = merge_consective_windows(results)

            if len(merged_windows) > 0:
                input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}]
                output_stream = {"id": wireless_marker_stream_id, "name": dd_stream_name,
                                 "algo_type": config["algo_type"]["sensor_unavailable_marker"]}
                metadata = get_metadata(dd_stream_name, input_streams, config)
                store(merged_windows, input_streams, output_stream, metadata, CC, config)
项目:CerebralCortex-2.0-legacy    作者:MD2Korg    | 项目源码 | 文件源码
def analyze_quality(streams, owner_id, led_right_wrist_quality_stream_name, wrist, CC):
    led_stream_quality_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(led_right_wrist_quality_stream_name + owner_id+"LED quality computed on CerebralCortex"))
    if wrist=="right":
        if "LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST" in streams:
            led_wrist_stream_id = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST"][
            led_wrist_stream_name = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST"]["name"]
            led_wrist_stream_id = None
        if "LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST" in streams:
            led_wrist_stream_id = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST"][
            led_wrist_stream_name = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST"]["name"]
            led_wrist_stream_id = None

    if led_wrist_stream_id:
        stream_end_days = CC.get_stream_start_end_time(led_wrist_stream_id)
        if stream_end_days["start_time"] and stream_end_days["end_time"]:
            days = stream_end_days["end_time"] - stream_end_days["start_time"]
            for day in range(days.days + 1):
                day = (stream_end_days["start_time"]+timedelta(days=day)).strftime('%Y%m%d')
                stream = CC.get_datastream(led_wrist_stream_id, data_type=DataSet.COMPLETE, day=day)
                if len( > 0:
                    windowed_data = window(, 3, False)
                    led_quality_windows = data_quality_led(windowed_data)

                    input_streams = [{"owner_id": str(owner_id), "id": str(led_wrist_stream_id),
                                      "name": led_wrist_stream_name}]
                    output_stream = {"id": str(led_stream_quality_id), "name": led_right_wrist_quality_stream_name, "algo_type": ""}

                    store(led_quality_windows, input_streams, output_stream, CC)
项目:CerebralCortex-2.0-legacy    作者:MD2Korg    | 项目源码 | 文件源码
def migrate(folder_path: str, data_block_size):
    Migrate data from old CerebralCortex structure to new CerebralCortex structure
    :param folder_path:

    configuration_file = os.path.join(os.path.dirname(__file__), '../../cerebralcortex.yml')
    CC = CerebralCortex(configuration_file, master="local[*]", name="Data Migrator API", time_zone="US/Central", load_spark=True)

    if not folder_path:
        raise ValueError("Path to the data directory cannot be empty.")

    for filename in glob.iglob(folder_path + '/**/*.json', recursive=True):
        print(str( + " -- Started processing file " + filename)

        tmp = filename.split("/")
        tmp = tmp[len(tmp) - 1].split("+")
        owner_id = tmp[0]
        stream_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(tmp[0] + " " + tmp[1])))

        name = ''
        for i in tmp[3:]:
            name += i + " "

        name = name.strip().replace(".json", "")
        name = tmp[1] + " " + name

        pm_algo_name = tmp[2]

        data_filename = filename.replace(".json", ".csv.bz2")
        old_schema = read_file(filename)
        execution_context = get_execution_context(pm_algo_name, old_schema)
        data_descriptor = get_data_descriptor(old_schema)
        annotations = get_annotations()
        print(str( + " -- Schema building is complete ")
        print(str( + " -- Started unzipping file and adding records in Cassandra ")
        for data_block in bz2file_to_datapoints(data_filename, data_block_size):
            persist_data(execution_context, data_descriptor, annotations, stream_id, name, owner_id, data_block, CC)
        print(str( + " -- Completed processing file " + filename)
项目:videoplusplus    作者:ArthurChiao    | 项目源码 | 文件源码
def generate_token(username):
    """generate a short token based on given username"""
    username = str(username)
    token = str(uuid.uuid3(uuid.NAMESPACE_URL, username))[:6]
    return token
项目:pillar-prototype    作者:swiftgist    | 项目源码 | 文件源码
def cluster_config(self):
        Provide the default configuration for a cluster
        if self.cluster:
            cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster)
            if not os.path.isdir(cluster_dir):
                 create_dirs(cluster_dir, self.root_dir)
            filename = "{}/cluster.yml".format(cluster_dir)
            contents = {}
            contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, self.keyring_roles['admin']))
            contents['admin_method'] = "default"
            contents['configuration_method'] = "default"
            contents['mds_method'] = "default"
            contents['mon_method'] = "default"
            contents['osd_method'] = "default"
            contents['package_method'] = "default"
            contents['pool_method'] = "default"
            contents['repo_method'] = "default"
            contents['rgw_method'] = "default"
            contents['update_method'] = "default"

            contents['public_network'] = self.public_network
            contents['cluster_network'] = self.cluster_network

            self.writer.write(filename, contents)
项目:bluesteel    作者:imvu    | 项目源码 | 文件源码
def get_host_info():
    """ Returns an object with unique information about the host """
    obj = {}
    # uuid.getnode() can return a random number, we need to fix it
    obj['uuid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(uuid.getnode())))
    obj['host_name'] = socket.gethostname()
    obj['operative_system'] = '{0}-{1}'.format(
    return obj
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def _fabric_server_uuid(host, port):
    """Create a UUID using host and port"""
    return uuid.uuid3(uuid.NAMESPACE_URL, _fabric_xmlrpc_uri(host, port))
项目:ehForwarderBot    作者:blueset    | 项目源码 | 文件源码
def recognize(self, path, lang="zh-CN"):
        if isinstance(path, str):
            file = open(path, 'rb')
            return ["ERROR!", "File must by a path string."]
        if lang not in self.lang_list:
            return ["ERROR!", "Invalid language."]

        audio = pydub.AudioSegment.from_file(file)
        audio = audio.set_frame_rate(16000)
        audio.export("%s.wav" % path, format="wav")
        header = {
            "Authorization": "Bearer %s" % self.access_token,
            "Content-Type": "audio/wav; samplerate=16000"
        d = {
            "version": "3.0",
            "requestid": str(uuid.uuid1()),
            "appID": "D4D52672-91D7-4C74-8AD8-42B1D98141A5",
            "format": "json",
            "locale": lang,
            "device.os": "Telegram",
            "scenarios": "ulm",
            "instanceid": uuid.uuid3(uuid.NAMESPACE_DNS, 'com.1a23.eh_telegram_master'),
            "maxnbest": 5
        with open("%s.wav" % path, 'rb') as f:
            r ="", params=d,, headers=header)
        os.remove("%s.wav" % path)
            rjson = r.json()
            return ["ERROR!", r.text]
        if r.status_code == 200:
            return [i['name'] for i in rjson['results']]
            return ["ERROR!", r.text]
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def type_coerce(expr, type_):
    """Coerce the given expression into the given type,
    on the Python side only.

    :func:`.type_coerce` is roughly similar to :func:`.cast`, except no
    "CAST" expression is rendered - the given type is only applied towards
    expression typing and against received result values.


        from sqlalchemy.types import TypeDecorator
        import uuid

        class AsGuid(TypeDecorator):
            impl = String

            def process_bind_param(self, value, dialect):
                if value is not None:
                    return str(value)
                    return None

            def process_result_value(self, value, dialect):
                if value is not None:
                    return uuid.UUID(value)
                    return None

            select([type_coerce(mytable.c.ident, AsGuid)]).\\
                        type_coerce(mytable.c.ident, AsGuid) ==
                        uuid.uuid3(uuid.NAMESPACE_URL, 'bar')

    type_ = sqltypes.to_instance(type_)

    if hasattr(expr, '__clause_element__'):
        return type_coerce(expr.__clause_element__(), type_)
    elif isinstance(expr, BindParameter):
        bp = expr._clone()
        bp.type = type_
        return bp
    elif not isinstance(expr, Visitable):
        if expr is None:
            return null()
            return literal(expr, type_=type_)
        return Label(None, expr, type_=type_)