我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用audioop.rms()。
def test_issue7673(self): state = None for data, size in INVALID_DATA: size2 = size self.assertRaises(audioop.error, audioop.getsample, data, size, 0) self.assertRaises(audioop.error, audioop.max, data, size) self.assertRaises(audioop.error, audioop.minmax, data, size) self.assertRaises(audioop.error, audioop.avg, data, size) self.assertRaises(audioop.error, audioop.rms, data, size) self.assertRaises(audioop.error, audioop.avgpp, data, size) self.assertRaises(audioop.error, audioop.maxpp, data, size) self.assertRaises(audioop.error, audioop.cross, data, size) self.assertRaises(audioop.error, audioop.mul, data, size, 1.0) self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5) self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5) self.assertRaises(audioop.error, audioop.add, data, data, size) self.assertRaises(audioop.error, audioop.bias, data, size, 0) self.assertRaises(audioop.error, audioop.reverse, data, size) self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2) self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state) self.assertRaises(audioop.error, audioop.lin2ulaw, data, size) self.assertRaises(audioop.error, audioop.lin2alaw, data, size) self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def main(): audio = Audio(frames_size=1600) rms = RMS() audio.link(rms) audio.start() alarm = os.path.realpath(os.path.join(os.path.dirname(__file__), 'resources/alarm.mp3')) alarm_uri = 'file://{}'.format(alarm) player1 = Player() player2 = Player() while True: try: player1.play(alarm_uri) time.sleep(1) player2.play(alarm_uri) time.sleep(3) except KeyboardInterrupt: break audio.stop()
def detect_squelch_level(self, detect_time=10, threshold=.8): start_time = time.time() end_time = start_time + detect_time audio_chunks = collections.deque() async with self._source.listen(): async for block in self._source: if time.time() > end_time: break even_iter = EvenChunkIterator(block, self._sample_size) try: while time.time() < end_time: audio_chunks.append(await even_iter.__anext__()) except StopAsyncIteration: pass rms_vals = [audioop.rms(x.audio, self._sample_width) for x in audio_chunks if len(x.audio) == self._sample_size * self._sample_width] level = sorted(rms_vals)[int(threshold * len(rms_vals)):][0] self.squelch_level = level return level
def read(self, frame_size): self.frame_count += 1 frame = self.buff.read(frame_size) if self.volume != 1: frame = self._frame_vol(frame, self.volume, maxv=2) if self.draw and not self.frame_count % self.frame_skip: # these should be processed for every frame, but "overhead" rms = audioop.rms(frame, 2) self.rmss.append(rms) max_rms = sorted(self.rmss)[-1] meter_text = 'avg rms: {:.2f}, max rms: {:.2f} '.format(self._avg(self.rmss), max_rms) self._pprint_meter(rms / max(1, max_rms), text=meter_text, shift=True) return frame
def test_string(self): data = 'abcd' size = 2 self.assertRaises(TypeError, audioop.getsample, data, size, 0) self.assertRaises(TypeError, audioop.max, data, size) self.assertRaises(TypeError, audioop.minmax, data, size) self.assertRaises(TypeError, audioop.avg, data, size) self.assertRaises(TypeError, audioop.rms, data, size) self.assertRaises(TypeError, audioop.avgpp, data, size) self.assertRaises(TypeError, audioop.maxpp, data, size) self.assertRaises(TypeError, audioop.cross, data, size) self.assertRaises(TypeError, audioop.mul, data, size, 1.0) self.assertRaises(TypeError, audioop.tomono, data, size, 0.5, 0.5) self.assertRaises(TypeError, audioop.tostereo, data, size, 0.5, 0.5) self.assertRaises(TypeError, audioop.add, data, data, size) self.assertRaises(TypeError, audioop.bias, data, size, 0) self.assertRaises(TypeError, audioop.reverse, data, size) self.assertRaises(TypeError, audioop.lin2lin, data, size, size) self.assertRaises(TypeError, audioop.ratecv, data, size, 1, 1, 1, None) self.assertRaises(TypeError, audioop.lin2ulaw, data, size) self.assertRaises(TypeError, audioop.lin2alaw, data, size) self.assertRaises(TypeError, audioop.lin2adpcm, data, size, None)
def __db_level(self, rms_mode=False): """ Returns the average audio volume level measured in dB (range -60 db to 0 db) If the sample is stereo, you get back a tuple: (left_level, right_level) If the sample is mono, you still get a tuple but both values will be the same. This method is probably only useful if processed on very short sample fragments in sequence, so the db levels could be used to show a level meter for the duration of the sample. """ maxvalue = 2**(8*self.__samplewidth-1) if self.nchannels == 1: if rms_mode: peak_left = peak_right = (audioop.rms(self.__frames, self.__samplewidth)+1)/maxvalue else: peak_left = peak_right = (audioop.max(self.__frames, self.__samplewidth)+1)/maxvalue else: left_frames = audioop.tomono(self.__frames, self.__samplewidth, 1, 0) right_frames = audioop.tomono(self.__frames, self.__samplewidth, 0, 1) if rms_mode: peak_left = (audioop.rms(left_frames, self.__samplewidth)+1)/maxvalue peak_right = (audioop.rms(right_frames, self.__samplewidth)+1)/maxvalue else: peak_left = (audioop.max(left_frames, self.__samplewidth)+1)/maxvalue peak_right = (audioop.max(right_frames, self.__samplewidth)+1)/maxvalue # cut off at the bottom at -60 instead of all the way down to -infinity return max(20.0*math.log(peak_left, 10), -60.0), max(20.0*math.log(peak_right, 10), -60.0)
def adjust_for_ambient_noise(self, source, duration = 1): """ Adjusts the energy threshold dynamically using audio from ``source`` (an ``AudioSource`` instance) to account for ambient noise. Intended to calibrate the energy threshold with the ambient energy level. Should be used on periods of audio without speech - will stop early if any speech is detected. The ``duration`` parameter is the maximum number of seconds that it will dynamically adjust the threshold for before returning. This value should be at least 0.5 in order to get a representative sample of the ambient noise. """ assert isinstance(source, AudioSource), "Source must be an audio source" assert self.pause_threshold >= self.non_speaking_duration >= 0 seconds_per_buffer = (source.CHUNK + 0.0) / source.SAMPLE_RATE elapsed_time = 0 # adjust energy threshold until a phrase starts while True: elapsed_time += seconds_per_buffer if elapsed_time > duration: break buffer = source.stream.read(source.CHUNK, exception_on_overflow=False) energy = audioop.rms(buffer, source.SAMPLE_WIDTH) # energy of the audio signal # dynamically adjust the energy threshold using assymmetric weighted average damping = self.dynamic_energy_adjustment_damping ** seconds_per_buffer # account for different chunk sizes and rates target_energy = energy * self.dynamic_energy_ratio self.energy_threshold = self.energy_threshold * damping + target_energy * (1 - damping)
def adjust_for_ambient_noise(self, source, duration = 1): """ Adjusts the energy threshold dynamically using audio from ``source`` (an ``AudioSource`` instance) to account for ambient noise. Intended to calibrate the energy threshold with the ambient energy level. Should be used on periods of audio without speech - will stop early if any speech is detected. The ``duration`` parameter is the maximum number of seconds that it will dynamically adjust the threshold for before returning. This value should be at least 0.5 in order to get a representative sample of the ambient noise. """ assert isinstance(source, AudioSource), "Source must be an audio source" assert source.stream is not None, "Audio source must be entered before adjusting, see documentation for `AudioSource`; are you using `source` outside of a `with` statement?" assert self.pause_threshold >= self.non_speaking_duration >= 0 seconds_per_buffer = (source.CHUNK + 0.0) / source.SAMPLE_RATE elapsed_time = 0 # adjust energy threshold until a phrase starts while True: elapsed_time += seconds_per_buffer if elapsed_time > duration: break buffer = source.stream.read(source.CHUNK) energy = audioop.rms(buffer, source.SAMPLE_WIDTH) # energy of the audio signal # dynamically adjust the energy threshold using assymmetric weighted average damping = self.dynamic_energy_adjustment_damping ** seconds_per_buffer # account for different chunk sizes and rates target_energy = energy * self.dynamic_energy_ratio self.energy_threshold = self.energy_threshold * damping + target_energy * (1 - damping)
def _calc_energy(self, sound_chunk): """Calculates how loud the sound is""" return audioop.rms(sound_chunk, self.sample_width)
def getScore(self, data): rms = audioop.rms(data, 2) score = rms / 3 return score
def test_rms(self): self.assertEqual(audioop.rms(data[0], 1), 1) self.assertEqual(audioop.rms(data[1], 2), 1) self.assertEqual(audioop.rms(data[2], 4), 1)
def put(self, data): print('RMS: {}'.format(audioop.rms(data, 2)))
def test_rms(self): for w in 1, 2, 4: self.assertEqual(audioop.rms(b'', w), 0) p = packs[w] self.assertEqual(audioop.rms(p(*range(100)), w), 57) self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w), maxvalues[w], delta=1) self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w), -minvalues[w], delta=1) self.assertEqual(audioop.rms(datas[1], 1), 77) self.assertEqual(audioop.rms(datas[2], 2), 20001) self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def get_score(self, data): rms = audioop.rms(data, 2) score = rms / 3 return score
def check_squelch(level, is_triggered, chunks): rms_vals = [audioop.rms(x.audio, x.width) for x in chunks] median_rms = sorted(rms_vals)[int(len(rms_vals) * .5)] if is_triggered: if median_rms < (level * .8): return False else: return True else: if median_rms > level: return True else: return False
def test_rms(self): for w in 1, 2, 3, 4: self.assertEqual(audioop.rms(b'', w), 0) self.assertEqual(audioop.rms(bytearray(), w), 0) self.assertEqual(audioop.rms(memoryview(b''), w), 0) p = packs[w] self.assertEqual(audioop.rms(p(*range(100)), w), 57) self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w), maxvalues[w], delta=1) self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w), -minvalues[w], delta=1) self.assertEqual(audioop.rms(datas[1], 1), 77) self.assertEqual(audioop.rms(datas[2], 2), 20001) self.assertEqual(audioop.rms(datas[3], 3), 5120523) self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def _snr(self, frames): rms = audioop.rms(b''.join(frames), int(self._input_bits / 8)) if rms > 0 and self._threshold > 0: return 20.0 * math.log(rms / self._threshold, 10) else: return 0
def rms(self): return audioop.rms(self.__frames, self.samplewidth)
def _apiai_stt(self): from math import log import audioop import pyaudio import time resampler = apiai.Resampler(source_samplerate=settings['RATE']) request = self.ai.voice_request() vad = apiai.VAD() def callback(in_data, frame_count): frames, data = resampler.resample(in_data, frame_count) if settings.show_decibels: decibel = 20 * log(audioop.rms(data, 2) + 1, 10) click.echo(decibel) state = vad.processFrame(frames) request.send(data) state_signal = pyaudio.paContinue if state == 1 else pyaudio.paComplete return in_data, state_signal p = pyaudio.PyAudio() stream = p.open(format=pyaudio.paInt32, input=True, output=False, stream_callback=callback, channels=settings['CHANNELS'], rate=settings['RATE'], frames_per_buffer=settings['CHUNK']) stream.start_stream() click.echo("Speak!") while stream.is_active(): time.sleep(0.1) stream.stop_stream() stream.close() p.terminate()
def check_silence(self, buf): volume = audioop.rms(buf, 2) if (volume >= config.THRESHOLD): self.silence_timer = time.time() if (self.append == False): if (self.hatch.get('debug') == True): print ('starting append mode') self.timer = time.time() for sbuf in self.silence_buffer: self.prepare.prepare(sbuf, volume) self.silence_buffer = [ ] self.append = True self.silence_counter = 0 else: self.silence_counter += 1 self.silence_buffer.append(buf) if (len(self.silence_buffer) > 3): del self.silence_buffer[0] if (self.out != None and self.out.closed != True): self.out.write(buf) if (self.append == True): self.prepare.prepare(buf, volume) if (self.append == True and self.silence_timer > 0 and self.silence_timer + config.MAX_SILENCE_AFTER_START < time.time() and self.live == True): self.stop("stop append mode because of silence") if (self.append == True and self.timer + config.MAX_TIME < time.time() and self.live == True): self.stop("stop append mode because time is up")
def rms(audio_fragment, sample_width_in_bytes=2): """Given audio fragment, return the root mean square.""" # Equal to # return int(np.sqrt(np.mean(np.square( # np.absolute(audio_fragment.astype(np.float)))))) return audioop.rms(np.abs(audio_fragment), sample_width_in_bytes)
def record(self): ''' time.sleep(1) args = ['arecord', '-d', '5', self.voicefile] proc = subprocess.Popen(args) time.sleep(5) ''' FORMAT = pyaudio.paInt16 CHANNELS = 2 RATE = 44100 CHUNK = 1024 RECORD_SECONDS = 5 WAVE_OUTPUT_FILENAME = self.voicefile audio = pyaudio.PyAudio() # start Recording stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print "recording..." frames = [] threshold = 1000 for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK) rms = audioop.rms(data,2) if rms > threshold: print "I am hearing you now" frames.append(data) print "finished recording" # stop Recording stream.stop_stream() stream.close() audio.terminate() waveFile = wave.open(WAVE_OUTPUT_FILENAME, 'wb') waveFile.setnchannels(CHANNELS) waveFile.setsampwidth(audio.get_sample_size(FORMAT)) waveFile.setframerate(RATE) waveFile.writeframes(b''.join(frames)) waveFile.close()
def wait_for_keyword(self, keyword=None): if not keyword: keyword = self._keyword frame_queue = queue.Queue() keyword_uttered = threading.Event() # FIXME: not configurable yet num_worker_threads = 2 for i in range(num_worker_threads): t = threading.Thread(target=self.check_for_keyword, args=(frame_queue, keyword_uttered, keyword)) t.daemon = True t.start() frames = collections.deque([], 30) recording = False recording_frames = [] self._logger.info("Waiting for keyword '%s'...", keyword) for frame in self._input_device.record(self._input_chunksize, self._input_bits, self._input_channels, self._input_rate): if keyword_uttered.is_set(): self._logger.info("Keyword %s has been uttered", keyword) return frames.append(frame) if not recording: snr = self._snr([frame]) if snr >= 10: # 10dB # Loudness is higher than normal, start recording and use # the last 10 frames to start self._logger.debug("Started recording on device '%s'", self._input_device.slug) self._logger.debug("Triggered on SNR of %sdB", snr) recording = True recording_frames = list(frames)[-10:] elif len(frames) >= frames.maxlen: # Threshold SNR not reached. Update threshold with # background noise. self._threshold = float(audioop.rms("".join(frames), 2)) else: # We're recording recording_frames.append(frame) if len(recording_frames) > 20: # If we recorded at least 20 frames, check if we're below # threshold again last_snr = self._snr(recording_frames[-10:]) self._logger.debug( "Recording's SNR dB: %f", last_snr) if last_snr <= 3 or len(recording_frames) >= 60: # The loudness of the sound is not at least as high as # the the threshold, or we've been waiting too long # we'll stop recording now recording = False self._logger.debug("Recorded %d frames", len(recording_frames)) frame_queue.put(tuple(recording_frames)) self._threshold = float( audioop.rms(b"".join(frames), 2))