我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用uuid.uuid3()。
def get_swift_hash(): if os.path.isfile(SWIFT_HASH_FILE): with open(SWIFT_HASH_FILE, 'r') as hashfile: swift_hash = hashfile.read().strip() elif config('swift-hash'): swift_hash = config('swift-hash') with open(SWIFT_HASH_FILE, 'w') as hashfile: hashfile.write(swift_hash) else: model_uuid = os.environ.get("JUJU_ENV_UUID", os.environ.get("JUJU_MODEL_UUID")) swift_hash = str(uuid.uuid3(uuid.UUID(model_uuid), service_name())) with open(SWIFT_HASH_FILE, 'w') as hashfile: hashfile.write(swift_hash) return swift_hash
def cluster_config(self): """ Provide the default configuration for a cluster """ if self.cluster: cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster) if not os.path.isdir(cluster_dir): _create_dirs(cluster_dir, self.root_dir) filename = "{}/cluster.yml".format(cluster_dir) contents = {} contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, os.urandom(32))) public_networks_str = ", ".join([str(n) for n in self.public_networks]) cluster_networks_str = ", ".join([str(n) for n in self.cluster_networks]) contents['public_network'] = public_networks_str contents['cluster_network'] = cluster_networks_str contents['available_roles'] = self.available_roles self.writer.write(filename, contents)
def test_uuid3(self): equal = self.assertEqual # Test some known version-3 UUIDs. for u, v in [(uuid.uuid3(uuid.NAMESPACE_DNS, 'python.org'), '6fa459ea-ee8a-3ca4-894e-db77e160355e'), (uuid.uuid3(uuid.NAMESPACE_URL, 'http://python.org/'), '9fe8e8c4-aaa8-32a9-a55c-4535a88b748d'), (uuid.uuid3(uuid.NAMESPACE_OID, '1.3.6.1'), 'dd1a1cef-13d5-368a-ad82-eca71acd4cd1'), (uuid.uuid3(uuid.NAMESPACE_X500, 'c=ca'), '658d3002-db6b-3040-a1d1-8ddd7d189a4d'), ]: equal(u.variant, uuid.RFC_4122) equal(u.version, 3) equal(u, uuid.UUID(v)) equal(str(u), v)
def __set_identity(self): node = None if sys.platform == 'win32': for getter in [uuid._netbios_getnode, uuid._ipconfig_getnode]: node = getter() if node: break else: # Linux only, find mac address using ifconfig command. taken from uuid._ifconfig_getnode for args in ('eth0', 'wlan0', 'en0'): # TODO: other possible network interface name node = uuid._find_mac('ifconfig', args, ['hwaddr', 'ether'], lambda i: i + 1) if node: break if node is None: raise RuntimeError("No network interface found.") self.__mac_address = ':'.join([str('%012x' % node)[x:x + 2] for x in range(0, 12, 2)]) url = 'xiboside://%s/%s/%s' % (sys.platform, os.name, self.__mac_address) self.__keys['hardware'] = uuid.uuid3(uuid.NAMESPACE_URL, url)
def upload_avatar(): u = g.user if 'photo' in request.files: form = request.form filename = str(uuid3(NAMESPACE_DNS, str(u.id) + u.username + str(time.time()))).replace('-','') try: x = int(form['x']) y = int(form['y']) w = int(form['nw']) h = int(form['nh']) img = Image.open(request.files['photo']) format = img.format croped_img = crop_img(img, x, y, w, h) filename = save_avatar(croped_img, filename, format) url_path = current_app.config['UPLOADED_PHOTOS_URL'] old_name = u.avatar.split(url_path)[1] remove_avatar(old_name) u.avatar = url_path + filename u.save() except Exception as e: print(e) flash('????????2Mb?????', 'error') return redirect(url_for('user.setting_view'))
def generateUuid(self, email_id, machine_name): """ return a uuid which uniquely identifies machinename and email id """ uuidstr = None if machine_name not in self.d: myNamespace = uuid.uuid3(uuid.NAMESPACE_URL, machine_name) uuidstr = str(uuid.uuid3(myNamespace, email_id)) self.d[machine_name] = (machine_name, uuidstr, email_id) self.d[uuidstr] = (machine_name, uuidstr ,email_id) else: (machine_name, uuidstr, email_id) = self.d[machine_name] return uuidstr
def _fabric_server_uuid(host, port): """Create a UUID using host and port""" return uuid.uuid3(uuid.NAMESPACE_URL, _fabric_xmlrpc_uri(host, port))
def randomUUIDField(self): """ Return the unique uuid from uuid1, uuid3, uuid4, or uuid5. """ uuid1 = uuid.uuid1().hex uuid3 = uuid.uuid3( uuid.NAMESPACE_URL, self.randomize(['python', 'django', 'awesome']) ).hex uuid4 = uuid.uuid4().hex uuid5 = uuid.uuid5( uuid.NAMESPACE_DNS, self.randomize(['python', 'django', 'awesome']) ).hex return self.randomize([uuid1, uuid3, uuid4, uuid5])
def generate_uuid(uuid_str): return str(uuid.uuid3(uuid.NAMESPACE_OID, (uuid_str.encode('utf8'))))
def generate_uuid(self, metadata): """ Generate a unique identifier given a dictionary of matrix metadata. :param metadata: metadata for the matrix :type metadata: dict :return: unique name for the file :rtype: str """ identifier = '' for key in sorted(metadata.keys()): identifier = '{0}_{1}'.format(identifier, str(metadata[key])) name_uuid = str(uuid.uuid3(uuid.NAMESPACE_DNS, identifier)) return name_uuid
def test_parse_uuid_invalid(self): # Invalid uuid4 taken from https://gist.github.com/ShawnMilo/7777304 uuid_str = '89eb3586-8a82-47a4-c911-758a62601cf7' self.assertFalse(_ZipArchive._is_uuid4(uuid_str)) # Not a UUID. uuid_str = 'abc123' self.assertFalse(_ZipArchive._is_uuid4(uuid_str)) # Other UUID versions. for uuid_ in (uuid.uuid1(), uuid.uuid3(uuid.NAMESPACE_DNS, 'foo'), uuid.uuid5(uuid.NAMESPACE_DNS, 'bar')): uuid_str = str(uuid_) self.assertFalse(_ZipArchive._is_uuid4(uuid_str))
def generate_uuid_by_element_id(element_id): return str(uuid.uuid3(uuid.NAMESPACE_DNS, element_id))
def generate_uuid(self, depend_mac=True): if depend_mac is False: self.logger.debug('uuid creating randomly') return uuid.uuid4() # make a random UUID else: self.logger.debug('uuid creating according to mac address') return uuid.uuid3(uuid.NAMESPACE_DNS, str(get_mac())) # make a UUID using an MD5 hash of a namespace UUID and a mac address
def hashed_id(): try: salt = getstatusoutput("git config --get user.name")[1] except: import uuid salt = uuid.uuid3(uuid.NAMESPACE_DNS, "") return sha(salt).hexdigest()
def save(self): buffers = self._buffers self._buffers = [] try: filename = str(uuid.uuid3(uuid.NAMESPACE_DNS, "%s_%s" % (time.time(), randint(0, 100000)))) filename = os.path.join(self._cache, filename) fp = open(filename, 'w') for event in buffers: fp.write(str(event)) return filename except Exception, e: return 'Save Failed, cause: %s' % e
def __init__(self, level): self.level = level self.trace_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, "%s_%s" % (time.time(), randint(0, 100000))))
def _translate_target(self, c, a, v): # Set some things up target = self.targets[a] def relative_path(p): p = str(p) if p[0:1] != '/': return os.path.relpath(p, c['_output_wrt']) else: return p # recurse for list like things if isinstance(v, list) or isinstance(v, SCons.Node.NodeList): return [self._translate_target(c, a, v_) for v_ in v] elif isinstance(v, dict): v = copy.deepcopy(v) elif isinstance(v, SCons.Node.FS.Entry) or isinstance(v, SCons.Node.FS.File): # Can add more metadata here as needed ident = uuid.uuid3(c[self.nest_levels[self.current_nest]['ident_attr']], str(v)) self.file_idents[str(v)] = ident v = {'db:ident': ident, 'tripl.file:path': relative_path(v), # This will be super cool... 'tripl.file:sources': [{'tripl.file:path': relative_path(p)} for p in v.sources]} # This is where the metadata function gets called if it is callable metadata = target['metadata'](c, v) if callable(target['metadata']) else (target['metadata'] or {}) if isinstance(v, dict): # Here we merge in the metadata v.update(metadata) elif metadata: metadata['tripl.nestly.target:value'] = v v = metadata # TODO namespace all keywords return v
def from_offline_player(cls, display_name): class FakeNamespace(): bytes = b'OfflinePlayer:' base_uuid = uuid.uuid3(FakeNamespace(), display_name) return UUID(bytes=base_uuid.bytes)
def gennerate_uuid(rootname, lineno): name = rootname + str(lineno) return uuid.uuid3(uuid.NAMESPACE_DNS, name).hex
def __init__(self, email, password): self.email = email self.password = password self.session = requests.Session() self.sessionToken = None self.id = str(uuid.uuid3(uuid.NAMESPACE_DNS, 'rienafairefr.pynYNAB')) self.lastrequest_elapsed=None self.logger = logging.getLogger('pynYnab') self._init_session()
def get_uuid( cur_dict ): od = collections.OrderedDict(sorted(cur_dict.items())) json_data_obj = json.dumps(od) digest = uuid.uuid3(uuid.NAMESPACE_DNS, json_data_obj) return str(digest)
def id_from_name(name): """Generate a UUID using a name as the namespace :type name: str :rtype: str """ return str(uuid.uuid3(uuid.NAMESPACE_DNS, name)).upper()
def create_uuid(self): if not self.version or self.version == 4: return uuid.uuid4() elif self.version == 1: return uuid.uuid1(self.node, self.clock_seq) elif self.version == 2: raise UUIDVersionError("UUID version 2 is not supported.") elif self.version == 3: return uuid.uuid3(self.namespace, self.name) elif self.version == 5: return uuid.uuid5(self.namespace, self.name) else: raise UUIDVersionError("UUID version %s is not valid." % self.version)
def test_create_timeuuid_with_uuid4_string_should_fail(self): ''' creating a TimeUUID with a hex uuid4 should fail''' for i in range(1,100): u = uuid.uuid4() with self.assertRaises(ValueError) as cm: t = timeuuid.TimeUUID(s=u.hex) self.assertEqual(str(cm.exception), 'Invalid UUID type') for fn in [uuid.uuid3, uuid.uuid5]: for i in range(1,100): u = fn(uuid.NAMESPACE_DNS,str(os.urandom(10))) with self.assertRaises(ValueError) as cm: t = timeuuid.TimeUUID(s=u.hex) self.assertEqual(str(cm.exception), 'Invalid UUID type')
def generate(authorization_id): """ Generate an access token based on an (unexpired) authorization id. """ auth = authorizationcode.Authorizationcode.get('id=%s', authorization_id) consumer_key = uuid.uuid3(uuid.NAMESPACE_DNS, base36encode(auth.id) + '-' + base36encode(auth.app_id)) consumer_secret = sha224("%s%s" % (str(uuid.uuid1()), time.time())).hexdigest() if auth.expires_at > datetime.utcnow(): access_token = Accesstoken(user_id=auth.user_id, app_id=auth.app_id, consumer_key=str(consumer_key), consumer_secret=str(consumer_secret)) access_token.save() return access_token else: return None
def uuid(self, name): """Get an ID""" if not name: name = self.def_name try: return str(uuid.uuid3(uuid.uuid4(), str(name) + str(time.time()))) except (NameError, AttributeError): return(("%s-%s-%s-%s-%s") % (self._rnd(8), self._rnd(4), self._rnd(4), self._rnd(4), self._rnd(12)))
def filename(self, filename): """Get a filename""" prefix = self.def_name + "-" + str(os.getpid()) + "-" try: return(prefix + str(uuid.uuid3(uuid.uuid4(), str(time.time()))) + "-" + str(filename)) except (NameError, AttributeError): return prefix + self.uuid(filename) + "-" + str(filename)
def uuid_for_string(s): return uuid.uuid3(uuid.NAMESPACE_OID, s)
def get_mount_point(block): global collection global messages bus = dbus.SystemBus() obj = bus.get_object('org.freedesktop.UDisks2', block) iface = dbus.Interface(obj, 'org.freedesktop.DBus.Properties') # Here we use this 'magic' interface dbus_mount_point = iface.Get('org.freedesktop.UDisks2.Filesystem', 'MountPoints') mount_point = '' while not len(dbus_mount_point): time.sleep(0.5) dbus_mount_point = iface.Get('org.freedesktop.UDisks2.Filesystem', 'MountPoints') dbus_id = iface.Get('org.freedesktop.UDisks2.Block', 'Id') dbus_name = iface.Get('org.freedesktop.UDisks2.Block', 'IdLabel') dbus_space = iface.Get('org.freedesktop.UDisks2.Block', 'Size') for letter in dbus_mount_point[0]: mount_point += chr(letter) if not dbus_name: dbus_name = mount_point[:-1].split(os.sep) dbus_name = dbus_name[len(dbus_name) - 1] if not dbus_id: dbus_id = uuid.uuid3(uuid.uuid4(), dbus_name) collection[block] = [str(mount_point[:-1]), str(dbus_id), str(dbus_name), None, None, dbus_space] messages.append( 'You have a new device connected (' + dbus_name + ', ' + extra_functions.convert_to_human_readable( dbus_space) + '). To have JF track it, execute:' + '\n' + ' jf ' + '-i ' + str(mount_point[:-1])) return dbus_id, block, dbus_name
def attachment_marker(raw_stream_id: uuid, stream_name: str, owner_id: uuid, dd_stream_name, CC: CerebralCortex, config: dict): """ Label sensor data as sensor-on-body, sensor-off-body, or improper-attachment. All the labeled data (st, et, label) with its metadata are then stored in a datastore """ # TODO: quality streams could be multiple so find the one computed with CC # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker attachment_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id)) stream_days = get_stream_days(raw_stream_id, attachment_marker_stream_id, CC) for day in stream_days: # load stream data to be diagnosed raw_stream = CC.get_datastream(raw_stream_id, day, data_type=DataSet.COMPLETE) if len(raw_stream.data) > 0: windowed_data = window(raw_stream.data, config['general']['window_size'], True) results = process_windows(windowed_data, config) merged_windows = merge_consective_windows(results) input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}] output_stream = {"id": attachment_marker_stream_id, "name": dd_stream_name, "algo_type": config["algo_type"]["attachment_marker"]} metadata = get_metadata(dd_stream_name, input_streams, config) store(merged_windows, input_streams, output_stream, metadata, CC, config)
def phone_screen_touch_marker(raw_stream_id: uuid, raw_stream_name: str, owner_id, dd_stream_name, CC: CerebralCortex, config: dict, start_time=None, end_time=None): """ This is not part of core data diagnostic suite. It only calculates how many screen touches are there. :param raw_stream_id: :param CC: :param config: """ try: # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker screen_touch_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str( raw_stream_id + dd_stream_name + owner_id + "mobile phone screen touch marker")) stream_days = get_stream_days(raw_stream_id, screen_touch_stream_id, CC) for day in stream_days: stream = CC.get_datastream(raw_stream_id, data_type=DataSet.COMPLETE, day=day, start_time=start_time, end_time=end_time) if len(stream.data) > 0: windowed_data = window(stream.data, config['general']['window_size'], True) results = process_windows(windowed_data) merged_windows = merge_consective_windows(results) if len(merged_windows) > 0: input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": raw_stream_name}] output_stream = {"id": screen_touch_stream_id, "name": dd_stream_name, "algo_type": config["algo_type"]["app_availability_marker"]} metadata = get_metadata(dd_stream_name, input_streams, config) store(merged_windows, input_streams, output_stream, metadata, CC, config) except Exception as e: print(e)
def battery_marker(raw_stream_id: uuid, stream_name: str, owner_id, dd_stream_name, CC: CerebralCortex, config: dict, start_time=None, end_time=None): """ This algorithm uses battery percentages to decide whether device was powered-off or battery was low. All the labeled data (st, et, label) with its metadata are then stored in a datastore. :param raw_stream_id: :param CC: :param config: """ try: # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker battery_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id)) stream_days = get_stream_days(raw_stream_id, battery_marker_stream_id, CC) for day in stream_days: stream = CC.get_datastream(raw_stream_id, data_type=DataSet.COMPLETE, day=day) if len(stream.data) > 0: windowed_data = window(stream.data, config['general']['window_size'], True) results = process_windows(windowed_data, stream_name, config) merged_windows = merge_consective_windows(results) if len(merged_windows) > 0: input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}] output_stream = {"id": battery_marker_stream_id, "name": dd_stream_name, "algo_type": config["algo_type"]["battery_marker"]} labelled_windows = mark_windows(battery_marker_stream_id, merged_windows, CC, config) metadata = get_metadata(dd_stream_name, input_streams, config) store(labelled_windows, input_streams, output_stream, metadata, CC, config) except Exception as e: print(e)
def sensor_availability(raw_stream_id: uuid, stream_name: str, owner_id: uuid, dd_stream_name, phone_physical_activity, CC: CerebralCortex, config: dict): """ Mark missing data as wireless disconnection if a participate walks away from phone or sensor :param raw_stream_id: :param stream_name: :param owner_id: :param dd_stream_name: :param phone_physical_activity: :param CC: :param config: """ # using stream_id, data-diagnostic-stream-id, and owner id to generate a unique stream ID for battery-marker wireless_marker_stream_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(raw_stream_id + dd_stream_name + owner_id)) stream_days = get_stream_days(raw_stream_id, wireless_marker_stream_id, CC) for day in stream_days: # load stream data to be diagnosed raw_stream = CC.get_datastream(raw_stream_id, day, data_type=DataSet.COMPLETE) if len(raw_stream.data) > 0: windowed_data = window(raw_stream.data, config['general']['window_size'], True) results = process_windows(windowed_data, day, CC, phone_physical_activity, config) merged_windows = merge_consective_windows(results) if len(merged_windows) > 0: input_streams = [{"owner_id": owner_id, "id": str(raw_stream_id), "name": stream_name}] output_stream = {"id": wireless_marker_stream_id, "name": dd_stream_name, "algo_type": config["algo_type"]["sensor_unavailable_marker"]} metadata = get_metadata(dd_stream_name, input_streams, config) store(merged_windows, input_streams, output_stream, metadata, CC, config)
def analyze_quality(streams, owner_id, led_right_wrist_quality_stream_name, wrist, CC): led_stream_quality_id = uuid.uuid3(uuid.NAMESPACE_DNS, str(led_right_wrist_quality_stream_name + owner_id+"LED quality computed on CerebralCortex")) if wrist=="right": if "LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST" in streams: led_wrist_stream_id = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST"][ "identifier"] led_wrist_stream_name = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST"]["name"] else: led_wrist_stream_id = None else: if "LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST" in streams: led_wrist_stream_id = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST"][ "identifier"] led_wrist_stream_name = streams["LED--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST"]["name"] else: led_wrist_stream_id = None if led_wrist_stream_id: stream_end_days = CC.get_stream_start_end_time(led_wrist_stream_id) if stream_end_days["start_time"] and stream_end_days["end_time"]: days = stream_end_days["end_time"] - stream_end_days["start_time"] for day in range(days.days + 1): day = (stream_end_days["start_time"]+timedelta(days=day)).strftime('%Y%m%d') stream = CC.get_datastream(led_wrist_stream_id, data_type=DataSet.COMPLETE, day=day) if len(stream.data) > 0: windowed_data = window(stream.data, 3, False) led_quality_windows = data_quality_led(windowed_data) input_streams = [{"owner_id": str(owner_id), "id": str(led_wrist_stream_id), "name": led_wrist_stream_name}] output_stream = {"id": str(led_stream_quality_id), "name": led_right_wrist_quality_stream_name, "algo_type": ""} store(led_quality_windows, input_streams, output_stream, CC)
def migrate(folder_path: str, data_block_size): """ Migrate data from old CerebralCortex structure to new CerebralCortex structure :param folder_path: """ configuration_file = os.path.join(os.path.dirname(__file__), '../../cerebralcortex.yml') CC = CerebralCortex(configuration_file, master="local[*]", name="Data Migrator API", time_zone="US/Central", load_spark=True) if not folder_path: raise ValueError("Path to the data directory cannot be empty.") for filename in glob.iglob(folder_path + '/**/*.json', recursive=True): print(str(datetime.datetime.now()) + " -- Started processing file " + filename) tmp = filename.split("/") tmp = tmp[len(tmp) - 1].split("+") owner_id = tmp[0] stream_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(tmp[0] + " " + tmp[1]))) name = '' for i in tmp[3:]: name += i + " " name = name.strip().replace(".json", "") name = tmp[1] + " " + name pm_algo_name = tmp[2] data_filename = filename.replace(".json", ".csv.bz2") old_schema = read_file(filename) execution_context = get_execution_context(pm_algo_name, old_schema) data_descriptor = get_data_descriptor(old_schema) annotations = get_annotations() print(str(datetime.datetime.now()) + " -- Schema building is complete ") print(str(datetime.datetime.now()) + " -- Started unzipping file and adding records in Cassandra ") for data_block in bz2file_to_datapoints(data_filename, data_block_size): persist_data(execution_context, data_descriptor, annotations, stream_id, name, owner_id, data_block, CC) print(str(datetime.datetime.now()) + " -- Completed processing file " + filename)
def generate_token(username): """generate a short token based on given username""" username = str(username) token = str(uuid.uuid3(uuid.NAMESPACE_URL, username))[:6] return token
def cluster_config(self): """ Provide the default configuration for a cluster """ if self.cluster: cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster) if not os.path.isdir(cluster_dir): create_dirs(cluster_dir, self.root_dir) filename = "{}/cluster.yml".format(cluster_dir) contents = {} contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, self.keyring_roles['admin'])) contents['admin_method'] = "default" contents['configuration_method'] = "default" contents['mds_method'] = "default" contents['mon_method'] = "default" contents['osd_method'] = "default" contents['package_method'] = "default" contents['pool_method'] = "default" contents['repo_method'] = "default" contents['rgw_method'] = "default" contents['update_method'] = "default" contents['public_network'] = self.public_network contents['cluster_network'] = self.cluster_network self.writer.write(filename, contents)
def get_host_info(): """ Returns an object with unique information about the host """ obj = {} # uuid.getnode() can return a random number, we need to fix it obj['uuid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(uuid.getnode()))) obj['host_name'] = socket.gethostname() obj['operative_system'] = '{0}-{1}'.format( platform.system(), platform.release() ) return obj
def recognize(self, path, lang="zh-CN"): if isinstance(path, str): file = open(path, 'rb') else: return ["ERROR!", "File must by a path string."] if lang not in self.lang_list: return ["ERROR!", "Invalid language."] audio = pydub.AudioSegment.from_file(file) audio = audio.set_frame_rate(16000) audio.export("%s.wav" % path, format="wav") header = { "Authorization": "Bearer %s" % self.access_token, "Content-Type": "audio/wav; samplerate=16000" } d = { "version": "3.0", "requestid": str(uuid.uuid1()), "appID": "D4D52672-91D7-4C74-8AD8-42B1D98141A5", "format": "json", "locale": lang, "device.os": "Telegram", "scenarios": "ulm", "instanceid": uuid.uuid3(uuid.NAMESPACE_DNS, 'com.1a23.eh_telegram_master'), "maxnbest": 5 } with open("%s.wav" % path, 'rb') as f: r = requests.post("https://speech.platform.bing.com/recognize", params=d, data=f.read(), headers=header) os.remove("%s.wav" % path) try: rjson = r.json() except: return ["ERROR!", r.text] if r.status_code == 200: return [i['name'] for i in rjson['results']] else: return ["ERROR!", r.text]
def type_coerce(expr, type_): """Coerce the given expression into the given type, on the Python side only. :func:`.type_coerce` is roughly similar to :func:`.cast`, except no "CAST" expression is rendered - the given type is only applied towards expression typing and against received result values. e.g.:: from sqlalchemy.types import TypeDecorator import uuid class AsGuid(TypeDecorator): impl = String def process_bind_param(self, value, dialect): if value is not None: return str(value) else: return None def process_result_value(self, value, dialect): if value is not None: return uuid.UUID(value) else: return None conn.execute( select([type_coerce(mytable.c.ident, AsGuid)]).\\ where( type_coerce(mytable.c.ident, AsGuid) == uuid.uuid3(uuid.NAMESPACE_URL, 'bar') ) ) """ type_ = sqltypes.to_instance(type_) if hasattr(expr, '__clause_element__'): return type_coerce(expr.__clause_element__(), type_) elif isinstance(expr, BindParameter): bp = expr._clone() bp.type = type_ return bp elif not isinstance(expr, Visitable): if expr is None: return null() else: return literal(expr, type_=type_) else: return Label(None, expr, type_=type_)