我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用wave.open()。
def play_file(fname): # create an audio object wf = wave.open(fname, 'rb') p = pyaudio.PyAudio() chunk = 1024 # open stream based on the wave object which has been input. stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) # read data (based on the chunk size) data = wf.readframes(chunk) # play stream (looping from beginning of file to the end) while data != '': # writing to the stream is what *actually* plays the sound. stream.write(data) data = wf.readframes(chunk) # cleanup stuff. stream.close() p.terminate()
def get_config(cls): # FIXME: Replace this as soon as we have a config module config = {} # Try to get baidu_yuyin config from config profile_path = dingdangpath.config('profile.yml') if os.path.exists(profile_path): with open(profile_path, 'r') as f: profile = yaml.safe_load(f) if 'baidu_yuyin' in profile: if 'api_key' in profile['baidu_yuyin']: config['api_key'] = \ profile['baidu_yuyin']['api_key'] if 'secret_key' in profile['baidu_yuyin']: config['secret_key'] = \ profile['baidu_yuyin']['secret_key'] return config
def audio_int(num_samples=50): """ Gets average audio intensity of your mic sound. You can use it to get average intensities while you're talking and/or silent. The average is the avg of the 20% largest intensities recorded. """ print "Getting intensity values from mic." p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) values = [math.sqrt(abs(audioop.avg(stream.read(CHUNK), 4))) for x in range(num_samples)] values = sorted(values, reverse=True) r = sum(values[:int(num_samples * 0.2)]) / int(num_samples * 0.2) print " Finished " print " Average audio intensity is ", r stream.close() p.terminate() return r
def read_json_file(file_path): ''' Args: 1. file_path: File path for a json file. File should be similar to the format - https://gist.github.com/pandeydivesh15/2012ab10562cc85e796e1f57554aca33 Returns: data: A list of dicts. Each dict contains timing info for a spoken word(or punctuation). ''' with open(file_path, 'r') as f: data = json.loads(f.read())['words'] # for line in f: # temp = json.loads(line) # temp['start'] = None if temp['start'] == 'NA' else float(temp['start']) # temp['end'] = None if temp['end'] == 'NA' else float(temp['end']) # try: # temp['word'] = temp['word'].encode('ascii') # except KeyError: # temp['punctuation'] = temp['punctuation'].encode('ascii') # data.append(temp) return data
def __init__(self,codes=[40], chunk_size=2**15): import wave cwd = os.path.dirname(os.path.realpath(__file__)) self._wfs = [] for code in codes: c = "" if code > 0 and code < 10: c = "0"+str(code) elif code < 89: c = str(code) else: print("out of code index") exit() #self._wfs.append(wave.open(cwd + "\piano88\Piano 0" + c + ".wav", 'rb')) self._wfs.append(cwd + "\piano88\Piano 0" + c + ".wav")
def save(self, path): """Save waveform to file path as a WAV file. :returns: Path to the saved file. """ (folder, filename) = os.path.split(path) (name, extension) = os.path.splitext(filename) if not name: raise ValueError, "name is required" path = os.path.join(folder, name + self.extension) f = open(path, "wb") f.write(self.contents) f.close() return path #-- Import submodules --#
def save_values_to_wave_file( values = None, filename = None, maximum_amplitude = 65535, # maximum value of unsigned short 16 bit number sample_rate = 44100, # Hz number_of_channels = 1, sample_width = 2 # bytes per frame ): values = datavision.normalize_to_range( values, minimum = -(maximum_amplitude / 2), maximum = maximum_amplitude / 2 ) file_output = wave.open(filename, "w") file_output.setnchannels(number_of_channels) file_output.setsampwidth(sample_width) file_output.setframerate(sample_rate) for value in values: write_data = struct.pack("<h", value) file_output.writeframesraw(write_data) file_output.writeframes("") file_output.close()
def load_wav_file(name): f = wave.open(name, "rb") # print("loading %s"%name) chunk = [] data0 = f.readframes(CHUNK) while data0: # f.getnframes() # data=numpy.fromstring(data0, dtype='float32') # data = numpy.fromstring(data0, dtype='uint16') data = numpy.fromstring(data0, dtype='uint8') data = (data + 128) / 255. # 0-1 for Better convergence # chunks.append(data) chunk.extend(data) data0 = f.readframes(CHUNK) # finally trim: chunk = chunk[0:CHUNK * 2] # should be enough for now -> cut chunk.extend(numpy.zeros(CHUNK * 2 - len(chunk))) # fill with padding 0's # print("%s loaded"%name) return chunk
def get_config(cls): # FIXME: Replace this as soon as we have a config module config = {} # HMM dir # Try to get hmm_dir from config profile_path = dingdangpath.config('profile.yml') if os.path.exists(profile_path): with open(profile_path, 'r') as f: profile = yaml.safe_load(f) try: config['hmm_dir'] = profile['pocketsphinx']['hmm_dir'] except KeyError: pass return config
def get_config(cls): # FIXME: Replace this as soon as we have a config module config = {} # Try to get iflytek_yuyin config from config profile_path = dingdangpath.config('profile.yml') if os.path.exists(profile_path): with open(profile_path, 'r') as f: profile = yaml.safe_load(f) if 'iflytek_yuyin' in profile: if 'api_id' in profile['iflytek_yuyin']: config['api_id'] = \ profile['iflytek_yuyin']['api_id'] if 'api_key' in profile['iflytek_yuyin']: config['api_key'] = \ profile['iflytek_yuyin']['api_key'] if 'url' in profile['iflytek_yuyin']: config['url'] = \ profile['iflytek_yuyin']['url'] return config
def get_config(cls): # FIXME: Replace this as soon as we have a config module config = {} # Try to get ali_yuyin config from config profile_path = dingdangpath.config('profile.yml') if os.path.exists(profile_path): with open(profile_path, 'r') as f: profile = yaml.safe_load(f) if 'ali_yuyin' in profile: if 'ak_id' in profile['ali_yuyin']: config['ak_id'] = \ profile['ali_yuyin']['ak_id'] if 'ak_secret' in profile['ali_yuyin']: config['ak_secret'] = \ profile['ali_yuyin']['ak_secret'] return config
def get_config(cls): # FIXME: Replace this as soon as we have a config module config = {} # Try to get snowboy config from config profile_path = dingdangpath.config('profile.yml') if os.path.exists(profile_path): with open(profile_path, 'r') as f: profile = yaml.safe_load(f) if 'snowboy' in profile: if 'model' in profile['snowboy']: config['model'] = \ profile['snowboy']['model'] else: config['model'] = os.path.join( dingdangpath.LIB_PATH, 'snowboy/dingdang.pmdl') if 'sensitivity' in profile['snowboy']: config['sensitivity'] = \ profile['snowboy']['sensitivity'] else: config['sensitivity'] = "0.5" if 'robot_name' in profile: config['hotword'] = profile['robot_name'] else: config['hotword'] = 'DINGDANG' return config
def cache(func): """Wrapper for cache the audio""" @wraps(func) def _(*args, **kwargs): cache_handler = CacheHandler() id_ = unique_id(func, *args, **kwargs) cache = cache_handler.get(id_) if cache: audio_handler = AudioHandler() audio_handler.aplay(base64.b64decode(cache), is_buffer=True) # return cache else: func(*args, **kwargs) with open('output.wav', 'rb') as f: encoded_audio = base64.b64encode(f.read()) cache_handler.set(id_, encoded_audio, 86400*7) # return buffer_ return _
def testWavDataToSamples(self): w = wave.open(self.wav_filename, 'rb') w_mono = wave.open(self.wav_filename_mono, 'rb') # Check content size. y = audio_io.wav_data_to_samples(self.wav_data, sample_rate=16000) y_mono = audio_io.wav_data_to_samples(self.wav_data_mono, sample_rate=22050) self.assertEquals( round(16000.0 * w.getnframes() / w.getframerate()), y.shape[0]) self.assertEquals( round(22050.0 * w_mono.getnframes() / w_mono.getframerate()), y_mono.shape[0]) # Check a few obvious failure modes. self.assertLess(0.01, y.std()) self.assertLess(0.01, y_mono.std()) self.assertGreater(-0.1, y.min()) self.assertGreater(-0.1, y_mono.min()) self.assertLess(0.1, y.max()) self.assertLess(0.1, y_mono.max())
def make_audio(tensor, sample_rate, length_frames, num_channels): """Convert an numpy representation audio to Audio protobuf""" output = StringIO() wav_out = wave.open(output, "w") wav_out.setframerate(float(sample_rate)) wav_out.setsampwidth(2) wav_out.setcomptype('NONE', 'not compressed') wav_out.setnchannels(num_channels) wav_out.writeframes(tensor.astype("int16").tostring()) wav_out.close() output.flush() audio_string = output.getvalue() return Summary.Audio(sample_rate=float(sample_rate), num_channels=num_channels, length_frames=length_frames, encoded_audio_string=audio_string, content_type="audio/wav")
def read(self): """Return audio file as array of integer. Returns: audio_data: np.ndarray, shape of (frame_num,) """ # Read wav file with wave.open(self.file_path, "r") as wav: # Move to head of the audio file wav.rewind() self.frame_num = wav.getnframes() self.sampling_rate = wav.getframerate() # 16,000 Hz self.channels = wav.getnchannels() self.sample_size = wav.getsampwidth() # 2 # Read to buffer as binary format buf = wav.readframes(self.frame_num) if self.channels == 1: audio_data = np.frombuffer(buf, dtype="int16") elif self.channels == 2: audio_data = np.frombuffer(buf, dtype="int32") return audio_data
def record_to_file(filename,FORMAT = pyaudio.paInt16, CHANNELS = 1, RATE = 8000, CHUNK = 1024, RECORD_SECONDS=1): audio = pyaudio.PyAudio() # start Recording stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK) frames.append(data) # stop Recording stream.stop_stream() stream.close() audio.terminate() waveFile = wave.open(filename, 'wb') waveFile.setnchannels(CHANNELS) waveFile.setsampwidth(audio.get_sample_size(FORMAT)) waveFile.setframerate(RATE) waveFile.writeframes(b''.join(frames)) waveFile.close()
def extract_sound(self, start=0, end=None): if not start and not end: raise ValueError start_pos = self.to_index(start) if end: end_pos = self.to_index(end) else: end_pos = len(self.raw) _buffer = io.BytesIO() _output = wave.open(_buffer, "wb") _output.setnchannels(self.channels) _output.setsampwidth(self.samplewidth) _output.setframerate(self.framerate) raw = self.raw[start_pos:end_pos] _output.writeframes(self.raw[start_pos:end_pos]) _output.close() _buffer.seek(0) return Sound(_buffer)
def read_wav(source, start=0, end=None): warnings.warn( "read_wav() is deprecated, use Sound() class instead", DeprecationWarning) in_wav = wave.open(source, "rb") fr = in_wav.getframerate() chan = in_wav.getnchannels() sw = in_wav.getsampwidth() in_wav.setpos(int(start * fr)) if end is None: end = (in_wav.getnframes() - start / fr) data = in_wav.readframes(int((end - start) * fr)) in_wav.close() d = {"framerate": fr, "channels": chan, "samplewidth": sw, "length": end - start, "state": 0, "data": data} return d
def speak(self, text, is_phonetic=False): temp = 'temp.wav' self.save_wav(text, temp, is_phonetic) w = wave.open(temp) p = pyaudio.PyAudio() stream = p.open( format=p.get_format_from_width(w.getsampwidth()), channels=w.getnchannels(), rate=w.getframerate(), output=True) chunk = 1024 data = w.readframes(chunk) while data: stream.write(data) data = w.readframes(chunk) stream.close() p.terminate()
def _play_audio(sound, delay): try: time.sleep(delay) wf = wave.open("sounds/"+sound+".wav", 'rb') p = pyaudio.PyAudio() stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) data = wf.readframes(TextToSpeech.CHUNK) while data: stream.write(data) data = wf.readframes(TextToSpeech.CHUNK) stream.stop_stream() stream.close() p.terminate() return except: pass
def writeScottFile(output_name, header, data): """ Writes header and data information to a file. Takes in a list of byte objects 'header', a list of byte objects 'data' and an 'output_name' which is the new scott file. The scott file contains the byte objects in header and data. """ with open(output_name, 'wb') as scott_file: for item in header: scott_file.write(item) for item in data: scott_file.write(item)
def wavFileType(filename): #Given a file, the function will determine #whether it is a SCOT WAV file or just a #regular WAV file. try: with open(filename, 'rb') as wav_file: wav_file.seek(8) is_wav_file = wav_file.read(4) if not is_wav_file == bytes('WAVE', 'ASCII'): return 'notwav' else: wav_file.seek(60) scot = wav_file.read(4) if scot == bytes('scot', 'ASCII'): return 'scottwav' else: return 'wav' except IOError: print("--wavFileType Error--") return 'error'
def editScottWav(filename, edit): #Edits the scott file 'filename', optionally re-naming #the file. addr = { "note" : 369, "title" : 72, "artist" : 335, "audio_id" : 115, "year" : 406, "end" : 405, "intro" : 403, "eom" : 152, "s_date" : 133, "e_date" : 139, "s_hour" : 145, "e_hour": 146 } try: with open(filename, 'rb+') as f: for name, data in edit: f.seek(addr[name]) if isinstance(data, str): f.write(bytes(data, 'utf-8')) else: num_bytes = len(str(abs(data))) f.write((data).to_bytes(num_bytes, byteorder='little')) except IOError: print("---EditScott cannot open {}. ---".format(filename))
def audio(tag, tensor, sample_rate=44100): tensor = makenp(tensor) tensor = tensor.squeeze() assert(tensor.ndim==1), 'input tensor should be 1 dimensional.' tensor_list = [int(32767.0*x) for x in tensor] import io import wave import struct fio = io.BytesIO() Wave_write = wave.open(fio, 'wb') Wave_write.setnchannels(1) Wave_write.setsampwidth(2) Wave_write.setframerate(sample_rate) tensor_enc = b'' for v in tensor_list: tensor_enc += struct.pack('<h', v) Wave_write.writeframes(tensor_enc) Wave_write.close() audio_string = fio.getvalue() fio.close() audio = Summary.Audio(sample_rate=sample_rate, num_channels=1, length_frames=len(tensor_list), encoded_audio_string=audio_string, content_type='audio/wav') return Summary(value=[Summary.Value(tag=tag, audio=audio)])
def test_it(self, test_rounding=False): self.f = wave.open(TESTFN, 'wb') self.f.setnchannels(nchannels) self.f.setsampwidth(sampwidth) if test_rounding: self.f.setframerate(framerate - 0.1) else: self.f.setframerate(framerate) self.f.setnframes(nframes) output = b'\0' * nframes * nchannels * sampwidth self.f.writeframes(output) self.f.close() self.f = wave.open(TESTFN, 'rb') self.assertEqual(nchannels, self.f.getnchannels()) self.assertEqual(sampwidth, self.f.getsampwidth()) self.assertEqual(framerate, self.f.getframerate()) self.assertEqual(nframes, self.f.getnframes()) self.assertEqual(self.f.readframes(nframes), output)
def fetch_sample_speech_fruit(n_samples=None): url = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz' wav_path = "audio.tar.gz" if not os.path.exists(wav_path): download(url, wav_path) tf = tarfile.open(wav_path) wav_names = [fname for fname in tf.getnames() if ".wav" in fname.split(os.sep)[-1]] speech = [] print("Loading speech files...") for wav_name in wav_names[:n_samples]: f = tf.extractfile(wav_name) fs, d = wavfile.read(f) d = d.astype('float32') / (2 ** 15) speech.append(d) return fs, speech
def play_audio_file(fname=DETECT_DONG): """Simple callback function to play a wave file. By default it plays a Ding sound. :param str fname: wave file name :return: None """ ding_wav = wave.open(fname, 'rb') ding_data = ding_wav.readframes(ding_wav.getnframes()) audio = pyaudio.PyAudio() stream_out = audio.open( format=audio.get_format_from_width(ding_wav.getsampwidth()), channels=ding_wav.getnchannels(), rate=ding_wav.getframerate(), input=False, output=True) stream_out.start_stream() stream_out.write(ding_data) time.sleep(0.2) stream_out.stop_stream() stream_out.close() audio.terminate()
def __init__(self, clocks): super(PygameStretchTIA_Sound, self).__init__(clocks) # Flag to indicate if samples should be stretched in frequency, or more outputs generated. self._maintain_pitch = True self._wav_output = [wave.open('pytari_stretch_chan0.wav', 'w'),wave.open('pytari_stretch_chan1.wav', 'w')] self._wav_output[0].setparams((1, 1, self.SAMPLERATE, 0, 'NONE', 'not compressed')) self._wav_output[1].setparams((1, 1, self.SAMPLERATE, 0, 'NONE', 'not compressed')) self._sound_chunk_size = 1024*4 self.openSound() self._test_accumulated_sound = self._sound_chunk_size * 2 # Hold 'stretch' state for each channel. self._stretcher = tiasound.Stretch() self._stretched = [[],[]] self._last_update_time = self.clocks.system_clock
def audio(tag, tensor, sample_rate=44100): tensor = tensor.squeeze() assert tensor.ndim==1, 'input tensor should be 1 dimensional.' tensor_list = [int(32767.0*x) for x in tensor] import io import wave import struct fio = io.BytesIO() Wave_write = wave.open(fio, 'wb') Wave_write.setnchannels(1) Wave_write.setsampwidth(2) Wave_write.setframerate(sample_rate) tensor_enc = b'' for v in tensor_list: tensor_enc += struct.pack('<h', v) Wave_write.writeframes(tensor_enc) Wave_write.close() audio_string = fio.getvalue() fio.close() audio = Summary.Audio(sample_rate=sample_rate, num_channels=1, length_frames=len(tensor_list), encoded_audio_string=audio_string, content_type='audio/wav') return Summary(value=[Summary.Value(tag=tag, audio=audio)])
def __init__(self, data=None, *args, **kwargs): if kwargs.get('metadata', False): # internal use only self._data = data for attr, val in kwargs.pop('metadata').items(): setattr(self, attr, val) else: # normal construction data = data if isinstance(data, basestring) else data.read() raw = wave.open(StringIO(data), 'rb') raw.rewind() self.channels = raw.getnchannels() self.sample_width = raw.getsampwidth() self.frame_rate = raw.getframerate() self.frame_width = self.channels * self.sample_width raw.rewind() self._data = raw.readframes(float('inf')) super(AudioSegment, self).__init__(*args, **kwargs)
def play_raw(self, raw_data, rate=16000, channels=1, width=2, block=True): self.raw = raw_data self.width = width self.channels = channels self.event.clear() self.stream = self.pa.open(format=self.pa.get_format_from_width(width), channels=channels, rate=rate, output=True, # output_device_index=1, frames_per_buffer=CHUNK_SIZE, stream_callback=self.raw_callback) if block: self.event.wait() time.sleep(2) # wait for playing audio data in buffer, a alsa driver bug self.stream.close()
def play_audio_file(fname=DETECT_DING): """Simple callback function to play a wave file. By default it plays a Ding sound. :param str fname: wave file name :return: None """ ding_wav = wave.open(fname, 'rb') ding_data = ding_wav.readframes(ding_wav.getnframes()) audio = pyaudio.PyAudio() stream_out = audio.open( format=audio.get_format_from_width(ding_wav.getsampwidth()), channels=ding_wav.getnchannels(), rate=ding_wav.getframerate(), input=False, output=True) stream_out.start_stream() stream_out.write(ding_data) time.sleep(0.2) stream_out.stop_stream() stream_out.close() audio.terminate()
def _is_good_wave(self, filename): """ check if wav is in correct format for MARF. """ par = None try: w_file = wave.open(filename) par = w_file.getparams() w_file.close() except wave.Error as exc: print (exc) return False if par[:3] == (1, 2, 8000) and par[-1:] == ('not compressed',): return True else: return False
def real_signal(): spf = wave.open('helloworld.wav', 'r') #Extract Raw Audio from Wav File # If you right-click on the file and go to "Get Info", you can see: # sampling rate = 16000 Hz # bits per sample = 16 # The first is quantization in time # The second is quantization in amplitude # We also do this for images! # 2^16 = 65536 is how many different sound levels we have signal = spf.readframes(-1) signal = np.fromstring(signal, 'Int16') T = len(signal) signal = (signal - signal.mean()) / signal.std() hmm = HMM(5, 3) hmm.fit(signal.reshape(1, T, 1))
def real_signal(): spf = wave.open('helloworld.wav', 'r') #Extract Raw Audio from Wav File # If you right-click on the file and go to "Get Info", you can see: # sampling rate = 16000 Hz # bits per sample = 16 # The first is quantization in time # The second is quantization in amplitude # We also do this for images! # 2^16 = 65536 is how many different sound levels we have signal = spf.readframes(-1) signal = np.fromstring(signal, 'Int16') T = len(signal) signal = (signal - signal.mean()) / signal.std() hmm = HMM(5, 3) # signal needs to be of shape N x T(n) x D hmm.fit(signal.reshape(1, T, 1), learning_rate=10e-6, max_iter=20)
def real_signal(): spf = wave.open('helloworld.wav', 'r') #Extract Raw Audio from Wav File # If you right-click on the file and go to "Get Info", you can see: # sampling rate = 16000 Hz # bits per sample = 16 # The first is quantization in time # The second is quantization in amplitude # We also do this for images! # 2^16 = 65536 is how many different sound levels we have signal = spf.readframes(-1) signal = np.fromstring(signal, 'Int16') T = len(signal) hmm = HMM(10) hmm.fit(signal.reshape(1, T))
def use_cloud(token): fp = wave.open('output.wav','r') nf = fp.getnframes() f_len = nf * 2 audio_data = fp.readframes(nf) cuid = "123456" #my xiaomi phone MAC srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token http_header = [ 'Content-Type: audio/pcm; rate=8000', 'Content-Length: %d' % f_len ] print srv_url c = pycurl.Curl() c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode c.setopt(c.HTTPHEADER, http_header) #must be list, not dict c.setopt(c.POST, 1) c.setopt(c.CONNECTTIMEOUT, 30) c.setopt(c.TIMEOUT, 30) c.setopt(c.WRITEFUNCTION, dump_res) c.setopt(c.POSTFIELDS, audio_data) c.setopt(c.POSTFIELDSIZE, f_len) c.perform()
def use_cloud(token): fp = wave.open('output.wav', 'rb') nf = fp.getnframes() f_len = nf * 2 audio_data = fp.readframes(nf) cuid = "xxxxxxxxxx" #my xiaomi phone MAC srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token http_header = [ 'Content-Type: audio/pcm; rate=8000', 'Content-Length: %d' % f_len ] print srv_url c = pycurl.Curl() c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode #c.setopt(c.RETURNTRANSFER, 1) c.setopt(c.HTTPHEADER, http_header) #must be list, not dict c.setopt(c.POST, 1) c.setopt(c.CONNECTTIMEOUT, 30) c.setopt(c.TIMEOUT, 30) c.setopt(c.WRITEFUNCTION, dump_res) c.setopt(c.POSTFIELDS, audio_data) c.setopt(c.POSTFIELDSIZE, f_len) c.perform() #pycurl.perform() has no return val
def use_cloud(token): fp = wave.open('output.wav','r') nf = fp.getnframes() f_len = nf * 2 audio_data = fp.readframes(nf) cuid = "123456" #my xiaomi phone MAC srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token http_header = [ 'Content-Type: audio/pcm; rate=8000', 'Content-Length: %d' % f_len ] c = pycurl.Curl() c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode c.setopt(c.HTTPHEADER, http_header) #must be list, not dict c.setopt(c.POST, 1) c.setopt(c.CONNECTTIMEOUT, 30) c.setopt(c.TIMEOUT, 30) c.setopt(c.WRITEFUNCTION, dump_res) c.setopt(c.POSTFIELDS, audio_data) c.setopt(c.POSTFIELDSIZE, f_len) c.perform()
def split(split_file_path, main_file_path, transcript_path, split_info): ''' Here, splitting takes place. Args: split_file_path: File path for new split file. main_file_path: File path for original .wav file. transcript_path: File path where transcript will be written. split_info: A tuple of the form (x, (y, z)) ''' audio_file = wave.open(main_file_path, 'rb') split_file = wave.open(split_file_path, 'wb') t0, t1 = split_info[1] # cut audio between t0, t1 seconds s0, s1 = int(t0*audio_file.getframerate()), int(t1*audio_file.getframerate()) audio_file.readframes(s0) # discard frames up to s0 frames = audio_file.readframes(s1-s0) split_file.setparams(audio_file.getparams()) split_file.writeframes(frames) split_file.close() # Store transcript with open(transcript_path, 'wb') as f: f.write(split_info[0]) # TODO: Get rid of multiple opening and closing of the same main audio file. audio_file.close()
def create_csv(data_dir): ''' Generates CSV file (as required by DeepSpeech_RHL.py) in the given dir. Args: data_dir: Directory where all .wav files and their associated timescripts are stored. ''' # Get all audio and transcript file paths. audio_file_paths = sorted(glob.glob(data_dir + "*.wav")) transcript_file_paths = sorted(glob.glob(data_dir + "*.txt")) audio_file_sizes = [] transcripts = [] for x, y in zip(audio_file_paths, transcript_file_paths): with open(y, "rb") as f: transcripts.append(f.read()) # Get file size. metadata = os.stat(x) audio_file_sizes.append(metadata.st_size) # Create pandas dataframe df = pandas.DataFrame(columns=["wav_filename", "wav_filesize", "transcript"]) df["wav_filename"] = audio_file_paths df["wav_filesize"] = audio_file_sizes df["transcript"] = transcripts df.to_csv(data_dir + "data.csv", sep=",", index=None) # Save CSV
def audio(tag, tensor, sample_rate=44100): tensor = makenp(tensor) tensor = tensor.squeeze() assert (tensor.ndim == 1), 'input tensor should be 1 dimensional.' tensor_list = [int(32767.0 * x) for x in tensor] import io import wave import struct fio = io.BytesIO() Wave_write = wave.open(fio, 'wb') Wave_write.setnchannels(1) Wave_write.setsampwidth(2) Wave_write.setframerate(sample_rate) tensor_enc = b'' for v in tensor_list: tensor_enc += struct.pack('<h', v) Wave_write.writeframes(tensor_enc) Wave_write.close() audio_string = fio.getvalue() fio.close() audio = Summary.Audio(sample_rate=sample_rate, num_channels=1, length_frames=len(tensor_list), encoded_audio_string=audio_string, content_type='audio/wav') return Summary(value=[Summary.Value(tag=tag, audio=audio)])
def load_sound(file_name): fp = wave.open(file_name, 'rb') try: assert fp.getnchannels() == 1, '{0}: sound format is incorrect! Sound must be mono.'.format(file_name) assert fp.getsampwidth() == 2, '{0}: sound format is incorrect! ' \ 'Sample width of sound must be 2 bytes.'.format(file_name) assert fp.getframerate() in (8000, 16000, 32000), '{0}: sound format is incorrect! ' \ 'Sampling frequency must be 8000 Hz, 16000 Hz or 32000 Hz.' sampling_frequency = fp.getframerate() sound_data = fp.readframes(fp.getnframes()) finally: fp.close() del fp return sound_data, sampling_frequency