我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyaudio.PyAudio()。
def play_file(fname): # create an audio object wf = wave.open(fname, 'rb') p = pyaudio.PyAudio() chunk = 1024 # open stream based on the wave object which has been input. stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) # read data (based on the chunk size) data = wf.readframes(chunk) # play stream (looping from beginning of file to the end) while data != '': # writing to the stream is what *actually* plays the sound. stream.write(data) data = wf.readframes(chunk) # cleanup stuff. stream.close() p.terminate()
def audio_interfaces(): """ Extracts audio interfaces data :return list[AudioInterface]: Audio interfaces data """ p = pyaudio.PyAudio() interfaces = [] for i in range(p.get_device_count()): data = p.get_device_info_by_index(i) if 'hw' not in data['name']: interfaces.append(AudioInterface(data)) p.terminate() return interfaces
def audio_int(num_samples=50): """ Gets average audio intensity of your mic sound. You can use it to get average intensities while you're talking and/or silent. The average is the avg of the 20% largest intensities recorded. """ print "Getting intensity values from mic." p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) values = [math.sqrt(abs(audioop.avg(stream.read(CHUNK), 4))) for x in range(num_samples)] values = sorted(values, reverse=True) r = sum(values[:int(num_samples * 0.2)]) / int(num_samples * 0.2) print " Finished " print " Average audio intensity is ", r stream.close() p.terminate() return r
def play_all(): # create an audio object pya = pyaudio.PyAudio() # open stream based on the wave object which has been input. stream = pya.open( format = pyaudio.paInt16, channels = 1, rate = np.int16(np.round(orig_rate)), output = True) # read data (based on the chunk size) audata = orig_au.astype(np.int16).tostring() stream.write(audata) # cleanup stuff. stream.close() pya.terminate()
def __enter__(self): self._audio_interface = pyaudio.PyAudio() self._audio_stream = self._audio_interface.open( format=pyaudio.paInt16, # The API currently only supports 1-channel (mono) audio # https://goo.gl/z757pE channels=1, rate=self._rate, input=True, frames_per_buffer=self._chunk, # Run the audio stream asynchronously to fill the buffer object. # This is necessary so that the input device's buffer doesn't # overflow while the calling thread makes network requests, etc. stream_callback=self._fill_buffer, ) self.closed = False return self
def record_to_file(filename,FORMAT = pyaudio.paInt16, CHANNELS = 1, RATE = 8000, CHUNK = 1024, RECORD_SECONDS=1): audio = pyaudio.PyAudio() # start Recording stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK) frames.append(data) # stop Recording stream.stop_stream() stream.close() audio.terminate() waveFile = wave.open(filename, 'wb') waveFile.setnchannels(CHANNELS) waveFile.setsampwidth(audio.get_sample_size(FORMAT)) waveFile.setframerate(RATE) waveFile.writeframes(b''.join(frames)) waveFile.close()
def start(self): try: p = pyaudio.PyAudio() self.decoder = Decoder(CHUNK_SIZE*20) self.stream = p.open(format=p.get_format_from_width(2), channels=2, rate=44100, output=True) yield self.stream finally: self.stream.stop_stream() self.stream.close() p.terminate()
def record_iter(total=10, chunk=5): try: audio = pyaudio.PyAudio() stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print "recording..." for k in range(0, int(int(total)/int(chunk))): data=b"" for j in range(0, int(chunk)): for i in range(0, int(RATE / CHUNK * 1)): data += stream.read(CHUNK) yield (audio.get_sample_size(FORMAT), CHANNELS, RATE, data) print "finished recording" finally: stream.stop_stream() stream.close() audio.terminate()
def speak(self, text, is_phonetic=False): temp = 'temp.wav' self.save_wav(text, temp, is_phonetic) w = wave.open(temp) p = pyaudio.PyAudio() stream = p.open( format=p.get_format_from_width(w.getsampwidth()), channels=w.getnchannels(), rate=w.getframerate(), output=True) chunk = 1024 data = w.readframes(chunk) while data: stream.write(data) data = w.readframes(chunk) stream.close() p.terminate()
def _play_audio(sound, delay): try: time.sleep(delay) wf = wave.open("sounds/"+sound+".wav", 'rb') p = pyaudio.PyAudio() stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) data = wf.readframes(TextToSpeech.CHUNK) while data: stream.write(data) data = wf.readframes(TextToSpeech.CHUNK) stream.stop_stream() stream.close() p.terminate() return except: pass
def play_audio_file(fname=DETECT_DONG): """Simple callback function to play a wave file. By default it plays a Ding sound. :param str fname: wave file name :return: None """ ding_wav = wave.open(fname, 'rb') ding_data = ding_wav.readframes(ding_wav.getnframes()) audio = pyaudio.PyAudio() stream_out = audio.open( format=audio.get_format_from_width(ding_wav.getsampwidth()), channels=ding_wav.getnchannels(), rate=ding_wav.getframerate(), input=False, output=True) stream_out.start_stream() stream_out.write(ding_data) time.sleep(0.2) stream_out.stop_stream() stream_out.close() audio.terminate()
def _play_with_pyaudio(seg): import pyaudio p = pyaudio.PyAudio() stream = p.open(format=p.get_format_from_width(seg.sample_width), channels=seg.channels, rate=seg.frame_rate, output=True) # break audio into half-second chunks (to allows keyboard interrupts) for chunk in make_chunks(seg, 500): stream.write(chunk._data) stream.stop_stream() stream.close() p.terminate()
def play_audio_file(fname=DETECT_DING): """Simple callback function to play a wave file. By default it plays a Ding sound. :param str fname: wave file name :return: None """ ding_wav = wave.open(fname, 'rb') ding_data = ding_wav.readframes(ding_wav.getnframes()) audio = pyaudio.PyAudio() stream_out = audio.open( format=audio.get_format_from_width(ding_wav.getsampwidth()), channels=ding_wav.getnchannels(), rate=ding_wav.getframerate(), input=False, output=True) stream_out.start_stream() stream_out.write(ding_data) time.sleep(0.2) stream_out.stop_stream() stream_out.close() audio.terminate()
def __init__(self, speaker, passive_stt_engine, active_stt_engine): """ Initiates the pocketsphinx instance. Arguments: speaker -- handles platform-independent audio output passive_stt_engine -- performs STT while Jasper is in passive listen mode acive_stt_engine -- performs STT while Jasper is in active listen mode """ self._logger = logging.getLogger(__name__) self.speaker = speaker self.passive_stt_engine = passive_stt_engine self.active_stt_engine = active_stt_engine self._logger.info("Initializing PyAudio. ALSA/Jack error messages " + "that pop up during this process are normal and " + "can usually be safely ignored.") self._audio = pyaudio.PyAudio() self._logger.info("Initialization of PyAudio completed.")
def __init__(self): self.rospack = RosPack() with open(join(self.rospack.get_path('apex_playground'), 'config', 'environment.json')) as f: self.params = json.load(f) with open(join(self.rospack.get_path('apex_playground'), 'config', 'bounds.json')) as f: self.bounds = json.load(f)["sensory"]["sound"][0] self.p = pyaudio.PyAudio() self.fs = 44100 # sampling rate, Hz, must be integer self.duration = 1./self.params['rate'] # for paFloat32 sample values must be in range [-1.0, 1.0] self.stream = self.p.open(format=pyaudio.paFloat32, channels=1, rate=self.fs, output=True)
def open(self): """Open a PyAudio instance. This needs to be called before play(), play_rec() or rec() is called. This can be done in two ways: snd = PA() snd.open() try: snd.play(x) finally: snd.close() or use the 'with' statement: with PA() as snd: snd.play(x) """ self._logger.debug("creating pyaudio instance") self.pa = pyaudio.PyAudio()
def my_record(): pa=PyAudio() stream=pa.open(format=paInt16,channels=1, rate=framerate,input=True, frames_per_buffer=NUM_SAMPLES) my_buf=[] count=0 print "* start recoding *" while count<TIME*5: string_audio_data=stream.read(NUM_SAMPLES) my_buf.append(string_audio_data) count+=1 print '.' #filename=datetime.now().strftime("%Y-%m-%d_%H_%M_%S")+".wav" filename="01.wav" save_wave_file(filename, my_buf) stream.close() print "* "+filename, "is saved! *"
def kayurecord(woodname, duration): """ Record audio and save to wav file """ filename = time_now() + "_" + woodname + ".wav" container = pyaudio.PyAudio() stream = container.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print("* start recording...") data = [] frames = [] for i in range(0, int(RATE / CHUNK * duration)): data = stream.read(CHUNK) frames.append(data) stream.stop_stream() stream.close() container.terminate() print("* done recording!") kayurecord_save(filename, frames, container) return filename
def play(self): """Play audio from source. This method will block until the source runs out of audio. """ p = pyaudio.PyAudio() stream = p.open(format=p.get_format_from_width(self._width), channels=self._channels, rate=self._freq, output=True) async with self._source.listen(): async for block in self._source: async for chunk in block: stream.write(chunk.audio) stream.stop_stream() stream.close() p.terminate()
def run(self): pya = PyAudio() self._stream = pya.open( format=paInt16, channels=1, rate=SAMPLE_RATE, input=True, frames_per_buffer=WINDOW_SIZE, stream_callback=self._process_frame, ) self._stream.start_stream() while self._stream.is_active() and not raw_input(): time.sleep(0.1) self._stream.stop_stream() self._stream.close() pya.terminate()
def __init__(self, sonic=None, block_size=None, **kwargs): if not sonic: sonic = Sonic() self.sonic = sonic self.channels = sonic.channels #??? self.sample_width = sonic.sample_width #????(byte) self.sample_frequency = sonic.sample_frequency #???? if block_size: self.block_size = block_size #wave???????? else: self.block_size = sonic.sample_length #wave???????? self.wave_buffer = list()#????? self.record_cache = queue.Queue() self.player = pyaudio.PyAudio() self.stream = self.player.open( format = self.player.get_format_from_width(self.sample_width), channels = self.channels, rate = self.sample_frequency, frames_per_buffer = self.block_size, input = True, **kwargs )
def play(file): chunk = 1024 f = wave.open(file,"rb") p = pyaudio.PyAudio() stream = p.open(format = p.get_format_from_width(f.getsampwidth()), channels = f.getnchannels(), rate = f.getframerate(), output = True) data = f.readframes(chunk) while data: stream.write(data) data = f.readframes(chunk) stream.stop_stream() stream.close() #p.terminate()
def play(file): chunk = 1024 f = wave.open(file,"rb") p = pyaudio.PyAudio() stream = p.open(format = p.get_format_from_width(f.getsampwidth()), channels = f.getnchannels(), rate = f.getframerate(), output = True) data = f.readframes(chunk) while data: stream.write(data) data = f.readframes(chunk) stream.stop_stream() stream.close() p.terminate()
def __init__(self, filename): """ Creates the high quality recorder :param filename: Filename to record into """ self._config = HQCConfig.get_instance('conn.conf') audio_config = self._config.get_section('HQRecordingSettings') self._p = pyaudio.PyAudio() self._stream = self._p.open(format=self._p.get_format_from_width(int(audio_config['width'])), channels=int(audio_config['channels']), rate=int(audio_config['rate']), input=True, stream_callback=self._callback) self._output = wave.open(filename, 'wb') self._output.setnchannels(int(audio_config['channels'])) self._output.setsampwidth(int(audio_config['width'])) self._output.setframerate(int(audio_config['rate'])) self._audio_writer = Thread(target=self._write_queue_to_file) self._audio_writer.daemon = True self._audio_writer.start()
def valid_input_devices(self): """ See which devices can be opened for microphone input. call this when no PyAudio object is loaded. """ mics=[] for device in range(self.p.get_device_count()): if self.valid_test(device): mics.append(device) if len(mics)==0: print("no microphone devices found!") else: print("found %d microphone devices: %s"%(len(mics),mics)) return mics ### SETUP AND SHUTDOWN
def record(self, chunksize, *args): with self.open_stream(*args, chunksize=chunksize, output=False) as stream: while True: try: frame = stream.read(chunksize) except IOError as e: if type(e.errno) is not int: # Simple hack to work around the fact that the # errno/strerror arguments were swapped in older # PyAudio versions. This was fixed in upstream # commit 1783aaf9bcc6f8bffc478cb5120ccb6f5091b3fb. strerror, errno = e.errno, e.strerror else: strerror, errno = e.strerror, e.errno self._logger.warning("IO error while reading from device" + " '%s': '%s' (Errno: %d)", self.slug, strerror, errno) else: yield frame
def pya(): global global_pya import pyaudio if global_pya == None: # suppress Jack and ALSA error messages on Linux. nullfd = os.open("/dev/null", 1) oerr = os.dup(2) os.dup2(nullfd, 2) global_pya = pyaudio.PyAudio() os.dup2(oerr, 2) os.close(oerr) os.close(nullfd) return global_pya # find the lowest supported input rate >= rate. # needed on Linux but not the Mac (which converts as needed).
def __init__(self, wave_prefix, total_seconds=-1, partial_seconds=None, incremental_mode = True, stop_callback=None, save_callback=None): self.chunk = 1600 self.format = pyaudio.paInt16 self.channels = 1 self.rate = 16000 self.partial_seconds = partial_seconds self.record_seconds = total_seconds self.wave_prefix = wave_prefix self.data = None self.stream = None self.p = pyaudio.PyAudio() if partial_seconds != None and partial_seconds > 0: self.multiple_waves_mode = True else: self.multiple_waves_mode = False self.incremental_mode = incremental_mode self._stop_signal = True self.stop_callback = stop_callback self.save_callback = save_callback
def Destroy(): Frames = 1024 Sound = wave.open("C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect" "\\BattleshipsSounds\\Sounds\\Destroy.wav", 'rb') Repro = pyaudio.PyAudio() stream = Repro.open( format=Repro.get_format_from_width(Sound.getsampwidth()), channels=Sound.getnchannels(), rate=Sound.getframerate(), output=True) data = Sound.readframes( Frames) while data != b'': stream.write(data) data = Sound.readframes(Frames) stream.close() Repro.terminate()
def Lanz(): Frames = 1024 Sound = wave.open("C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect" "\\BattleshipsSounds\\Sounds\\Lanzamiento.wav", 'rb') Repro = pyaudio.PyAudio() stream = Repro.open( format=Repro.get_format_from_width(Sound.getsampwidth()), channels=Sound.getnchannels(), rate=Sound.getframerate(), output=True) data = Sound.readframes( Frames) while data != b'': stream.write(data) data = Sound.readframes(Frames) stream.close() Repro.terminate()
def Water(): Frames = 1024 Sound = wave.open("C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect" "\\BattleshipsSounds\\Sounds\\Water.wav", 'rb') Repro = pyaudio.PyAudio() stream = Repro.open( format=Repro.get_format_from_width(Sound.getsampwidth()), channels=Sound.getnchannels(), rate=Sound.getframerate(), output=True) data = Sound.readframes( Frames) while data != b'': stream.write(data) data = Sound.readframes(Frames) stream.close() Repro.terminate()
def Looser(): Frames = 1024 Sound = wave.open("C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect" "\\BattleshipsSounds\\Sounds\\Looser.wav", 'rb') Repro = pyaudio.PyAudio() stream = Repro.open( format=Repro.get_format_from_width(Sound.getsampwidth()), channels=Sound.getnchannels(), rate=Sound.getframerate(), output=True) data = Sound.readframes( Frames) while data != b'': stream.write(data) data = Sound.readframes(Frames) stream.close() Repro.terminate()
def Boton(): Frames = 1024 Sound = wave.open( "C:\\Users\\braro\\Documents\\UTN\\I Cuatri\\Principios de Programación\\PycharmProjects\\CO-Proyect" "\\BattleshipsSounds\\Sounds\\Botton.wav", 'rb') Repro = pyaudio.PyAudio() stream = Repro.open( format=Repro.get_format_from_width(Sound.getsampwidth()), channels=Sound.getnchannels(), rate=Sound.getframerate(), output=True) data = Sound.readframes( Frames) while data != b'': stream.write(data) data = Sound.readframes(Frames) stream.close() Repro.terminate()
def __init__(self, device_index = None, sample_rate = None, chunk_size = None): assert device_index is None or isinstance(device_index, int), "Device index must be None or an integer" if device_index is not None: # ensure device index is in range audio = pyaudio.PyAudio(); count = audio.get_device_count(); audio.terminate() # obtain device count assert 0 <= device_index < count, "Device index out of range" assert sample_rate is None or isinstance(sample_rate, int) and sample_rate > 0, "Sample rate must be None or a positive integer" assert chunk_size is None or isinstance(chunk_size, int) and chunk_size > 0, "Chunk size must be None or a positive integer" if sample_rate is None: chunk_size = 16000 if chunk_size is None: chunk_size = 1024 self.device_index = device_index self.format = pyaudio.paInt16 # 16-bit int sampling self.SAMPLE_WIDTH = pyaudio.get_sample_size(self.format) # size of each sample self.SAMPLE_RATE = sample_rate # sampling rate in Hertz self.CHUNK = chunk_size # number of frames stored in each buffer self.audio = None self.stream = None
def __init__(self): self.miso = Queue() self.mosi = Queue() print "starting worker thread" CHUNK = 1024 FORMAT = pyaudio.paInt16 #paUint16 #paInt8 CHANNELS = 1 RATE = 44100 #sample rate self.paw = pyaudio.PyAudio() self.stream = self.paw.open(format=FORMAT, channels=CHANNELS, #input_device_index = 4, # rocketfish rate=RATE, input=True, stream_callback=self.callback, frames_per_buffer=CHUNK) #buffer #self.fort_proc = Process(target = fft_worker, # args=(self.mosi, self.miso)) #self.fort_proc.start() atexit.register(self.shutdown) print "allegedly started worker"
def __init__(self, name, chunks, chunksize=1024, channels=2, rate="auto"): self.elements_per_ringbuffer = chunks self.frames_per_element = chunksize self.samples_per_frame = channels self.bytes_per_sample = 2 # int16 self.name = name self.channels = channels self._ringbuffer = collections.deque( [b"0"*self.bytes_per_sample*channels*chunksize]*chunks, maxlen=self.elements_per_ringbuffer) self.p = pyaudio.PyAudio() # pylint: disable=invalid-name default_output = self.p.get_default_output_device_info() self.deviceindex = default_output['index'] if rate == "auto": self.rate = default_output['defaultSampleRate'] else: self.rate = rate self.has_new_audio = False
def _record(): global p global stream global frames p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print("????...") frames = [] signal.signal(signal.SIGINT, stop_record) while (True): if stream.is_stopped(): stream.close() p.terminate() break data = stream.read(CHUNK) frames.append(data)
def __init__(self,device=None,rate=None,chunk=4096,maxMemorySec=5): """ Prime the Ear class to access audio data. Recording won't start until stream_start() is called. - if device is none, will use first valid input (not system default) - if a rate isn't specified, the lowest usable rate will be used. """ # configuration self.chunk = chunk # doesn't have to be a power of 2 self.maxMemorySec = maxMemorySec # delete if more than this around self.device=device self.rate=rate # internal variables self.chunksRecorded=0 self.p=pyaudio.PyAudio() #keep this forever self.t=False #later will become threads ### SOUND CARD TESTING
def play_audio_file(fname=None): """Simple callback function to play a wave file. By default it plays a Ding sound. :param str fname: wave file name :return: None """ ding_wav = wave.open(fname, 'rb') ding_data = ding_wav.readframes(ding_wav.getnframes()) audio = pyaudio.PyAudio() stream_out = audio.open( format=audio.get_format_from_width(ding_wav.getsampwidth()), channels=ding_wav.getnchannels(), rate=ding_wav.getframerate(), input=False, output=True) stream_out.start_stream() stream_out.write(ding_data) time.sleep(0.2) stream_out.stop_stream() stream_out.close() audio.terminate()
def play_audio_file_and_record(fname=DETECT_DING): """Simple callback function to play a wave file. By default it plays a Ding sound. :param str fname: wave file name :return: None """ print("start of ding") ding_wav = wave.open(fname, 'rb') ding_data = ding_wav.readframes(ding_wav.getnframes()) audio = pyaudio.PyAudio() stream_out = audio.open( format=audio.get_format_from_width(ding_wav.getsampwidth()), channels=ding_wav.getnchannels(), rate=ding_wav.getframerate(), input=False, output=True) stream_out.start_stream() stream_out.write(ding_data) time.sleep(0.2) stream_out.stop_stream() stream_out.close() audio.terminate() print("end of ding, recording") print("end of recording") recognize_speech()
def play(self, wave): self.playing = True aud = pyaudio.PyAudio() stream = aud.open(format = aud.get_format_from_width(wave.getsampwidth()), channels = wave.getnchannels(), rate = wave.getframerate(), output = True) while self.playing: data = wave.readframes(1024) if not data: break stream.write(data) stream.stop_stream() stream.close() aud.terminate()
def __init__(self, rt): super().__init__(rt) config = rt.config['frontends']['speech']['recognizers'] self.listener_config = config self.chunk_size = config['chunk_size'] self.format = pyaudio.paInt16 self.sample_width = pyaudio.get_sample_size(self.format) self.sample_rate = config['sample_rate'] self.channels = config['channels'] self.p = pyaudio.PyAudio() self.stream = self.p.open(format=self.format, channels=self.channels, rate=self.sample_rate, input=True, frames_per_buffer=self.chunk_size) self.buffer_sec = config['wake_word_length'] self.talking_volume_ratio = config['talking_volume_ratio'] self.required_integral = config['required_noise_integral'] self.max_di_dt = config['max_di_dt'] self.noise_max_out_sec = config['noise_max_out_sec'] self.sec_between_ww_checks = config['sec_between_ww_checks'] self.recording_timeout = config['recording_timeout'] self.energy_weight = 1.0 - pow(1.0 - config['ambient_adjust_speed'], self.chunk_size / self.sample_rate) # For convenience self.chunk_sec = self.chunk_size / self.sample_rate self.av_energy = None self.integral = 0 self.noise_level = 0 self._intercept = None